From e45716805c40460e2df35e7ccf7403b43030fc86 Mon Sep 17 00:00:00 2001 From: xaoyaoo Date: Sat, 20 Apr 2024 19:17:40 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B8=85=E7=90=86=E5=B7=B2=E7=BB=8F=E5=BA=9F?= =?UTF-8?q?=E5=BC=83=E7=9A=84=E4=BB=A3=E7=A0=81=EF=BC=88=E5=8F=AF=E8=83=BD?= =?UTF-8?q?=E4=BC=9A=E5=AD=98=E5=9C=A8=E6=9C=89=E7=94=A8=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E8=A2=AB=E6=B8=85=E9=99=A4=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pywxdump/__init__.py | 14 +- pywxdump/analyzer/__init__.py | 6 +- pywxdump/analyzer/db_parsing.py | 349 ------------------ pywxdump/analyzer/export_chat.py | 429 ----------------------- pywxdump/api/api.py | 376 ++++++++++---------- pywxdump/dbpreprocess/parsingMicroMsg.py | 58 ++- pywxdump/ui/__init__.py | 2 +- pywxdump/ui/view_chat.py | 318 ----------------- 8 files changed, 253 insertions(+), 1299 deletions(-) delete mode 100644 pywxdump/analyzer/db_parsing.py delete mode 100644 pywxdump/analyzer/export_chat.py delete mode 100644 pywxdump/ui/view_chat.py diff --git a/pywxdump/__init__.py b/pywxdump/__init__.py index f4438ed..2edac25 100644 --- a/pywxdump/__init__.py +++ b/pywxdump/__init__.py @@ -5,17 +5,15 @@ # Author: xaoyaoo # Date: 2023/10/14 # ------------------------------------------------------------------------------- +# from .analyzer.db_parsing import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, \ +# parse_xml_string, read_BytesExtra +# from .ui import app_show_chat, get_user_list, export from .wx_info import BiasAddr, read_info, get_wechat_db, batch_decrypt, decrypt, get_core_db from .wx_info import merge_copy_db, merge_msg_db, merge_media_msg_db, merge_db, decrypt_merge, merge_real_time_db -from .analyzer.db_parsing import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, \ - parse_xml_string, read_BytesExtra -from .analyzer import export_csv, export_json, DBPool -from .ui import app_show_chat, get_user_list, export +from .analyzer import DBPool from .dbpreprocess import get_user_list, get_recent_user_list, wxid2userinfo, ParsingMSG, ParsingMicroMsg, \ ParsingMediaMSG, ParsingOpenIMContact - from .server import start_falsk - import os, json try: @@ -26,7 +24,7 @@ except: VERSION_LIST = {} VERSION_LIST_PATH = None -PYWXDUMP_ROOT_PATH = os.path.dirname(__file__) -db_init = DBPool("DBPOOL_INIT") +# PYWXDUMP_ROOT_PATH = os.path.dirname(__file__) +# db_init = DBPool("DBPOOL_INIT") __version__ = "3.0.10" diff --git a/pywxdump/analyzer/__init__.py b/pywxdump/analyzer/__init__.py index fdf84aa..e352f37 100644 --- a/pywxdump/analyzer/__init__.py +++ b/pywxdump/analyzer/__init__.py @@ -5,8 +5,4 @@ # Author: xaoyaoo # Date: 2023/09/27 # ------------------------------------------------------------------------------- -from .db_parsing import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, \ - parse_xml_string, read_BytesExtra -from .export_chat import export_csv, get_contact_list, get_chatroom_list, get_msg_list, get_chat_count, export_json, \ - get_all_chat_count -from .utils import get_type_name, get_name_typeid,DBPool +from .utils import DBPool diff --git a/pywxdump/analyzer/db_parsing.py b/pywxdump/analyzer/db_parsing.py deleted file mode 100644 index b1314ea..0000000 --- a/pywxdump/analyzer/db_parsing.py +++ /dev/null @@ -1,349 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: parse.py -# Description: 解析数据库内容 -# Author: xaoyaoo -# Date: 2023/09/27 -# ------------------------------------------------------------------------------- -import os.path -import sqlite3 -import pysilk -from io import BytesIO -import wave -import pyaudio -import requests -import hashlib -import lz4.block -import blackboxprotobuf - -from PIL import Image -# import xml.etree.ElementTree as ET -import lxml.etree as ET # 这个模块更健壮些,微信XML格式有时有非标格式,会导致xml.etree.ElementTree处理失败 - - -def get_md5(data): - md5 = hashlib.md5() - md5.update(data) - return md5.hexdigest() - - -def parse_xml_string(xml_string): - """ - 解析 XML 字符串 - :param xml_string: 要解析的 XML 字符串 - :return: 解析结果,以字典形式返回 - """ - - def parse_xml(element): - """ - 递归解析 XML 元素 - :param element: 要解析的 XML 元素 - :return: 解析结果,以字典形式返回 - """ - result = {} - - # 解析当前元素的属性 - if element is None or element.attrib is None: # 有时可能会遇到没有属性,要处理下 - return result - for key, value in element.attrib.items(): - result[key] = value - - # 解析当前元素的子元素 - for child in element: - child_result = parse_xml(child) - - # 如果子元素的标签已经在结果中存在,则将其转换为列表 - if child.tag in result: - if not isinstance(result[child.tag], list): - result[child.tag] = [result[child.tag]] - result[child.tag].append(child_result) - else: - result[child.tag] = child_result - - # 如果当前元素没有子元素,则将其文本内容作为值保存 - if not result and element.text: - result = element.text - - return result - - if xml_string is None or not isinstance(xml_string, str): - return None - try: - parser = ET.XMLParser(recover=True) # 有时微信的聊天记录里面,会冒出来xml格式不对的情况,这里把parser设置成忽略错误 - root = ET.fromstring(xml_string, parser) - except Exception as e: - return xml_string - return parse_xml(root) - - -def read_img_dat(input_data): - """ - 读取图片文件dat格式 - :param input_data: 图片文件路径或者图片文件数据 - :return: 图片格式,图片md5,图片数据 - """ - # 常见图片格式的文件头 - img_head = { - b"\xFF\xD8\xFF": ".jpg", - b"\x89\x50\x4E\x47": ".png", - b"\x47\x49\x46\x38": ".gif", - b"\x42\x4D": ".BMP", - b"\x49\x49": ".TIFF", - b"\x4D\x4D": ".TIFF", - b"\x00\x00\x01\x00": ".ICO", - b"\x52\x49\x46\x46": ".WebP", - b"\x00\x00\x00\x18\x66\x74\x79\x70\x68\x65\x69\x63": ".HEIC", - } - - if isinstance(input_data, str): - with open(input_data, "rb") as f: - input_bytes = f.read() - else: - input_bytes = input_data - - try: - import numpy as np - input_bytes = np.frombuffer(input_bytes, dtype=np.uint8) - for hcode in img_head: # 遍历文件头 - t = input_bytes[0] ^ hcode[0] # 异或解密 - if np.all(t == np.bitwise_xor(np.frombuffer(input_bytes[:len(hcode)], dtype=np.uint8), - np.frombuffer(hcode, dtype=np.uint8))): # 使用NumPy进行向量化的异或解密操作,并进行类型转换 - fomt = img_head[hcode] # 获取文件格式 - - out_bytes = np.bitwise_xor(input_bytes, t) # 使用NumPy进行向量化的异或解密操作 - md5 = get_md5(out_bytes) - return fomt, md5, out_bytes - return False - except ImportError: - pass - - for hcode in img_head: - t = input_bytes[0] ^ hcode[0] - for i in range(1, len(hcode)): - if t == input_bytes[i] ^ hcode[i]: - fomt = img_head[hcode] - out_bytes = bytearray() - for nowByte in input_bytes: # 读取文件 - newByte = nowByte ^ t # 异或解密 - out_bytes.append(newByte) - md5 = get_md5(out_bytes) - return fomt, md5, out_bytes - return False - - -def read_emoji(cdnurl, is_show=False): - headers = { - "User-Agent": "Mozilla/5.0 (Linux; Android 10; Redmi K30 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Mobile Safari/537.36" - - } - r1 = requests.get(cdnurl, headers=headers) - rdata = r1.content - - if is_show: # 显示表情 - img = Image.open(BytesIO(rdata)) - img.show() - return rdata - - -def decompress_CompressContent(data): - """ - 解压缩Msg:CompressContent内容 - :param data: - :return: - """ - if data is None or not isinstance(data, bytes): - return None - try: - dst = lz4.block.decompress(data, uncompressed_size=len(data) << 8) - dst = dst.replace(b'\x00', b'') # 已经解码完成后,还含有0x00的部分,要删掉,要不后面ET识别的时候会报错 - uncompressed_data = dst.decode('utf-8', errors='ignore') - return uncompressed_data - except Exception as e: - return data.decode('utf-8', errors='ignore') - - -def read_audio_buf(buf_data, is_play=False, is_wave=False, rate=24000): - silk_file = BytesIO(buf_data) # 读取silk文件 - pcm_file = BytesIO() # 创建pcm文件 - - pysilk.decode(silk_file, pcm_file, rate) # 解码silk文件->pcm文件 - pcm_data = pcm_file.getvalue() # 获取pcm文件数据 - - silk_file.close() # 关闭silk文件 - pcm_file.close() # 关闭pcm文件 - if is_play: # 播放音频 - def play_audio(pcm_data, rate): - p = pyaudio.PyAudio() # 实例化pyaudio - stream = p.open(format=pyaudio.paInt16, channels=1, rate=rate, output=True) # 创建音频流对象 - stream.write(pcm_data) # 写入音频流 - stream.stop_stream() # 停止音频流 - stream.close() # 关闭音频流 - p.terminate() # 关闭pyaudio - - play_audio(pcm_data, rate) - - if is_wave: # 转换为wav文件 - wave_file = BytesIO() # 创建wav文件 - with wave.open(wave_file, 'wb') as wf: - wf.setparams((1, 2, rate, 0, 'NONE', 'NONE')) # 设置wav文件参数 - wf.writeframes(pcm_data) # 写入wav文件 - rdata = wave_file.getvalue() # 获取wav文件数据 - wave_file.close() # 关闭wav文件 - return rdata - - return pcm_data - - -def read_audio(MsgSvrID, is_play=False, is_wave=False, DB_PATH: str = "", rate=24000): - if DB_PATH == "": - return False - - DB = sqlite3.connect(DB_PATH) - cursor = DB.cursor() - sql = "select Buf from Media where Reserved0={}".format(MsgSvrID) - DBdata = cursor.execute(sql).fetchall() - - if len(DBdata) == 0: - return False - data = DBdata[0][0] # [1:] + b'\xFF\xFF' - try: - pcm_data = read_audio_buf(data, is_play, is_wave, rate) - return pcm_data - except Exception as e: - return False - - -def wordcloud_generator(text, out_path="", is_show=False, img_path="", font="C:\Windows\Fonts\simhei.ttf"): - """ - 词云 - :param is_show: 是否显示 - :param img_path: 背景图片路径 - :param text: 文本 - :param font: 字体路径 - :return: - """ - try: - from wordcloud import WordCloud - import jieba - import numpy as np - import matplotlib.pyplot as plt - from matplotlib.font_manager import fontManager - except ImportError as e: - print("error", e) - raise ImportError("请安装wordcloud,jieba,numpy,matplotlib,pillow库") - words = jieba.lcut(text) # 精确分词 - newtxt = ' '.join(words) # 空格拼接 - # 字体路径 - - # 创建WordCloud对象 - wordcloud1 = WordCloud(width=800, height=400, background_color='white', font_path=font) - wordcloud1.generate(newtxt) - - if out_path and out_path != "": - wordcloud1.to_file("wordcloud.png") # 保存图片 - if img_path and os.path.exists(img_path): # 设置背景图片 - img_color = np.array(Image.open(img_path)) # 读取背景图片 - img_color = img_color.reshape((img_color.shape[0] * img_color.shape[1], 3)) - wordcloud1.recolor(color_func=img_color) # 设置背景图片颜色 - if is_show: - # 显示词云 - wordcloud_img = wordcloud1.to_image() - wordcloud_img.show() - - -def convert_bytes_to_str(d): - """ - 遍历字典并将bytes转换为字符串 - :param d: - :return: - """ - for k, v in d.items(): - if isinstance(v, dict): - convert_bytes_to_str(v) - elif isinstance(v, list): - for item in v: - if isinstance(item, dict): - convert_bytes_to_str(item) - elif isinstance(item, bytes): - item = item.decode('utf-8') # 将bytes转换为字符串 - elif isinstance(v, bytes): - d[k] = v.decode('utf-8') - - -def read_BytesExtra(BytesExtra): - if BytesExtra is None or not isinstance(BytesExtra, bytes): - return None - try: - deserialize_data, message_type = blackboxprotobuf.decode_message(BytesExtra) - return deserialize_data - except Exception as e: - return None - - -def read_ChatRoom_RoomData(RoomData): - # 读取群聊数据,主要为 wxid,以及对应昵称 - if RoomData is None or not isinstance(RoomData, bytes): - return None - try: - data = read_BytesExtra(RoomData) - convert_bytes_to_str(data) - return data - except Exception as e: - return None - - -def read_ExtraBuf(ExtraBuf: bytes): - """ - 读取ExtraBuf(联系人表) - :param ExtraBuf: - :return: - """ - if not ExtraBuf: - return None - try: - buf_dict = { - 'DDF32683': '0', '74752C06': '性别[1男2女]', '88E28FCE': '2', '761A1D2D': '3', '0263A0CB': '4', - '0451FF12': '5', - '228C66A8': '6', '46CF10C4': '个性签名', 'A4D9024A': '国', 'E2EAA8D1': '省', '1D025BBF': '市', - '4D6C4570': '11', - 'F917BCC0': '公司名称', '759378AD': '手机号', '4335DFDD': '14', 'DE4CDAEB': '15', 'A72BC20A': '16', - '069FED52': '17', - '9B0F4299': '18', '3D641E22': '19', '1249822C': '20', '4EB96D85': '企微属性', 'B4F73ACB': '22', - '0959EB92': '23', - '3CF4A315': '24', 'C9477AC60201E44CD0E8': '26', 'B7ACF0F5': '28', '57A7B5A8': '29', - '81AE19B4': '朋友圈背景', - '695F3170': '31', 'FB083DD9': '32', '0240E37F': '33', '315D02A3': '34', '7DEC0BC3': '35', - '0E719F13': '备注图片', - '16791C90': '37' - } - - rdata = {} - for buf_name in buf_dict: - rdata_name = buf_dict[buf_name] - buf_name = bytes.fromhex(buf_name) - offset = ExtraBuf.find(buf_name) - if offset == -1: - rdata[rdata_name] = "" - continue - offset += len(buf_name) - type_id = ExtraBuf[offset: offset + 1] - offset += 1 - - if type_id == b"\x04": - rdata[rdata_name] = int.from_bytes(ExtraBuf[offset: offset + 4], "little") - - elif type_id == b"\x18": - length = int.from_bytes(ExtraBuf[offset: offset + 4], "little") - rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-16").rstrip("\x00") - - elif type_id == b"\x17": - length = int.from_bytes(ExtraBuf[offset: offset + 4], "little") - rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-8").rstrip("\x00") - - elif type_id == b"\x05": - rdata[rdata_name] = f"0x{ExtraBuf[offset: offset + 8].hex()}" - return rdata - - except Exception as e: - print(f'解析错误:\n{e}') - return None diff --git a/pywxdump/analyzer/export_chat.py b/pywxdump/analyzer/export_chat.py deleted file mode 100644 index 766f7d9..0000000 --- a/pywxdump/analyzer/export_chat.py +++ /dev/null @@ -1,429 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: export_chat.py -# Description: -# Author: xaoyaoo -# Date: 2023/12/03 -# ------------------------------------------------------------------------------- -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: GUI.py -# Description: -# Author: xaoyaoo -# Date: 2023/11/10 -# ------------------------------------------------------------------------------- -import csv -import re -import sqlite3 -import os -import json -import time -from functools import wraps - -from .utils import get_md5, attach_databases, execute_sql, get_type_name, match_BytesExtra, DBPool, time_int2str -from .db_parsing import parse_xml_string, decompress_CompressContent, read_BytesExtra - - -def get_contact(MicroMsg_db_path, wx_id): - """ - 获取联系人信息 - :param MicroMsg_db_path: MicroMsg.db 文件路径 - :param wx_id: 微信id - :return: 联系人信息 - """ - with DBPool(MicroMsg_db_path) as db: - # 获取username是wx_id的用户 - sql = ("SELECT A.UserName, A.NickName, A.Remark,A.Alias,A.Reserved6,B.bigHeadImgUrl " - "FROM Contact A,ContactHeadImgUrl B " - f"WHERE A.UserName = '{wx_id}' AND A.UserName = B.usrName " - "ORDER BY NickName ASC;") - result = execute_sql(db, sql) - print('联系人信息:', result) - if not result: - print('居然没找到!') - print(wx_id) - return None - return {"username": result[0], "nickname": result[1], "remark": result[2], "account": result[3], - "describe": result[4], "headImgUrl": result[5]} - - -def get_contact_list(MicroMsg_db_path, OpenIMContact_db_path=None): - """ - 获取联系人列表 - :param MicroMsg_db_path: MicroMsg.db 文件路径 - :return: 联系人列表 - """ - users = [] - # 连接 MicroMsg.db 数据库,并执行查询 - with DBPool(MicroMsg_db_path) as db: - sql = ("SELECT A.UserName, A.NickName, A.Remark,A.Alias,A.Reserved6,B.bigHeadImgUrl " - "FROM Contact A,ContactHeadImgUrl B " - "where UserName==usrName " - "ORDER BY NickName ASC;") - result = execute_sql(db, sql) - for row in result: - # 获取用户名、昵称、备注和聊天记录数量 - username, nickname, remark, Alias, describe, headImgUrl = row - users.append( - {"username": username, "nickname": nickname, "remark": remark, "account": Alias, "describe": describe, - "headImgUrl": headImgUrl}) - # return users - if OpenIMContact_db_path: - with DBPool(OpenIMContact_db_path) as db: - sql = ("SELECT A.UserName, A.NickName, A.Remark,A.BigHeadImgUrl FROM OpenIMContact A " - "ORDER BY NickName ASC;") - result = execute_sql(db, sql) - for row in result: - # 获取用户名、昵称、备注和聊天记录数量 - username, nickname, remark, headImgUrl = row - users.append( - {"username": username, "nickname": nickname, "remark": remark, "account": "", "describe": "", - "headImgUrl": headImgUrl}) - return users - - -def get_chatroom_list(MicroMsg_db_path): - """ - 获取群聊列表 - :param MicroMsg_db_path: MicroMsg.db 文件路径 - :return: 群聊列表 - """ - rooms = [] - # 连接 MicroMsg.db 数据库,并执行查询 - with DBPool(MicroMsg_db_path) as db: - sql = ("SELECT A.ChatRoomName,A.UserNameList, A.DisplayNameList, B.Announcement,B.AnnouncementEditor " - "FROM ChatRoom A,ChatRoomInfo B " - "where A.ChatRoomName==B.ChatRoomName " - "ORDER BY A.ChatRoomName ASC;") - result = execute_sql(db, sql) - for row in result: - # 获取用户名、昵称、备注和聊天记录数量 - ChatRoomName, UserNameList, DisplayNameList, Announcement, AnnouncementEditor = row - UserNameList = UserNameList.split("^G") - DisplayNameList = DisplayNameList.split("^G") - rooms.append( - {"ChatRoomName": ChatRoomName, "UserNameList": UserNameList, "DisplayNameList": DisplayNameList, - "Announcement": Announcement, "AnnouncementEditor": AnnouncementEditor}) - return rooms - - -def get_room_user_list(MSG_db_path, selected_talker): - """ - 获取群聊中包含的所有用户列表 - :param MSG_db_path: MSG.db 文件路径 - :param selected_talker: 选中的聊天对象 wxid - :return: 聊天用户列表 - """ - - # 连接 MSG_ALL.db 数据库,并执行查询 - with DBPool(MSG_db_path) as db1: - sql = ( - "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id " - "FROM MSG WHERE StrTalker=? " - "ORDER BY CreateTime ASC") - - result1 = execute_sql(db1, sql, (selected_talker,)) - user_list = [] - read_user_wx_id = [] - for row in result1: - localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent, BytesExtra, id = row - bytes_extra = read_BytesExtra(BytesExtra) - if bytes_extra: - try: - talker = bytes_extra['3'][0]['2'].decode('utf-8', errors='ignore') - except: - continue - if talker in read_user_wx_id: - continue - user = get_contact(MSG_db_path, talker) - if not user: - continue - user_list.append(user) - read_user_wx_id.append(talker) - return user_list - - -def get_msg_list(MSG_db_path, selected_talker="", start_index=0, page_size=500): - """ - 获取聊天记录列表 - :param MSG_db_path: MSG.db 文件路径 - :param selected_talker: 选中的聊天对象 wxid - :param start_index: 开始索引 - :param page_size: 每页数量 - :return: 聊天记录列表 - """ - - # 连接 MSG_ALL.db 数据库,并执行查询 - with DBPool(MSG_db_path) as db1: - if selected_talker: - sql = ( - "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id " - "FROM MSG WHERE StrTalker=? " - "ORDER BY CreateTime ASC LIMIT ?,?") - result1 = execute_sql(db1, sql, (selected_talker, start_index, page_size)) - else: - sql = ( - "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id " - "FROM MSG ORDER BY CreateTime ASC LIMIT ?,?") - result1 = execute_sql(db1, sql, (start_index, page_size)) - - data = [] - for row in result1: - localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent, BytesExtra, id = row - CreateTime = time_int2str(CreateTime) - - type_id = (Type, SubType) - type_name = get_type_name(type_id) - - content = {"src": "", "msg": StrContent} - - if type_id == (1, 0): # 文本 - content["msg"] = StrContent - - elif type_id == (3, 0): # 图片 - DictExtra = read_BytesExtra(BytesExtra) - DictExtra_str = str(DictExtra) - img_paths = [i for i in re.findall(r"(FileStorage.*?)'", DictExtra_str)] - img_paths = sorted(img_paths, key=lambda p: "Image" in p, reverse=True) - if img_paths: - img_path = img_paths[0].replace("'", "") - img_path = [i for i in img_path.split("\\") if i] - img_path = os.path.join(*img_path) - content["src"] = img_path - else: - content["src"] = "" - content["msg"] = "图片" - elif type_id == (34, 0): # 语音 - tmp_c = parse_xml_string(StrContent) - voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "") - transtext = tmp_c.get("voicetrans", {}).get("transtext", "") - if voicelength.isdigit(): - voicelength = int(voicelength) / 1000 - voicelength = f"{voicelength:.2f}" - content[ - "msg"] = f"语音时长:{voicelength}秒\n翻译结果:{transtext}" if transtext else f"语音时长:{voicelength}秒" - content["src"] = os.path.join("audio", f"{StrTalker}", - f"{CreateTime.replace(':', '-').replace(' ', '_')}_{IsSender}_{MsgSvrID}.wav") - elif type_id == (43, 0): # 视频 - DictExtra = read_BytesExtra(BytesExtra) - DictExtra = str(DictExtra) - - DictExtra_str = str(DictExtra) - video_paths = [i for i in re.findall(r"(FileStorage.*?)'", DictExtra_str)] - video_paths = sorted(video_paths, key=lambda p: "mp4" in p, reverse=True) - if video_paths: - video_path = video_paths[0].replace("'", "") - video_path = [i for i in video_path.split("\\") if i] - video_path = os.path.join(*video_path) - content["src"] = video_path - else: - content["src"] = "" - content["msg"] = "视频" - - elif type_id == (47, 0): # 动画表情 - content_tmp = parse_xml_string(StrContent) - cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "") - if cdnurl: - content = {"src": cdnurl, "msg": "表情"} - - elif type_id == (49, 0): - DictExtra = read_BytesExtra(BytesExtra) - url = match_BytesExtra(DictExtra) - content["src"] = url - file_name = os.path.basename(url) - content["msg"] = file_name - - elif type_id == (49, 19): # 合并转发的聊天记录 - CompressContent = decompress_CompressContent(CompressContent) - content_tmp = parse_xml_string(CompressContent) - title = content_tmp.get("appmsg", {}).get("title", "") - des = content_tmp.get("appmsg", {}).get("des", "") - recorditem = content_tmp.get("appmsg", {}).get("recorditem", "") - recorditem = parse_xml_string(recorditem) - content["msg"] = f"{title}\n{des}" - content["src"] = recorditem - - elif type_id == (49, 57): # 带有引用的文本消息 - CompressContent = decompress_CompressContent(CompressContent) - content_tmp = parse_xml_string(CompressContent) - appmsg = content_tmp.get("appmsg", {}) - title = appmsg.get("title", "") - refermsg = appmsg.get("refermsg", {}) - displayname = refermsg.get("displayname", "") - display_content = refermsg.get("content", "") - display_createtime = refermsg.get("createtime", "") - display_createtime = time_int2str( - int(display_createtime)) if display_createtime.isdigit() else display_createtime - content["msg"] = f"{title}\n\n[引用]({display_createtime}){displayname}:{display_content}" - content["src"] = "" - - elif type_id == (49, 2000): # 转账消息 - CompressContent = decompress_CompressContent(CompressContent) - content_tmp = parse_xml_string(CompressContent) - feedesc = content_tmp.get("appmsg", {}).get("wcpayinfo", {}).get("feedesc", "") - content["msg"] = f"转账:{feedesc}" - content["src"] = "" - - elif type_id[0] == 49 and type_id[1] != 0: - DictExtra = read_BytesExtra(BytesExtra) - url = match_BytesExtra(DictExtra) - content["src"] = url - content["msg"] = type_name - - elif type_id == (50, 0): # 语音通话 - content["msg"] = "语音/视频通话[%s]" % DisplayContent - - # elif type_id == (10000, 0): - # content["msg"] = StrContent - # elif type_id == (10000, 4): - # content["msg"] = StrContent - # elif type_id == (10000, 8000): - # content["msg"] = StrContent - - talker = "未知" - if IsSender == 1: - talker = "我" - else: - if StrTalker.endswith("@chatroom"): - bytes_extra = read_BytesExtra(BytesExtra) - if bytes_extra: - try: - talker = bytes_extra['3'][0]['2'].decode('utf-8', errors='ignore') - if "publisher-id" in talker: - talker = "系统" - except: - pass - else: - talker = StrTalker - - row_data = {"MsgSvrID": str(MsgSvrID), "type_name": type_name, "is_sender": IsSender, "talker": talker, - "room_name": StrTalker, "content": content, "CreateTime": CreateTime, "id": id} - data.append(row_data) - return data - - -def get_chat_count(MSG_db_path: [str, list], username: str = ""): - """ - 获取聊天记录数量 - :param MSG_db_path: MSG.db 文件路径 - :return: 聊天记录数量列表 - """ - if username: - sql = f"SELECT StrTalker,COUNT(*) FROM MSG WHERE StrTalker='{username}';" - else: - sql = f"SELECT StrTalker, COUNT(*) FROM MSG GROUP BY StrTalker ORDER BY COUNT(*) DESC;" - - with DBPool(MSG_db_path) as db1: - result = execute_sql(db1, sql) - chat_counts = {} - for row in result: - username, chat_count = row - chat_counts[username] = chat_count - return chat_counts - - -def get_all_chat_count(MSG_db_path: [str, list]): - """ - 获取聊天记录总数量 - :param MSG_db_path: MSG.db 文件路径 - :return: 聊天记录数量 - """ - sql = f"SELECT COUNT(*) FROM MSG;" - with DBPool(MSG_db_path) as db1: - result = execute_sql(db1, sql) - if result and len(result) > 0: - chat_counts = result[0][0] - return chat_counts - return 0 - - -def export_csv(username, outpath, MSG_ALL_db_path, page_size=5000): - if not os.path.exists(outpath): - outpath = os.path.join(os.getcwd(), "export" + os.sep + username) - if not os.path.exists(outpath): - os.makedirs(outpath) - count = get_chat_count(MSG_ALL_db_path, username) - chatCount = count.get(username, 0) - if chatCount == 0: - return False, "没有聊天记录" - if page_size > chatCount: - page_size = chatCount + 1 - for i in range(0, chatCount, page_size): - start_index = i - data = get_msg_list(MSG_ALL_db_path, username, start_index, page_size) - if len(data) == 0: - return False, "没有聊天记录" - save_path = os.path.join(outpath, f"{username}_{i}_{i + page_size}.csv") - with open(save_path, "w", encoding="utf-8", newline='') as f: - csv_writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL) - csv_writer.writerow(["id", "MsgSvrID", "type_name", "is_sender", "talker", "room_name", "content", - "CreateTime"]) - for row in data: - id = row.get("id", "") - MsgSvrID = row.get("MsgSvrID", "") - type_name = row.get("type_name", "") - is_sender = row.get("is_sender", "") - talker = row.get("talker", "") - room_name = row.get("room_name", "") - content = row.get("content", "") - CreateTime = row.get("CreateTime", "") - - content = json.dumps(content, ensure_ascii=False) - csv_writer.writerow([id, MsgSvrID, type_name, is_sender, talker, room_name, content, CreateTime]) - - return True, f"导出成功: {outpath}" - - -def export_json(username, outpath, MSG_ALL_db_path): - if not os.path.exists(outpath): - outpath = os.path.join(os.getcwd(), "export" + os.sep + username) - if not os.path.exists(outpath): - os.makedirs(outpath) - count = get_chat_count(MSG_ALL_db_path, username) - chatCount = count.get(username, 0) - if chatCount == 0: - return False, "没有聊天记录" - page_size = chatCount + 1 - for i in range(0, chatCount, page_size): - start_index = i - data = get_msg_list(MSG_ALL_db_path, username, start_index, page_size) - if len(data) == 0: - return False, "没有聊天记录" - save_path = os.path.join(outpath, f"{username}_{i}_{i + page_size}.json") - with open(save_path, "w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False, indent=4) - return True, f"导出成功: {outpath}" - - -def export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path, page_size=500): - name_save = user.get("remark", user.get("nickname", user.get("username", ""))) - username = user.get("username", "") - - chatCount = user.get("chat_count", 0) - if chatCount == 0: - return False, "没有聊天记录" - - for i in range(0, chatCount, page_size): - start_index = i - data = load_chat_records(username, start_index, page_size, user, MSG_ALL_db_path, MediaMSG_all_db_path, - FileStorage_path) - if len(data) == 0: - break - save_path = os.path.join(outpath, f"{name_save}_{int(i / page_size)}.html") - with open(save_path, "w", encoding="utf-8") as f: - f.write(render_template("chat.html", msgs=data)) - return True, f"导出成功{outpath}" - - -def export(username, outpath, MSG_ALL_db_path, MicroMsg_db_path, MediaMSG_all_db_path, FileStorage_path): - if not os.path.exists(outpath): - outpath = os.path.join(os.getcwd(), "export" + os.sep + username) - if not os.path.exists(outpath): - os.makedirs(outpath) - - USER_LIST = get_user_list(MSG_ALL_db_path, MicroMsg_db_path) - user = list(filter(lambda x: x["username"] == username, USER_LIST)) - - if username and len(user) > 0: - user = user[0] - return export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path) diff --git a/pywxdump/api/api.py b/pywxdump/api/api.py index 8830c40..bb85fb1 100644 --- a/pywxdump/api/api.py +++ b/pywxdump/api/api.py @@ -12,18 +12,18 @@ import os import re import time import shutil - import pythoncom +import pywxdump + from flask import Flask, request, render_template, g, Blueprint, send_file, make_response, session -from pywxdump import analyzer, read_img_dat, read_audio, get_wechat_db, get_core_db -from pywxdump.analyzer.export_chat import get_contact, get_room_user_list +from pywxdump import get_core_db from pywxdump.api.rjson import ReJson, RqJson from pywxdump.api.utils import read_session, get_session_wxids, save_session, error9999, gen_base64, validate_title from pywxdump import read_info, VERSION_LIST, batch_decrypt, BiasAddr, merge_db, decrypt_merge, merge_real_time_db -import pywxdump + from pywxdump.dbpreprocess import wxid2userinfo, ParsingMSG, get_user_list, get_recent_user_list, ParsingMediaMSG, \ - download_file -from pywxdump.dbpreprocess import export_csv,export_json + download_file,export_csv, export_json +from pywxdump.dbpreprocess.utils import dat2img # app = Flask(__name__, static_folder='../ui/web/dist', static_url_path='/') @@ -345,7 +345,7 @@ def get_img(img_path): original_img_path = os.path.join(wx_path, img_path) if os.path.exists(original_img_path): - fomt, md5, out_bytes = read_img_dat(original_img_path) + fomt, md5, out_bytes = dat2img(original_img_path) imgsavepath = os.path.join(img_tmp_path, img_path + "_" + ".".join([md5, fomt])) if not os.path.exists(os.path.dirname(imgsavepath)): os.makedirs(os.path.dirname(imgsavepath)) @@ -559,187 +559,187 @@ def get_export_json(): return ReJson(2001, body=ret) -@api.route('/api/export', methods=["GET", 'POST']) -@error9999 -def export(): - """ - 导出聊天记录 - :return: - """ - export_type = request.json.get("export_type") - start_time = request.json.get("start_time", 0) - end_time = request.json.get("end_time", 0) - chat_type = request.json.get("chat_type") - username = request.json.get("username") - - wx_path = request.json.get("wx_path", read_session(g.sf, "wx_path")) - key = request.json.get("key", read_session(g.sf, "key")) - - if not export_type or not isinstance(export_type, str): - return ReJson(1002) - - # 导出路径 - outpath = os.path.join(g.tmp_path, "export", export_type) - if not os.path.exists(outpath): - os.makedirs(outpath) - - if export_type == "endb": # 导出加密数据库 - # 获取微信文件夹路径 - if not wx_path: - return ReJson(1002) - if not os.path.exists(wx_path): - return ReJson(1001, body=wx_path) - - # 分割wx_path的文件名和父目录 - code, wxdbpaths = get_core_db(wx_path) - if not code: - return ReJson(2001, body=wxdbpaths) - - for wxdb in wxdbpaths: - # 复制wxdb->outpath, os.path.basename(wxdb) - shutil.copy(wxdb, os.path.join(outpath, os.path.basename(wxdb))) - return ReJson(0, body=outpath) - - elif export_type == "dedb": - if isinstance(start_time, int) and isinstance(end_time, int): - msg_path = read_session(g.sf, "msg_path") - micro_path = read_session(g.sf, "micro_path") - media_path = read_session(g.sf, "media_path") - dbpaths = [msg_path, media_path, micro_path] - dbpaths = list(set(dbpaths)) - mergepath = merge_db(dbpaths, os.path.join(outpath, "merge.db"), start_time, end_time) - return ReJson(0, body=mergepath) - # if msg_path == media_path and msg_path == media_path: - # shutil.copy(msg_path, os.path.join(outpath, "merge.db")) - # return ReJson(0, body=msg_path) - # else: - # dbpaths = [msg_path, msg_path, micro_path] - # dbpaths = list(set(dbpaths)) - # mergepath = merge_db(dbpaths, os.path.join(outpath, "merge.db"), start_time, end_time) - # return ReJson(0, body=mergepath) - else: - return ReJson(1002, body={"start_time": start_time, "end_time": end_time}) - - elif export_type == "csv": - outpath = os.path.join(outpath, username) - if not os.path.exists(outpath): - os.makedirs(outpath) - code, ret = analyzer.export_csv(username, outpath, read_session(g.sf, "msg_path")) - if code: - return ReJson(0, ret) - else: - return ReJson(2001, body=ret) - elif export_type == "json": - outpath = os.path.join(outpath, username) - if not os.path.exists(outpath): - os.makedirs(outpath) - code, ret = analyzer.export_json(username, outpath, read_session(g.sf, "msg_path")) - if code: - return ReJson(0, ret) - else: - return ReJson(2001, body=ret) - elif export_type == "html": - outpath = os.path.join(outpath, username) - if os.path.exists(outpath): - shutil.rmtree(outpath) - if not os.path.exists(outpath): - os.makedirs(outpath) - # chat_type_tups = [] - # for ct in chat_type: - # tup = analyzer.get_name_typeid(ct) - # if tup: - # chat_type_tups += tup - # if not chat_type_tups: - # return ReJson(1002) - - # 复制文件 html - export_html = os.path.join(os.path.dirname(pywxdump.VERSION_LIST_PATH), "ui", "export") - indexhtml_path = os.path.join(export_html, "index.html") - assets_path = os.path.join(export_html, "assets") - if not os.path.exists(indexhtml_path) or not os.path.exists(assets_path): - return ReJson(1001) - js_path = "" - css_path = "" - for file in os.listdir(assets_path): - if file.endswith('.js'): - js_path = os.path.join(assets_path, file) - elif file.endswith('.css'): - css_path = os.path.join(assets_path, file) - else: - continue - # 读取html,js,css - with open(indexhtml_path, 'r', encoding='utf-8') as f: - html = f.read() - with open(js_path, 'r', encoding='utf-8') as f: - js = f.read() - with open(css_path, 'r', encoding='utf-8') as f: - css = f.read() - - html = re.sub(r'', '', html) # 删除所有的script标签 - html = re.sub(r'', '', html) # 删除所有的link标签 - - html = html.replace('', f'') - html = html.replace('', f'') - # END 生成index.html - - rdata = func_get_msgs(0, 10000000, username, "", "") - - msg_list = rdata["msg_list"] - for i in range(len(msg_list)): - if msg_list[i]["type_name"] == "语音": - savePath = msg_list[i]["content"]["src"] - MsgSvrID = savePath.split("_")[-1].replace(".wav", "") - if not savePath: - continue - media_path = read_session(g.sf, "media_path") - wave_data = read_audio(MsgSvrID, is_wave=True, DB_PATH=media_path) - if not wave_data: - continue - # 判断savePath路径的文件夹是否存在 - savePath = os.path.join(outpath, savePath) - if not os.path.exists(os.path.dirname(savePath)): - os.makedirs(os.path.dirname(savePath)) - with open(savePath, "wb") as f: - f.write(wave_data) - elif msg_list[i]["type_name"] == "图片": - img_path = msg_list[i]["content"]["src"] - wx_path = read_session(g.sf, "wx_path") - img_path_all = os.path.join(wx_path, img_path) - - if os.path.exists(img_path_all): - fomt, md5, out_bytes = read_img_dat(img_path_all) - imgsavepath = os.path.join(outpath, "img", img_path + "_" + ".".join([md5, fomt])) - if not os.path.exists(os.path.dirname(imgsavepath)): - os.makedirs(os.path.dirname(imgsavepath)) - with open(imgsavepath, "wb") as f: - f.write(out_bytes) - msg_list[i]["content"]["src"] = os.path.join("img", img_path + "_" + ".".join([md5, fomt])) - - rdata["msg_list"] = msg_list - rdata["myuserdata"] = rdata["user_list"][rdata["my_wxid"]] - rdata["myuserdata"]["chat_count"] = len(rdata["msg_list"]) - save_data = rdata - save_json_path = os.path.join(outpath, "data") - if not os.path.exists(save_json_path): - os.makedirs(save_json_path) - with open(os.path.join(save_json_path, "msg_user.json"), "w", encoding="utf-8") as f: - json.dump(save_data, f, ensure_ascii=False) - - json_base64 = gen_base64(os.path.join(save_json_path, "msg_user.json")) - html = html.replace('"./data/msg_user.json"', f'"{json_base64}"') - - with open(os.path.join(outpath, "index.html"), 'w', encoding='utf-8') as f: - f.write(html) - return ReJson(0, outpath) - - elif export_type == "pdf": - pass - elif export_type == "docx": - pass - else: - return ReJson(1002) - - return ReJson(9999, "") +# @api.route('/api/export', methods=["GET", 'POST']) +# @error9999 +# def export(): +# """ +# 导出聊天记录 +# :return: +# """ +# export_type = request.json.get("export_type") +# start_time = request.json.get("start_time", 0) +# end_time = request.json.get("end_time", 0) +# chat_type = request.json.get("chat_type") +# username = request.json.get("username") +# +# wx_path = request.json.get("wx_path", read_session(g.sf, "wx_path")) +# key = request.json.get("key", read_session(g.sf, "key")) +# +# if not export_type or not isinstance(export_type, str): +# return ReJson(1002) +# +# # 导出路径 +# outpath = os.path.join(g.tmp_path, "export", export_type) +# if not os.path.exists(outpath): +# os.makedirs(outpath) +# +# if export_type == "endb": # 导出加密数据库 +# # 获取微信文件夹路径 +# if not wx_path: +# return ReJson(1002) +# if not os.path.exists(wx_path): +# return ReJson(1001, body=wx_path) +# +# # 分割wx_path的文件名和父目录 +# code, wxdbpaths = get_core_db(wx_path) +# if not code: +# return ReJson(2001, body=wxdbpaths) +# +# for wxdb in wxdbpaths: +# # 复制wxdb->outpath, os.path.basename(wxdb) +# shutil.copy(wxdb, os.path.join(outpath, os.path.basename(wxdb))) +# return ReJson(0, body=outpath) +# +# elif export_type == "dedb": +# if isinstance(start_time, int) and isinstance(end_time, int): +# msg_path = read_session(g.sf, "msg_path") +# micro_path = read_session(g.sf, "micro_path") +# media_path = read_session(g.sf, "media_path") +# dbpaths = [msg_path, media_path, micro_path] +# dbpaths = list(set(dbpaths)) +# mergepath = merge_db(dbpaths, os.path.join(outpath, "merge.db"), start_time, end_time) +# return ReJson(0, body=mergepath) +# # if msg_path == media_path and msg_path == media_path: +# # shutil.copy(msg_path, os.path.join(outpath, "merge.db")) +# # return ReJson(0, body=msg_path) +# # else: +# # dbpaths = [msg_path, msg_path, micro_path] +# # dbpaths = list(set(dbpaths)) +# # mergepath = merge_db(dbpaths, os.path.join(outpath, "merge.db"), start_time, end_time) +# # return ReJson(0, body=mergepath) +# else: +# return ReJson(1002, body={"start_time": start_time, "end_time": end_time}) +# +# elif export_type == "csv": +# outpath = os.path.join(outpath, username) +# if not os.path.exists(outpath): +# os.makedirs(outpath) +# code, ret = analyzer.export_csv(username, outpath, read_session(g.sf, "msg_path")) +# if code: +# return ReJson(0, ret) +# else: +# return ReJson(2001, body=ret) +# elif export_type == "json": +# outpath = os.path.join(outpath, username) +# if not os.path.exists(outpath): +# os.makedirs(outpath) +# code, ret = analyzer.export_json(username, outpath, read_session(g.sf, "msg_path")) +# if code: +# return ReJson(0, ret) +# else: +# return ReJson(2001, body=ret) +# elif export_type == "html": +# outpath = os.path.join(outpath, username) +# if os.path.exists(outpath): +# shutil.rmtree(outpath) +# if not os.path.exists(outpath): +# os.makedirs(outpath) +# # chat_type_tups = [] +# # for ct in chat_type: +# # tup = analyzer.get_name_typeid(ct) +# # if tup: +# # chat_type_tups += tup +# # if not chat_type_tups: +# # return ReJson(1002) +# +# # 复制文件 html +# export_html = os.path.join(os.path.dirname(pywxdump.VERSION_LIST_PATH), "ui", "export") +# indexhtml_path = os.path.join(export_html, "index.html") +# assets_path = os.path.join(export_html, "assets") +# if not os.path.exists(indexhtml_path) or not os.path.exists(assets_path): +# return ReJson(1001) +# js_path = "" +# css_path = "" +# for file in os.listdir(assets_path): +# if file.endswith('.js'): +# js_path = os.path.join(assets_path, file) +# elif file.endswith('.css'): +# css_path = os.path.join(assets_path, file) +# else: +# continue +# # 读取html,js,css +# with open(indexhtml_path, 'r', encoding='utf-8') as f: +# html = f.read() +# with open(js_path, 'r', encoding='utf-8') as f: +# js = f.read() +# with open(css_path, 'r', encoding='utf-8') as f: +# css = f.read() +# +# html = re.sub(r'', '', html) # 删除所有的script标签 +# html = re.sub(r'', '', html) # 删除所有的link标签 +# +# html = html.replace('', f'') +# html = html.replace('', f'') +# # END 生成index.html +# +# rdata = func_get_msgs(0, 10000000, username, "", "") +# +# msg_list = rdata["msg_list"] +# for i in range(len(msg_list)): +# if msg_list[i]["type_name"] == "语音": +# savePath = msg_list[i]["content"]["src"] +# MsgSvrID = savePath.split("_")[-1].replace(".wav", "") +# if not savePath: +# continue +# media_path = read_session(g.sf, "media_path") +# wave_data = read_audio(MsgSvrID, is_wave=True, DB_PATH=media_path) +# if not wave_data: +# continue +# # 判断savePath路径的文件夹是否存在 +# savePath = os.path.join(outpath, savePath) +# if not os.path.exists(os.path.dirname(savePath)): +# os.makedirs(os.path.dirname(savePath)) +# with open(savePath, "wb") as f: +# f.write(wave_data) +# elif msg_list[i]["type_name"] == "图片": +# img_path = msg_list[i]["content"]["src"] +# wx_path = read_session(g.sf, "wx_path") +# img_path_all = os.path.join(wx_path, img_path) +# +# if os.path.exists(img_path_all): +# fomt, md5, out_bytes = read_img_dat(img_path_all) +# imgsavepath = os.path.join(outpath, "img", img_path + "_" + ".".join([md5, fomt])) +# if not os.path.exists(os.path.dirname(imgsavepath)): +# os.makedirs(os.path.dirname(imgsavepath)) +# with open(imgsavepath, "wb") as f: +# f.write(out_bytes) +# msg_list[i]["content"]["src"] = os.path.join("img", img_path + "_" + ".".join([md5, fomt])) +# +# rdata["msg_list"] = msg_list +# rdata["myuserdata"] = rdata["user_list"][rdata["my_wxid"]] +# rdata["myuserdata"]["chat_count"] = len(rdata["msg_list"]) +# save_data = rdata +# save_json_path = os.path.join(outpath, "data") +# if not os.path.exists(save_json_path): +# os.makedirs(save_json_path) +# with open(os.path.join(save_json_path, "msg_user.json"), "w", encoding="utf-8") as f: +# json.dump(save_data, f, ensure_ascii=False) +# +# json_base64 = gen_base64(os.path.join(save_json_path, "msg_user.json")) +# html = html.replace('"./data/msg_user.json"', f'"{json_base64}"') +# +# with open(os.path.join(outpath, "index.html"), 'w', encoding='utf-8') as f: +# f.write(html) +# return ReJson(0, outpath) +# +# elif export_type == "pdf": +# pass +# elif export_type == "docx": +# pass +# else: +# return ReJson(1002) +# +# return ReJson(9999, "") # end 导出聊天记录 ******************************************************************************************************* diff --git a/pywxdump/dbpreprocess/parsingMicroMsg.py b/pywxdump/dbpreprocess/parsingMicroMsg.py index 4d3da3b..f4f453a 100644 --- a/pywxdump/dbpreprocess/parsingMicroMsg.py +++ b/pywxdump/dbpreprocess/parsingMicroMsg.py @@ -156,7 +156,7 @@ class ParsingMicroMsg(DatabaseBase): rd += v for i in rd: try: - if isinstance(i, dict) and isinstance(i.get('1'),str) and i.get('2'): + if isinstance(i, dict) and isinstance(i.get('1'), str) and i.get('2'): wxid2remark[i['1']] = i["2"] except Exception as e: logging.error(f"wxid2remark: ChatRoomName:{ChatRoomName}, {i} error:{e}") @@ -164,3 +164,59 @@ class ParsingMicroMsg(DatabaseBase): {"ChatRoomName": ChatRoomName, "UserNameList": UserNameList, "DisplayNameList": DisplayNameList, "Announcement": Announcement, "AnnouncementEditor": AnnouncementEditor, "wxid2remark": wxid2remark}) return rooms + + def get_ExtraBuf(self, ExtraBuf: bytes): + """ + 读取ExtraBuf(联系人表) + :param ExtraBuf: + :return: + """ + if not ExtraBuf: + return None + try: + buf_dict = { + 'DDF32683': '0', '74752C06': '性别[1男2女]', '88E28FCE': '2', '761A1D2D': '3', '0263A0CB': '4', + '0451FF12': '5', + '228C66A8': '6', '46CF10C4': '个性签名', 'A4D9024A': '国', 'E2EAA8D1': '省', '1D025BBF': '市', + '4D6C4570': '11', + 'F917BCC0': '公司名称', '759378AD': '手机号', '4335DFDD': '14', 'DE4CDAEB': '15', 'A72BC20A': '16', + '069FED52': '17', + '9B0F4299': '18', '3D641E22': '19', '1249822C': '20', '4EB96D85': '企微属性', 'B4F73ACB': '22', + '0959EB92': '23', + '3CF4A315': '24', 'C9477AC60201E44CD0E8': '26', 'B7ACF0F5': '28', '57A7B5A8': '29', + '81AE19B4': '朋友圈背景', + '695F3170': '31', 'FB083DD9': '32', '0240E37F': '33', '315D02A3': '34', '7DEC0BC3': '35', + '0E719F13': '备注图片', + '16791C90': '37' + } + + rdata = {} + for buf_name in buf_dict: + rdata_name = buf_dict[buf_name] + buf_name = bytes.fromhex(buf_name) + offset = ExtraBuf.find(buf_name) + if offset == -1: + rdata[rdata_name] = "" + continue + offset += len(buf_name) + type_id = ExtraBuf[offset: offset + 1] + offset += 1 + + if type_id == b"\x04": + rdata[rdata_name] = int.from_bytes(ExtraBuf[offset: offset + 4], "little") + + elif type_id == b"\x18": + length = int.from_bytes(ExtraBuf[offset: offset + 4], "little") + rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-16").rstrip("\x00") + + elif type_id == b"\x17": + length = int.from_bytes(ExtraBuf[offset: offset + 4], "little") + rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-8").rstrip("\x00") + + elif type_id == b"\x05": + rdata[rdata_name] = f"0x{ExtraBuf[offset: offset + 8].hex()}" + return rdata + + except Exception as e: + print(f'解析错误:\n{e}') + return None diff --git a/pywxdump/ui/__init__.py b/pywxdump/ui/__init__.py index 5d59823..6dab261 100644 --- a/pywxdump/ui/__init__.py +++ b/pywxdump/ui/__init__.py @@ -5,7 +5,7 @@ # Author: xaoyaoo # Date: 2023/12/03 # ------------------------------------------------------------------------------- -from .view_chat import app_show_chat, get_user_list, export +# from .view_chat import app_show_chat, get_user_list, export if __name__ == '__main__': pass diff --git a/pywxdump/ui/view_chat.py b/pywxdump/ui/view_chat.py deleted file mode 100644 index e603107..0000000 --- a/pywxdump/ui/view_chat.py +++ /dev/null @@ -1,318 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: GUI.py -# Description: -# Author: xaoyaoo -# Date: 2023/11/10 -# ------------------------------------------------------------------------------- -import base64 -import re -import sqlite3 -import os -import json -import time -import hashlib -from pywxdump.analyzer import read_img_dat, decompress_CompressContent, read_audio, parse_xml_string, read_BytesExtra - -from flask import Flask, request, render_template, g, Blueprint - - -def get_md5(s): - m = hashlib.md5() - m.update(s.encode("utf-8")) - return m.hexdigest() - - -def get_user_list(MSG_ALL_db_path, MicroMsg_db_path): - users = [] - - # 连接 MSG_ALL.db 数据库,并执行查询 - db1 = sqlite3.connect(MSG_ALL_db_path) - cursor1 = db1.cursor() - cursor1.execute("SELECT StrTalker, COUNT(*) AS ChatCount FROM MSG GROUP BY StrTalker ORDER BY ChatCount DESC") - result = cursor1.fetchall() - - dict_user_count = {} - # 将结果转换为字典 - for row in result: - dict_user_count[row[0]] = row[1] - - db2 = sqlite3.connect(MicroMsg_db_path) - cursor2 = db2.cursor() - cursor2.execute("SELECT UserName, NickName, Remark FROM Contact;") - result2 = cursor2.fetchall() - for row in result2: - username, nickname, remark = row - # 拼接四列数据为元组 - row_data = {"username": username, "nickname": nickname, "remark": remark, - "chat_count": dict_user_count.get(username, 0), - "isChatRoom": username.startswith("@chatroom")} - users.append(row_data) - - users.sort(key=lambda x: x["chat_count"], reverse=True) # 按照聊天记录数量排序 - cursor2.close() - db2.close() - cursor1.close() - db1.close() - return users - - -def load_base64_audio_data(MsgSvrID, MediaMSG_all_db_path): - wave_data = read_audio(MsgSvrID, is_wave=True, DB_PATH=MediaMSG_all_db_path) - if not wave_data: - return "" - video_base64 = base64.b64encode(wave_data).decode("utf-8") - video_data = f"data:audio/wav;base64,{video_base64}" - return video_data - - -def load_base64_img_data(start_time, end_time, username_md5, FileStorage_path): - """ - 获取图片的base64数据 - :param start_time: 开始时间戳 - :param end_time: 结束时间戳 - :param username_md5: 用户名的md5值 - :return: - """ - # 获取CreateTime的最大值日期 - min_time = time.strftime("%Y-%m", time.localtime(start_time)) - max_time = time.strftime("%Y-%m", time.localtime(end_time)) - img_path = os.path.join(FileStorage_path, "MsgAttach", username_md5, "Image") if FileStorage_path else "" - if not os.path.exists(img_path): - return {} - # print(min_time, max_time, img_path) - paths = [] - for root, path, files in os.walk(img_path): - for p in path: - if p >= min_time and p <= max_time: - paths.append(os.path.join(root, p)) - # print(paths) - img_md5_data = {} - for path in paths: - for root, path, files in os.walk(path): - for file in files: - if file.endswith(".dat"): - file_path = os.path.join(root, file) - fomt, md5, out_bytes = read_img_dat(file_path) - out_bytes = base64.b64encode(out_bytes).decode("utf-8") - img_md5_data[md5] = f"data:{fomt};base64,{out_bytes}" - return img_md5_data - - -def load_chat_records(selected_talker, start_index, page_size, user_list, MSG_ALL_db_path, MediaMSG_all_db_path, - FileStorage_path, USER_LIST): - username = user_list.get("username", "") - username_md5 = get_md5(username) - type_name_dict = { - 1: {0: "文本"}, - 3: {0: "图片"}, - 34: {0: "语音"}, - 43: {0: "视频"}, - 47: {0: "动画表情"}, - 49: {0: "文本", 1: "类似文字消息而不一样的消息", 5: "卡片式链接", 6: "文件", 8: "用户上传的 GIF 表情", - 19: "合并转发的聊天记录", 33: "分享的小程序", 36: "分享的小程序", 57: "带有引用的文本消息", - 63: "视频号直播或直播回放等", - 87: "群公告", 88: "视频号直播或直播回放等", 2000: "转账消息", 2003: "赠送红包封面"}, - 50: {0: "语音通话"}, - 10000: {0: "系统通知", 4: "拍一拍", 8000: "系统通知"} - } - - # 连接 MSG_ALL.db 数据库,并执行查询 - db1 = sqlite3.connect(MSG_ALL_db_path) - cursor1 = db1.cursor() - - cursor1.execute( - "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra FROM MSG WHERE StrTalker=? ORDER BY CreateTime ASC LIMIT ?,?", - (selected_talker, start_index, page_size)) - result1 = cursor1.fetchall() - - cursor1.close() - db1.close() - # 获取图片的base64数据 - # img_md5_data = load_base64_img_data(result1[0][7], result1[-1][7], username_md5, FileStorage_path) if len( - # result1) > 0 else {} - - data = [] - room_username_count = {} - for row in result1: - localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent, BytesExtra = row - CreateTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(CreateTime)) - - type_name = type_name_dict.get(Type, {}).get(SubType, "未知") - - content = {"src": "", "msg": "", "style": ""} - - if Type == 47 and SubType == 0: # 动画表情 - content_tmp = parse_xml_string(StrContent) - cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "") - # md5 = content_tmp.get("emoji", {}).get("md5", "") - if cdnurl: - content = {"src": cdnurl, "msg": "表情", "style": "width: 100px; height: 100px;"} - - elif Type == 49 and SubType == 57: # 带有引用的文本消息 - CompressContent = CompressContent.rsplit(b'\x00', 1)[0] - content["msg"] = decompress_CompressContent(CompressContent) - try: - content["msg"] = content["msg"].decode("utf-8") - content["msg"] = parse_xml_string(content["msg"]) - content["msg"] = json.dumps(content["msg"], ensure_ascii=False) - except Exception as e: - content["msg"] = "[带有引用的文本消息]解析失败" - elif Type == 34 and SubType == 0: # 语音 - tmp_c = parse_xml_string(StrContent) - voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "") - transtext = tmp_c.get("voicetrans", {}).get("transtext", "") - if voicelength.isdigit(): - voicelength = int(voicelength) / 1000 - voicelength = f"{voicelength:.2f}" - content["msg"] = f"语音时长:{voicelength}秒\n翻译结果:{transtext}" - - src = load_base64_audio_data(MsgSvrID, MediaMSG_all_db_path=MediaMSG_all_db_path) - content["src"] = src - elif Type == 3 and SubType == 0: # 图片 - xml_content = parse_xml_string(StrContent) - BytesExtra = read_BytesExtra(BytesExtra) - BytesExtra = str(BytesExtra) - match = re.search(r"MsgAttach(.*?)'", BytesExtra) - if match: - img_path = match.group(0).replace("'", "") - # print(FileStorage_path) - # print(img_path) - img_path = img_path.split("\\") - img_path = [i for i in img_path if i != ""] - img_path = os.path.join(*img_path) - if FileStorage_path: - img_path = os.path.join(FileStorage_path, img_path) - if os.path.exists(img_path): - fomt, md5, out_bytes = read_img_dat(img_path) - out_bytes = base64.b64encode(out_bytes).decode("utf-8") - content["src"] = f"data:{fomt};base64,{out_bytes}" - else: - content["src"] = "" - else: - content["src"] = "" - else: - content["src"] = "" - content["msg"] = "图片" - else: - content["msg"] = StrContent - - talker = "未知" - if IsSender == 1: - talker = "我" - else: - if StrTalker.endswith("@chatroom"): - bytes_extra = read_BytesExtra(BytesExtra) - if bytes_extra: - try: - matched_string = bytes_extra['3'][0]['2'].decode('utf-8', errors='ignore') - talker_dicts = list(filter(lambda x: x["username"] == matched_string, USER_LIST)) - if len(talker_dicts) > 0: - talker_dict = talker_dicts[0] - room_username = talker_dict.get("username", "") - room_nickname = talker_dict.get("nickname", "") - room_remark = talker_dict.get("remark", "") - talker = room_remark if room_remark else room_nickname if room_nickname else room_username - else: - talker = matched_string - except: - pass - else: - talker = user_list.get("remark", user_list.get("nickname", user_list.get("username", ""))) - - row_data = {"MsgSvrID": MsgSvrID, "type_name": type_name, "is_sender": IsSender, "talker": talker, - "content": content, "CreateTime": CreateTime} - data.append(row_data) - return data - - -def export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path, page_size=500): - name_save = user.get("remark", user.get("nickname", user.get("username", ""))) - username = user.get("username", "") - - chatCount = user.get("chat_count", 0) - if chatCount == 0: - return False, "没有聊天记录" - - for i in range(0, chatCount, page_size): - start_index = i - data = load_chat_records(username, start_index, page_size, user, MSG_ALL_db_path, MediaMSG_all_db_path, - FileStorage_path, [user]) - if len(data) == 0: - break - save_path = os.path.join(outpath, f"{name_save}_{int(i / page_size)}.html") - with open(save_path, "w", encoding="utf-8") as f: - f.write(render_template("chat.html", msgs=data)) - return True, f"导出成功{outpath}" - - -def export(username, outpath, MSG_ALL_db_path, MicroMsg_db_path, MediaMSG_all_db_path, FileStorage_path): - if not os.path.exists(outpath): - outpath = os.path.join(os.getcwd(), "export" + os.sep + username) - if not os.path.exists(outpath): - os.makedirs(outpath) - - USER_LIST = get_user_list(MSG_ALL_db_path, MicroMsg_db_path) - user = list(filter(lambda x: x["username"] == username, USER_LIST)) - - if username and len(user) > 0: - user = user[0] - return export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path) - - -app_show_chat = Blueprint('show_chat_main', __name__, template_folder='templates') -app_show_chat.debug = False - - -# 主页 - 显示用户列表 -@app_show_chat.route('/') -def index(): - g.USER_LIST = get_user_list(g.MSG_ALL_db_path, g.MicroMsg_db_path) - # 只去前面500个有聊天记录的用户 - USER_LIST = g.USER_LIST[:500] - return render_template("index.html", users=USER_LIST) - - -# 获取聊天记录 -@app_show_chat.route('/get_chat_data', methods=["GET", 'POST']) -def get_chat_data(): - username = request.args.get("username", "") - user = list(filter(lambda x: x["username"] == username, g.USER_LIST)) - - if username and len(user) > 0: - user = user[0] - - limit = int(request.args.get("limit", 100)) # 每页显示的条数 - page = int(request.args.get("page", user.get("chat_count", limit) / limit)) # 当前页数 - - start_index = (page - 1) * limit - page_size = limit - - data = load_chat_records(username, start_index, page_size, user, g.MSG_ALL_db_path, g.MediaMSG_all_db_path, - g.FileStorage_path, g.USER_LIST) - return render_template("chat.html", msgs=data) - else: - return "error" - - -# 聊天记录导出为html -@app_show_chat.route('/export_chat_data', methods=["GET", 'POST']) -def get_export(): - username = request.args.get("username", "") - - user = list(filter(lambda x: x["username"] == username, g.USER_LIST)) - - if username and len(user) > 0: - user = user[0] - n = f"{user.get('username', '')}_{user.get('nickname', '')}_{user.get('remark', '')}" - outpath = os.path.join(os.getcwd(), "export" + os.sep + n) - if not os.path.exists(outpath): - os.makedirs(outpath) - - ret = export_html(user, outpath, g.MSG_ALL_db_path, g.MediaMSG_all_db_path, g.FileStorage_path, page_size=200) - if ret[0]: - return ret[1] - else: - return ret[1] - else: - return "error"