From ee2ab5de809adbf0b624cf0c0204d564689ff4ba Mon Sep 17 00:00:00 2001 From: xaoyaoo Date: Tue, 26 Dec 2023 18:03:22 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=AF=BC=E5=87=BA=E4=B8=BAcs?= =?UTF-8?q?v=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pywxdump/analyzer/export_chat.py | 260 ++++++++++++++++++------------- pywxdump/analyzer/utils.py | 88 +++++++++++ pywxdump/ui/view_chat.py | 3 +- 3 files changed, 245 insertions(+), 106 deletions(-) diff --git a/pywxdump/analyzer/export_chat.py b/pywxdump/analyzer/export_chat.py index 1249a29..01dcd52 100644 --- a/pywxdump/analyzer/export_chat.py +++ b/pywxdump/analyzer/export_chat.py @@ -12,14 +12,15 @@ # Author: xaoyaoo # Date: 2023/11/10 # ------------------------------------------------------------------------------- -import base64 +import re import sqlite3 import os import json import time from functools import wraps -from .utils import get_md5, attach_databases, execute_sql +from .utils import get_md5, attach_databases, execute_sql, get_type_name, match_BytesExtra +from .db_parsing import parse_xml_string, decompress_CompressContent, read_BytesExtra def get_contact_list(MicroMsg_db_path): @@ -50,118 +51,83 @@ def get_contact_list(MicroMsg_db_path): return users -def msg_db_connect(func): - @wraps(func) - def wrapper(MSG_db_path, *args, **kwargs): - # 连接 MSG.db 数据库,并执行查询 - if isinstance(MSG_db_path, list): - # alias, file_path - databases = {f"MSG{i}": db_path for i, db_path in enumerate(MSG_db_path)} - elif isinstance(MSG_db_path, str): - databases = {"MSG": MSG_db_path} - else: - raise TypeError("MSG_db_path 类型错误") - - # 连接 MSG_ALL.db 数据库,并执行查询 - if len(databases) > 1: - db = sqlite3.connect(":memory:") - attach_databases(db, databases) - else: - db = sqlite3.connect(list(databases.values())[0]) - - result = func("", db=db, databases=databases, *args, **kwargs) - - # 断开数据库连接 - if len(databases) > 1: - for alias in databases: - db.execute(f"DETACH DATABASE {alias}") - db.close() - - return result - - return wrapper - - -@msg_db_connect -def get_chat_count(MSG_db_path: [str, list], db=None, databases=None): +def get_chatroom_list(MicroMsg_db_path): """ - 获取聊天记录数量 - :param MSG_db_path: MSG.db 文件路径 - :return: 聊天记录数量列表 + 获取群聊列表 + :param MicroMsg_db_path: MicroMsg.db 文件路径 + :return: 群聊列表 """ - # 构造 SQL 查询,使用 UNION ALL 联合不同数据库的 MSG 表 - union_sql = " UNION ALL ".join( - f"SELECT StrTalker, COUNT(*) AS ChatCount FROM {alias}.MSG GROUP BY StrTalker" for alias in databases) + rooms = [] + # 连接 MicroMsg.db 数据库,并执行查询 + db = sqlite3.connect(MicroMsg_db_path) - sql = f"SELECT StrTalker, SUM(ChatCount) AS TotalChatCount FROM ({union_sql}) GROUP BY StrTalker ORDER BY TotalChatCount DESC" + sql = ("SELECT A.ChatRoomName,A.UserNameList, A.DisplayNameList, B.Announcement,B.AnnouncementEditor " + "FROM ChatRoom A,ChatRoomInfo B " + "where A.ChatRoomName==B.ChatRoomName " + "ORDER BY A.ChatRoomName ASC;") - chat_counts = [] result = execute_sql(db, sql) + db.close() for row in result: - username, chat_count = row - row_data = {"username": username, "chat_count": chat_count} - chat_counts.append(row_data) - return chat_counts + # 获取用户名、昵称、备注和聊天记录数量 + ChatRoomName, UserNameList, DisplayNameList, Announcement, AnnouncementEditor = row + UserNameList = UserNameList.split("^G") + DisplayNameList = DisplayNameList.split("^G") + rooms.append( + {"ChatRoomName": ChatRoomName, "UserNameList": UserNameList, "DisplayNameList": DisplayNameList, + "Announcement": Announcement, "AnnouncementEditor": AnnouncementEditor}) + return rooms -def load_chat_records(selected_talker, start_index, page_size, user_list, MSG_ALL_db_path, MediaMSG_all_db_path, - FileStorage_path): - username = user_list.get("username", "") - username_md5 = get_md5(username) - type_name_dict = { - 1: {0: "文本"}, - 3: {0: "图片"}, - 34: {0: "语音"}, - 43: {0: "视频"}, - 47: {0: "动画表情"}, - 49: {0: "文本", 1: "类似文字消息而不一样的消息", 5: "卡片式链接", 6: "文件", 8: "用户上传的 GIF 表情", - 19: "合并转发的聊天记录", 33: "分享的小程序", 36: "分享的小程序", 57: "带有引用的文本消息", - 63: "视频号直播或直播回放等", - 87: "群公告", 88: "视频号直播或直播回放等", 2000: "转账消息", 2003: "赠送红包封面"}, - 50: {0: "语音通话"}, - 10000: {0: "系统通知", 4: "拍一拍", 8000: "系统通知"} - } +def get_msg_list(MSG_db_path, selected_talker="", start_index=0, page_size=500): + """ + 获取聊天记录列表 + :param MSG_db_path: MSG.db 文件路径 + :return: 聊天记录列表 + """ # 连接 MSG_ALL.db 数据库,并执行查询 - db1 = sqlite3.connect(MSG_ALL_db_path) + db1 = sqlite3.connect(MSG_db_path) cursor1 = db1.cursor() - - cursor1.execute( - "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent FROM MSG WHERE StrTalker=? ORDER BY CreateTime ASC LIMIT ?,?", - (selected_talker, start_index, page_size)) + if selected_talker: + sql = ( + "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra " + "FROM MSG WHERE StrTalker=? " + "ORDER BY CreateTime ASC LIMIT ?,?") + cursor1.execute(sql, (selected_talker, start_index, page_size)) + else: + sql = ( + "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra " + "FROM MSG ORDER BY CreateTime ASC LIMIT ?,?") + cursor1.execute(sql, (start_index, page_size)) result1 = cursor1.fetchall() - cursor1.close() db1.close() - img_md5_data = load_base64_img_data(result1[0][7], result1[-1][7], username_md5, FileStorage_path) # 获取图片的base64数据 - data = [] for row in result1: - localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent = row + localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent, BytesExtra = row CreateTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(CreateTime)) - type_name = type_name_dict.get(Type, {}).get(SubType, "未知") + type_id = (Type, SubType) + type_name = get_type_name(type_id) - content = {"src": "", "msg": "", "style": ""} + content = {"src": "", "msg": StrContent} - if Type == 47 and SubType == 0: # 动画表情 - content_tmp = parse_xml_string(StrContent) - cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "") - # md5 = content_tmp.get("emoji", {}).get("md5", "") - if cdnurl: - content = {"src": cdnurl, "msg": "表情", "style": "width: 100px; height: 100px;"} + if type_id == (1, 0): # 文本 + content["msg"] = StrContent - elif Type == 49 and SubType == 57: # 带有引用的文本消息 - CompressContent = CompressContent.rsplit(b'\x00', 1)[0] - content["msg"] = decompress_CompressContent(CompressContent) - try: - content["msg"] = content["msg"].decode("utf-8") - content["msg"] = parse_xml_string(content["msg"]) - content["msg"] = json.dumps(content["msg"], ensure_ascii=False) - except Exception as e: - content["msg"] = "[带有引用的文本消息]解析失败" - elif Type == 34 and SubType == 0: # 语音 + elif type_id == (3, 0): # 图片 + BytesExtra = read_BytesExtra(BytesExtra) + BytesExtra = str(BytesExtra) + match = re.search(r"FileStorage(.*?)'", BytesExtra) + if match: + img_path = match.group(0).replace("'", "") + content["src"] = img_path + else: + content["src"] = "" + content["msg"] = "图片" + elif type_id == (34, 0): tmp_c = parse_xml_string(StrContent) voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "") transtext = tmp_c.get("voicetrans", {}).get("transtext", "") @@ -169,27 +135,113 @@ def load_chat_records(selected_talker, start_index, page_size, user_list, MSG_AL voicelength = int(voicelength) / 1000 voicelength = f"{voicelength:.2f}" content["msg"] = f"语音时长:{voicelength}秒\n翻译结果:{transtext}" - - src = load_base64_audio_data(MsgSvrID, MediaMSG_all_db_path=MediaMSG_all_db_path) - content["src"] = src - elif Type == 3 and SubType == 0: # 图片 - xml_content = parse_xml_string(StrContent) - md5 = xml_content.get("img", {}).get("md5", "") - if md5: - content["src"] = img_md5_data.get(md5, "") + content["src"] = os.path.join("audio", f"{StrTalker}", f"{CreateTime}_{MsgSvrID}.wav") + elif type_id == (43, 0): # 视频 + BytesExtra = read_BytesExtra(BytesExtra) + BytesExtra = str(BytesExtra) + match = re.search(r"FileStorage(.*?)'", BytesExtra) + if match: + video_path = match.group(0).replace("'", "") + content["src"] = video_path else: content["src"] = "" - content["msg"] = "图片" + content["msg"] = "视频" + elif type_id == (47, 0): # 动画表情 + content_tmp = parse_xml_string(StrContent) + cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "") + # md5 = content_tmp.get("emoji", {}).get("md5", "") + if cdnurl: + content = {"src": cdnurl, "msg": "表情"} + + elif type_id[0] == 49: + BytesExtra = read_BytesExtra(BytesExtra) + url = match_BytesExtra(BytesExtra) + content["src"] = url + content["msg"] = type_name + + elif type_id == (50, 0): # 语音通话 + BytesExtra = read_BytesExtra(BytesExtra) + + # elif type_id == (10000, 0): + # content["msg"] = StrContent + # elif type_id == (10000, 4): + # content["msg"] = StrContent + # elif type_id == (10000, 8000): + # content["msg"] = StrContent + + talker = "未知" + if IsSender == 1: + talker = "我" else: - content["msg"] = StrContent + if StrTalker.endswith("@chatroom"): + bytes_extra = read_BytesExtra(BytesExtra) + if bytes_extra: + try: + talker = bytes_extra['3'][0]['2'].decode('utf-8', errors='ignore') + except: + pass + else: + talker = StrTalker - row_data = {"MsgSvrID": MsgSvrID, "type_name": type_name, "is_sender": IsSender, - "content": content, "CreateTime": CreateTime} + row_data = {"MsgSvrID": MsgSvrID, "type_name": type_name, "is_sender": IsSender, "talker": talker, + "room_name": StrTalker, "content": content, "CreateTime": CreateTime} data.append(row_data) return data +def get_chat_count(MSG_db_path: [str, list], username: str = ""): + """ + 获取聊天记录数量 + :param MSG_db_path: MSG.db 文件路径 + :return: 聊天记录数量列表 + """ + if username: + sql = f"SELECT StrTalker,COUNT(*) FROM MSG WHERE StrTalker='{username}';" + else: + sql = f"SELECT StrTalker, COUNT(*) FROM MSG GROUP BY StrTalker ORDER BY COUNT(*) DESC;" + db1 = sqlite3.connect(MSG_db_path) + result = execute_sql(db1, sql) + + chat_counts = {} + for row in result: + username, chat_count = row + chat_counts[username] = chat_count + return chat_counts + + +def export_csv(username, outpath, MSG_ALL_db_path, page_size=5000): + if not os.path.exists(outpath): + outpath = os.path.join(os.getcwd(), "export" + os.sep + username) + if not os.path.exists(outpath): + os.makedirs(outpath) + count = get_chat_count(MSG_ALL_db_path, username) + chatCount = count.get(username, 0) + if chatCount == 0: + return False, "没有聊天记录" + for i in range(0, chatCount, page_size): + start_index = i + data = get_msg_list(MSG_ALL_db_path, username, start_index, page_size) + if len(data) == 0: + break + save_path = os.path.join(outpath, f"{username}_{int(i / page_size)}.csv") + with open(save_path, "w", encoding="utf-8") as f: + f.write("MsgSvrID,type_name,is_sender,talker,room_name,content,CreateTime\n") + for row in data: + MsgSvrID = row.get("MsgSvrID", "") + type_name = row.get("type_name", "") + is_sender = row.get("is_sender", "") + talker = row.get("talker", "") + room_name = row.get("room_name", "") + content = row.get("content", "") + CreateTime = row.get("CreateTime", "") + + content = json.dumps(content, ensure_ascii=False) + + f.write(f"{MsgSvrID},{type_name},{is_sender},{talker},{room_name},{content},{CreateTime}\n") + return True, f"导出成功: {outpath}" + + def export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path, page_size=500): name_save = user.get("remark", user.get("nickname", user.get("username", ""))) username = user.get("username", "") diff --git a/pywxdump/analyzer/utils.py b/pywxdump/analyzer/utils.py index c7accdb..120877d 100644 --- a/pywxdump/analyzer/utils.py +++ b/pywxdump/analyzer/utils.py @@ -6,6 +6,94 @@ # Date: 2023/12/03 # ------------------------------------------------------------------------------- import hashlib +import re + + +def read_dict_all_values(data): + """ + 读取字典中所有的值(单层) + :param dict_data: 字典 + :return: 所有值的list + """ + result = [] + if isinstance(data, list): + for item in data: + result.extend(read_dict_all_values(item)) + elif isinstance(data, dict): + for key, value in data.items(): + result.extend(read_dict_all_values(value)) + else: + if isinstance(data, bytes): + tmp = data.decode("utf-8") + else: + tmp = str(data) if isinstance(data, int) else data + result.append(tmp) + + for i in range(len(result)): + if isinstance(result[i], bytes): + result[i] = result[i].decode("utf-8") + return result + + +def match_BytesExtra(BytesExtra, pattern=r"FileStorage(.*?)'"): + """ + 匹配 BytesExtra + :param BytesExtra: BytesExtra + :param pattern: 匹配模式 + :return: + """ + if not BytesExtra: + return False + BytesExtra = read_dict_all_values(BytesExtra) + BytesExtra = "'" + "'".join(BytesExtra) + "'" + # print(BytesExtra) + + match = re.search(pattern, BytesExtra) + if match: + video_path = match.group(0).replace("'", "") + return video_path + else: + return "" + + +def get_type_name(type_id: tuple): + """ + 获取消息类型名称 + :param type_id: 消息类型ID 元组 eg: (1, 0) + :return: + """ + type_name_dict = { + (1, 0): "文本", + (3, 0): "图片", + (34, 0): "语音", + (43, 0): "视频", + (47, 0): "动画表情", + + (49, 0): "文件", + (49, 1): "类似文字消息而不一样的消息", + (49, 5): "卡片式链接", + (49, 6): "文件", + (49, 8): "用户上传的 GIF 表情", + (49, 19): "合并转发的聊天记录", + (49, 33): "分享的小程序", + (49, 36): "分享的小程序", + (49, 57): "带有引用的文本消息", + (49, 63): "视频号直播或直播回放等", + (49, 87): "群公告", + (49, 88): "视频号直播或直播回放等", + (49, 2000): "转账消息", + (49, 2003): "赠送红包封面", + + (50, 0): "语音通话", + (10000, 0): "系统通知", + (10000, 4): "拍一拍", + (10000, 8000): "系统通知" + } + + if type_id in type_name_dict: + return type_name_dict[type_id] + else: + return "未知" def get_md5(data): diff --git a/pywxdump/ui/view_chat.py b/pywxdump/ui/view_chat.py index fb4ec2d..aeee009 100644 --- a/pywxdump/ui/view_chat.py +++ b/pywxdump/ui/view_chat.py @@ -188,10 +188,9 @@ def load_chat_records(selected_talker, start_index, page_size, user_list, MSG_AL else: content["src"] = "" content["msg"] = "图片" - - else: content["msg"] = StrContent + talker = "未知" if IsSender == 1: talker = "我"