diff --git a/README.md b/README.md index 55d3660..1b61730 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,7 @@ * 6.查看群聊中具体发言成员的ID [#31](https://github.com/xaoyaoo/PyWxDump/issues/31) * 7.增加数据库合并功能,方便查看 * 8.增加企业微信的支持 +* 9.增加获取实时聊天记录的功能 注: 欢迎大家提供更多的想法,或者提供代码,一起完善这个项目。 diff --git a/doc/python1.0_README.md b/doc/python1.0_README.md index 03ef925..8e61f4e 100644 --- a/doc/python1.0_README.md +++ b/doc/python1.0_README.md @@ -189,7 +189,7 @@ python get_wx_decrypted_db.py --key ******** ## 四、解析数据库 -* [parse.py](../pywxdump/analyse/parse.py) : 数据库解析脚本,可以解析语音、图片、聊天记录等 +* [parse.py](../pywxdump/analyzer/parse.py) : 数据库解析脚本,可以解析语音、图片、聊天记录等 * 关于各个数据库的说明文档,请查看[wx数据库简述.md](./wx数据库简述.md) 未完待续... diff --git a/pywxdump/__init__.py b/pywxdump/__init__.py index 903a5cf..159680b 100644 --- a/pywxdump/__init__.py +++ b/pywxdump/__init__.py @@ -5,13 +5,10 @@ # Author: xaoyaoo # Date: 2023/10/14 # ------------------------------------------------------------------------------- -from .bias_addr.get_bias_addr import BiasAddr -from .wx_info.get_wx_info import read_info -from .wx_info.get_wx_db import get_wechat_db -from .decrypted.decrypt import batch_decrypt, decrypt,encrypt -from .decrypted.get_wx_decrypted_db import all_decrypt, merge_copy_msg_db, merge_msg_db, merge_media_msg_db -from .analyse.parse import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, parse_xml_string -from .show_chat import app_show_chat, get_user_list, export +from .wx_info import BiasAddr,read_info, get_wechat_db,encrypt,batch_decrypt,decrypt +from .wx_info import merge_copy_db, merge_msg_db, merge_media_msg_db +from .analyzer.db_parsing import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, parse_xml_string +from .ui.view_chat import app_show_chat, get_user_list, export import os,json diff --git a/pywxdump/analyse/__init__.py b/pywxdump/analyzer/__init__.py similarity index 100% rename from pywxdump/analyse/__init__.py rename to pywxdump/analyzer/__init__.py diff --git a/pywxdump/analyse/analyser.py b/pywxdump/analyzer/chat_analysis.py similarity index 100% rename from pywxdump/analyse/analyser.py rename to pywxdump/analyzer/chat_analysis.py diff --git a/pywxdump/analyse/parse.py b/pywxdump/analyzer/db_parsing.py similarity index 100% rename from pywxdump/analyse/parse.py rename to pywxdump/analyzer/db_parsing.py diff --git a/pywxdump/analyzer/export_chat.py b/pywxdump/analyzer/export_chat.py new file mode 100644 index 0000000..199810b --- /dev/null +++ b/pywxdump/analyzer/export_chat.py @@ -0,0 +1,274 @@ +# -*- coding: utf-8 -*-# +# ------------------------------------------------------------------------------- +# Name: export_chat.py +# Description: +# Author: xaoyaoo +# Date: 2023/12/03 +# ------------------------------------------------------------------------------- +# -*- coding: utf-8 -*-# +# ------------------------------------------------------------------------------- +# Name: GUI.py +# Description: +# Author: xaoyaoo +# Date: 2023/11/10 +# ------------------------------------------------------------------------------- +import base64 +import sqlite3 +import os +import json +import time +from .utils import get_md5 +from .db_parsing import read_img_dat, decompress_CompressContent, read_audio, parse_xml_string + +from flask import Flask, request, render_template, g, Blueprint + + +def get_user_list(MSG_ALL_db_path, MicroMsg_db_path): + users = [] + # 连接 MSG_ALL.db 数据库,并执行查询 + db1 = sqlite3.connect(MSG_ALL_db_path) + cursor1 = db1.cursor() + cursor1.execute("SELECT StrTalker, COUNT(*) AS ChatCount FROM MSG GROUP BY StrTalker ORDER BY ChatCount DESC") + result = cursor1.fetchall() + + for row in result: + # 获取用户名、昵称、备注和聊天记录数量 + db2 = sqlite3.connect(MicroMsg_db_path) + cursor2 = db2.cursor() + cursor2.execute("SELECT UserName, NickName, Remark FROM Contact WHERE UserName=?", (row[0],)) + result2 = cursor2.fetchone() + if result2: + username, nickname, remark = result2 + chat_count = row[1] + + # 拼接四列数据为元组 + row_data = {"username": username, "nickname": nickname, "remark": remark, "chat_count": chat_count, + "isChatRoom": username.startswith("@chatroom")} + users.append(row_data) + cursor2.close() + db2.close() + cursor1.close() + db1.close() + return users + + +def load_base64_audio_data(MsgSvrID, MediaMSG_all_db_path): + wave_data = read_audio(MsgSvrID, is_wave=True, DB_PATH=MediaMSG_all_db_path) + if not wave_data: + return "" + video_base64 = base64.b64encode(wave_data).decode("utf-8") + video_data = f"data:audio/wav;base64,{video_base64}" + return video_data + + +def load_base64_img_data(start_time, end_time, username_md5, FileStorage_path): + """ + 获取图片的base64数据 + :param start_time: 开始时间戳 + :param end_time: 结束时间戳 + :param username_md5: 用户名的md5值 + :return: + """ + # 获取CreateTime的最大值日期 + min_time = time.strftime("%Y-%m", time.localtime(start_time)) + max_time = time.strftime("%Y-%m", time.localtime(end_time)) + img_path = os.path.join(FileStorage_path, "MsgAttach", username_md5, "Image") + if not os.path.exists(img_path): + return {} + # print(min_time, max_time, img_path) + paths = [] + for root, path, files in os.walk(img_path): + for p in path: + if p >= min_time and p <= max_time: + paths.append(os.path.join(root, p)) + # print(paths) + img_md5_data = {} + for path in paths: + for root, path, files in os.walk(path): + for file in files: + if file.endswith(".dat"): + file_path = os.path.join(root, file) + fomt, md5, out_bytes = read_img_dat(file_path) + out_bytes = base64.b64encode(out_bytes).decode("utf-8") + img_md5_data[md5] = f"data:{fomt};base64,{out_bytes}" + return img_md5_data + + +def load_chat_records(selected_talker, start_index, page_size, user_list, MSG_ALL_db_path, MediaMSG_all_db_path, + FileStorage_path): + username = user_list.get("username", "") + username_md5 = get_md5(username) + type_name_dict = { + 1: {0: "文本"}, + 3: {0: "图片"}, + 34: {0: "语音"}, + 43: {0: "视频"}, + 47: {0: "动画表情"}, + 49: {0: "文本", 1: "类似文字消息而不一样的消息", 5: "卡片式链接", 6: "文件", 8: "用户上传的 GIF 表情", + 19: "合并转发的聊天记录", 33: "分享的小程序", 36: "分享的小程序", 57: "带有引用的文本消息", + 63: "视频号直播或直播回放等", + 87: "群公告", 88: "视频号直播或直播回放等", 2000: "转账消息", 2003: "赠送红包封面"}, + 50: {0: "语音通话"}, + 10000: {0: "系统通知", 4: "拍一拍", 8000: "系统通知"} + } + + # 连接 MSG_ALL.db 数据库,并执行查询 + db1 = sqlite3.connect(MSG_ALL_db_path) + cursor1 = db1.cursor() + + cursor1.execute( + "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent FROM MSG WHERE StrTalker=? ORDER BY CreateTime ASC LIMIT ?,?", + (selected_talker, start_index, page_size)) + result1 = cursor1.fetchall() + + cursor1.close() + db1.close() + + img_md5_data = load_base64_img_data(result1[0][7], result1[-1][7], username_md5, FileStorage_path) # 获取图片的base64数据 + + data = [] + for row in result1: + localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent = row + CreateTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(CreateTime)) + + type_name = type_name_dict.get(Type, {}).get(SubType, "未知") + + content = {"src": "", "msg": "", "style": ""} + + if Type == 47 and SubType == 0: # 动画表情 + content_tmp = parse_xml_string(StrContent) + cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "") + # md5 = content_tmp.get("emoji", {}).get("md5", "") + if cdnurl: + content = {"src": cdnurl, "msg": "表情", "style": "width: 100px; height: 100px;"} + + elif Type == 49 and SubType == 57: # 带有引用的文本消息 + CompressContent = CompressContent.rsplit(b'\x00', 1)[0] + content["msg"] = decompress_CompressContent(CompressContent) + try: + content["msg"] = content["msg"].decode("utf-8") + content["msg"] = parse_xml_string(content["msg"]) + content["msg"] = json.dumps(content["msg"], ensure_ascii=False) + except Exception as e: + content["msg"] = "[带有引用的文本消息]解析失败" + elif Type == 34 and SubType == 0: # 语音 + tmp_c = parse_xml_string(StrContent) + voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "") + transtext = tmp_c.get("voicetrans", {}).get("transtext", "") + if voicelength.isdigit(): + voicelength = int(voicelength) / 1000 + voicelength = f"{voicelength:.2f}" + content["msg"] = f"语音时长:{voicelength}秒\n翻译结果:{transtext}" + + src = load_base64_audio_data(MsgSvrID, MediaMSG_all_db_path=MediaMSG_all_db_path) + content["src"] = src + elif Type == 3 and SubType == 0: # 图片 + xml_content = parse_xml_string(StrContent) + md5 = xml_content.get("img", {}).get("md5", "") + if md5: + content["src"] = img_md5_data.get(md5, "") + else: + content["src"] = "" + content["msg"] = "图片" + + else: + content["msg"] = StrContent + + row_data = {"MsgSvrID": MsgSvrID, "type_name": type_name, "is_sender": IsSender, + "content": content, "CreateTime": CreateTime} + data.append(row_data) + return data + + +def export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path, page_size=500): + name_save = user.get("remark", user.get("nickname", user.get("username", ""))) + username = user.get("username", "") + + chatCount = user.get("chat_count", 0) + if chatCount == 0: + return False, "没有聊天记录" + + for i in range(0, chatCount, page_size): + start_index = i + data = load_chat_records(username, start_index, page_size, user, MSG_ALL_db_path, MediaMSG_all_db_path, + FileStorage_path) + if len(data) == 0: + break + save_path = os.path.join(outpath, f"{name_save}_{int(i / page_size)}.html") + with open(save_path, "w", encoding="utf-8") as f: + f.write(render_template("chat.html", msgs=data)) + return True, f"导出成功{outpath}" + + +def export(username, outpath, MSG_ALL_db_path, MicroMsg_db_path, MediaMSG_all_db_path, FileStorage_path): + if not os.path.exists(outpath): + outpath = os.path.join(os.getcwd(), "export" + os.sep + username) + if not os.path.exists(outpath): + os.makedirs(outpath) + + USER_LIST = get_user_list(MSG_ALL_db_path, MicroMsg_db_path) + user = list(filter(lambda x: x["username"] == username, USER_LIST)) + + if username and len(user) > 0: + user = user[0] + return export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path) + + +app_show_chat = Blueprint('show_chat_main', __name__, template_folder='templates') +app_show_chat.debug = False + + +# 主页 - 显示用户列表 +@app_show_chat.route('/') +def index(): + g.USER_LIST = get_user_list(g.MSG_ALL_db_path, g.MicroMsg_db_path) + return render_template("index.html", users=g.USER_LIST) + + +# 获取聊天记录 +@app_show_chat.route('/get_chat_data', methods=["GET", 'POST']) +def get_chat_data(): + username = request.args.get("username", "") + user = list(filter(lambda x: x["username"] == username, g.USER_LIST)) + + if username and len(user) > 0: + user = user[0] + + limit = int(request.args.get("limit", 100)) # 每页显示的条数 + page = int(request.args.get("page", user.get("chat_count", limit) / limit)) # 当前页数 + + start_index = (page - 1) * limit + page_size = limit + + data = load_chat_records(username, start_index, page_size, user, g.MSG_ALL_db_path, g.MediaMSG_all_db_path, + g.FileStorage_path) + return render_template("chat.html", msgs=data) + else: + return "error" + + +# 聊天记录导出为html +@app_show_chat.route('/export_chat_data', methods=["GET", 'POST']) +def get_export(): + username = request.args.get("username", "") + + user = list(filter(lambda x: x["username"] == username, g.USER_LIST)) + + if username and len(user) > 0: + user = user[0] + n = f"{user.get('username', '')}_{user.get('nickname', '')}_{user.get('remark', '')}" + outpath = os.path.join(os.getcwd(), "export" + os.sep + n) + if not os.path.exists(outpath): + os.makedirs(outpath) + + ret = export_html(user, outpath, g.MSG_ALL_db_path, g.MediaMSG_all_db_path, g.FileStorage_path, page_size=200) + if ret[0]: + return ret[1] + else: + return ret[1] + else: + return "error" + + +if __name__ == '__main__': + pass diff --git a/pywxdump/analyzer/img.png b/pywxdump/analyzer/img.png new file mode 100644 index 0000000..05cfeb7 Binary files /dev/null and b/pywxdump/analyzer/img.png differ diff --git a/pywxdump/analyzer/utils.py b/pywxdump/analyzer/utils.py new file mode 100644 index 0000000..acd334b --- /dev/null +++ b/pywxdump/analyzer/utils.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*-# +# ------------------------------------------------------------------------------- +# Name: utils.py +# Description: +# Author: xaoyaoo +# Date: 2023/12/03 +# ------------------------------------------------------------------------------- +import hashlib + + +def get_md5(data): + """ + 获取数据的 MD5 值 + :param data: 数据(bytes) + :return: + """ + md5 = hashlib.md5() + md5.update(data) + return md5.hexdigest() + +if __name__ == '__main__': + pass diff --git a/pywxdump/command.py b/pywxdump/cli.py similarity index 100% rename from pywxdump/command.py rename to pywxdump/cli.py diff --git a/pywxdump/decrypted/__init__.py b/pywxdump/decrypted/__init__.py deleted file mode 100644 index 6a3db0f..0000000 --- a/pywxdump/decrypted/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: __init__.py.py -# Description: -# Author: xaoyaoo -# Date: 2023/08/21 -# ------------------------------------------------------------------------------- -from .decrypt import batch_decrypt, encrypt -from .get_wx_decrypted_db import all_decrypt, merge_copy_msg_db, merge_msg_db, merge_media_msg_db \ No newline at end of file diff --git a/pywxdump/bias_addr/__init__.py b/pywxdump/ui/__init__.py similarity index 74% rename from pywxdump/bias_addr/__init__.py rename to pywxdump/ui/__init__.py index 5c0359f..9874cd3 100644 --- a/pywxdump/bias_addr/__init__.py +++ b/pywxdump/ui/__init__.py @@ -1,8 +1,11 @@ # -*- coding: utf-8 -*-# # ------------------------------------------------------------------------------- # Name: __init__.py.py -# Description: +# Description: # Author: xaoyaoo -# Date: 2023/10/14 +# Date: 2023/12/03 # ------------------------------------------------------------------------------- -from .get_bias_addr import BiasAddr \ No newline at end of file + + +if __name__ == '__main__': + pass diff --git a/pywxdump/show_chat/__init__.py b/pywxdump/ui/view_chat/__init__.py similarity index 100% rename from pywxdump/show_chat/__init__.py rename to pywxdump/ui/view_chat/__init__.py diff --git a/pywxdump/show_chat/main_window.py b/pywxdump/ui/view_chat/main_window.py similarity index 100% rename from pywxdump/show_chat/main_window.py rename to pywxdump/ui/view_chat/main_window.py diff --git a/pywxdump/show_chat/templates/chat.html b/pywxdump/ui/view_chat/templates/chat.html similarity index 100% rename from pywxdump/show_chat/templates/chat.html rename to pywxdump/ui/view_chat/templates/chat.html diff --git a/pywxdump/show_chat/templates/index.html b/pywxdump/ui/view_chat/templates/index.html similarity index 100% rename from pywxdump/show_chat/templates/index.html rename to pywxdump/ui/view_chat/templates/index.html diff --git a/pywxdump/wx_info/__init__.py b/pywxdump/wx_info/__init__.py index 2e9671c..e910160 100644 --- a/pywxdump/wx_info/__init__.py +++ b/pywxdump/wx_info/__init__.py @@ -5,5 +5,7 @@ # Author: xaoyaoo # Date: 2023/08/21 # ------------------------------------------------------------------------------- -from .get_wx_info import read_info -from .get_wx_db import get_wechat_db \ No newline at end of file +from .get_wx_info import read_info, get_wechat_db +from .get_bias_addr import BiasAddr +from .decryption import batch_decrypt, encrypt, decrypt +from .merge_db import merge_msg_db, merge_copy_db, merge_media_msg_db diff --git a/pywxdump/decrypted/decrypt.py b/pywxdump/wx_info/decryption.py similarity index 99% rename from pywxdump/decrypted/decrypt.py rename to pywxdump/wx_info/decryption.py index 1e1d820..f0689c0 100644 --- a/pywxdump/decrypted/decrypt.py +++ b/pywxdump/wx_info/decryption.py @@ -12,7 +12,6 @@ # 为了保证数据部分长度是16字节即AES块大小的整倍数,每一页的末尾将填充一段空字节,使得保留字段的长度为48字节。 # 综上,加密文件结构为第一页4KB数据前16字节为盐值,紧接着4032字节数据,再加上16字节IV和20字节HMAC以及12字节空字节;而后的页均是4048字节长度的加密数据段和48字节的保留段。 # ------------------------------------------------------------------------------- - import argparse import hmac import hashlib diff --git a/pywxdump/bias_addr/get_bias_addr.py b/pywxdump/wx_info/get_bias_addr.py similarity index 52% rename from pywxdump/bias_addr/get_bias_addr.py rename to pywxdump/wx_info/get_bias_addr.py index 3a070f9..6e53120 100644 --- a/pywxdump/bias_addr/get_bias_addr.py +++ b/pywxdump/wx_info/get_bias_addr.py @@ -276,238 +276,4 @@ class BiasAddr: elif logging_path: print("{版本号:昵称,账号,手机号,邮箱,KEY}") print(rdata) - return rdata - - -# class BiasAddr: -# def __init__(self, account, mobile, name, key, db_path): -# self.account = account.encode("utf-8") -# self.mobile = mobile.encode("utf-8") -# self.name = name.encode("utf-8") -# self.key = bytes.fromhex(key) if key else b"" -# self.db_path = db_path if db_path else "" -# -# self.process_name = "WeChat.exe" -# self.module_name = "WeChatWin.dll" -# -# self.pm = Pymem("WeChat.exe") -# -# self.bits = self.get_osbits() -# self.version = self.get_file_version(self.process_name) -# self.address_len = self.get_addr_len() -# -# self.islogin = True -# -# def get_addr_len(self): -# version_nums = list(map(int, self.version.split("."))) # 将版本号拆分为数字列表 -# if version_nums[0] <= 3 and version_nums[1] <= 9 and version_nums[2] <= 2: -# return 4 -# else: -# return 8 -# -# def find_all(self, c: bytes, string: bytes, base_addr=0): -# """ -# 查找字符串中所有子串的位置 -# :param c: 子串 b'123' -# :param string: 字符串 b'123456789123' -# :return: -# """ -# return [base_addr + m.start() for m in re.finditer(re.escape(c), string)] -# -# def get_file_version(self, process_name): -# for process in psutil.process_iter(['pid', 'name', 'exe']): -# if process.name() == process_name: -# file_version = Dispatch("Scripting.FileSystemObject").GetFileVersion(process.exe()) -# return file_version -# self.islogin = False -# -# def get_osbits(self): -# return int(platform.architecture()[0][:-3]) -# -# def search_memory_value(self, value: bytes, module_name="WeChatWin.dll"): -# # 创建 Pymem 对象 -# pm = self.pm -# module = pymem.process.module_from_name(pm.process_handle, module_name) -# -# # result = pymem.pattern.pattern_scan_module(pm.process_handle, module, value, return_multiple=True) -# # result = result[-1]-module.lpBaseOfDll if len(result) > 0 else 0 -# mem_data = pm.read_bytes(module.lpBaseOfDll, module.SizeOfImage) -# result = self.find_all(value, mem_data) -# result = result[-1] if len(result) > 0 else 0 -# return result -# -# def search_key(self, key: bytes): -# byteLen = self.address_len # if self.bits == 32 else 8 # 4字节或8字节 -# key = re.escape(key) # 转义特殊字符 -# key_addr = self.pm.pattern_scan_all(key, return_multiple=True)[-1] if len(key) > 0 else 0 -# key = key_addr.to_bytes(byteLen, byteorder='little', signed=True) -# result = self.search_memory_value(key, self.module_name) -# return result -# -# def get_key_bias_test(self): -# byteLen = self.address_len # 4 if self.bits == 32 else 8 # 4字节或8字节 -# keyLenOffset = 0x8c if self.bits == 32 else 0xd0 -# keyWindllOffset = 0x90 if self.bits == 32 else 0xd8 -# -# pm = self.pm -# -# module = pymem.process.module_from_name(pm.process_handle, "WeChatWin.dll") -# keyBytes = b'-----BEGIN PUBLIC KEY-----\n...' -# publicKeyList = pymem.pattern.pattern_scan_all(self.pm.process_handle, keyBytes, return_multiple=True) -# -# keyaddrs = [] -# for addr in publicKeyList: -# keyBytes = addr.to_bytes(byteLen, byteorder="little", signed=True) # 低位在前 -# addrs = pymem.pattern.pattern_scan_module(pm.process_handle, module, keyBytes, return_multiple=True) -# if addrs != 0: -# keyaddrs += addrs -# -# keyWinAddr = 0 -# for addr in keyaddrs: -# keyLen = pm.read_uchar(addr - keyLenOffset) -# if keyLen != 32: -# continue -# keyWinAddr = addr - keyWindllOffset -# # keyaddr = int.from_bytes(pm.read_bytes(keyWinAddr, byteLen), byteorder='little') -# # key = pm.read_bytes(keyaddr, 32) -# # print("key", key.hex()) -# -# return keyWinAddr - module.lpBaseOfDll -# -# def get_key_bias(self, wx_db_path, account_bias=0): -# wx_db_path = os.path.join(wx_db_path, "Msg", "MicroMsg.db") -# if not os.path.exists(wx_db_path): -# return 0 -# -# def get_maybe_key(mem_data): -# maybe_key = [] -# for i in range(0, len(mem_data), self.address_len): -# addr = mem_data[i:i + self.address_len] -# addr = int.from_bytes(addr, byteorder='little') -# # 去掉不可能的地址 -# if min_addr < addr < max_addr: -# key = read_key(addr) -# if key == b"": -# continue -# maybe_key.append([key, i]) -# return maybe_key -# -# def read_key(addr): -# key = ctypes.create_string_buffer(35) -# if ReadProcessMemory(pm.process_handle, void_p(addr - 1), key, 35, 0) == 0: -# return b"" -# -# if b"\x00\x00" in key.raw[1:33]: -# return b"" -# -# if b"\x00\x00" == key.raw[33:35] and b"\x90" == key.raw[0:1]: -# return key.raw[1:33] -# return b"" -# -# def verify_key(keys, wx_db_path): -# with open(wx_db_path, "rb") as file: -# blist = file.read(5000) -# salt = blist[:16] -# first = blist[16:DEFAULT_PAGESIZE] -# mac_salt = bytes([(salt[i] ^ 58) for i in range(16)]) -# -# with multiprocessing.Pool(processes=8) as pool: -# results = [pool.apply_async(validate_key, args=(key, salt, first, mac_salt)) for key, i in keys[-1::-1]] -# results = [p.get() for p in results] -# for i, result in enumerate(results[-1::-1]): -# if result: -# return keys[i] -# return b"", 0 -# -# module_name = "WeChatWin.dll" -# pm = self.pm -# module = pymem.process.module_from_name(pm.process_handle, module_name) -# start_addr = module.lpBaseOfDll -# size = module.SizeOfImage -# -# if account_bias > 1: -# maybe_key = [] -# for i in [0x24, 0x40]: -# addr = start_addr + account_bias - i -# mem_data = pm.read_bytes(addr, self.address_len) -# key = read_key(int.from_bytes(mem_data, byteorder='little')) -# if key != b"": -# maybe_key.append([key, addr - start_addr]) -# key, bais = verify_key(maybe_key, wx_db_path) -# if bais != 0: -# return bais -# -# min_addr = 0xffffffffffffffffffffffff -# max_addr = 0 -# for module1 in pm.list_modules(): -# if module1.lpBaseOfDll < min_addr: -# min_addr = module1.lpBaseOfDll -# if module1.lpBaseOfDll > max_addr: -# max_addr = module1.lpBaseOfDll + module1.SizeOfImage -# -# mem_data = pm.read_bytes(start_addr, size) -# maybe_key = get_maybe_key(mem_data) -# key, bais = verify_key(maybe_key, wx_db_path) -# return bais -# -# def run(self, is_logging=False, version_list_path=None): -# self.version = self.get_file_version(self.process_name) -# if not self.islogin: -# error = "[-] WeChat No Run" -# if is_logging: print(error) -# return error -# mobile_bias = self.search_memory_value(self.mobile) -# name_bias = self.search_memory_value(self.name) -# account_bias = self.search_memory_value(self.account) -# # version_bias = self.search_memory_value(self.version.encode("utf-8")) -# -# try: -# key_bias = self.get_key_bias_test() -# except: -# key_bias = 0 -# -# if key_bias <= 0: -# if self.key: -# key_bias = self.search_key(self.key) -# elif self.db_path: -# key_bias = self.get_key_bias(self.db_path, account_bias) -# else: -# key_bias = 0 -# rdata = {self.version: [name_bias, account_bias, mobile_bias, 0, key_bias]} -# if version_list_path and os.path.exists(version_list_path): -# with open(version_list_path, "r", encoding="utf-8") as f: -# data = json.load(f) -# data.update(rdata) -# with open(version_list_path, "w", encoding="utf-8") as f: -# json.dump(data, f, ensure_ascii=False, indent=4) -# if is_logging: -# print("{版本号:昵称,账号,手机号,邮箱,KEY}") -# print(rdata) -# return rdata - - -if __name__ == '__main__': - # 创建命令行参数解析器 - parser = argparse.ArgumentParser() - parser.add_argument("--mobile", type=str, help="手机号", required=True) - parser.add_argument("--name", type=str, help="微信昵称", required=True) - parser.add_argument("--account", type=str, help="微信账号", required=True) - parser.add_argument("--key", type=str, help="(可选)密钥") - parser.add_argument("--db_path", type=str, help="(可选)已登录账号的微信文件夹路径") - - # 解析命令行参数 - args = parser.parse_args() - - # 检查是否缺少必要参数,并抛出错误 - if not args.mobile or not args.name or not args.account: - raise ValueError("缺少必要的命令行参数!请提供手机号、微信昵称、微信账号。") - - # 从命令行参数获取值 - mobile = args.mobile - name = args.name - account = args.account - key = args.key - db_path = args.db_path - - # 调用 run 函数,并传入参数 - rdata = BiasAddr(account, mobile, name, key, db_path).run(True, "../version_list.json") + return rdata \ No newline at end of file diff --git a/pywxdump/wx_info/get_wx_db.py b/pywxdump/wx_info/get_wx_db.py deleted file mode 100644 index e30798b..0000000 --- a/pywxdump/wx_info/get_wx_db.py +++ /dev/null @@ -1,101 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: get_wx_db.py -# Description: -# Author: xaoyaoo -# Date: 2023/10/14 -# ------------------------------------------------------------------------------- -import os -import re -import winreg -from typing import List, Union - - -def get_wechat_db(require_list: Union[List[str], str] = "all", msg_dir: str = None, wxid: Union[List[str], str] = None, - is_logging: bool = False): - if not msg_dir: - try: - key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Tencent\WeChat", 0, winreg.KEY_READ) - value, _ = winreg.QueryValueEx(key, "FileSavePath") - winreg.CloseKey(key) - w_dir = value - except Exception as e: - # 获取文档实际目录 - try: - # 打开注册表路径 - key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, - r"Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders") - documents_path = winreg.QueryValueEx(key, "Personal")[0] # 读取文档实际目录路径 - winreg.CloseKey(key) # 关闭注册表 - documents_paths = os.path.split(documents_path) - if "%" in documents_paths[0]: - w_dir = os.environ.get(documents_paths[0].replace("%", "")) - w_dir = os.path.join(w_dir, os.path.join(*documents_paths[1:])) - else: - w_dir = documents_path - except Exception as e: - profile = os.path.expanduser("~") - w_dir = os.path.join(profile, "Documents") - msg_dir = os.path.join(w_dir, "WeChat Files") - - if not os.path.exists(msg_dir): - error = f"[-] 目录不存在: {msg_dir}" - if is_logging: print(error) - return error - - user_dirs = {} # wx用户目录 - files = os.listdir(msg_dir) - if wxid: # 如果指定wxid - if isinstance(wxid, str): - wxid = wxid.split(";") - for file_name in files: - if file_name in wxid: - user_dirs[os.path.join(msg_dir, file_name)] = os.path.join(msg_dir, file_name) - else: # 如果未指定wxid - for file_name in files: - if file_name == "All Users" or file_name == "Applet" or file_name == "WMPF": - continue - user_dirs[os.path.join(msg_dir, file_name)] = os.path.join(msg_dir, file_name) - - if isinstance(require_list, str): - require_list = require_list.split(";") - - # generate pattern - if "all" in require_list: - pattern = {"all": re.compile(r".*\.db$")} - elif isinstance(require_list, list): - pattern = {} - for require in require_list: - pattern[require] = re.compile(r"%s.*\.db$" % require) - else: - error = f"[-] 参数错误: {require_list}" - if is_logging: print(error) - return error - - # 获取数据库路径 - for user, user_dir in user_dirs.items(): # 遍历用户目录 - user_dirs[user] = {n: [] for n in pattern.keys()} - for root, dirs, files in os.walk(user_dir): - for file_name in files: - for n, p in pattern.items(): - if p.match(file_name): - src_path = os.path.join(root, file_name) - user_dirs[user][n].append(src_path) - - if is_logging: - for user, user_dir in user_dirs.items(): - print(f"[+] user_path: {user}") - for n, paths in user_dir.items(): - print(f" {n}:") - for path in paths: - print(f" {path.replace(user, '')}") - print("-" * 32) - print(f"[+] 共 {len(user_dirs)} 个微信账号") - - return user_dirs - - -if __name__ == '__main__': - require_list = ["MediaMSG", "MicroMsg", "FTSMSG", "MSG", "Sns", "Emotion"] - # require_list = "all" - user_dirs = get_wechat_db(require_list, is_logging=True) diff --git a/pywxdump/wx_info/get_wx_info.py b/pywxdump/wx_info/get_wx_info.py index 4c723bd..49b9316 100644 --- a/pywxdump/wx_info/get_wx_info.py +++ b/pywxdump/wx_info/get_wx_info.py @@ -13,6 +13,7 @@ import pymem from win32com.client import Dispatch import psutil import sys +from typing import List, Union ReadProcessMemory = ctypes.windll.kernel32.ReadProcessMemory void_p = ctypes.c_void_p @@ -181,28 +182,63 @@ def read_info(version_list, is_logging=False): return result +def get_wechat_db(require_list: Union[List[str], str] = "all", msg_dir: str = None, wxid: Union[List[str], str] = None, + is_logging: bool = False): + if not msg_dir: + msg_dir = get_info_filePath(wxid="all") -if __name__ == "__main__": - import argparse + if not os.path.exists(msg_dir): + error = f"[-] 目录不存在: {msg_dir}" + if is_logging: print(error) + return error - parser = argparse.ArgumentParser() - parser.add_argument("--vlfile", type=str, help="手机号", required=False) - parser.add_argument("--vldict", type=str, help="微信昵称", required=False) + user_dirs = {} # wx用户目录 + files = os.listdir(msg_dir) + if wxid: # 如果指定wxid + if isinstance(wxid, str): + wxid = wxid.split(";") + for file_name in files: + if file_name in wxid: + user_dirs[os.path.join(msg_dir, file_name)] = os.path.join(msg_dir, file_name) + else: # 如果未指定wxid + for file_name in files: + if file_name == "All Users" or file_name == "Applet" or file_name == "WMPF": + continue + user_dirs[os.path.join(msg_dir, file_name)] = os.path.join(msg_dir, file_name) - args = parser.parse_args() + if isinstance(require_list, str): + require_list = require_list.split(";") - # 读取微信各版本偏移 - if args.vlfile: - VERSION_LIST_PATH = args.vlfile - with open(VERSION_LIST_PATH, "r", encoding="utf-8") as f: - VERSION_LIST = json.load(f) - if args.vldict: - VERSION_LIST = json.loads(args.vldict) + # generate pattern + if "all" in require_list: + pattern = {"all": re.compile(r".*\.db$")} + elif isinstance(require_list, list): + pattern = {} + for require in require_list: + pattern[require] = re.compile(r"%s.*\.db$" % require) + else: + error = f"[-] 参数错误: {require_list}" + if is_logging: print(error) + return error - if not args.vlfile and not args.vldict: - VERSION_LIST_PATH = "../version_list.json" + # 获取数据库路径 + for user, user_dir in user_dirs.items(): # 遍历用户目录 + user_dirs[user] = {n: [] for n in pattern.keys()} + for root, dirs, files in os.walk(user_dir): + for file_name in files: + for n, p in pattern.items(): + if p.match(file_name): + src_path = os.path.join(root, file_name) + user_dirs[user][n].append(src_path) - with open(VERSION_LIST_PATH, "r", encoding="utf-8") as f: - VERSION_LIST = json.load(f) + if is_logging: + for user, user_dir in user_dirs.items(): + print(f"[+] user_path: {user}") + for n, paths in user_dir.items(): + print(f" {n}:") + for path in paths: + print(f" {path.replace(user, '')}") + print("-" * 32) + print(f"[+] 共 {len(user_dirs)} 个微信账号") - result = read_info(VERSION_LIST, True) # 读取微信信息 + return user_dirs \ No newline at end of file diff --git a/pywxdump/decrypted/get_wx_decrypted_db.py b/pywxdump/wx_info/merge_db.py similarity index 52% rename from pywxdump/decrypted/get_wx_decrypted_db.py rename to pywxdump/wx_info/merge_db.py index e77e321..30da3cd 100644 --- a/pywxdump/decrypted/get_wx_decrypted_db.py +++ b/pywxdump/wx_info/merge_db.py @@ -1,110 +1,16 @@ # -*- coding: utf-8 -*-# # ------------------------------------------------------------------------------- -# Name: get_wx_decrypted_db.py +# Name: merge_db.py # Description: # Author: xaoyaoo -# Date: 2023/08/25 +# Date: 2023/12/03 # ------------------------------------------------------------------------------- -import argparse import os -import re import shutil import sqlite3 -# import sys -import winreg - -# sys.path.append(os.path.dirname(os.path.abspath(__file__))) -try: - from decrypted.decrypt import decrypt -except ImportError: - from .decrypt import decrypt - -# 开始获取微信数据库 -def get_wechat_db(): - try: - key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Tencent\WeChat", 0, winreg.KEY_READ) - value, _ = winreg.QueryValueEx(key, "FileSavePath") - winreg.CloseKey(key) - w_dir = value - except Exception as e: - try: - w_dir = "MyDocument:" - except Exception as e: - print("读取注册表错误:", str(e)) - return str(e) - - if w_dir == "MyDocument:": - profile = os.path.expanduser("~") - msg_dir = os.path.join(profile, "Documents", "WeChat Files") - else: - msg_dir = os.path.join(w_dir, "WeChat Files") - if not os.path.exists(msg_dir): - return FileNotFoundError("目录不存在") - user_dirs = {} # wx用户目录 - files = os.listdir(msg_dir) - for file_name in files: - if file_name == "All Users" or file_name == "Applet" or file_name == "WMPF": - continue - user_dirs[file_name] = os.path.join(msg_dir, file_name) - - # 获取数据库路径 - for user, user_dir in user_dirs.items(): - Media_p = [] - Micro_p = [] - FTS_p = [] - Sns_p = [] - Msg = [] - Emotion_p = [] - for root, dirs, files in os.walk(user_dir): - for file_name in files: - if re.match(r".*MediaMSG.*\.db$", file_name): - src_path = os.path.join(root, file_name) - Media_p.append(src_path) - elif re.match(r".*MicroMsg.*\.db$", file_name): - src_path = os.path.join(root, file_name) - Micro_p.append(src_path) - elif re.match(r".*FTSMSG.*\.db$", file_name): - src_path = os.path.join(root, file_name) - FTS_p.append(src_path) - elif re.match(r".*MSG.*\.db$", file_name): - src_path = os.path.join(root, file_name) - Msg.append(src_path) - elif re.match(r".*Sns.*\.db$", file_name): - src_path = os.path.join(root, file_name) - Sns_p.append(src_path) - elif re.match(r".*Emotion.*\.db$", file_name): - src_path = os.path.join(root, file_name) - Emotion_p.append(src_path) - Media_p.sort() - Msg.sort() - Micro_p.sort() - # FTS_p.sort() - user_dirs[user] = {"MicroMsg": Micro_p, "Msg": Msg, "MediaMSG": Media_p, "Sns": Sns_p, "Emotion": Emotion_p} - return user_dirs - - -# 解密所有数据库 paths(文件) 到 decrypted_path(目录) -def all_decrypt(keys, paths, decrypted_path): - decrypted_paths = [] - - for key in keys: - for path in paths: - - name = os.path.basename(path) # 文件名 - dtp = os.path.join(decrypted_path, name) # 解密后的路径 - if not decrypt(key, path, dtp): - break - decrypted_paths.append(dtp) - else: # for循环正常结束,没有break - break # 跳出while循环 - else: - return False # while循环正常结束,没有break 解密失败 - return decrypted_paths - - -def merge_copy_msg_db(db_path, save_path): +def merge_copy_db(db_path, save_path): if isinstance(db_path, list) and len(db_path) == 1: db_path = db_path[0] if not os.path.exists(db_path): @@ -112,7 +18,7 @@ def merge_copy_msg_db(db_path, save_path): shutil.move(db_path, save_path) -# 合并相同名称的数据库 +# 合并相同名称的数据库 MSG0-MSG9.db def merge_msg_db(db_path: list, save_path: str, CreateTime: int = 0): # CreateTime: 从这个时间开始的消息 10位时间戳 merged_conn = sqlite3.connect(save_path) @@ -252,64 +158,3 @@ def merge_media_msg_db(db_path: list, save_path: str): merged_conn.close() return save_path - - -if __name__ == '__main__': - # 创建命令行参数解析器 - parser = argparse.ArgumentParser() - parser.add_argument("-k", "--key", help="解密密钥", nargs="+", required=True) - - # 解析命令行参数 - args = parser.parse_args() - - # 检查是否缺少必要参数,并抛出错误 - if not args.key: - raise ValueError("缺少必要的命令行参数!请提供密钥。") - - # 从命令行参数获取值 - keys = args.key - - decrypted_ROOT = os.path.join(os.getcwd(), "decrypted") - - if keys is None: - print("keys is None") - exit(0) - if isinstance(keys, str): - keys = [keys] - - user_dirs = get_wechat_db() - for user, db_path in user_dirs.items(): # 遍历用户 - MicroMsgPaths = db_path["MicroMsg"] - MsgPaths = db_path["Msg"] - MediaMSGPaths = db_path["MediaMSG"] - # FTSMSGPaths = db_path["FTSMSG"] - SnsPaths = db_path["Sns"] - EmotionPaths = db_path["Emotion"] - - decrypted_path_tmp = os.path.join(decrypted_ROOT, user, "tmp") # 解密后的目录 - if not os.path.exists(decrypted_path_tmp): - os.makedirs(decrypted_path_tmp) - - MicroMsgDecryptPaths = all_decrypt(keys, MicroMsgPaths, decrypted_path_tmp) - MsgDecryptPaths = all_decrypt(keys, MsgPaths, decrypted_path_tmp) - MediaMSGDecryptPaths = all_decrypt(keys, MediaMSGPaths, decrypted_path_tmp) - SnsDecryptPaths = all_decrypt(keys, SnsPaths, decrypted_path_tmp) - EmotionDecryptPaths = all_decrypt(keys, EmotionPaths, decrypted_path_tmp) - - # 合并数据库 - decrypted_path = os.path.join(decrypted_ROOT, user) # 解密后的目录 - - MicroMsgDbPath = os.path.join(decrypted_path, "MicroMsg.db") - MsgDbPath = os.path.join(decrypted_path, "MSG_all.db") - MediaMSGDbPath = os.path.join(decrypted_path, "MediaMSG_all.db") - SnsDbPath = os.path.join(decrypted_path, "Sns_all.db") - EmmotionDbPath = os.path.join(decrypted_path, "Emotion_all.db") - - merge_copy_msg_db(MicroMsgDecryptPaths, MicroMsgDbPath) - merge_msg_db(MsgDecryptPaths, MsgDbPath, 0) - merge_media_msg_db(MediaMSGDecryptPaths, MediaMSGDbPath) - merge_copy_msg_db(SnsDecryptPaths, SnsDbPath) - merge_copy_msg_db(EmotionDecryptPaths, EmmotionDbPath) - - shutil.rmtree(decrypted_path_tmp) # 删除临时文件 - print(f"解密完成:{user}, {decrypted_path}")