diff --git a/pywxdump/__init__.py b/pywxdump/__init__.py
index f4438ed..2edac25 100644
--- a/pywxdump/__init__.py
+++ b/pywxdump/__init__.py
@@ -5,17 +5,15 @@
# Author: xaoyaoo
# Date: 2023/10/14
# -------------------------------------------------------------------------------
+# from .analyzer.db_parsing import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, \
+# parse_xml_string, read_BytesExtra
+# from .ui import app_show_chat, get_user_list, export
from .wx_info import BiasAddr, read_info, get_wechat_db, batch_decrypt, decrypt, get_core_db
from .wx_info import merge_copy_db, merge_msg_db, merge_media_msg_db, merge_db, decrypt_merge, merge_real_time_db
-from .analyzer.db_parsing import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, \
- parse_xml_string, read_BytesExtra
-from .analyzer import export_csv, export_json, DBPool
-from .ui import app_show_chat, get_user_list, export
+from .analyzer import DBPool
from .dbpreprocess import get_user_list, get_recent_user_list, wxid2userinfo, ParsingMSG, ParsingMicroMsg, \
ParsingMediaMSG, ParsingOpenIMContact
-
from .server import start_falsk
-
import os, json
try:
@@ -26,7 +24,7 @@ except:
VERSION_LIST = {}
VERSION_LIST_PATH = None
-PYWXDUMP_ROOT_PATH = os.path.dirname(__file__)
-db_init = DBPool("DBPOOL_INIT")
+# PYWXDUMP_ROOT_PATH = os.path.dirname(__file__)
+# db_init = DBPool("DBPOOL_INIT")
__version__ = "3.0.10"
diff --git a/pywxdump/analyzer/__init__.py b/pywxdump/analyzer/__init__.py
index fdf84aa..e352f37 100644
--- a/pywxdump/analyzer/__init__.py
+++ b/pywxdump/analyzer/__init__.py
@@ -5,8 +5,4 @@
# Author: xaoyaoo
# Date: 2023/09/27
# -------------------------------------------------------------------------------
-from .db_parsing import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, \
- parse_xml_string, read_BytesExtra
-from .export_chat import export_csv, get_contact_list, get_chatroom_list, get_msg_list, get_chat_count, export_json, \
- get_all_chat_count
-from .utils import get_type_name, get_name_typeid,DBPool
+from .utils import DBPool
diff --git a/pywxdump/analyzer/db_parsing.py b/pywxdump/analyzer/db_parsing.py
deleted file mode 100644
index b1314ea..0000000
--- a/pywxdump/analyzer/db_parsing.py
+++ /dev/null
@@ -1,349 +0,0 @@
-# -*- coding: utf-8 -*-#
-# -------------------------------------------------------------------------------
-# Name: parse.py
-# Description: 解析数据库内容
-# Author: xaoyaoo
-# Date: 2023/09/27
-# -------------------------------------------------------------------------------
-import os.path
-import sqlite3
-import pysilk
-from io import BytesIO
-import wave
-import pyaudio
-import requests
-import hashlib
-import lz4.block
-import blackboxprotobuf
-
-from PIL import Image
-# import xml.etree.ElementTree as ET
-import lxml.etree as ET # 这个模块更健壮些,微信XML格式有时有非标格式,会导致xml.etree.ElementTree处理失败
-
-
-def get_md5(data):
- md5 = hashlib.md5()
- md5.update(data)
- return md5.hexdigest()
-
-
-def parse_xml_string(xml_string):
- """
- 解析 XML 字符串
- :param xml_string: 要解析的 XML 字符串
- :return: 解析结果,以字典形式返回
- """
-
- def parse_xml(element):
- """
- 递归解析 XML 元素
- :param element: 要解析的 XML 元素
- :return: 解析结果,以字典形式返回
- """
- result = {}
-
- # 解析当前元素的属性
- if element is None or element.attrib is None: # 有时可能会遇到没有属性,要处理下
- return result
- for key, value in element.attrib.items():
- result[key] = value
-
- # 解析当前元素的子元素
- for child in element:
- child_result = parse_xml(child)
-
- # 如果子元素的标签已经在结果中存在,则将其转换为列表
- if child.tag in result:
- if not isinstance(result[child.tag], list):
- result[child.tag] = [result[child.tag]]
- result[child.tag].append(child_result)
- else:
- result[child.tag] = child_result
-
- # 如果当前元素没有子元素,则将其文本内容作为值保存
- if not result and element.text:
- result = element.text
-
- return result
-
- if xml_string is None or not isinstance(xml_string, str):
- return None
- try:
- parser = ET.XMLParser(recover=True) # 有时微信的聊天记录里面,会冒出来xml格式不对的情况,这里把parser设置成忽略错误
- root = ET.fromstring(xml_string, parser)
- except Exception as e:
- return xml_string
- return parse_xml(root)
-
-
-def read_img_dat(input_data):
- """
- 读取图片文件dat格式
- :param input_data: 图片文件路径或者图片文件数据
- :return: 图片格式,图片md5,图片数据
- """
- # 常见图片格式的文件头
- img_head = {
- b"\xFF\xD8\xFF": ".jpg",
- b"\x89\x50\x4E\x47": ".png",
- b"\x47\x49\x46\x38": ".gif",
- b"\x42\x4D": ".BMP",
- b"\x49\x49": ".TIFF",
- b"\x4D\x4D": ".TIFF",
- b"\x00\x00\x01\x00": ".ICO",
- b"\x52\x49\x46\x46": ".WebP",
- b"\x00\x00\x00\x18\x66\x74\x79\x70\x68\x65\x69\x63": ".HEIC",
- }
-
- if isinstance(input_data, str):
- with open(input_data, "rb") as f:
- input_bytes = f.read()
- else:
- input_bytes = input_data
-
- try:
- import numpy as np
- input_bytes = np.frombuffer(input_bytes, dtype=np.uint8)
- for hcode in img_head: # 遍历文件头
- t = input_bytes[0] ^ hcode[0] # 异或解密
- if np.all(t == np.bitwise_xor(np.frombuffer(input_bytes[:len(hcode)], dtype=np.uint8),
- np.frombuffer(hcode, dtype=np.uint8))): # 使用NumPy进行向量化的异或解密操作,并进行类型转换
- fomt = img_head[hcode] # 获取文件格式
-
- out_bytes = np.bitwise_xor(input_bytes, t) # 使用NumPy进行向量化的异或解密操作
- md5 = get_md5(out_bytes)
- return fomt, md5, out_bytes
- return False
- except ImportError:
- pass
-
- for hcode in img_head:
- t = input_bytes[0] ^ hcode[0]
- for i in range(1, len(hcode)):
- if t == input_bytes[i] ^ hcode[i]:
- fomt = img_head[hcode]
- out_bytes = bytearray()
- for nowByte in input_bytes: # 读取文件
- newByte = nowByte ^ t # 异或解密
- out_bytes.append(newByte)
- md5 = get_md5(out_bytes)
- return fomt, md5, out_bytes
- return False
-
-
-def read_emoji(cdnurl, is_show=False):
- headers = {
- "User-Agent": "Mozilla/5.0 (Linux; Android 10; Redmi K30 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Mobile Safari/537.36"
-
- }
- r1 = requests.get(cdnurl, headers=headers)
- rdata = r1.content
-
- if is_show: # 显示表情
- img = Image.open(BytesIO(rdata))
- img.show()
- return rdata
-
-
-def decompress_CompressContent(data):
- """
- 解压缩Msg:CompressContent内容
- :param data:
- :return:
- """
- if data is None or not isinstance(data, bytes):
- return None
- try:
- dst = lz4.block.decompress(data, uncompressed_size=len(data) << 8)
- dst = dst.replace(b'\x00', b'') # 已经解码完成后,还含有0x00的部分,要删掉,要不后面ET识别的时候会报错
- uncompressed_data = dst.decode('utf-8', errors='ignore')
- return uncompressed_data
- except Exception as e:
- return data.decode('utf-8', errors='ignore')
-
-
-def read_audio_buf(buf_data, is_play=False, is_wave=False, rate=24000):
- silk_file = BytesIO(buf_data) # 读取silk文件
- pcm_file = BytesIO() # 创建pcm文件
-
- pysilk.decode(silk_file, pcm_file, rate) # 解码silk文件->pcm文件
- pcm_data = pcm_file.getvalue() # 获取pcm文件数据
-
- silk_file.close() # 关闭silk文件
- pcm_file.close() # 关闭pcm文件
- if is_play: # 播放音频
- def play_audio(pcm_data, rate):
- p = pyaudio.PyAudio() # 实例化pyaudio
- stream = p.open(format=pyaudio.paInt16, channels=1, rate=rate, output=True) # 创建音频流对象
- stream.write(pcm_data) # 写入音频流
- stream.stop_stream() # 停止音频流
- stream.close() # 关闭音频流
- p.terminate() # 关闭pyaudio
-
- play_audio(pcm_data, rate)
-
- if is_wave: # 转换为wav文件
- wave_file = BytesIO() # 创建wav文件
- with wave.open(wave_file, 'wb') as wf:
- wf.setparams((1, 2, rate, 0, 'NONE', 'NONE')) # 设置wav文件参数
- wf.writeframes(pcm_data) # 写入wav文件
- rdata = wave_file.getvalue() # 获取wav文件数据
- wave_file.close() # 关闭wav文件
- return rdata
-
- return pcm_data
-
-
-def read_audio(MsgSvrID, is_play=False, is_wave=False, DB_PATH: str = "", rate=24000):
- if DB_PATH == "":
- return False
-
- DB = sqlite3.connect(DB_PATH)
- cursor = DB.cursor()
- sql = "select Buf from Media where Reserved0={}".format(MsgSvrID)
- DBdata = cursor.execute(sql).fetchall()
-
- if len(DBdata) == 0:
- return False
- data = DBdata[0][0] # [1:] + b'\xFF\xFF'
- try:
- pcm_data = read_audio_buf(data, is_play, is_wave, rate)
- return pcm_data
- except Exception as e:
- return False
-
-
-def wordcloud_generator(text, out_path="", is_show=False, img_path="", font="C:\Windows\Fonts\simhei.ttf"):
- """
- 词云
- :param is_show: 是否显示
- :param img_path: 背景图片路径
- :param text: 文本
- :param font: 字体路径
- :return:
- """
- try:
- from wordcloud import WordCloud
- import jieba
- import numpy as np
- import matplotlib.pyplot as plt
- from matplotlib.font_manager import fontManager
- except ImportError as e:
- print("error", e)
- raise ImportError("请安装wordcloud,jieba,numpy,matplotlib,pillow库")
- words = jieba.lcut(text) # 精确分词
- newtxt = ' '.join(words) # 空格拼接
- # 字体路径
-
- # 创建WordCloud对象
- wordcloud1 = WordCloud(width=800, height=400, background_color='white', font_path=font)
- wordcloud1.generate(newtxt)
-
- if out_path and out_path != "":
- wordcloud1.to_file("wordcloud.png") # 保存图片
- if img_path and os.path.exists(img_path): # 设置背景图片
- img_color = np.array(Image.open(img_path)) # 读取背景图片
- img_color = img_color.reshape((img_color.shape[0] * img_color.shape[1], 3))
- wordcloud1.recolor(color_func=img_color) # 设置背景图片颜色
- if is_show:
- # 显示词云
- wordcloud_img = wordcloud1.to_image()
- wordcloud_img.show()
-
-
-def convert_bytes_to_str(d):
- """
- 遍历字典并将bytes转换为字符串
- :param d:
- :return:
- """
- for k, v in d.items():
- if isinstance(v, dict):
- convert_bytes_to_str(v)
- elif isinstance(v, list):
- for item in v:
- if isinstance(item, dict):
- convert_bytes_to_str(item)
- elif isinstance(item, bytes):
- item = item.decode('utf-8') # 将bytes转换为字符串
- elif isinstance(v, bytes):
- d[k] = v.decode('utf-8')
-
-
-def read_BytesExtra(BytesExtra):
- if BytesExtra is None or not isinstance(BytesExtra, bytes):
- return None
- try:
- deserialize_data, message_type = blackboxprotobuf.decode_message(BytesExtra)
- return deserialize_data
- except Exception as e:
- return None
-
-
-def read_ChatRoom_RoomData(RoomData):
- # 读取群聊数据,主要为 wxid,以及对应昵称
- if RoomData is None or not isinstance(RoomData, bytes):
- return None
- try:
- data = read_BytesExtra(RoomData)
- convert_bytes_to_str(data)
- return data
- except Exception as e:
- return None
-
-
-def read_ExtraBuf(ExtraBuf: bytes):
- """
- 读取ExtraBuf(联系人表)
- :param ExtraBuf:
- :return:
- """
- if not ExtraBuf:
- return None
- try:
- buf_dict = {
- 'DDF32683': '0', '74752C06': '性别[1男2女]', '88E28FCE': '2', '761A1D2D': '3', '0263A0CB': '4',
- '0451FF12': '5',
- '228C66A8': '6', '46CF10C4': '个性签名', 'A4D9024A': '国', 'E2EAA8D1': '省', '1D025BBF': '市',
- '4D6C4570': '11',
- 'F917BCC0': '公司名称', '759378AD': '手机号', '4335DFDD': '14', 'DE4CDAEB': '15', 'A72BC20A': '16',
- '069FED52': '17',
- '9B0F4299': '18', '3D641E22': '19', '1249822C': '20', '4EB96D85': '企微属性', 'B4F73ACB': '22',
- '0959EB92': '23',
- '3CF4A315': '24', 'C9477AC60201E44CD0E8': '26', 'B7ACF0F5': '28', '57A7B5A8': '29',
- '81AE19B4': '朋友圈背景',
- '695F3170': '31', 'FB083DD9': '32', '0240E37F': '33', '315D02A3': '34', '7DEC0BC3': '35',
- '0E719F13': '备注图片',
- '16791C90': '37'
- }
-
- rdata = {}
- for buf_name in buf_dict:
- rdata_name = buf_dict[buf_name]
- buf_name = bytes.fromhex(buf_name)
- offset = ExtraBuf.find(buf_name)
- if offset == -1:
- rdata[rdata_name] = ""
- continue
- offset += len(buf_name)
- type_id = ExtraBuf[offset: offset + 1]
- offset += 1
-
- if type_id == b"\x04":
- rdata[rdata_name] = int.from_bytes(ExtraBuf[offset: offset + 4], "little")
-
- elif type_id == b"\x18":
- length = int.from_bytes(ExtraBuf[offset: offset + 4], "little")
- rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-16").rstrip("\x00")
-
- elif type_id == b"\x17":
- length = int.from_bytes(ExtraBuf[offset: offset + 4], "little")
- rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-8").rstrip("\x00")
-
- elif type_id == b"\x05":
- rdata[rdata_name] = f"0x{ExtraBuf[offset: offset + 8].hex()}"
- return rdata
-
- except Exception as e:
- print(f'解析错误:\n{e}')
- return None
diff --git a/pywxdump/analyzer/export_chat.py b/pywxdump/analyzer/export_chat.py
deleted file mode 100644
index 766f7d9..0000000
--- a/pywxdump/analyzer/export_chat.py
+++ /dev/null
@@ -1,429 +0,0 @@
-# -*- coding: utf-8 -*-#
-# -------------------------------------------------------------------------------
-# Name: export_chat.py
-# Description:
-# Author: xaoyaoo
-# Date: 2023/12/03
-# -------------------------------------------------------------------------------
-# -*- coding: utf-8 -*-#
-# -------------------------------------------------------------------------------
-# Name: GUI.py
-# Description:
-# Author: xaoyaoo
-# Date: 2023/11/10
-# -------------------------------------------------------------------------------
-import csv
-import re
-import sqlite3
-import os
-import json
-import time
-from functools import wraps
-
-from .utils import get_md5, attach_databases, execute_sql, get_type_name, match_BytesExtra, DBPool, time_int2str
-from .db_parsing import parse_xml_string, decompress_CompressContent, read_BytesExtra
-
-
-def get_contact(MicroMsg_db_path, wx_id):
- """
- 获取联系人信息
- :param MicroMsg_db_path: MicroMsg.db 文件路径
- :param wx_id: 微信id
- :return: 联系人信息
- """
- with DBPool(MicroMsg_db_path) as db:
- # 获取username是wx_id的用户
- sql = ("SELECT A.UserName, A.NickName, A.Remark,A.Alias,A.Reserved6,B.bigHeadImgUrl "
- "FROM Contact A,ContactHeadImgUrl B "
- f"WHERE A.UserName = '{wx_id}' AND A.UserName = B.usrName "
- "ORDER BY NickName ASC;")
- result = execute_sql(db, sql)
- print('联系人信息:', result)
- if not result:
- print('居然没找到!')
- print(wx_id)
- return None
- return {"username": result[0], "nickname": result[1], "remark": result[2], "account": result[3],
- "describe": result[4], "headImgUrl": result[5]}
-
-
-def get_contact_list(MicroMsg_db_path, OpenIMContact_db_path=None):
- """
- 获取联系人列表
- :param MicroMsg_db_path: MicroMsg.db 文件路径
- :return: 联系人列表
- """
- users = []
- # 连接 MicroMsg.db 数据库,并执行查询
- with DBPool(MicroMsg_db_path) as db:
- sql = ("SELECT A.UserName, A.NickName, A.Remark,A.Alias,A.Reserved6,B.bigHeadImgUrl "
- "FROM Contact A,ContactHeadImgUrl B "
- "where UserName==usrName "
- "ORDER BY NickName ASC;")
- result = execute_sql(db, sql)
- for row in result:
- # 获取用户名、昵称、备注和聊天记录数量
- username, nickname, remark, Alias, describe, headImgUrl = row
- users.append(
- {"username": username, "nickname": nickname, "remark": remark, "account": Alias, "describe": describe,
- "headImgUrl": headImgUrl})
- # return users
- if OpenIMContact_db_path:
- with DBPool(OpenIMContact_db_path) as db:
- sql = ("SELECT A.UserName, A.NickName, A.Remark,A.BigHeadImgUrl FROM OpenIMContact A "
- "ORDER BY NickName ASC;")
- result = execute_sql(db, sql)
- for row in result:
- # 获取用户名、昵称、备注和聊天记录数量
- username, nickname, remark, headImgUrl = row
- users.append(
- {"username": username, "nickname": nickname, "remark": remark, "account": "", "describe": "",
- "headImgUrl": headImgUrl})
- return users
-
-
-def get_chatroom_list(MicroMsg_db_path):
- """
- 获取群聊列表
- :param MicroMsg_db_path: MicroMsg.db 文件路径
- :return: 群聊列表
- """
- rooms = []
- # 连接 MicroMsg.db 数据库,并执行查询
- with DBPool(MicroMsg_db_path) as db:
- sql = ("SELECT A.ChatRoomName,A.UserNameList, A.DisplayNameList, B.Announcement,B.AnnouncementEditor "
- "FROM ChatRoom A,ChatRoomInfo B "
- "where A.ChatRoomName==B.ChatRoomName "
- "ORDER BY A.ChatRoomName ASC;")
- result = execute_sql(db, sql)
- for row in result:
- # 获取用户名、昵称、备注和聊天记录数量
- ChatRoomName, UserNameList, DisplayNameList, Announcement, AnnouncementEditor = row
- UserNameList = UserNameList.split("^G")
- DisplayNameList = DisplayNameList.split("^G")
- rooms.append(
- {"ChatRoomName": ChatRoomName, "UserNameList": UserNameList, "DisplayNameList": DisplayNameList,
- "Announcement": Announcement, "AnnouncementEditor": AnnouncementEditor})
- return rooms
-
-
-def get_room_user_list(MSG_db_path, selected_talker):
- """
- 获取群聊中包含的所有用户列表
- :param MSG_db_path: MSG.db 文件路径
- :param selected_talker: 选中的聊天对象 wxid
- :return: 聊天用户列表
- """
-
- # 连接 MSG_ALL.db 数据库,并执行查询
- with DBPool(MSG_db_path) as db1:
- sql = (
- "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id "
- "FROM MSG WHERE StrTalker=? "
- "ORDER BY CreateTime ASC")
-
- result1 = execute_sql(db1, sql, (selected_talker,))
- user_list = []
- read_user_wx_id = []
- for row in result1:
- localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent, BytesExtra, id = row
- bytes_extra = read_BytesExtra(BytesExtra)
- if bytes_extra:
- try:
- talker = bytes_extra['3'][0]['2'].decode('utf-8', errors='ignore')
- except:
- continue
- if talker in read_user_wx_id:
- continue
- user = get_contact(MSG_db_path, talker)
- if not user:
- continue
- user_list.append(user)
- read_user_wx_id.append(talker)
- return user_list
-
-
-def get_msg_list(MSG_db_path, selected_talker="", start_index=0, page_size=500):
- """
- 获取聊天记录列表
- :param MSG_db_path: MSG.db 文件路径
- :param selected_talker: 选中的聊天对象 wxid
- :param start_index: 开始索引
- :param page_size: 每页数量
- :return: 聊天记录列表
- """
-
- # 连接 MSG_ALL.db 数据库,并执行查询
- with DBPool(MSG_db_path) as db1:
- if selected_talker:
- sql = (
- "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id "
- "FROM MSG WHERE StrTalker=? "
- "ORDER BY CreateTime ASC LIMIT ?,?")
- result1 = execute_sql(db1, sql, (selected_talker, start_index, page_size))
- else:
- sql = (
- "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id "
- "FROM MSG ORDER BY CreateTime ASC LIMIT ?,?")
- result1 = execute_sql(db1, sql, (start_index, page_size))
-
- data = []
- for row in result1:
- localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent, BytesExtra, id = row
- CreateTime = time_int2str(CreateTime)
-
- type_id = (Type, SubType)
- type_name = get_type_name(type_id)
-
- content = {"src": "", "msg": StrContent}
-
- if type_id == (1, 0): # 文本
- content["msg"] = StrContent
-
- elif type_id == (3, 0): # 图片
- DictExtra = read_BytesExtra(BytesExtra)
- DictExtra_str = str(DictExtra)
- img_paths = [i for i in re.findall(r"(FileStorage.*?)'", DictExtra_str)]
- img_paths = sorted(img_paths, key=lambda p: "Image" in p, reverse=True)
- if img_paths:
- img_path = img_paths[0].replace("'", "")
- img_path = [i for i in img_path.split("\\") if i]
- img_path = os.path.join(*img_path)
- content["src"] = img_path
- else:
- content["src"] = ""
- content["msg"] = "图片"
- elif type_id == (34, 0): # 语音
- tmp_c = parse_xml_string(StrContent)
- voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "")
- transtext = tmp_c.get("voicetrans", {}).get("transtext", "")
- if voicelength.isdigit():
- voicelength = int(voicelength) / 1000
- voicelength = f"{voicelength:.2f}"
- content[
- "msg"] = f"语音时长:{voicelength}秒\n翻译结果:{transtext}" if transtext else f"语音时长:{voicelength}秒"
- content["src"] = os.path.join("audio", f"{StrTalker}",
- f"{CreateTime.replace(':', '-').replace(' ', '_')}_{IsSender}_{MsgSvrID}.wav")
- elif type_id == (43, 0): # 视频
- DictExtra = read_BytesExtra(BytesExtra)
- DictExtra = str(DictExtra)
-
- DictExtra_str = str(DictExtra)
- video_paths = [i for i in re.findall(r"(FileStorage.*?)'", DictExtra_str)]
- video_paths = sorted(video_paths, key=lambda p: "mp4" in p, reverse=True)
- if video_paths:
- video_path = video_paths[0].replace("'", "")
- video_path = [i for i in video_path.split("\\") if i]
- video_path = os.path.join(*video_path)
- content["src"] = video_path
- else:
- content["src"] = ""
- content["msg"] = "视频"
-
- elif type_id == (47, 0): # 动画表情
- content_tmp = parse_xml_string(StrContent)
- cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "")
- if cdnurl:
- content = {"src": cdnurl, "msg": "表情"}
-
- elif type_id == (49, 0):
- DictExtra = read_BytesExtra(BytesExtra)
- url = match_BytesExtra(DictExtra)
- content["src"] = url
- file_name = os.path.basename(url)
- content["msg"] = file_name
-
- elif type_id == (49, 19): # 合并转发的聊天记录
- CompressContent = decompress_CompressContent(CompressContent)
- content_tmp = parse_xml_string(CompressContent)
- title = content_tmp.get("appmsg", {}).get("title", "")
- des = content_tmp.get("appmsg", {}).get("des", "")
- recorditem = content_tmp.get("appmsg", {}).get("recorditem", "")
- recorditem = parse_xml_string(recorditem)
- content["msg"] = f"{title}\n{des}"
- content["src"] = recorditem
-
- elif type_id == (49, 57): # 带有引用的文本消息
- CompressContent = decompress_CompressContent(CompressContent)
- content_tmp = parse_xml_string(CompressContent)
- appmsg = content_tmp.get("appmsg", {})
- title = appmsg.get("title", "")
- refermsg = appmsg.get("refermsg", {})
- displayname = refermsg.get("displayname", "")
- display_content = refermsg.get("content", "")
- display_createtime = refermsg.get("createtime", "")
- display_createtime = time_int2str(
- int(display_createtime)) if display_createtime.isdigit() else display_createtime
- content["msg"] = f"{title}\n\n[引用]({display_createtime}){displayname}:{display_content}"
- content["src"] = ""
-
- elif type_id == (49, 2000): # 转账消息
- CompressContent = decompress_CompressContent(CompressContent)
- content_tmp = parse_xml_string(CompressContent)
- feedesc = content_tmp.get("appmsg", {}).get("wcpayinfo", {}).get("feedesc", "")
- content["msg"] = f"转账:{feedesc}"
- content["src"] = ""
-
- elif type_id[0] == 49 and type_id[1] != 0:
- DictExtra = read_BytesExtra(BytesExtra)
- url = match_BytesExtra(DictExtra)
- content["src"] = url
- content["msg"] = type_name
-
- elif type_id == (50, 0): # 语音通话
- content["msg"] = "语音/视频通话[%s]" % DisplayContent
-
- # elif type_id == (10000, 0):
- # content["msg"] = StrContent
- # elif type_id == (10000, 4):
- # content["msg"] = StrContent
- # elif type_id == (10000, 8000):
- # content["msg"] = StrContent
-
- talker = "未知"
- if IsSender == 1:
- talker = "我"
- else:
- if StrTalker.endswith("@chatroom"):
- bytes_extra = read_BytesExtra(BytesExtra)
- if bytes_extra:
- try:
- talker = bytes_extra['3'][0]['2'].decode('utf-8', errors='ignore')
- if "publisher-id" in talker:
- talker = "系统"
- except:
- pass
- else:
- talker = StrTalker
-
- row_data = {"MsgSvrID": str(MsgSvrID), "type_name": type_name, "is_sender": IsSender, "talker": talker,
- "room_name": StrTalker, "content": content, "CreateTime": CreateTime, "id": id}
- data.append(row_data)
- return data
-
-
-def get_chat_count(MSG_db_path: [str, list], username: str = ""):
- """
- 获取聊天记录数量
- :param MSG_db_path: MSG.db 文件路径
- :return: 聊天记录数量列表
- """
- if username:
- sql = f"SELECT StrTalker,COUNT(*) FROM MSG WHERE StrTalker='{username}';"
- else:
- sql = f"SELECT StrTalker, COUNT(*) FROM MSG GROUP BY StrTalker ORDER BY COUNT(*) DESC;"
-
- with DBPool(MSG_db_path) as db1:
- result = execute_sql(db1, sql)
- chat_counts = {}
- for row in result:
- username, chat_count = row
- chat_counts[username] = chat_count
- return chat_counts
-
-
-def get_all_chat_count(MSG_db_path: [str, list]):
- """
- 获取聊天记录总数量
- :param MSG_db_path: MSG.db 文件路径
- :return: 聊天记录数量
- """
- sql = f"SELECT COUNT(*) FROM MSG;"
- with DBPool(MSG_db_path) as db1:
- result = execute_sql(db1, sql)
- if result and len(result) > 0:
- chat_counts = result[0][0]
- return chat_counts
- return 0
-
-
-def export_csv(username, outpath, MSG_ALL_db_path, page_size=5000):
- if not os.path.exists(outpath):
- outpath = os.path.join(os.getcwd(), "export" + os.sep + username)
- if not os.path.exists(outpath):
- os.makedirs(outpath)
- count = get_chat_count(MSG_ALL_db_path, username)
- chatCount = count.get(username, 0)
- if chatCount == 0:
- return False, "没有聊天记录"
- if page_size > chatCount:
- page_size = chatCount + 1
- for i in range(0, chatCount, page_size):
- start_index = i
- data = get_msg_list(MSG_ALL_db_path, username, start_index, page_size)
- if len(data) == 0:
- return False, "没有聊天记录"
- save_path = os.path.join(outpath, f"{username}_{i}_{i + page_size}.csv")
- with open(save_path, "w", encoding="utf-8", newline='') as f:
- csv_writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
- csv_writer.writerow(["id", "MsgSvrID", "type_name", "is_sender", "talker", "room_name", "content",
- "CreateTime"])
- for row in data:
- id = row.get("id", "")
- MsgSvrID = row.get("MsgSvrID", "")
- type_name = row.get("type_name", "")
- is_sender = row.get("is_sender", "")
- talker = row.get("talker", "")
- room_name = row.get("room_name", "")
- content = row.get("content", "")
- CreateTime = row.get("CreateTime", "")
-
- content = json.dumps(content, ensure_ascii=False)
- csv_writer.writerow([id, MsgSvrID, type_name, is_sender, talker, room_name, content, CreateTime])
-
- return True, f"导出成功: {outpath}"
-
-
-def export_json(username, outpath, MSG_ALL_db_path):
- if not os.path.exists(outpath):
- outpath = os.path.join(os.getcwd(), "export" + os.sep + username)
- if not os.path.exists(outpath):
- os.makedirs(outpath)
- count = get_chat_count(MSG_ALL_db_path, username)
- chatCount = count.get(username, 0)
- if chatCount == 0:
- return False, "没有聊天记录"
- page_size = chatCount + 1
- for i in range(0, chatCount, page_size):
- start_index = i
- data = get_msg_list(MSG_ALL_db_path, username, start_index, page_size)
- if len(data) == 0:
- return False, "没有聊天记录"
- save_path = os.path.join(outpath, f"{username}_{i}_{i + page_size}.json")
- with open(save_path, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=4)
- return True, f"导出成功: {outpath}"
-
-
-def export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path, page_size=500):
- name_save = user.get("remark", user.get("nickname", user.get("username", "")))
- username = user.get("username", "")
-
- chatCount = user.get("chat_count", 0)
- if chatCount == 0:
- return False, "没有聊天记录"
-
- for i in range(0, chatCount, page_size):
- start_index = i
- data = load_chat_records(username, start_index, page_size, user, MSG_ALL_db_path, MediaMSG_all_db_path,
- FileStorage_path)
- if len(data) == 0:
- break
- save_path = os.path.join(outpath, f"{name_save}_{int(i / page_size)}.html")
- with open(save_path, "w", encoding="utf-8") as f:
- f.write(render_template("chat.html", msgs=data))
- return True, f"导出成功{outpath}"
-
-
-def export(username, outpath, MSG_ALL_db_path, MicroMsg_db_path, MediaMSG_all_db_path, FileStorage_path):
- if not os.path.exists(outpath):
- outpath = os.path.join(os.getcwd(), "export" + os.sep + username)
- if not os.path.exists(outpath):
- os.makedirs(outpath)
-
- USER_LIST = get_user_list(MSG_ALL_db_path, MicroMsg_db_path)
- user = list(filter(lambda x: x["username"] == username, USER_LIST))
-
- if username and len(user) > 0:
- user = user[0]
- return export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path)
diff --git a/pywxdump/api/api.py b/pywxdump/api/api.py
index 8830c40..bb85fb1 100644
--- a/pywxdump/api/api.py
+++ b/pywxdump/api/api.py
@@ -12,18 +12,18 @@ import os
import re
import time
import shutil
-
import pythoncom
+import pywxdump
+
from flask import Flask, request, render_template, g, Blueprint, send_file, make_response, session
-from pywxdump import analyzer, read_img_dat, read_audio, get_wechat_db, get_core_db
-from pywxdump.analyzer.export_chat import get_contact, get_room_user_list
+from pywxdump import get_core_db
from pywxdump.api.rjson import ReJson, RqJson
from pywxdump.api.utils import read_session, get_session_wxids, save_session, error9999, gen_base64, validate_title
from pywxdump import read_info, VERSION_LIST, batch_decrypt, BiasAddr, merge_db, decrypt_merge, merge_real_time_db
-import pywxdump
+
from pywxdump.dbpreprocess import wxid2userinfo, ParsingMSG, get_user_list, get_recent_user_list, ParsingMediaMSG, \
- download_file
-from pywxdump.dbpreprocess import export_csv,export_json
+ download_file,export_csv, export_json
+from pywxdump.dbpreprocess.utils import dat2img
# app = Flask(__name__, static_folder='../ui/web/dist', static_url_path='/')
@@ -345,7 +345,7 @@ def get_img(img_path):
original_img_path = os.path.join(wx_path, img_path)
if os.path.exists(original_img_path):
- fomt, md5, out_bytes = read_img_dat(original_img_path)
+ fomt, md5, out_bytes = dat2img(original_img_path)
imgsavepath = os.path.join(img_tmp_path, img_path + "_" + ".".join([md5, fomt]))
if not os.path.exists(os.path.dirname(imgsavepath)):
os.makedirs(os.path.dirname(imgsavepath))
@@ -559,187 +559,187 @@ def get_export_json():
return ReJson(2001, body=ret)
-@api.route('/api/export', methods=["GET", 'POST'])
-@error9999
-def export():
- """
- 导出聊天记录
- :return:
- """
- export_type = request.json.get("export_type")
- start_time = request.json.get("start_time", 0)
- end_time = request.json.get("end_time", 0)
- chat_type = request.json.get("chat_type")
- username = request.json.get("username")
-
- wx_path = request.json.get("wx_path", read_session(g.sf, "wx_path"))
- key = request.json.get("key", read_session(g.sf, "key"))
-
- if not export_type or not isinstance(export_type, str):
- return ReJson(1002)
-
- # 导出路径
- outpath = os.path.join(g.tmp_path, "export", export_type)
- if not os.path.exists(outpath):
- os.makedirs(outpath)
-
- if export_type == "endb": # 导出加密数据库
- # 获取微信文件夹路径
- if not wx_path:
- return ReJson(1002)
- if not os.path.exists(wx_path):
- return ReJson(1001, body=wx_path)
-
- # 分割wx_path的文件名和父目录
- code, wxdbpaths = get_core_db(wx_path)
- if not code:
- return ReJson(2001, body=wxdbpaths)
-
- for wxdb in wxdbpaths:
- # 复制wxdb->outpath, os.path.basename(wxdb)
- shutil.copy(wxdb, os.path.join(outpath, os.path.basename(wxdb)))
- return ReJson(0, body=outpath)
-
- elif export_type == "dedb":
- if isinstance(start_time, int) and isinstance(end_time, int):
- msg_path = read_session(g.sf, "msg_path")
- micro_path = read_session(g.sf, "micro_path")
- media_path = read_session(g.sf, "media_path")
- dbpaths = [msg_path, media_path, micro_path]
- dbpaths = list(set(dbpaths))
- mergepath = merge_db(dbpaths, os.path.join(outpath, "merge.db"), start_time, end_time)
- return ReJson(0, body=mergepath)
- # if msg_path == media_path and msg_path == media_path:
- # shutil.copy(msg_path, os.path.join(outpath, "merge.db"))
- # return ReJson(0, body=msg_path)
- # else:
- # dbpaths = [msg_path, msg_path, micro_path]
- # dbpaths = list(set(dbpaths))
- # mergepath = merge_db(dbpaths, os.path.join(outpath, "merge.db"), start_time, end_time)
- # return ReJson(0, body=mergepath)
- else:
- return ReJson(1002, body={"start_time": start_time, "end_time": end_time})
-
- elif export_type == "csv":
- outpath = os.path.join(outpath, username)
- if not os.path.exists(outpath):
- os.makedirs(outpath)
- code, ret = analyzer.export_csv(username, outpath, read_session(g.sf, "msg_path"))
- if code:
- return ReJson(0, ret)
- else:
- return ReJson(2001, body=ret)
- elif export_type == "json":
- outpath = os.path.join(outpath, username)
- if not os.path.exists(outpath):
- os.makedirs(outpath)
- code, ret = analyzer.export_json(username, outpath, read_session(g.sf, "msg_path"))
- if code:
- return ReJson(0, ret)
- else:
- return ReJson(2001, body=ret)
- elif export_type == "html":
- outpath = os.path.join(outpath, username)
- if os.path.exists(outpath):
- shutil.rmtree(outpath)
- if not os.path.exists(outpath):
- os.makedirs(outpath)
- # chat_type_tups = []
- # for ct in chat_type:
- # tup = analyzer.get_name_typeid(ct)
- # if tup:
- # chat_type_tups += tup
- # if not chat_type_tups:
- # return ReJson(1002)
-
- # 复制文件 html
- export_html = os.path.join(os.path.dirname(pywxdump.VERSION_LIST_PATH), "ui", "export")
- indexhtml_path = os.path.join(export_html, "index.html")
- assets_path = os.path.join(export_html, "assets")
- if not os.path.exists(indexhtml_path) or not os.path.exists(assets_path):
- return ReJson(1001)
- js_path = ""
- css_path = ""
- for file in os.listdir(assets_path):
- if file.endswith('.js'):
- js_path = os.path.join(assets_path, file)
- elif file.endswith('.css'):
- css_path = os.path.join(assets_path, file)
- else:
- continue
- # 读取html,js,css
- with open(indexhtml_path, 'r', encoding='utf-8') as f:
- html = f.read()
- with open(js_path, 'r', encoding='utf-8') as f:
- js = f.read()
- with open(css_path, 'r', encoding='utf-8') as f:
- css = f.read()
-
- html = re.sub(r'', '', html) # 删除所有的script标签
- html = re.sub(r'', '', html) # 删除所有的link标签
-
- html = html.replace('', f'')
- html = html.replace('', f'')
- # END 生成index.html
-
- rdata = func_get_msgs(0, 10000000, username, "", "")
-
- msg_list = rdata["msg_list"]
- for i in range(len(msg_list)):
- if msg_list[i]["type_name"] == "语音":
- savePath = msg_list[i]["content"]["src"]
- MsgSvrID = savePath.split("_")[-1].replace(".wav", "")
- if not savePath:
- continue
- media_path = read_session(g.sf, "media_path")
- wave_data = read_audio(MsgSvrID, is_wave=True, DB_PATH=media_path)
- if not wave_data:
- continue
- # 判断savePath路径的文件夹是否存在
- savePath = os.path.join(outpath, savePath)
- if not os.path.exists(os.path.dirname(savePath)):
- os.makedirs(os.path.dirname(savePath))
- with open(savePath, "wb") as f:
- f.write(wave_data)
- elif msg_list[i]["type_name"] == "图片":
- img_path = msg_list[i]["content"]["src"]
- wx_path = read_session(g.sf, "wx_path")
- img_path_all = os.path.join(wx_path, img_path)
-
- if os.path.exists(img_path_all):
- fomt, md5, out_bytes = read_img_dat(img_path_all)
- imgsavepath = os.path.join(outpath, "img", img_path + "_" + ".".join([md5, fomt]))
- if not os.path.exists(os.path.dirname(imgsavepath)):
- os.makedirs(os.path.dirname(imgsavepath))
- with open(imgsavepath, "wb") as f:
- f.write(out_bytes)
- msg_list[i]["content"]["src"] = os.path.join("img", img_path + "_" + ".".join([md5, fomt]))
-
- rdata["msg_list"] = msg_list
- rdata["myuserdata"] = rdata["user_list"][rdata["my_wxid"]]
- rdata["myuserdata"]["chat_count"] = len(rdata["msg_list"])
- save_data = rdata
- save_json_path = os.path.join(outpath, "data")
- if not os.path.exists(save_json_path):
- os.makedirs(save_json_path)
- with open(os.path.join(save_json_path, "msg_user.json"), "w", encoding="utf-8") as f:
- json.dump(save_data, f, ensure_ascii=False)
-
- json_base64 = gen_base64(os.path.join(save_json_path, "msg_user.json"))
- html = html.replace('"./data/msg_user.json"', f'"{json_base64}"')
-
- with open(os.path.join(outpath, "index.html"), 'w', encoding='utf-8') as f:
- f.write(html)
- return ReJson(0, outpath)
-
- elif export_type == "pdf":
- pass
- elif export_type == "docx":
- pass
- else:
- return ReJson(1002)
-
- return ReJson(9999, "")
+# @api.route('/api/export', methods=["GET", 'POST'])
+# @error9999
+# def export():
+# """
+# 导出聊天记录
+# :return:
+# """
+# export_type = request.json.get("export_type")
+# start_time = request.json.get("start_time", 0)
+# end_time = request.json.get("end_time", 0)
+# chat_type = request.json.get("chat_type")
+# username = request.json.get("username")
+#
+# wx_path = request.json.get("wx_path", read_session(g.sf, "wx_path"))
+# key = request.json.get("key", read_session(g.sf, "key"))
+#
+# if not export_type or not isinstance(export_type, str):
+# return ReJson(1002)
+#
+# # 导出路径
+# outpath = os.path.join(g.tmp_path, "export", export_type)
+# if not os.path.exists(outpath):
+# os.makedirs(outpath)
+#
+# if export_type == "endb": # 导出加密数据库
+# # 获取微信文件夹路径
+# if not wx_path:
+# return ReJson(1002)
+# if not os.path.exists(wx_path):
+# return ReJson(1001, body=wx_path)
+#
+# # 分割wx_path的文件名和父目录
+# code, wxdbpaths = get_core_db(wx_path)
+# if not code:
+# return ReJson(2001, body=wxdbpaths)
+#
+# for wxdb in wxdbpaths:
+# # 复制wxdb->outpath, os.path.basename(wxdb)
+# shutil.copy(wxdb, os.path.join(outpath, os.path.basename(wxdb)))
+# return ReJson(0, body=outpath)
+#
+# elif export_type == "dedb":
+# if isinstance(start_time, int) and isinstance(end_time, int):
+# msg_path = read_session(g.sf, "msg_path")
+# micro_path = read_session(g.sf, "micro_path")
+# media_path = read_session(g.sf, "media_path")
+# dbpaths = [msg_path, media_path, micro_path]
+# dbpaths = list(set(dbpaths))
+# mergepath = merge_db(dbpaths, os.path.join(outpath, "merge.db"), start_time, end_time)
+# return ReJson(0, body=mergepath)
+# # if msg_path == media_path and msg_path == media_path:
+# # shutil.copy(msg_path, os.path.join(outpath, "merge.db"))
+# # return ReJson(0, body=msg_path)
+# # else:
+# # dbpaths = [msg_path, msg_path, micro_path]
+# # dbpaths = list(set(dbpaths))
+# # mergepath = merge_db(dbpaths, os.path.join(outpath, "merge.db"), start_time, end_time)
+# # return ReJson(0, body=mergepath)
+# else:
+# return ReJson(1002, body={"start_time": start_time, "end_time": end_time})
+#
+# elif export_type == "csv":
+# outpath = os.path.join(outpath, username)
+# if not os.path.exists(outpath):
+# os.makedirs(outpath)
+# code, ret = analyzer.export_csv(username, outpath, read_session(g.sf, "msg_path"))
+# if code:
+# return ReJson(0, ret)
+# else:
+# return ReJson(2001, body=ret)
+# elif export_type == "json":
+# outpath = os.path.join(outpath, username)
+# if not os.path.exists(outpath):
+# os.makedirs(outpath)
+# code, ret = analyzer.export_json(username, outpath, read_session(g.sf, "msg_path"))
+# if code:
+# return ReJson(0, ret)
+# else:
+# return ReJson(2001, body=ret)
+# elif export_type == "html":
+# outpath = os.path.join(outpath, username)
+# if os.path.exists(outpath):
+# shutil.rmtree(outpath)
+# if not os.path.exists(outpath):
+# os.makedirs(outpath)
+# # chat_type_tups = []
+# # for ct in chat_type:
+# # tup = analyzer.get_name_typeid(ct)
+# # if tup:
+# # chat_type_tups += tup
+# # if not chat_type_tups:
+# # return ReJson(1002)
+#
+# # 复制文件 html
+# export_html = os.path.join(os.path.dirname(pywxdump.VERSION_LIST_PATH), "ui", "export")
+# indexhtml_path = os.path.join(export_html, "index.html")
+# assets_path = os.path.join(export_html, "assets")
+# if not os.path.exists(indexhtml_path) or not os.path.exists(assets_path):
+# return ReJson(1001)
+# js_path = ""
+# css_path = ""
+# for file in os.listdir(assets_path):
+# if file.endswith('.js'):
+# js_path = os.path.join(assets_path, file)
+# elif file.endswith('.css'):
+# css_path = os.path.join(assets_path, file)
+# else:
+# continue
+# # 读取html,js,css
+# with open(indexhtml_path, 'r', encoding='utf-8') as f:
+# html = f.read()
+# with open(js_path, 'r', encoding='utf-8') as f:
+# js = f.read()
+# with open(css_path, 'r', encoding='utf-8') as f:
+# css = f.read()
+#
+# html = re.sub(r'', '', html) # 删除所有的script标签
+# html = re.sub(r'', '', html) # 删除所有的link标签
+#
+# html = html.replace('', f'')
+# html = html.replace('', f'')
+# # END 生成index.html
+#
+# rdata = func_get_msgs(0, 10000000, username, "", "")
+#
+# msg_list = rdata["msg_list"]
+# for i in range(len(msg_list)):
+# if msg_list[i]["type_name"] == "语音":
+# savePath = msg_list[i]["content"]["src"]
+# MsgSvrID = savePath.split("_")[-1].replace(".wav", "")
+# if not savePath:
+# continue
+# media_path = read_session(g.sf, "media_path")
+# wave_data = read_audio(MsgSvrID, is_wave=True, DB_PATH=media_path)
+# if not wave_data:
+# continue
+# # 判断savePath路径的文件夹是否存在
+# savePath = os.path.join(outpath, savePath)
+# if not os.path.exists(os.path.dirname(savePath)):
+# os.makedirs(os.path.dirname(savePath))
+# with open(savePath, "wb") as f:
+# f.write(wave_data)
+# elif msg_list[i]["type_name"] == "图片":
+# img_path = msg_list[i]["content"]["src"]
+# wx_path = read_session(g.sf, "wx_path")
+# img_path_all = os.path.join(wx_path, img_path)
+#
+# if os.path.exists(img_path_all):
+# fomt, md5, out_bytes = read_img_dat(img_path_all)
+# imgsavepath = os.path.join(outpath, "img", img_path + "_" + ".".join([md5, fomt]))
+# if not os.path.exists(os.path.dirname(imgsavepath)):
+# os.makedirs(os.path.dirname(imgsavepath))
+# with open(imgsavepath, "wb") as f:
+# f.write(out_bytes)
+# msg_list[i]["content"]["src"] = os.path.join("img", img_path + "_" + ".".join([md5, fomt]))
+#
+# rdata["msg_list"] = msg_list
+# rdata["myuserdata"] = rdata["user_list"][rdata["my_wxid"]]
+# rdata["myuserdata"]["chat_count"] = len(rdata["msg_list"])
+# save_data = rdata
+# save_json_path = os.path.join(outpath, "data")
+# if not os.path.exists(save_json_path):
+# os.makedirs(save_json_path)
+# with open(os.path.join(save_json_path, "msg_user.json"), "w", encoding="utf-8") as f:
+# json.dump(save_data, f, ensure_ascii=False)
+#
+# json_base64 = gen_base64(os.path.join(save_json_path, "msg_user.json"))
+# html = html.replace('"./data/msg_user.json"', f'"{json_base64}"')
+#
+# with open(os.path.join(outpath, "index.html"), 'w', encoding='utf-8') as f:
+# f.write(html)
+# return ReJson(0, outpath)
+#
+# elif export_type == "pdf":
+# pass
+# elif export_type == "docx":
+# pass
+# else:
+# return ReJson(1002)
+#
+# return ReJson(9999, "")
# end 导出聊天记录 *******************************************************************************************************
diff --git a/pywxdump/dbpreprocess/parsingMicroMsg.py b/pywxdump/dbpreprocess/parsingMicroMsg.py
index 4d3da3b..f4f453a 100644
--- a/pywxdump/dbpreprocess/parsingMicroMsg.py
+++ b/pywxdump/dbpreprocess/parsingMicroMsg.py
@@ -156,7 +156,7 @@ class ParsingMicroMsg(DatabaseBase):
rd += v
for i in rd:
try:
- if isinstance(i, dict) and isinstance(i.get('1'),str) and i.get('2'):
+ if isinstance(i, dict) and isinstance(i.get('1'), str) and i.get('2'):
wxid2remark[i['1']] = i["2"]
except Exception as e:
logging.error(f"wxid2remark: ChatRoomName:{ChatRoomName}, {i} error:{e}")
@@ -164,3 +164,59 @@ class ParsingMicroMsg(DatabaseBase):
{"ChatRoomName": ChatRoomName, "UserNameList": UserNameList, "DisplayNameList": DisplayNameList,
"Announcement": Announcement, "AnnouncementEditor": AnnouncementEditor, "wxid2remark": wxid2remark})
return rooms
+
+ def get_ExtraBuf(self, ExtraBuf: bytes):
+ """
+ 读取ExtraBuf(联系人表)
+ :param ExtraBuf:
+ :return:
+ """
+ if not ExtraBuf:
+ return None
+ try:
+ buf_dict = {
+ 'DDF32683': '0', '74752C06': '性别[1男2女]', '88E28FCE': '2', '761A1D2D': '3', '0263A0CB': '4',
+ '0451FF12': '5',
+ '228C66A8': '6', '46CF10C4': '个性签名', 'A4D9024A': '国', 'E2EAA8D1': '省', '1D025BBF': '市',
+ '4D6C4570': '11',
+ 'F917BCC0': '公司名称', '759378AD': '手机号', '4335DFDD': '14', 'DE4CDAEB': '15', 'A72BC20A': '16',
+ '069FED52': '17',
+ '9B0F4299': '18', '3D641E22': '19', '1249822C': '20', '4EB96D85': '企微属性', 'B4F73ACB': '22',
+ '0959EB92': '23',
+ '3CF4A315': '24', 'C9477AC60201E44CD0E8': '26', 'B7ACF0F5': '28', '57A7B5A8': '29',
+ '81AE19B4': '朋友圈背景',
+ '695F3170': '31', 'FB083DD9': '32', '0240E37F': '33', '315D02A3': '34', '7DEC0BC3': '35',
+ '0E719F13': '备注图片',
+ '16791C90': '37'
+ }
+
+ rdata = {}
+ for buf_name in buf_dict:
+ rdata_name = buf_dict[buf_name]
+ buf_name = bytes.fromhex(buf_name)
+ offset = ExtraBuf.find(buf_name)
+ if offset == -1:
+ rdata[rdata_name] = ""
+ continue
+ offset += len(buf_name)
+ type_id = ExtraBuf[offset: offset + 1]
+ offset += 1
+
+ if type_id == b"\x04":
+ rdata[rdata_name] = int.from_bytes(ExtraBuf[offset: offset + 4], "little")
+
+ elif type_id == b"\x18":
+ length = int.from_bytes(ExtraBuf[offset: offset + 4], "little")
+ rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-16").rstrip("\x00")
+
+ elif type_id == b"\x17":
+ length = int.from_bytes(ExtraBuf[offset: offset + 4], "little")
+ rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-8").rstrip("\x00")
+
+ elif type_id == b"\x05":
+ rdata[rdata_name] = f"0x{ExtraBuf[offset: offset + 8].hex()}"
+ return rdata
+
+ except Exception as e:
+ print(f'解析错误:\n{e}')
+ return None
diff --git a/pywxdump/ui/__init__.py b/pywxdump/ui/__init__.py
index 5d59823..6dab261 100644
--- a/pywxdump/ui/__init__.py
+++ b/pywxdump/ui/__init__.py
@@ -5,7 +5,7 @@
# Author: xaoyaoo
# Date: 2023/12/03
# -------------------------------------------------------------------------------
-from .view_chat import app_show_chat, get_user_list, export
+# from .view_chat import app_show_chat, get_user_list, export
if __name__ == '__main__':
pass
diff --git a/pywxdump/ui/view_chat.py b/pywxdump/ui/view_chat.py
deleted file mode 100644
index e603107..0000000
--- a/pywxdump/ui/view_chat.py
+++ /dev/null
@@ -1,318 +0,0 @@
-# -*- coding: utf-8 -*-#
-# -------------------------------------------------------------------------------
-# Name: GUI.py
-# Description:
-# Author: xaoyaoo
-# Date: 2023/11/10
-# -------------------------------------------------------------------------------
-import base64
-import re
-import sqlite3
-import os
-import json
-import time
-import hashlib
-from pywxdump.analyzer import read_img_dat, decompress_CompressContent, read_audio, parse_xml_string, read_BytesExtra
-
-from flask import Flask, request, render_template, g, Blueprint
-
-
-def get_md5(s):
- m = hashlib.md5()
- m.update(s.encode("utf-8"))
- return m.hexdigest()
-
-
-def get_user_list(MSG_ALL_db_path, MicroMsg_db_path):
- users = []
-
- # 连接 MSG_ALL.db 数据库,并执行查询
- db1 = sqlite3.connect(MSG_ALL_db_path)
- cursor1 = db1.cursor()
- cursor1.execute("SELECT StrTalker, COUNT(*) AS ChatCount FROM MSG GROUP BY StrTalker ORDER BY ChatCount DESC")
- result = cursor1.fetchall()
-
- dict_user_count = {}
- # 将结果转换为字典
- for row in result:
- dict_user_count[row[0]] = row[1]
-
- db2 = sqlite3.connect(MicroMsg_db_path)
- cursor2 = db2.cursor()
- cursor2.execute("SELECT UserName, NickName, Remark FROM Contact;")
- result2 = cursor2.fetchall()
- for row in result2:
- username, nickname, remark = row
- # 拼接四列数据为元组
- row_data = {"username": username, "nickname": nickname, "remark": remark,
- "chat_count": dict_user_count.get(username, 0),
- "isChatRoom": username.startswith("@chatroom")}
- users.append(row_data)
-
- users.sort(key=lambda x: x["chat_count"], reverse=True) # 按照聊天记录数量排序
- cursor2.close()
- db2.close()
- cursor1.close()
- db1.close()
- return users
-
-
-def load_base64_audio_data(MsgSvrID, MediaMSG_all_db_path):
- wave_data = read_audio(MsgSvrID, is_wave=True, DB_PATH=MediaMSG_all_db_path)
- if not wave_data:
- return ""
- video_base64 = base64.b64encode(wave_data).decode("utf-8")
- video_data = f"data:audio/wav;base64,{video_base64}"
- return video_data
-
-
-def load_base64_img_data(start_time, end_time, username_md5, FileStorage_path):
- """
- 获取图片的base64数据
- :param start_time: 开始时间戳
- :param end_time: 结束时间戳
- :param username_md5: 用户名的md5值
- :return:
- """
- # 获取CreateTime的最大值日期
- min_time = time.strftime("%Y-%m", time.localtime(start_time))
- max_time = time.strftime("%Y-%m", time.localtime(end_time))
- img_path = os.path.join(FileStorage_path, "MsgAttach", username_md5, "Image") if FileStorage_path else ""
- if not os.path.exists(img_path):
- return {}
- # print(min_time, max_time, img_path)
- paths = []
- for root, path, files in os.walk(img_path):
- for p in path:
- if p >= min_time and p <= max_time:
- paths.append(os.path.join(root, p))
- # print(paths)
- img_md5_data = {}
- for path in paths:
- for root, path, files in os.walk(path):
- for file in files:
- if file.endswith(".dat"):
- file_path = os.path.join(root, file)
- fomt, md5, out_bytes = read_img_dat(file_path)
- out_bytes = base64.b64encode(out_bytes).decode("utf-8")
- img_md5_data[md5] = f"data:{fomt};base64,{out_bytes}"
- return img_md5_data
-
-
-def load_chat_records(selected_talker, start_index, page_size, user_list, MSG_ALL_db_path, MediaMSG_all_db_path,
- FileStorage_path, USER_LIST):
- username = user_list.get("username", "")
- username_md5 = get_md5(username)
- type_name_dict = {
- 1: {0: "文本"},
- 3: {0: "图片"},
- 34: {0: "语音"},
- 43: {0: "视频"},
- 47: {0: "动画表情"},
- 49: {0: "文本", 1: "类似文字消息而不一样的消息", 5: "卡片式链接", 6: "文件", 8: "用户上传的 GIF 表情",
- 19: "合并转发的聊天记录", 33: "分享的小程序", 36: "分享的小程序", 57: "带有引用的文本消息",
- 63: "视频号直播或直播回放等",
- 87: "群公告", 88: "视频号直播或直播回放等", 2000: "转账消息", 2003: "赠送红包封面"},
- 50: {0: "语音通话"},
- 10000: {0: "系统通知", 4: "拍一拍", 8000: "系统通知"}
- }
-
- # 连接 MSG_ALL.db 数据库,并执行查询
- db1 = sqlite3.connect(MSG_ALL_db_path)
- cursor1 = db1.cursor()
-
- cursor1.execute(
- "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra FROM MSG WHERE StrTalker=? ORDER BY CreateTime ASC LIMIT ?,?",
- (selected_talker, start_index, page_size))
- result1 = cursor1.fetchall()
-
- cursor1.close()
- db1.close()
- # 获取图片的base64数据
- # img_md5_data = load_base64_img_data(result1[0][7], result1[-1][7], username_md5, FileStorage_path) if len(
- # result1) > 0 else {}
-
- data = []
- room_username_count = {}
- for row in result1:
- localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent, BytesExtra = row
- CreateTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(CreateTime))
-
- type_name = type_name_dict.get(Type, {}).get(SubType, "未知")
-
- content = {"src": "", "msg": "", "style": ""}
-
- if Type == 47 and SubType == 0: # 动画表情
- content_tmp = parse_xml_string(StrContent)
- cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "")
- # md5 = content_tmp.get("emoji", {}).get("md5", "")
- if cdnurl:
- content = {"src": cdnurl, "msg": "表情", "style": "width: 100px; height: 100px;"}
-
- elif Type == 49 and SubType == 57: # 带有引用的文本消息
- CompressContent = CompressContent.rsplit(b'\x00', 1)[0]
- content["msg"] = decompress_CompressContent(CompressContent)
- try:
- content["msg"] = content["msg"].decode("utf-8")
- content["msg"] = parse_xml_string(content["msg"])
- content["msg"] = json.dumps(content["msg"], ensure_ascii=False)
- except Exception as e:
- content["msg"] = "[带有引用的文本消息]解析失败"
- elif Type == 34 and SubType == 0: # 语音
- tmp_c = parse_xml_string(StrContent)
- voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "")
- transtext = tmp_c.get("voicetrans", {}).get("transtext", "")
- if voicelength.isdigit():
- voicelength = int(voicelength) / 1000
- voicelength = f"{voicelength:.2f}"
- content["msg"] = f"语音时长:{voicelength}秒\n翻译结果:{transtext}"
-
- src = load_base64_audio_data(MsgSvrID, MediaMSG_all_db_path=MediaMSG_all_db_path)
- content["src"] = src
- elif Type == 3 and SubType == 0: # 图片
- xml_content = parse_xml_string(StrContent)
- BytesExtra = read_BytesExtra(BytesExtra)
- BytesExtra = str(BytesExtra)
- match = re.search(r"MsgAttach(.*?)'", BytesExtra)
- if match:
- img_path = match.group(0).replace("'", "")
- # print(FileStorage_path)
- # print(img_path)
- img_path = img_path.split("\\")
- img_path = [i for i in img_path if i != ""]
- img_path = os.path.join(*img_path)
- if FileStorage_path:
- img_path = os.path.join(FileStorage_path, img_path)
- if os.path.exists(img_path):
- fomt, md5, out_bytes = read_img_dat(img_path)
- out_bytes = base64.b64encode(out_bytes).decode("utf-8")
- content["src"] = f"data:{fomt};base64,{out_bytes}"
- else:
- content["src"] = ""
- else:
- content["src"] = ""
- else:
- content["src"] = ""
- content["msg"] = "图片"
- else:
- content["msg"] = StrContent
-
- talker = "未知"
- if IsSender == 1:
- talker = "我"
- else:
- if StrTalker.endswith("@chatroom"):
- bytes_extra = read_BytesExtra(BytesExtra)
- if bytes_extra:
- try:
- matched_string = bytes_extra['3'][0]['2'].decode('utf-8', errors='ignore')
- talker_dicts = list(filter(lambda x: x["username"] == matched_string, USER_LIST))
- if len(talker_dicts) > 0:
- talker_dict = talker_dicts[0]
- room_username = talker_dict.get("username", "")
- room_nickname = talker_dict.get("nickname", "")
- room_remark = talker_dict.get("remark", "")
- talker = room_remark if room_remark else room_nickname if room_nickname else room_username
- else:
- talker = matched_string
- except:
- pass
- else:
- talker = user_list.get("remark", user_list.get("nickname", user_list.get("username", "")))
-
- row_data = {"MsgSvrID": MsgSvrID, "type_name": type_name, "is_sender": IsSender, "talker": talker,
- "content": content, "CreateTime": CreateTime}
- data.append(row_data)
- return data
-
-
-def export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path, page_size=500):
- name_save = user.get("remark", user.get("nickname", user.get("username", "")))
- username = user.get("username", "")
-
- chatCount = user.get("chat_count", 0)
- if chatCount == 0:
- return False, "没有聊天记录"
-
- for i in range(0, chatCount, page_size):
- start_index = i
- data = load_chat_records(username, start_index, page_size, user, MSG_ALL_db_path, MediaMSG_all_db_path,
- FileStorage_path, [user])
- if len(data) == 0:
- break
- save_path = os.path.join(outpath, f"{name_save}_{int(i / page_size)}.html")
- with open(save_path, "w", encoding="utf-8") as f:
- f.write(render_template("chat.html", msgs=data))
- return True, f"导出成功{outpath}"
-
-
-def export(username, outpath, MSG_ALL_db_path, MicroMsg_db_path, MediaMSG_all_db_path, FileStorage_path):
- if not os.path.exists(outpath):
- outpath = os.path.join(os.getcwd(), "export" + os.sep + username)
- if not os.path.exists(outpath):
- os.makedirs(outpath)
-
- USER_LIST = get_user_list(MSG_ALL_db_path, MicroMsg_db_path)
- user = list(filter(lambda x: x["username"] == username, USER_LIST))
-
- if username and len(user) > 0:
- user = user[0]
- return export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path)
-
-
-app_show_chat = Blueprint('show_chat_main', __name__, template_folder='templates')
-app_show_chat.debug = False
-
-
-# 主页 - 显示用户列表
-@app_show_chat.route('/')
-def index():
- g.USER_LIST = get_user_list(g.MSG_ALL_db_path, g.MicroMsg_db_path)
- # 只去前面500个有聊天记录的用户
- USER_LIST = g.USER_LIST[:500]
- return render_template("index.html", users=USER_LIST)
-
-
-# 获取聊天记录
-@app_show_chat.route('/get_chat_data', methods=["GET", 'POST'])
-def get_chat_data():
- username = request.args.get("username", "")
- user = list(filter(lambda x: x["username"] == username, g.USER_LIST))
-
- if username and len(user) > 0:
- user = user[0]
-
- limit = int(request.args.get("limit", 100)) # 每页显示的条数
- page = int(request.args.get("page", user.get("chat_count", limit) / limit)) # 当前页数
-
- start_index = (page - 1) * limit
- page_size = limit
-
- data = load_chat_records(username, start_index, page_size, user, g.MSG_ALL_db_path, g.MediaMSG_all_db_path,
- g.FileStorage_path, g.USER_LIST)
- return render_template("chat.html", msgs=data)
- else:
- return "error"
-
-
-# 聊天记录导出为html
-@app_show_chat.route('/export_chat_data', methods=["GET", 'POST'])
-def get_export():
- username = request.args.get("username", "")
-
- user = list(filter(lambda x: x["username"] == username, g.USER_LIST))
-
- if username and len(user) > 0:
- user = user[0]
- n = f"{user.get('username', '')}_{user.get('nickname', '')}_{user.get('remark', '')}"
- outpath = os.path.join(os.getcwd(), "export" + os.sep + n)
- if not os.path.exists(outpath):
- os.makedirs(outpath)
-
- ret = export_html(user, outpath, g.MSG_ALL_db_path, g.MediaMSG_all_db_path, g.FileStorage_path, page_size=200)
- if ret[0]:
- return ret[1]
- else:
- return ret[1]
- else:
- return "error"