diff --git a/build/lib/pywxdump/__init__.py b/build/lib/pywxdump/__init__.py deleted file mode 100644 index 903a5cf..0000000 --- a/build/lib/pywxdump/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: __init__.py.py -# Description: -# Author: xaoyaoo -# Date: 2023/10/14 -# ------------------------------------------------------------------------------- -from .bias_addr.get_bias_addr import BiasAddr -from .wx_info.get_wx_info import read_info -from .wx_info.get_wx_db import get_wechat_db -from .decrypted.decrypt import batch_decrypt, decrypt,encrypt -from .decrypted.get_wx_decrypted_db import all_decrypt, merge_copy_msg_db, merge_msg_db, merge_media_msg_db -from .analyse.parse import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, parse_xml_string -from .show_chat import app_show_chat, get_user_list, export - -import os,json - -VERSION_LIST_PATH = os.path.join(os.path.dirname(__file__), "version_list.json") -with open(VERSION_LIST_PATH, "r", encoding="utf-8") as f: - VERSION_LIST = json.load(f) diff --git a/build/lib/pywxdump/analyse/__init__.py b/build/lib/pywxdump/analyse/__init__.py deleted file mode 100644 index 790c6b4..0000000 --- a/build/lib/pywxdump/analyse/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: __init__.py.py -# Description: -# Author: xaoyaoo -# Date: 2023/09/27 -# ------------------------------------------------------------------------------- -from .parse import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, parse_xml_string diff --git a/build/lib/pywxdump/analyse/analyser.py b/build/lib/pywxdump/analyse/analyser.py deleted file mode 100644 index 93838e6..0000000 --- a/build/lib/pywxdump/analyse/analyser.py +++ /dev/null @@ -1,370 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: analyser.py -# Description: -# Author: xaoyaoo -# Date: 2023/12/01 -# ------------------------------------------------------------------------------- -import sqlite3 -import time -from collections import Counter -import pandas as pd - -from pywxdump.analyse import parse_xml_string - - -def read_msgs(MSG_path, selected_talker=None, start_time=time.time() * 3600 * 24 * 365, end_time=time.time()): - """ - 读取消息内容-MSG.db 包含IsSender,StrContent,StrTalker,ype,SubType,CreateTime,MsgSvrID - :param MSG_path: MSG.db 路径 - :param selected_talker: 选中的聊天对象 - :param start_time: 开始时间 时间戳10位 - :param end_time: 结束时间 时间戳10位 - :return: - """ - type_name_dict = { - 1: {0: "文本"}, - 3: {0: "图片"}, - 34: {0: "语音"}, - 43: {0: "视频"}, - 47: {0: "动画表情"}, - 49: {0: "文本", 1: "类文本消息", 5: "卡片式链接", 6: "文件", 8: "上传的GIF表情", - 19: "合并转发聊天记录", 33: "分享的小程序", 36: "分享的小程序", 57: "带有引用的文本", - 63: "视频号直播或回放等", - 87: "群公告", 88: "视频号直播或回放等", 2000: "转账消息", 2003: "红包封面"}, - 50: {0: "语音通话"}, - 10000: {0: "系统通知", 4: "拍一拍", 8000: "系统通知"} - } - - # 连接 MSG_ALL.db 数据库,并执行查询 - db1 = sqlite3.connect(MSG_path) - cursor1 = db1.cursor() - - if isinstance(start_time, str): - start_time = time.mktime(time.strptime(start_time, "%Y-%m-%d %H:%M:%S")) - if isinstance(end_time, str): - end_time = time.mktime(time.strptime(end_time, "%Y-%m-%d %H:%M:%S")) - - if selected_talker is None or selected_talker == "": # 如果 selected_talker 为 None,则查询全部对话 - cursor1.execute( - "SELECT MsgSvrID,IsSender, StrContent, StrTalker, Type, SubType,CreateTime FROM MSG WHERE CreateTime>=? AND CreateTime<=? ORDER BY CreateTime ASC", - (start_time, end_time)) - else: - cursor1.execute( - "SELECT MsgSvrID,IsSender, StrContent, StrTalker, Type, SubType,CreateTime FROM MSG WHERE StrTalker=? AND CreateTime>=? AND CreateTime<=? ORDER BY CreateTime ASC", - (selected_talker, start_time, end_time)) - result1 = cursor1.fetchall() - cursor1.close() - db1.close() - - def get_emoji_cdnurl(row): - if row["type_name"] == "动画表情": - parsed_content = parse_xml_string(row["StrContent"]) - if isinstance(parsed_content, dict) and "emoji" in parsed_content: - return parsed_content["emoji"].get("cdnurl", "") - return row["content"] - - init_data = pd.DataFrame(result1, columns=["MsgSvrID", "IsSender", "StrContent", "StrTalker", "Type", "SubType", - "CreateTime"]) - init_data["CreateTime"] = pd.to_datetime(init_data["CreateTime"], unit="s") - init_data["AdjustedTime"] = init_data["CreateTime"] - pd.Timedelta(hours=4) - init_data["AdjustedTime"] = init_data["AdjustedTime"].dt.strftime("%Y-%m-%d %H:%M:%S") - init_data["CreateTime"] = init_data["CreateTime"].dt.strftime("%Y-%m-%d %H:%M:%S") - init_data["type_name"] = init_data.apply(lambda x: type_name_dict.get(x["Type"], {}).get(x["SubType"], "未知"), - axis=1) - init_data["content"] = init_data.apply(lambda x: x["StrContent"] if x["type_name"] == "文本" else "", axis=1) - init_data["content"] = init_data.apply(get_emoji_cdnurl, axis=1) - - init_data["content_len"] = init_data.apply(lambda x: len(x["content"]) if x["type_name"] == "文本" else 0, axis=1) - - chat_data = init_data[ - ["MsgSvrID", "IsSender", "StrTalker", "type_name", "content", "content_len", "CreateTime", "AdjustedTime"]] - - return True, chat_data - - -# 绘制直方图 -def draw_hist_all_count(chat_data, out_path="", is_show=False): - try: - import matplotlib.pyplot as plt - except ImportError as e: - print("error", e) - raise ImportError("请安装matplotlib库") - plt.rcParams['font.sans-serif'] = ['SimHei'] - plt.rcParams['axes.unicode_minus'] = False - - type_count = Counter(chat_data["type_name"]) - - # 对type_count按值进行排序,并返回排序后的结果 - sorted_type_count = dict(sorted(type_count.items(), key=lambda item: item[1], reverse=True)) - - plt.figure(figsize=(12, 8)) - plt.bar(range(len(sorted_type_count)), list(sorted_type_count.values()), tick_label=list(sorted_type_count.keys())) - plt.title("消息类型分布图") - plt.xlabel("消息类型") - plt.ylabel("数量") - - # 设置x轴标签的旋转角度为45度 - plt.xticks(rotation=-45) - - # 在每个柱上添加数字标签 - for i, v in enumerate(list(sorted_type_count.values())): - plt.text(i, v, str(v), ha='center', va='bottom') - - if out_path != "": - plt.savefig(out_path) - if is_show: - plt.show() - plt.close() - - -# 按照interval绘制折线图 -def draw_line_type_name(chat_data, interval="W", type_name_list=None, out_path="", is_show=False): - """ - 绘制折线图,横轴为时间,纵轴为消息数量,不同类型的消息用不同的颜色表示 - :param chat_data: - :param interval: - :param type_name_list: 消息类型列表,按照列表中的顺序绘制折线图 可选:全部类型、发送、接收、总字数、发送字数、接收字数、其他类型 - :param out_path: - :param is_show: - :return: - """ - if type_name_list is None: - type_name_list = ["全部类型", "发送", "接收"] + ["总字数", "发送字数", "接收字数"] - # type_name_list = ["总字数", "发送字数", "接收字数"] - - try: - import matplotlib.pyplot as plt - import pandas as pd - except ImportError as e: - print("error", e) - raise ImportError("请安装matplotlib库") - plt.rcParams['font.sans-serif'] = ['SimHei'] - plt.rcParams['axes.unicode_minus'] = False - - chat_data["CreateTime"] = pd.to_datetime(chat_data["CreateTime"]) - chat_data["AdjustedTime"] = pd.to_datetime(chat_data["AdjustedTime"]) - - # interval = interval.lower() - interval_dict = {"day": "%Y-%m-%d", "month": "%Y-%m", "year": "%Y", "week": "%Y-%W", - "d": "%Y-%m-%d", "m": "%Y-%m", "y": "%Y", "W": "%Y-%W" - } - if interval not in interval_dict: - raise ValueError("interval参数错误,可选值为day、month、year、week") - chat_data["interval"] = chat_data["AdjustedTime"].dt.strftime(interval_dict[interval]) - - # 根据chat_data["interval"]最大值和最小值,生成一个时间间隔列表 - interval_list = pd.date_range(chat_data["AdjustedTime"].min(), chat_data["AdjustedTime"].max(), freq=interval) - interval_list = interval_list.append(pd.Index([interval_list[-1] + pd.Timedelta(days=1)])) # 最后一天加一天 - - # 构建数据集 - # interval type_name1 type_name2 type_name3 - # 2021-01 文本数量 其他类型数量 其他类型数量 - # 2021-02 文本数量 其他类型数量 其他类型数量 - type_data = pd.DataFrame(columns=["interval"] + list(chat_data["type_name"].unique())) - type_data["interval"] = interval_list.strftime(interval_dict[interval]) - type_data = type_data.set_index("interval") - for type_name in chat_data["type_name"].unique(): - type_data[type_name] = chat_data[chat_data["type_name"] == type_name].groupby("interval").size() - type_data["全部类型"] = type_data.sum(axis=1) - type_data["发送"] = chat_data[chat_data["IsSender"] == 1].groupby("interval").size() - type_data["接收"] = chat_data[chat_data["IsSender"] == 0].groupby("interval").size() - - type_data["总字数"] = chat_data.groupby("interval")["content_len"].sum() - type_data["发送字数"] = chat_data[chat_data["IsSender"] == 1].groupby("interval")["content_len"].sum() - type_data["接收字数"] = chat_data[chat_data["IsSender"] == 0].groupby("interval")["content_len"].sum() - - type_data = type_data.fillna(0) - # 调整typename顺序,使其按照总数量排序,只要最大的5个 - type_data = type_data.reindex(type_data.sum().sort_values(ascending=False).index, axis=1) - if type_name_list is not None: - type_data = type_data[type_name_list] - else: - type_data = type_data.iloc[:, :5] - - # if interval == "W" or interval == "week": # 改为当前周的周一的日期 - # # - - plt.figure(figsize=(12, 8)) - - # 绘制折线图 - for type_name in type_data.columns: - plt.plot(type_data.index, type_data[type_name], label=type_name) - - # 设置x轴标签的旋转角度为45度 - plt.xticks(rotation=-45) - # 设置标题、坐标轴标签、图例等信息 - plt.title("消息类型分布图") - plt.xlabel("时间") - plt.ylabel("数量") - - plt.legend(loc="upper right") # 设置图例位置 - - # 显示图形 - if out_path != "": - plt.savefig(out_path) - if is_show: - plt.tight_layout() - plt.show() - plt.close() - - - -def wordcloud_generator(chat_data, interval="m", stopwords=None, out_path="", is_show=False, bg_img=None, - font="C:\Windows\Fonts\simhei.ttf"): - """ - 词云 - :param is_show: 是否显示 - :param img_path: 背景图片路径 - :param text: 文本 - :param font: 字体路径 - :return: - """ - try: - from wordcloud import WordCloud, ImageColorGenerator - import wordcloud - import jieba - import numpy as np - import matplotlib.pyplot as plt - from matplotlib.font_manager import fontManager - import pandas as pd - import codecs - import re - from imageio import imread - except ImportError as e: - print("error", e) - raise ImportError("请安装wordcloud,jieba,numpy,matplotlib,pillow库") - - plt.rcParams['font.sans-serif'] = ['SimHei'] - plt.rcParams['axes.unicode_minus'] = False - - chat_data["CreateTime"] = pd.to_datetime(chat_data["CreateTime"]) - chat_data["AdjustedTime"] = pd.to_datetime(chat_data["AdjustedTime"]) - - # interval = interval.lower() - interval_dict = {"day": "%Y-%m-%d", "month": "%Y-%m", "year": "%Y", "week": "%Y-%W", - "d": "%Y-%m-%d", "m": "%Y-%m", "y": "%Y", "W": "%Y-%W" - } - if interval not in interval_dict: - raise ValueError("interval参数错误,可选值为day、month、year、week") - chat_data["interval"] = chat_data["AdjustedTime"].dt.strftime(interval_dict[interval]) - - # 根据chat_data["interval"]最大值和最小值,生成一个时间间隔列表 - interval_list = pd.date_range(chat_data["AdjustedTime"].min(), chat_data["AdjustedTime"].max(), freq=interval) - interval_list = interval_list.append(pd.Index([interval_list[-1] + pd.Timedelta(days=1)])) # 最后一天加一天 - - # 构建数据集 - # interval text_all text_sender text_receiver - # 2021-01 文本\n合并 聊天记录\n文本\n合并 聊天记录\n文本\n合并 聊天记录\n - def merage_text(x): - pattern = re.compile("(\[.+?\])") # 匹配表情 - rt = "\n".join(x) - rt = pattern.sub('', rt).replace("\n", " ") - return rt - - chat_data["content"] = chat_data.apply(lambda x: x["content"] if x["type_name"] == "文本" else "", axis=1) - - text_data = pd.DataFrame(columns=["interval", "text_all", "text_sender", "text_receiver"]) - text_data["interval"] = interval_list.strftime(interval_dict[interval]) - text_data = text_data.set_index("interval") - # 使用“\n”合并 - text_data["text_all"] = chat_data.groupby("interval")["content"].apply(merage_text) - text_data["text_sender"] = chat_data[chat_data["IsSender"] == 1].groupby("interval")["content"].apply(merage_text) - text_data["text_receiver"] = chat_data[chat_data["IsSender"] == 0].groupby("interval")["content"].apply(merage_text) - - def gen_img(texts,out_path,is_show,bg_img,title=""): - words = jieba.lcut(texts) - res = [word for word in words if word not in stopwords and word.replace(" ", "") != "" and len(word) > 1] - count_dict = dict(Counter(res)) - - if bg_img: - bgimg = imread(open(bg_img, 'rb')) - # 获得词云对象,设定词云背景颜色及其图片和字体 - wc = WordCloud(background_color='white', mask=bgimg, font_path='simhei.ttf', mode='RGBA', include_numbers=False, - random_state=0) - else: - # 如果你的背景色是透明的,请用这两条语句替换上面两条 - bgimg = None - wc = WordCloud(background_color='white', mode='RGBA', font_path='simhei.ttf', include_numbers=False, - random_state=0,width=500, height=500) # 如果不指定中文字体路径,词云会乱码 - wc = wc.fit_words(count_dict) - - fig = plt.figure(figsize=(8, 8)) - fig.suptitle(title, fontsize=26) - ax = fig.subplots() - - ax.imshow(wc) - ax.axis('off') - - if out_path != "": - plt.savefig(out_path) - if is_show: - plt.show() - plt.close() - - for i in text_data.index: - out_path = f"out/img_{i}.png" - gen_img(text_data["text_all"][i], out_path=out_path, is_show=False, bg_img=bg_img, title=f"全部({i})") - # gen_img(text_data["text_sender"][i], out_path="", is_show=is_show, bg_img=bg_img, title=f"发送_{i}") - # gen_img(text_data["text_receiver"][i], out_path="", is_show=is_show, bg_img=bg_img, title=f"接收_{i}") - # time.sleep(1) - -# 情感分析 -def sentiment_analysis(chat_data, stopwords="", out_path="", is_show=False, bg_img=None): - try: - from snownlp import SnowNLP - import pandas as pd - import matplotlib.pyplot as plt - import seaborn as sns - - except ImportError as e: - print("error", e) - raise ImportError("请安装snownlp,pandas,matplotlib,seaborn库") - - sns.set_style('white', {'font.sans-serif': ['simhei', 'FangSong']}) - - chats = [] - for row in chat_data: - if row["type_name"] != "文本" or row["content"] == "": - continue - chats.append(row) - - scores = [] - for row in chats: - s = SnowNLP(row["content"]) - scores.append(s.sentiments) - - def draw(data): - df = pd.DataFrame({'Sentiment Score': data}) - plt.figure(figsize=(8, 6)) - sns.histplot(data=df, x='Sentiment Score', kde=True) - plt.title("Sentiment Analysis") - plt.xlabel("Sentiment Score") - plt.ylabel("Frequency") - - if out_path != "": - plt.savefig(out_path) - if is_show: - plt.show() - plt.close() - - draw(scores) - - -if __name__ == '__main__': - MSG_PATH = r"" - selected_talker = "wxid_" - start_time = time.time() - 3600 * 24 * 50000 - end_time = time.time() - code, chat_data = read_msgs(MSG_PATH, selected_talker, start_time, end_time) - # print(chat_data) - # code, data, classify_count, all_type_count = merge_chat_data(chat_data, interval="month") - # draw_hist_all_count(chat_data, is_show=True) # 绘制直方图 消息类型分布图 - # draw_line_type_name(chat_data, is_show=True) # 绘制折线图 消息类型分布图 - - # bg_img = 'img.png' - stopwords = ['的', '了', '是', '在', '我', '有', '和', '就', '不', '人', '都', '一', '一个', '上', '也', '很', '到', - '说', '要', - '去', '你', '会', '着', '没有', '看', '好', '自己', '这'] - wordcloud_generator(chat_data, stopwords=stopwords, out_path="", is_show=True) - # sentiment_analysis(chat_data) diff --git a/build/lib/pywxdump/analyse/parse.py b/build/lib/pywxdump/analyse/parse.py deleted file mode 100644 index e5f47d8..0000000 --- a/build/lib/pywxdump/analyse/parse.py +++ /dev/null @@ -1,262 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: parse.py -# Description: 解析数据库内容 -# Author: xaoyaoo -# Date: 2023/09/27 -# ------------------------------------------------------------------------------- -import os.path -import sqlite3 -import pysilk -from io import BytesIO -import wave -import pyaudio -import requests -import hashlib -import lz4.block -import blackboxprotobuf - -from PIL import Image -#import xml.etree.ElementTree as ET -import lxml.etree as ET #这个模块更健壮些,微信XML格式有时有非标格式,会导致xml.etree.ElementTree处理失败 - - -def get_md5(data): - md5 = hashlib.md5() - md5.update(data) - return md5.hexdigest() - - -def parse_xml_string(xml_string): - """ - 解析 XML 字符串 - :param xml_string: 要解析的 XML 字符串 - :return: 解析结果,以字典形式返回 - """ - - def parse_xml(element): - """ - 递归解析 XML 元素 - :param element: 要解析的 XML 元素 - :return: 解析结果,以字典形式返回 - """ - result = {} - - # 解析当前元素的属性 - if element is None or element.attrib is None: - return result - for key, value in element.attrib.items(): - result[key] = value - - # 解析当前元素的子元素 - for child in element: - child_result = parse_xml(child) - - # 如果子元素的标签已经在结果中存在,则将其转换为列表 - if child.tag in result: - if not isinstance(result[child.tag], list): - result[child.tag] = [result[child.tag]] - result[child.tag].append(child_result) - else: - result[child.tag] = child_result - - # 如果当前元素没有子元素,则将其文本内容作为值保存 - if not result and element.text: - result = element.text - - return result - - if xml_string is None or not isinstance(xml_string, str): - return None - try: - parser = ET.XMLParser(recover=True) # 有时微信的聊天记录里面,会冒出来xml格式不对的情况,这里把parser设置成忽略错误 - root = ET.fromstring(xml_string,parser) - except Exception as e: - return xml_string - return parse_xml(root) - - -def read_img_dat(input_data): - """ - 读取图片文件dat格式 - :param input_data: 图片文件路径或者图片文件数据 - :return: 图片格式,图片md5,图片数据 - """ - # 常见图片格式的文件头 - img_head = { - b"\xFF\xD8\xFF": ".jpg", - b"\x89\x50\x4E\x47": ".png", - b"\x47\x49\x46\x38": ".gif", - b"\x42\x4D": ".BMP", - b"\x49\x49": ".TIFF", - b"\x4D\x4D": ".TIFF", - b"\x00\x00\x01\x00": ".ICO", - b"\x52\x49\x46\x46": ".WebP", - b"\x00\x00\x00\x18\x66\x74\x79\x70\x68\x65\x69\x63": ".HEIC", - } - - if isinstance(input_data, str): - with open(input_data, "rb") as f: - input_bytes = f.read() - else: - input_bytes = input_data - - try: - import numpy as np - input_bytes = np.frombuffer(input_bytes, dtype=np.uint8) - for hcode in img_head: # 遍历文件头 - t = input_bytes[0] ^ hcode[0] # 异或解密 - if np.all(t == np.bitwise_xor(np.frombuffer(input_bytes[:len(hcode)], dtype=np.uint8), - np.frombuffer(hcode, dtype=np.uint8))): # 使用NumPy进行向量化的异或解密操作,并进行类型转换 - fomt = img_head[hcode] # 获取文件格式 - - out_bytes = np.bitwise_xor(input_bytes, t) # 使用NumPy进行向量化的异或解密操作 - md5 = get_md5(out_bytes) - return fomt, md5, out_bytes - return False - except ImportError: - pass - - for hcode in img_head: - t = input_bytes[0] ^ hcode[0] - for i in range(1, len(hcode)): - if t == input_bytes[i] ^ hcode[i]: - fomt = img_head[hcode] - out_bytes = bytearray() - for nowByte in input_bytes: # 读取文件 - newByte = nowByte ^ t # 异或解密 - out_bytes.append(newByte) - md5 = get_md5(out_bytes) - return fomt, md5, out_bytes - return False - - -def read_emoji(cdnurl, is_show=False): - headers = { - "User-Agent": "Mozilla/5.0 (Linux; Android 10; Redmi K30 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Mobile Safari/537.36" - - } - r1 = requests.get(cdnurl, headers=headers) - rdata = r1.content - - if is_show: # 显示表情 - img = Image.open(BytesIO(rdata)) - img.show() - return rdata - - -def decompress_CompressContent(data): - """ - 解压缩Msg:CompressContent内容 - :param data: - :return: - """ - if data is None or not isinstance(data, bytes): - return None - dst = lz4.block.decompress(compress_content, uncompressed_size=len(compress_content) << 8) - dst.decode().replace('\x00', '') # 已经解码完成后,还含有0x00的部分,要删掉,要不后面ET识别的时候会报错 - uncompressed_data = dst.encode() - return uncompressed_data - - -def read_audio_buf(buf_data, is_play=False, is_wave=False, rate=24000): - silk_file = BytesIO(buf_data) # 读取silk文件 - pcm_file = BytesIO() # 创建pcm文件 - - pysilk.decode(silk_file, pcm_file, rate) # 解码silk文件->pcm文件 - pcm_data = pcm_file.getvalue() # 获取pcm文件数据 - - silk_file.close() # 关闭silk文件 - pcm_file.close() # 关闭pcm文件 - if is_play: # 播放音频 - def play_audio(pcm_data, rate): - p = pyaudio.PyAudio() # 实例化pyaudio - stream = p.open(format=pyaudio.paInt16, channels=1, rate=rate, output=True) # 创建音频流对象 - stream.write(pcm_data) # 写入音频流 - stream.stop_stream() # 停止音频流 - stream.close() # 关闭音频流 - p.terminate() # 关闭pyaudio - - play_audio(pcm_data, rate) - - if is_wave: # 转换为wav文件 - wave_file = BytesIO() # 创建wav文件 - with wave.open(wave_file, 'wb') as wf: - wf.setparams((1, 2, rate, 0, 'NONE', 'NONE')) # 设置wav文件参数 - wf.writeframes(pcm_data) # 写入wav文件 - rdata = wave_file.getvalue() # 获取wav文件数据 - wave_file.close() # 关闭wav文件 - return rdata - - return pcm_data - - -def read_audio(MsgSvrID, is_play=False, is_wave=False, DB_PATH: str = "", rate=24000): - if DB_PATH == "": - return False - - DB = sqlite3.connect(DB_PATH) - cursor = DB.cursor() - sql = "select Buf from Media where Reserved0='{}'".format(MsgSvrID) - DBdata = cursor.execute(sql).fetchall() - - if len(DBdata) == 0: - return False - data = DBdata[0][0] # [1:] + b'\xFF\xFF' - pcm_data = read_audio_buf(data, is_play, is_wave, rate) - return pcm_data - - -def wordcloud_generator(text, out_path="", is_show=False, img_path="", font="C:\Windows\Fonts\simhei.ttf"): - """ - 词云 - :param is_show: 是否显示 - :param img_path: 背景图片路径 - :param text: 文本 - :param font: 字体路径 - :return: - """ - try: - from wordcloud import WordCloud - import jieba - import numpy as np - import matplotlib.pyplot as plt - from matplotlib.font_manager import fontManager - except ImportError as e: - print("error", e) - raise ImportError("请安装wordcloud,jieba,numpy,matplotlib,pillow库") - words = jieba.lcut(text) # 精确分词 - newtxt = ' '.join(words) # 空格拼接 - # 字体路径 - - # 创建WordCloud对象 - wordcloud1 = WordCloud(width=800, height=400, background_color='white', font_path=font) - wordcloud1.generate(newtxt) - - if out_path and out_path != "": - wordcloud1.to_file("wordcloud.png") # 保存图片 - if img_path and os.path.exists(img_path): # 设置背景图片 - img_color = np.array(Image.open(img_path)) # 读取背景图片 - img_color = img_color.reshape((img_color.shape[0] * img_color.shape[1], 3)) - wordcloud1.recolor(color_func=img_color) # 设置背景图片颜色 - if is_show: - # 显示词云 - wordcloud_img = wordcloud1.to_image() - wordcloud_img.show() - - -def read_BytesExtra(data): - if data[0:2] == '0x': - data = data[2:] - data = bytes.fromhex(data) - print(data) - print('*' * 50) - print(data.decode('utf-8', errors='ignore')) - - -if __name__ == '__main__': - data = '' - read_BytesExtra(data) - print('*' * 50) - data2 = '' - read_BytesExtra(data2) diff --git a/build/lib/pywxdump/bias_addr/__init__.py b/build/lib/pywxdump/bias_addr/__init__.py deleted file mode 100644 index 5c0359f..0000000 --- a/build/lib/pywxdump/bias_addr/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: __init__.py.py -# Description: -# Author: xaoyaoo -# Date: 2023/10/14 -# ------------------------------------------------------------------------------- -from .get_bias_addr import BiasAddr \ No newline at end of file diff --git a/build/lib/pywxdump/bias_addr/get_bias_addr.py b/build/lib/pywxdump/bias_addr/get_bias_addr.py deleted file mode 100644 index 3a070f9..0000000 --- a/build/lib/pywxdump/bias_addr/get_bias_addr.py +++ /dev/null @@ -1,513 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: get_base_addr.py -# Description: -# Author: xaoyaoo -# Date: 2023/08/22 -# ------------------------------------------------------------------------------- -import argparse -import ctypes -import hashlib -import json -import multiprocessing -import os -import re -import sys - -import psutil -from win32com.client import Dispatch -from pymem import Pymem -import pymem -import hmac - -ReadProcessMemory = ctypes.windll.kernel32.ReadProcessMemory -void_p = ctypes.c_void_p -KEY_SIZE = 32 -DEFAULT_PAGESIZE = 4096 -DEFAULT_ITER = 64000 - - -def validate_key(key, salt, first, mac_salt): - byteKey = hashlib.pbkdf2_hmac("sha1", key, salt, DEFAULT_ITER, KEY_SIZE) - mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE) - hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1) - hash_mac.update(b'\x01\x00\x00\x00') - - if hash_mac.digest() == first[-32:-12]: - return True - else: - return False - - -def get_exe_bit(file_path): - """ - 获取 PE 文件的位数: 32 位或 64 位 - :param file_path: PE 文件路径(可执行文件) - :return: 如果遇到错误则返回 64 - """ - try: - with open(file_path, 'rb') as f: - dos_header = f.read(2) - if dos_header != b'MZ': - print('get exe bit error: Invalid PE file') - return 64 - # Seek to the offset of the PE signature - f.seek(60) - pe_offset_bytes = f.read(4) - pe_offset = int.from_bytes(pe_offset_bytes, byteorder='little') - - # Seek to the Machine field in the PE header - f.seek(pe_offset + 4) - machine_bytes = f.read(2) - machine = int.from_bytes(machine_bytes, byteorder='little') - - if machine == 0x14c: - return 32 - elif machine == 0x8664: - return 64 - else: - print('get exe bit error: Unknown architecture: %s' % hex(machine)) - return 64 - except IOError: - print('get exe bit error: File not found or cannot be opened') - return 64 - - -def get_exe_version(file_path): - """ - 获取 PE 文件的版本号 - :param file_path: PE 文件路径(可执行文件) - :return: 如果遇到错误则返回 - """ - file_version = Dispatch("Scripting.FileSystemObject").GetFileVersion(file_path) - return file_version - - -def find_all(c: bytes, string: bytes, base_addr=0): - """ - 查找字符串中所有子串的位置 - :param c: 子串 b'123' - :param string: 字符串 b'123456789123' - :return: - """ - return [base_addr + m.start() for m in re.finditer(re.escape(c), string)] - - -class BiasAddr: - def __init__(self, account, mobile, name, key, db_path): - self.account = account.encode("utf-8") - self.mobile = mobile.encode("utf-8") - self.name = name.encode("utf-8") - self.key = bytes.fromhex(key) if key else b"" - self.db_path = db_path if os.path.exists(db_path) else "" - - self.process_name = "WeChat.exe" - self.module_name = "WeChatWin.dll" - - self.pm = None # Pymem 对象 - self.is_WoW64 = None # True: 32位进程运行在64位系统上 False: 64位进程运行在64位系统上 - self.process_handle = None # 进程句柄 - self.pid = None # 进程ID - self.version = None # 微信版本号 - self.process = None # 进程对象 - self.exe_path = None # 微信路径 - self.address_len = None # 4 if self.bits == 32 else 8 # 4字节或8字节 - self.bits = 64 if sys.maxsize > 2 ** 32 else 32 # 系统:32位或64位 - - def get_process_handle(self): - try: - self.pm = Pymem(self.process_name) - self.pm.check_wow64() - self.is_WoW64 = self.pm.is_WoW64 - self.process_handle = self.pm.process_handle - self.pid = self.pm.process_id - self.process = psutil.Process(self.pid) - self.exe_path = self.process.exe() - self.version = get_exe_version(self.exe_path) - - version_nums = list(map(int, self.version.split("."))) # 将版本号拆分为数字列表 - if version_nums[0] <= 3 and version_nums[1] <= 9 and version_nums[2] <= 2: - self.address_len = 4 - else: - self.address_len = 8 - return True, "" - except pymem.exception.ProcessNotFound: - return False, "[-] WeChat No Run" - - def search_memory_value(self, value: bytes, module_name="WeChatWin.dll"): - # 创建 Pymem 对象 - module = pymem.process.module_from_name(self.pm.process_handle, module_name) - ret = self.pm.pattern_scan_module(value, module, return_multiple=True) - ret = ret[-1] - module.lpBaseOfDll if len(ret) > 0 else 0 - return ret - - def get_key_bias1(self): - try: - byteLen = self.address_len # 4 if self.bits == 32 else 8 # 4字节或8字节 - - keyLenOffset = 0x8c if self.bits == 32 else 0xd0 - keyWindllOffset = 0x90 if self.bits == 32 else 0xd8 - - module = pymem.process.module_from_name(self.process_handle, self.module_name) - keyBytes = b'-----BEGIN PUBLIC KEY-----\n...' - publicKeyList = pymem.pattern.pattern_scan_all(self.process_handle, keyBytes, return_multiple=True) - - keyaddrs = [] - for addr in publicKeyList: - keyBytes = addr.to_bytes(byteLen, byteorder="little", signed=True) # 低位在前 - may_addrs = pymem.pattern.pattern_scan_module(self.process_handle, module, keyBytes, - return_multiple=True) - if may_addrs != 0 and len(may_addrs) > 0: - for addr in may_addrs: - keyLen = self.pm.read_uchar(addr - keyLenOffset) - if keyLen != 32: - continue - keyaddrs.append(addr - keyWindllOffset) - - return keyaddrs[-1] - module.lpBaseOfDll if len(keyaddrs) > 0 else 0 - except: - return 0 - - def search_key(self, key: bytes): - key = re.escape(key) # 转义特殊字符 - key_addr = self.pm.pattern_scan_all(key, return_multiple=False) - key = key_addr.to_bytes(self.address_len, byteorder='little', signed=True) - result = self.search_memory_value(key, self.module_name) - return result - - def get_key_bias2(self, wx_db_path, account_bias=0): - wx_db_path = os.path.join(wx_db_path, "Msg", "MicroMsg.db") - if not os.path.exists(wx_db_path): - return 0 - - def get_maybe_key(mem_data): - min_addr = 0xffffffffffffffffffffffff - max_addr = 0 - for module1 in pm.list_modules(): - if module1.lpBaseOfDll < min_addr: - min_addr = module1.lpBaseOfDll - if module1.lpBaseOfDll > max_addr: - max_addr = module1.lpBaseOfDll + module1.SizeOfImage - - maybe_key = [] - for i in range(0, len(mem_data), self.address_len): - addr = mem_data[i:i + self.address_len] - addr = int.from_bytes(addr, byteorder='little') - # 去掉不可能的地址 - if min_addr < addr < max_addr: - key = read_key(addr) - if key == b"": - continue - maybe_key.append([key, i]) - return maybe_key - - def read_key(addr): - key = ctypes.create_string_buffer(35) - if ReadProcessMemory(pm.process_handle, void_p(addr - 1), key, 35, 0) == 0: - return b"" - - if b"\x00\x00" in key.raw[1:33]: - return b"" - - if b"\x00\x00" == key.raw[33:35] and b"\x90" == key.raw[0:1]: - return key.raw[1:33] - return b"" - - def verify_key(keys, wx_db_path): - with open(wx_db_path, "rb") as file: - blist = file.read(5000) - salt = blist[:16] - first = blist[16:DEFAULT_PAGESIZE] - mac_salt = bytes([(salt[i] ^ 58) for i in range(16)]) - - with multiprocessing.Pool(processes=8) as pool: - results = [pool.apply_async(validate_key, args=(key, salt, first, mac_salt)) for key, i in keys[-1::-1]] - results = [p.get() for p in results] - for i, result in enumerate(results[-1::-1]): - if result: - return keys[i] - return b"", 0 - - module_name = "WeChatWin.dll" - pm = self.pm - module = pymem.process.module_from_name(pm.process_handle, module_name) - start_addr = module.lpBaseOfDll - size = module.SizeOfImage - - if account_bias > 1: - maybe_key = [] - for i in [0x24, 0x40]: - addr = start_addr + account_bias - i - mem_data = pm.read_bytes(addr, self.address_len) - key = read_key(int.from_bytes(mem_data, byteorder='little')) - if key != b"": - maybe_key.append([key, addr - start_addr]) - key, bais = verify_key(maybe_key, wx_db_path) - if bais != 0: - return bais - - mem_data = pm.read_bytes(start_addr, size) - maybe_key = get_maybe_key(mem_data) - key, bais = verify_key(maybe_key, wx_db_path) - return bais - - def run(self, logging_path=False, version_list_path=None): - if not self.get_process_handle()[0]: - return None - mobile_bias = self.search_memory_value(self.mobile, self.module_name) - name_bias = self.search_memory_value(self.name, self.module_name) - account_bias = self.search_memory_value(self.account, self.module_name) - key_bias = 0 - key_bias = self.get_key_bias1() - key_bias = self.search_key(self.key) if key_bias <= 0 and self.key else key_bias - key_bias = self.get_key_bias2(self.db_path, account_bias) if key_bias <= 0 and self.db_path else key_bias - - rdata = {self.version: [name_bias, account_bias, mobile_bias, 0, key_bias]} - if version_list_path and os.path.exists(version_list_path): - with open(version_list_path, "r", encoding="utf-8") as f: - data = json.load(f) - data.update(rdata) - with open(version_list_path, "w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False, indent=4) - if os.path.exists(logging_path) and isinstance(logging_path, str): - with open(logging_path, "a", encoding="utf-8") as f: - f.write("{版本号:昵称,账号,手机号,邮箱,KEY}" + "\n") - f.write(str(rdata) + "\n") - elif logging_path: - print("{版本号:昵称,账号,手机号,邮箱,KEY}") - print(rdata) - return rdata - - -# class BiasAddr: -# def __init__(self, account, mobile, name, key, db_path): -# self.account = account.encode("utf-8") -# self.mobile = mobile.encode("utf-8") -# self.name = name.encode("utf-8") -# self.key = bytes.fromhex(key) if key else b"" -# self.db_path = db_path if db_path else "" -# -# self.process_name = "WeChat.exe" -# self.module_name = "WeChatWin.dll" -# -# self.pm = Pymem("WeChat.exe") -# -# self.bits = self.get_osbits() -# self.version = self.get_file_version(self.process_name) -# self.address_len = self.get_addr_len() -# -# self.islogin = True -# -# def get_addr_len(self): -# version_nums = list(map(int, self.version.split("."))) # 将版本号拆分为数字列表 -# if version_nums[0] <= 3 and version_nums[1] <= 9 and version_nums[2] <= 2: -# return 4 -# else: -# return 8 -# -# def find_all(self, c: bytes, string: bytes, base_addr=0): -# """ -# 查找字符串中所有子串的位置 -# :param c: 子串 b'123' -# :param string: 字符串 b'123456789123' -# :return: -# """ -# return [base_addr + m.start() for m in re.finditer(re.escape(c), string)] -# -# def get_file_version(self, process_name): -# for process in psutil.process_iter(['pid', 'name', 'exe']): -# if process.name() == process_name: -# file_version = Dispatch("Scripting.FileSystemObject").GetFileVersion(process.exe()) -# return file_version -# self.islogin = False -# -# def get_osbits(self): -# return int(platform.architecture()[0][:-3]) -# -# def search_memory_value(self, value: bytes, module_name="WeChatWin.dll"): -# # 创建 Pymem 对象 -# pm = self.pm -# module = pymem.process.module_from_name(pm.process_handle, module_name) -# -# # result = pymem.pattern.pattern_scan_module(pm.process_handle, module, value, return_multiple=True) -# # result = result[-1]-module.lpBaseOfDll if len(result) > 0 else 0 -# mem_data = pm.read_bytes(module.lpBaseOfDll, module.SizeOfImage) -# result = self.find_all(value, mem_data) -# result = result[-1] if len(result) > 0 else 0 -# return result -# -# def search_key(self, key: bytes): -# byteLen = self.address_len # if self.bits == 32 else 8 # 4字节或8字节 -# key = re.escape(key) # 转义特殊字符 -# key_addr = self.pm.pattern_scan_all(key, return_multiple=True)[-1] if len(key) > 0 else 0 -# key = key_addr.to_bytes(byteLen, byteorder='little', signed=True) -# result = self.search_memory_value(key, self.module_name) -# return result -# -# def get_key_bias_test(self): -# byteLen = self.address_len # 4 if self.bits == 32 else 8 # 4字节或8字节 -# keyLenOffset = 0x8c if self.bits == 32 else 0xd0 -# keyWindllOffset = 0x90 if self.bits == 32 else 0xd8 -# -# pm = self.pm -# -# module = pymem.process.module_from_name(pm.process_handle, "WeChatWin.dll") -# keyBytes = b'-----BEGIN PUBLIC KEY-----\n...' -# publicKeyList = pymem.pattern.pattern_scan_all(self.pm.process_handle, keyBytes, return_multiple=True) -# -# keyaddrs = [] -# for addr in publicKeyList: -# keyBytes = addr.to_bytes(byteLen, byteorder="little", signed=True) # 低位在前 -# addrs = pymem.pattern.pattern_scan_module(pm.process_handle, module, keyBytes, return_multiple=True) -# if addrs != 0: -# keyaddrs += addrs -# -# keyWinAddr = 0 -# for addr in keyaddrs: -# keyLen = pm.read_uchar(addr - keyLenOffset) -# if keyLen != 32: -# continue -# keyWinAddr = addr - keyWindllOffset -# # keyaddr = int.from_bytes(pm.read_bytes(keyWinAddr, byteLen), byteorder='little') -# # key = pm.read_bytes(keyaddr, 32) -# # print("key", key.hex()) -# -# return keyWinAddr - module.lpBaseOfDll -# -# def get_key_bias(self, wx_db_path, account_bias=0): -# wx_db_path = os.path.join(wx_db_path, "Msg", "MicroMsg.db") -# if not os.path.exists(wx_db_path): -# return 0 -# -# def get_maybe_key(mem_data): -# maybe_key = [] -# for i in range(0, len(mem_data), self.address_len): -# addr = mem_data[i:i + self.address_len] -# addr = int.from_bytes(addr, byteorder='little') -# # 去掉不可能的地址 -# if min_addr < addr < max_addr: -# key = read_key(addr) -# if key == b"": -# continue -# maybe_key.append([key, i]) -# return maybe_key -# -# def read_key(addr): -# key = ctypes.create_string_buffer(35) -# if ReadProcessMemory(pm.process_handle, void_p(addr - 1), key, 35, 0) == 0: -# return b"" -# -# if b"\x00\x00" in key.raw[1:33]: -# return b"" -# -# if b"\x00\x00" == key.raw[33:35] and b"\x90" == key.raw[0:1]: -# return key.raw[1:33] -# return b"" -# -# def verify_key(keys, wx_db_path): -# with open(wx_db_path, "rb") as file: -# blist = file.read(5000) -# salt = blist[:16] -# first = blist[16:DEFAULT_PAGESIZE] -# mac_salt = bytes([(salt[i] ^ 58) for i in range(16)]) -# -# with multiprocessing.Pool(processes=8) as pool: -# results = [pool.apply_async(validate_key, args=(key, salt, first, mac_salt)) for key, i in keys[-1::-1]] -# results = [p.get() for p in results] -# for i, result in enumerate(results[-1::-1]): -# if result: -# return keys[i] -# return b"", 0 -# -# module_name = "WeChatWin.dll" -# pm = self.pm -# module = pymem.process.module_from_name(pm.process_handle, module_name) -# start_addr = module.lpBaseOfDll -# size = module.SizeOfImage -# -# if account_bias > 1: -# maybe_key = [] -# for i in [0x24, 0x40]: -# addr = start_addr + account_bias - i -# mem_data = pm.read_bytes(addr, self.address_len) -# key = read_key(int.from_bytes(mem_data, byteorder='little')) -# if key != b"": -# maybe_key.append([key, addr - start_addr]) -# key, bais = verify_key(maybe_key, wx_db_path) -# if bais != 0: -# return bais -# -# min_addr = 0xffffffffffffffffffffffff -# max_addr = 0 -# for module1 in pm.list_modules(): -# if module1.lpBaseOfDll < min_addr: -# min_addr = module1.lpBaseOfDll -# if module1.lpBaseOfDll > max_addr: -# max_addr = module1.lpBaseOfDll + module1.SizeOfImage -# -# mem_data = pm.read_bytes(start_addr, size) -# maybe_key = get_maybe_key(mem_data) -# key, bais = verify_key(maybe_key, wx_db_path) -# return bais -# -# def run(self, is_logging=False, version_list_path=None): -# self.version = self.get_file_version(self.process_name) -# if not self.islogin: -# error = "[-] WeChat No Run" -# if is_logging: print(error) -# return error -# mobile_bias = self.search_memory_value(self.mobile) -# name_bias = self.search_memory_value(self.name) -# account_bias = self.search_memory_value(self.account) -# # version_bias = self.search_memory_value(self.version.encode("utf-8")) -# -# try: -# key_bias = self.get_key_bias_test() -# except: -# key_bias = 0 -# -# if key_bias <= 0: -# if self.key: -# key_bias = self.search_key(self.key) -# elif self.db_path: -# key_bias = self.get_key_bias(self.db_path, account_bias) -# else: -# key_bias = 0 -# rdata = {self.version: [name_bias, account_bias, mobile_bias, 0, key_bias]} -# if version_list_path and os.path.exists(version_list_path): -# with open(version_list_path, "r", encoding="utf-8") as f: -# data = json.load(f) -# data.update(rdata) -# with open(version_list_path, "w", encoding="utf-8") as f: -# json.dump(data, f, ensure_ascii=False, indent=4) -# if is_logging: -# print("{版本号:昵称,账号,手机号,邮箱,KEY}") -# print(rdata) -# return rdata - - -if __name__ == '__main__': - # 创建命令行参数解析器 - parser = argparse.ArgumentParser() - parser.add_argument("--mobile", type=str, help="手机号", required=True) - parser.add_argument("--name", type=str, help="微信昵称", required=True) - parser.add_argument("--account", type=str, help="微信账号", required=True) - parser.add_argument("--key", type=str, help="(可选)密钥") - parser.add_argument("--db_path", type=str, help="(可选)已登录账号的微信文件夹路径") - - # 解析命令行参数 - args = parser.parse_args() - - # 检查是否缺少必要参数,并抛出错误 - if not args.mobile or not args.name or not args.account: - raise ValueError("缺少必要的命令行参数!请提供手机号、微信昵称、微信账号。") - - # 从命令行参数获取值 - mobile = args.mobile - name = args.name - account = args.account - key = args.key - db_path = args.db_path - - # 调用 run 函数,并传入参数 - rdata = BiasAddr(account, mobile, name, key, db_path).run(True, "../version_list.json") diff --git a/build/lib/pywxdump/command.py b/build/lib/pywxdump/command.py deleted file mode 100644 index 1bc762b..0000000 --- a/build/lib/pywxdump/command.py +++ /dev/null @@ -1,398 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: main.py.py -# Description: -# Author: xaoyaoo -# Date: 2023/10/14 -# ------------------------------------------------------------------------------- -import argparse -import importlib.metadata -import sys - -from pywxdump import * - -wxdump_ascii = r""" -██████╗ ██╗ ██╗██╗ ██╗██╗ ██╗██████╗ ██╗ ██╗███╗ ███╗██████╗ -██╔══██╗╚██╗ ██╔╝██║ ██║╚██╗██╔╝██╔══██╗██║ ██║████╗ ████║██╔══██╗ -██████╔╝ ╚████╔╝ ██║ █╗ ██║ ╚███╔╝ ██║ ██║██║ ██║██╔████╔██║██████╔╝ -██╔═══╝ ╚██╔╝ ██║███╗██║ ██╔██╗ ██║ ██║██║ ██║██║╚██╔╝██║██╔═══╝ -██║ ██║ ╚███╔███╔╝██╔╝ ██╗██████╔╝╚██████╔╝██║ ╚═╝ ██║██║ -╚═╝ ╚═╝ ╚══╝╚══╝ ╚═╝ ╚═╝╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ -""" - - -class MainBiasAddr(): - def init_parses(self, parser): - self.mode = "bias" - # 添加 'bias_addr' 子命令解析器 - sb_bias_addr = parser.add_parser(self.mode, help="获取微信基址偏移") - sb_bias_addr.add_argument("--mobile", type=str, help="手机号", metavar="", required=True) - sb_bias_addr.add_argument("--name", type=str, help="微信昵称", metavar="", required=True) - sb_bias_addr.add_argument("--account", type=str, help="微信账号", metavar="", required=True) - sb_bias_addr.add_argument("--key", type=str, metavar="", help="(可选)密钥") - sb_bias_addr.add_argument("--db_path", type=str, metavar="", help="(可选)已登录账号的微信文件夹路径") - sb_bias_addr.add_argument("-vlp", '--version_list_path', type=str, metavar="", - help="(可选)微信版本偏移文件路径,如有,则自动更新", - default=None) - self.sb_bias_addr = sb_bias_addr - return sb_bias_addr - - def run(self, args): - # 判断是否至少输入一个参数 - # if not args.key and not args.db_path: - # self.sb_bias_addr.error("必须至少指定 --key 或 --db_path 参数中的一个") - - # 从命令行参数获取值 - mobile = args.mobile - name = args.name - account = args.account - key = args.key - db_path = args.db_path - vlp = args.version_list_path - # 调用 run 函数,并传入参数 - rdata = BiasAddr(account, mobile, name, key, db_path).run(True, vlp) - return rdata - - -class MainWxInfo(): - def init_parses(self, parser): - self.mode = "info" - # 添加 'wx_info' 子命令解析器 - sb_wx_info = parser.add_parser(self.mode, help="获取微信信息") - sb_wx_info.add_argument("-vlp", '--version_list_path', metavar="", type=str, - help="(可选)微信版本偏移文件路径", default=VERSION_LIST_PATH) - return sb_wx_info - - def run(self, args): - # 读取微信各版本偏移 - path = args.version_list_path - version_list = json.load(open(path, "r", encoding="utf-8")) - result = read_info(version_list, True) # 读取微信信息 - return result - - -class MainWxDbPath(): - def init_parses(self, parser): - self.mode = "db_path" - # 添加 'wx_db_path' 子命令解析器 - sb_wx_db_path = parser.add_parser(self.mode, help="获取微信文件夹路径") - sb_wx_db_path.add_argument("-r", "--require_list", type=str, - help="(可选)需要的数据库名称(eg: -r MediaMSG;MicroMsg;FTSMSG;MSG;Sns;Emotion )", - default="all", metavar="") - sb_wx_db_path.add_argument("-wf", "--wx_files", type=str, help="(可选)'WeChat Files'路径", default=None, - metavar="") - sb_wx_db_path.add_argument("-id", "--wxid", type=str, help="(可选)wxid_,用于确认用户文件夹", - default=None, metavar="") - return sb_wx_db_path - - def run(self, args): - # 从命令行参数获取值 - require_list = args.require_list - msg_dir = args.wx_files - wxid = args.wxid - - user_dirs = get_wechat_db(require_list, msg_dir, wxid, True) # 获取微信数据库路径 - return user_dirs - - -class MainDecrypt(): - def init_parses(self, parser): - self.mode = "decrypt" - # 添加 'decrypt' 子命令解析器 - sb_decrypt = parser.add_parser(self.mode, help="解密微信数据库") - sb_decrypt.add_argument("-k", "--key", type=str, help="密钥", required=True, metavar="") - sb_decrypt.add_argument("-i", "--db_path", type=str, help="数据库路径(目录or文件)", required=True, metavar="") - sb_decrypt.add_argument("-o", "--out_path", type=str, default=os.path.join(os.getcwd(), "decrypted"), - help="输出路径(必须是目录)[默认为当前路径下decrypted文件夹]", required=False, - metavar="") - return sb_decrypt - - def run(self, args): - # 从命令行参数获取值 - key = args.key - db_path = args.db_path - out_path = args.out_path - - if not os.path.exists(db_path): - print(f"[-] 数据库路径不存在:{db_path}") - return - - if not os.path.exists(out_path): - os.makedirs(out_path) - print(f"[+] 创建输出文件夹:{out_path}") - - # 调用 decrypt 函数,并传入参数 - result = batch_decrypt(key, db_path, out_path, True) - return result - - -class MainShowChatRecords(): - def init_parses(self, parser): - self.mode = "dbshow" - # 添加 'decrypt' 子命令解析器 - sb_decrypt = parser.add_parser(self.mode, help="聊天记录查看") - sb_decrypt.add_argument("-msg", "--msg_path", type=str, help="解密后的 MSG.db 的路径", required=True, - metavar="") - sb_decrypt.add_argument("-micro", "--micro_path", type=str, help="解密后的 MicroMsg.db 的路径", required=True, - metavar="") - sb_decrypt.add_argument("-media", "--media_path", type=str, help="解密后的 MediaMSG.db 的路径", required=True, - metavar="") - sb_decrypt.add_argument("-fs", "--filestorage_path", type=str, - help="(可选)文件夹FileStorage的路径(用于显示图片)", required=False, - metavar="") - return sb_decrypt - - def run(self, args): - # 从命令行参数获取值 - try: - from flask import Flask, request, jsonify, render_template, g - import logging - except Exception as e: - print(e) - print("[-] 请安装flask( pip install flask )") - return - - if not os.path.exists(args.msg_path) or not os.path.exists(args.micro_path) or not os.path.exists( - args.media_path): - print(os.path.exists(args.msg_path), os.path.exists(args.micro_path), os.path.exists(args.media_path)) - print("[-] 输入数据库路径不存在") - return - - app = Flask(__name__, template_folder='./show_chat/templates') - app.logger.setLevel(logging.ERROR) - - @app.before_request - def before_request(): - - g.MSG_ALL_db_path = args.msg_path - g.MicroMsg_db_path = args.micro_path - g.MediaMSG_all_db_path = args.media_path - g.FileStorage_path = args.filestorage_path - g.USER_LIST = get_user_list(args.msg_path, args.micro_path) - - app.register_blueprint(app_show_chat) - - print("[+] 请使用浏览器访问 http://127.0.0.1:5000/ 查看聊天记录") - app.run(debug=False) - - -class MainExportChatRecords(): - def init_parses(self, parser): - self.mode = "export" - # 添加 'decrypt' 子命令解析器 - sb_decrypt = parser.add_parser(self.mode, help="聊天记录导出为html") - sb_decrypt.add_argument("-u", "--username", type=str, help="微信账号(聊天对象账号)", required=True, metavar="") - sb_decrypt.add_argument("-o", "--outpath", type=str, help="导出路径", required=True, metavar="") - sb_decrypt.add_argument("-msg", "--msg_path", type=str, help="解密后的 MSG.db 的路径", required=True, - metavar="") - sb_decrypt.add_argument("-micro", "--micro_path", type=str, help="解密后的 MicroMsg.db 的路径", required=True, - metavar="") - sb_decrypt.add_argument("-media", "--media_path", type=str, help="解密后的 MediaMSG.db 的路径", required=True, - metavar="") - sb_decrypt.add_argument("-fs", "--filestorage_path", type=str, - help="(可选)文件夹FileStorage的路径(用于显示图片)", required=False, - metavar="") - return sb_decrypt - - def run(self, args): - # 从命令行参数获取值 - try: - from flask import Flask, request, jsonify, render_template, g - import logging - except Exception as e: - print(e) - print("[-] 请安装flask( pip install flask)") - return - - if not os.path.exists(args.msg_path) or not os.path.exists(args.micro_path) or not os.path.exists( - args.media_path): - print(os.path.exists(args.msg_path), os.path.exists(args.micro_path), os.path.exists(args.media_path)) - print("[-] 输入数据库路径不存在") - return - - if not os.path.exists(args.outpath): - os.makedirs(args.outpath) - print(f"[+] 创建输出文件夹:{args.outpath}") - - export(args.username, args.outpath, args.msg_path, args.micro_path, args.media_path, args.filestorage_path) - print(f"[+] 导出成功{args.outpath}") - - -class MainAll(): - def init_parses(self, parser): - self.mode = "all" - # 添加 'all' 子命令解析器 - sb_all = parser.add_parser(self.mode, help="获取微信信息,解密微信数据库,查看聊天记录") - return sb_all - - def run(self, args): - # 获取微信信息 - WxInfo = read_info(VERSION_LIST, True) - - for user in WxInfo: - key = user.get("key", "") - if not key: - print("[-] 未获取到密钥") - return - wxid = user.get("wxid", None) - - WxDbPath = get_wechat_db('all', None, wxid=wxid, is_logging=True) # 获取微信数据库路径 - if isinstance(WxDbPath, str): # 如果返回的是字符串,则表示出错 - print(WxDbPath) - return - wxdbpaths = [path for user_dir in WxDbPath.values() for paths in user_dir.values() for path in paths] - if len(wxdbpaths) == 0: - print("[-] 未获取到数据库路径") - return - - wxdblen = len(wxdbpaths) - print(f"[+] 共发现 {wxdblen} 个微信数据库") - print("=" * 32) - - out_path = os.path.join(os.getcwd(), "decrypted", wxid) if wxid else os.path.join(os.getcwd(), "decrypted") - print(f"[*] 解密后文件夹:{out_path} ") - print(f"[*] 解密中...(用时较久,耐心等待)") - if not os.path.exists(out_path): - os.makedirs(out_path) - - # 判断out_path是否为空目录 - if os.listdir(out_path): - isdel = input(f"[*] 输出文件夹不为空({out_path})\n 是否删除?(y/n):") - if isdel.lower() == 'y' or isdel.lower() == 'yes': - for root, dirs, files in os.walk(out_path, topdown=False): - for name in files: - os.remove(os.path.join(root, name)) - for name in dirs: - os.rmdir(os.path.join(root, name)) - - # 调用 decrypt 函数,并传入参数 # 解密 - code, ret = batch_decrypt(key, wxdbpaths, out_path, False) - if not code: - print(ret) - return - print("[+] 解密完成") - print("-" * 32) - errors = [] - out_dbs = [] - for code1, ret1 in ret: - if code1 == False: - errors.append(ret1) - else: - print( - f'[+] success "{os.path.relpath(ret1[0], os.path.commonprefix(wxdbpaths))}" -> "{os.path.relpath(ret1[1], os.getcwd())}"') - out_dbs.append(ret1[1]) - print("-" * 32) - print( - "[-] " + f"共 {len(errors)} 个文件解密失败(可能原因:非当前登录用户数据库;非加密数据库),详见{out_path}下‘解密失败.txt’;") - # print("; ".join([f'"{wxdbpaths[i]}"' for i in errors])) - with open(os.path.join(out_path, "解密失败.txt"), "w", encoding="utf-8") as f: - f.write("\n".join([f'{i}' for i in errors])) - print("=" * 32) - - if len(out_dbs) <= 0: - print("[-] 未获取到解密后的数据库路径") - return - - user_path = out_dbs[0].split("MSG") - FileStorage_path = os.path.join(user_path[0], "FileStorage") - - # 查看聊天记录 - MSGDB = [i for i in out_dbs if "de_MSG" in i] - MSGDB = MSGDB[-1] if MSGDB else None - MicroMsgDB = [i for i in out_dbs if "de_MicroMsg" in i] - MicroMsgDB = MicroMsgDB[-1] if MicroMsgDB else None - MediaMSGDB = [i for i in out_dbs if "de_MediaMSG" in i] - MediaMSGDB = MediaMSGDB[-1] if MediaMSGDB else None - - args.msg_path = MSGDB - args.micro_path = MicroMsgDB - args.media_path = MediaMSGDB - args.filestorage_path = FileStorage_path - MainShowChatRecords().run(args) - - -PYWXDUMP_VERSION = importlib.metadata.version('pywxdump') - - -class CustomArgumentParser(argparse.ArgumentParser): - def format_help(self): - # 首先显示软件简介 - # 定义软件简介文本并进行格式化 - line_len = 70 - PYWXDUMP_VERSION = importlib.metadata.version('pywxdump') - wxdump_line = '\n'.join([f'\033[36m{line:^{line_len}}\033[0m' for line in wxdump_ascii.split('\n') if line]) - first_line = f'\033[36m{" PyWxDump v" + PYWXDUMP_VERSION + " ":=^{line_len}}\033[0m' - brief = 'PyWxDump功能:获取账号信息、解密数据库、查看聊天记录、导出聊天记录为html等' - other = '更多详情请查看: \033[4m\033[1mhttps://github.com/xaoyaoo/PyWxDump\033[0m' - - separator = f'\033[36m{" options ":-^{line_len}}\033[0m' - - # 获取帮助信息并添加到软件简介下方 - help_text = super().format_help().strip() - - return f'\n{wxdump_line}\n\n{first_line}\n{brief}\n{separator}\n{help_text}\n{separator}\n{other}\n{first_line}\n' - - -def console_run(): - # 创建命令行参数解析器 - parser = CustomArgumentParser(formatter_class=argparse.RawTextHelpFormatter) - PYWXDUMP_VERSION = importlib.metadata.version('pywxdump') - parser.add_argument('-V', '--version', action='version', version=f"PyWxDump v{PYWXDUMP_VERSION}") - - # 添加子命令解析器 - subparsers = parser.add_subparsers(dest="mode", help="""运行模式:""", required=True, metavar="mode") - - modes = {} - # 添加 'bias' 子命令解析器 - main_bias_addr = MainBiasAddr() - sb_bias_addr = main_bias_addr.init_parses(subparsers) - modes[main_bias_addr.mode] = main_bias_addr - - # 添加 'info' 子命令解析器 - main_wx_info = MainWxInfo() - sb_wx_info = main_wx_info.init_parses(subparsers) - modes[main_wx_info.mode] = main_wx_info - - # 添加 'db_path' 子命令解析器 - main_wx_db_path = MainWxDbPath() - sb_wx_db_path = main_wx_db_path.init_parses(subparsers) - modes[main_wx_db_path.mode] = main_wx_db_path - - # 添加 'decrypt' 子命令解析器 - main_decrypt = MainDecrypt() - sb_decrypt = main_decrypt.init_parses(subparsers) - modes[main_decrypt.mode] = main_decrypt - - # 添加 '' 子命令解析器 - main_show_chat_records = MainShowChatRecords() - sb_dbshow = main_show_chat_records.init_parses(subparsers) - modes[main_show_chat_records.mode] = main_show_chat_records - - # 添加 'export' 子命令解析器 - main_export_chat_records = MainExportChatRecords() - sb_export = main_export_chat_records.init_parses(subparsers) - modes[main_export_chat_records.mode] = main_export_chat_records - - # 添加 'all' 子命令解析器 - main_all = MainAll() - sb_all = main_all.init_parses(subparsers) - modes[main_all.mode] = main_all - - # 检查是否需要显示帮助信息 - if len(sys.argv) == 1: - sys.argv.append('-h') - elif len(sys.argv) == 2 and sys.argv[1] in modes.keys() and sys.argv[1] not in [main_all.mode, main_wx_info.mode, - main_wx_db_path.mode]: - sys.argv.append('-h') - - args = parser.parse_args() # 解析命令行参数 - - if not any(vars(args).values()): - parser.print_help() - - # 根据不同的 'mode' 参数,执行不同的操作 - modes[args.mode].run(args) - - -if __name__ == '__main__': - console_run() diff --git a/build/lib/pywxdump/decrypted/__init__.py b/build/lib/pywxdump/decrypted/__init__.py deleted file mode 100644 index 6a3db0f..0000000 --- a/build/lib/pywxdump/decrypted/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: __init__.py.py -# Description: -# Author: xaoyaoo -# Date: 2023/08/21 -# ------------------------------------------------------------------------------- -from .decrypt import batch_decrypt, encrypt -from .get_wx_decrypted_db import all_decrypt, merge_copy_msg_db, merge_msg_db, merge_media_msg_db \ No newline at end of file diff --git a/build/lib/pywxdump/decrypted/decrypt.py b/build/lib/pywxdump/decrypted/decrypt.py deleted file mode 100644 index 1e1d820..0000000 --- a/build/lib/pywxdump/decrypted/decrypt.py +++ /dev/null @@ -1,228 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: getwxinfo.py -# Description: -# Author: xaoyaoo -# Date: 2023/08/21 -# 微信数据库采用的加密算法是256位的AES-CBC。数据库的默认的页大小是4096字节即4KB,其中每一个页都是被单独加解密的。 -# 加密文件的每一个页都有一个随机的初始化向量,它被保存在每一页的末尾。 -# 加密文件的每一页都存有着消息认证码,算法使用的是HMAC-SHA1(安卓数据库使用的是SHA512)。它也被保存在每一页的末尾。 -# 每一个数据库文件的开头16字节都保存了一段唯一且随机的盐值,作为HMAC的验证和数据的解密。 -# 用来计算HMAC的key与解密的key是不同的,解密用的密钥是主密钥和之前提到的16字节的盐值通过PKCS5_PBKF2_HMAC1密钥扩展算法迭代64000次计算得到的。而计算HMAC的密钥是刚提到的解密密钥和16字节盐值异或0x3a的值通过PKCS5_PBKF2_HMAC1密钥扩展算法迭代2次计算得到的。 -# 为了保证数据部分长度是16字节即AES块大小的整倍数,每一页的末尾将填充一段空字节,使得保留字段的长度为48字节。 -# 综上,加密文件结构为第一页4KB数据前16字节为盐值,紧接着4032字节数据,再加上16字节IV和20字节HMAC以及12字节空字节;而后的页均是4048字节长度的加密数据段和48字节的保留段。 -# ------------------------------------------------------------------------------- - -import argparse -import hmac -import hashlib -import os -from typing import Union, List -from Cryptodome.Cipher import AES - -# from Crypto.Cipher import AES # 如果上面的导入失败,可以尝试使用这个 - -SQLITE_FILE_HEADER = "SQLite format 3\x00" # SQLite文件头 - -KEY_SIZE = 32 -DEFAULT_PAGESIZE = 4096 -DEFAULT_ITER = 64000 - - -# 通过密钥解密数据库 -def decrypt(key: str, db_path, out_path): - """ - 通过密钥解密数据库 - :param key: 密钥 64位16进制字符串 - :param db_path: 待解密的数据库路径(必须是文件) - :param out_path: 解密后的数据库输出路径(必须是文件) - :return: - """ - if not os.path.exists(db_path) or not os.path.isfile(db_path): - return False, f"[-] db_path:'{db_path}' File not found!" - if not os.path.exists(os.path.dirname(out_path)): - return False, f"[-] out_path:'{out_path}' File not found!" - - if len(key) != 64: - return False, f"[-] key:'{key}' Len Error!" - - password = bytes.fromhex(key.strip()) - with open(db_path, "rb") as file: - blist = file.read() - - salt = blist[:16] - byteKey = hashlib.pbkdf2_hmac("sha1", password, salt, DEFAULT_ITER, KEY_SIZE) - first = blist[16:DEFAULT_PAGESIZE] - if len(salt) != 16: - return False, f"[-] db_path:'{db_path}' File Error!" - - mac_salt = bytes([(salt[i] ^ 58) for i in range(16)]) - mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE) - hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1) - hash_mac.update(b'\x01\x00\x00\x00') - - if hash_mac.digest() != first[-32:-12]: - return False, f"[-] Key Error! (key:'{key}'; db_path:'{db_path}'; out_path:'{out_path}' )" - - newblist = [blist[i:i + DEFAULT_PAGESIZE] for i in range(DEFAULT_PAGESIZE, len(blist), DEFAULT_PAGESIZE)] - - with open(out_path, "wb") as deFile: - deFile.write(SQLITE_FILE_HEADER.encode()) - t = AES.new(byteKey, AES.MODE_CBC, first[-48:-32]) - decrypted = t.decrypt(first[:-48]) - deFile.write(decrypted) - deFile.write(first[-48:]) - - for i in newblist: - t = AES.new(byteKey, AES.MODE_CBC, i[-48:-32]) - decrypted = t.decrypt(i[:-48]) - deFile.write(decrypted) - deFile.write(i[-48:]) - return True, [db_path, out_path, key] - - -def batch_decrypt(key: str, db_path: Union[str, List[str]], out_path: str, is_logging: bool = False): - if not isinstance(key, str) or not isinstance(out_path, str) or not os.path.exists(out_path) or len(key) != 64: - error = f"[-] (key:'{key}' or out_path:'{out_path}') Error!" - if is_logging: print(error) - return False, error - - process_list = [] - - if isinstance(db_path, str): - if not os.path.exists(db_path): - error = f"[-] db_path:'{db_path}' not found!" - if is_logging: print(error) - return False, error - - if os.path.isfile(db_path): - inpath = db_path - outpath = os.path.join(out_path, 'de_' + os.path.basename(db_path)) - process_list.append([key, inpath, outpath]) - - elif os.path.isdir(db_path): - for root, dirs, files in os.walk(db_path): - for file in files: - inpath = os.path.join(root, file) - rel = os.path.relpath(root, db_path) - outpath = os.path.join(out_path, rel, 'de_' + file) - - if not os.path.exists(os.path.dirname(outpath)): - os.makedirs(os.path.dirname(outpath)) - process_list.append([key, inpath, outpath]) - else: - error = f"[-] db_path:'{db_path}' Error " - if is_logging: print(error) - return False, error - - elif isinstance(db_path, list): - rt_path = os.path.commonprefix(db_path) - if not os.path.exists(rt_path): - rt_path = os.path.dirname(rt_path) - - for inpath in db_path: - if not os.path.exists(inpath): - erreor = f"[-] db_path:'{db_path}' not found!" - if is_logging: print(erreor) - return False, erreor - - inpath = os.path.normpath(inpath) - rel = os.path.relpath(os.path.dirname(inpath), rt_path) - outpath = os.path.join(out_path, rel, 'de_' + os.path.basename(inpath)) - if not os.path.exists(os.path.dirname(outpath)): - os.makedirs(os.path.dirname(outpath)) - process_list.append([key, inpath, outpath]) - else: - error = f"[-] db_path:'{db_path}' Error " - if is_logging: print(error) - return False, error - - result = [] - for i in process_list: - result.append(decrypt(*i)) # 解密 - - # 删除空文件夹 - for root, dirs, files in os.walk(out_path, topdown=False): - for dir in dirs: - if not os.listdir(os.path.join(root, dir)): - os.rmdir(os.path.join(root, dir)) - - if is_logging: - print("=" * 32) - success_count = 0 - fail_count = 0 - for code, ret in result: - if code == False: - print(ret) - fail_count += 1 - else: - print(f'[+] "{ret[0]}" -> "{ret[1]}"') - success_count += 1 - print("-" * 32) - print(f"[+] 共 {len(result)} 个文件, 成功 {success_count} 个, 失败 {fail_count} 个") - print("=" * 32) - return True, result - - -def encrypt(key: str, db_path, out_path): - """ - 通过密钥加密数据库 - :param key: 密钥 64位16进制字符串 - :param db_path: 待加密的数据库路径(必须是文件) - :param out_path: 加密后的数据库输出路径(必须是文件) - :return: - """ - if not os.path.exists(db_path) or not os.path.isfile(db_path): - return False, f"[-] db_path:'{db_path}' File not found!" - if not os.path.exists(os.path.dirname(out_path)): - return False, f"[-] out_path:'{out_path}' File not found!" - - if len(key) != 64: - return False, f"[-] key:'{key}' Len Error!" - - password = bytes.fromhex(key.strip()) - with open(db_path, "rb") as file: - blist = file.read() - - salt = os.urandom(16) # 生成随机盐值 - byteKey = hashlib.pbkdf2_hmac("sha1", password, salt, DEFAULT_ITER, KEY_SIZE) - - # 计算消息认证码 - mac_salt = bytes([(salt[i] ^ 58) for i in range(16)]) - mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE) - hash_mac = hmac.new(mac_key, blist[:-32], hashlib.sha1) - hash_mac.update(b'\x01\x00\x00\x00') - mac_digest = hash_mac.digest() - - newblist = [blist[i:i + DEFAULT_PAGESIZE] for i in range(DEFAULT_PAGESIZE, len(blist), DEFAULT_PAGESIZE)] - - with open(out_path, "wb") as enFile: - enFile.write(salt) # 写入盐值 - enFile.write(mac_digest) # 写入消息认证码 - - for i in newblist: - t = AES.new(byteKey, AES.MODE_CBC, os.urandom(16)) # 生成随机的初始向量 - encrypted = t.encrypt(i) # 加密数据块 - enFile.write(encrypted) - - return True, [db_path, out_path, key] - - -if __name__ == '__main__': - # 创建命令行参数解析器 - parser = argparse.ArgumentParser() - parser.add_argument("-k", "--key", type=str, help="密钥", required=True) - parser.add_argument("-i", "--db_path", type=str, help="数据库路径(目录or文件)", required=True) - parser.add_argument("-o", "--out_path", type=str, - help="输出路径(必须是目录),输出文件为 out_path/de_{original_name}", required=True) - - # 解析命令行参数 - args = parser.parse_args() - - # 从命令行参数获取值 - key = args.key - db_path = args.db_path - out_path = args.out_path - - # 调用 decrypt 函数,并传入参数 - result = batch_decrypt(key, db_path, out_path, is_logging=True) diff --git a/build/lib/pywxdump/decrypted/get_wx_decrypted_db.py b/build/lib/pywxdump/decrypted/get_wx_decrypted_db.py deleted file mode 100644 index e77e321..0000000 --- a/build/lib/pywxdump/decrypted/get_wx_decrypted_db.py +++ /dev/null @@ -1,315 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: get_wx_decrypted_db.py -# Description: -# Author: xaoyaoo -# Date: 2023/08/25 -# ------------------------------------------------------------------------------- -import argparse -import os -import re -import shutil -import sqlite3 -# import sys -import winreg - -# sys.path.append(os.path.dirname(os.path.abspath(__file__))) -try: - from decrypted.decrypt import decrypt -except ImportError: - from .decrypt import decrypt - - - -# 开始获取微信数据库 -def get_wechat_db(): - try: - key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Tencent\WeChat", 0, winreg.KEY_READ) - value, _ = winreg.QueryValueEx(key, "FileSavePath") - winreg.CloseKey(key) - w_dir = value - except Exception as e: - try: - w_dir = "MyDocument:" - except Exception as e: - print("读取注册表错误:", str(e)) - return str(e) - - if w_dir == "MyDocument:": - profile = os.path.expanduser("~") - msg_dir = os.path.join(profile, "Documents", "WeChat Files") - else: - msg_dir = os.path.join(w_dir, "WeChat Files") - if not os.path.exists(msg_dir): - return FileNotFoundError("目录不存在") - user_dirs = {} # wx用户目录 - files = os.listdir(msg_dir) - for file_name in files: - if file_name == "All Users" or file_name == "Applet" or file_name == "WMPF": - continue - user_dirs[file_name] = os.path.join(msg_dir, file_name) - - # 获取数据库路径 - for user, user_dir in user_dirs.items(): - Media_p = [] - Micro_p = [] - FTS_p = [] - Sns_p = [] - Msg = [] - Emotion_p = [] - for root, dirs, files in os.walk(user_dir): - for file_name in files: - if re.match(r".*MediaMSG.*\.db$", file_name): - src_path = os.path.join(root, file_name) - Media_p.append(src_path) - elif re.match(r".*MicroMsg.*\.db$", file_name): - src_path = os.path.join(root, file_name) - Micro_p.append(src_path) - elif re.match(r".*FTSMSG.*\.db$", file_name): - src_path = os.path.join(root, file_name) - FTS_p.append(src_path) - elif re.match(r".*MSG.*\.db$", file_name): - src_path = os.path.join(root, file_name) - Msg.append(src_path) - elif re.match(r".*Sns.*\.db$", file_name): - src_path = os.path.join(root, file_name) - Sns_p.append(src_path) - elif re.match(r".*Emotion.*\.db$", file_name): - src_path = os.path.join(root, file_name) - Emotion_p.append(src_path) - Media_p.sort() - Msg.sort() - Micro_p.sort() - # FTS_p.sort() - user_dirs[user] = {"MicroMsg": Micro_p, "Msg": Msg, "MediaMSG": Media_p, "Sns": Sns_p, "Emotion": Emotion_p} - return user_dirs - - -# 解密所有数据库 paths(文件) 到 decrypted_path(目录) -def all_decrypt(keys, paths, decrypted_path): - decrypted_paths = [] - - for key in keys: - for path in paths: - - name = os.path.basename(path) # 文件名 - dtp = os.path.join(decrypted_path, name) # 解密后的路径 - if not decrypt(key, path, dtp): - break - decrypted_paths.append(dtp) - else: # for循环正常结束,没有break - break # 跳出while循环 - else: - return False # while循环正常结束,没有break 解密失败 - return decrypted_paths - - -def merge_copy_msg_db(db_path, save_path): - if isinstance(db_path, list) and len(db_path) == 1: - db_path = db_path[0] - if not os.path.exists(db_path): - raise FileNotFoundError("目录不存在") - shutil.move(db_path, save_path) - - -# 合并相同名称的数据库 -def merge_msg_db(db_path: list, save_path: str, CreateTime: int = 0): # CreateTime: 从这个时间开始的消息 10位时间戳 - - merged_conn = sqlite3.connect(save_path) - merged_cursor = merged_conn.cursor() - - for db_file in db_path: - c_tabels = merged_cursor.execute( - "select tbl_name from sqlite_master where type='table' and tbl_name!='sqlite_sequence'") - tabels_all = c_tabels.fetchall() # 所有表名 - tabels_all = [row[0] for row in tabels_all] - - conn = sqlite3.connect(db_file) - cursor = conn.cursor() - - # 创建表 - if len(tabels_all) < 4: - cursor.execute( - "select tbl_name,sql from sqlite_master where type='table' and tbl_name!='sqlite_sequence'") - c_part = cursor.fetchall() - - for tbl_name, sql in c_part: - if tbl_name in tabels_all: - continue - try: - merged_cursor.execute(sql) - tabels_all.append(tbl_name) - except Exception as e: - print(f"error: {db_file}\n{tbl_name}\n{sql}\n{e}\n**********") - raise e - merged_conn.commit() - - # 写入数据 - for tbl_name in tabels_all: - if tbl_name == "MSG": - MsgSvrIDs = merged_cursor.execute( - f"select MsgSvrID from MSG where CreateTime>{CreateTime} and MsgSvrID!=0").fetchall() - - cursor.execute(f"PRAGMA table_info({tbl_name})") - columns = cursor.fetchall() - columns = [column[1] for column in columns[1:]] - - ex_sql = f"select {','.join(columns)} from {tbl_name} where CreateTime>{CreateTime} and MsgSvrID not in ({','.join([str(MsgSvrID[0]) for MsgSvrID in MsgSvrIDs])})" - cursor.execute(ex_sql) - - insert_sql = f"INSERT INTO {tbl_name} ({','.join(columns)}) VALUES ({','.join(['?' for _ in range(len(columns))])})" - try: - merged_cursor.executemany(insert_sql, cursor.fetchall()) - except Exception as e: - print( - f"error: {db_file}\n{tbl_name}\n{insert_sql}\n{cursor.fetchall()}\n{len(cursor.fetchall())}\n{e}\n**********") - raise e - merged_conn.commit() - else: - ex_sql = f"select * from {tbl_name}" - cursor.execute(ex_sql) - - for r in cursor.fetchall(): - cursor.execute(f"PRAGMA table_info({tbl_name})") - columns = cursor.fetchall() - if len(columns) > 1: - columns = [column[1] for column in columns[1:]] - values = r[1:] - else: - columns = [columns[0][1]] - values = [r[0]] - - query_1 = "select * from " + tbl_name + " where " + columns[0] + "=?" # 查询语句 用于判断是否存在 - c2 = merged_cursor.execute(query_1, values) - if len(c2.fetchall()) > 0: # 已存在 - continue - query = "INSERT INTO " + tbl_name + " (" + ",".join(columns) + ") VALUES (" + ",".join( - ["?" for _ in range(len(values))]) + ")" - - try: - merged_cursor.execute(query, values) - except Exception as e: - print(f"error: {db_file}\n{tbl_name}\n{query}\n{values}\n{len(values)}\n{e}\n**********") - raise e - merged_conn.commit() - - conn.close() - sql = '''delete from MSG where localId in (SELECT localId from MSG - where MsgSvrID != 0 and MsgSvrID in (select MsgSvrID from MSG - where MsgSvrID != 0 GROUP BY MsgSvrID HAVING COUNT(*) > 1) - and localId not in (select min(localId) from MSG - where MsgSvrID != 0 GROUP BY MsgSvrID HAVING COUNT(*) > 1))''' - c = merged_cursor.execute(sql) - merged_conn.commit() - merged_conn.close() - return save_path - - -def merge_media_msg_db(db_path: list, save_path: str): - merged_conn = sqlite3.connect(save_path) - merged_cursor = merged_conn.cursor() - - for db_file in db_path: - - s = "select tbl_name,sql from sqlite_master where type='table' and tbl_name!='sqlite_sequence'" - have_tables = merged_cursor.execute(s).fetchall() - have_tables = [row[0] for row in have_tables] - - conn_part = sqlite3.connect(db_file) - cursor = conn_part.cursor() - - if len(have_tables) < 1: - cursor.execute(s) - table_part = cursor.fetchall() - tblname, sql = table_part[0] - - sql = "CREATE TABLE Media(localId INTEGER PRIMARY KEY AUTOINCREMENT,Key TEXT,Reserved0 INT,Buf BLOB,Reserved1 INT,Reserved2 TEXT)" - try: - merged_cursor.execute(sql) - have_tables.append(tblname) - except Exception as e: - print(f"error: {db_file}\n{tblname}\n{sql}\n{e}\n**********") - raise e - merged_conn.commit() - - for tblname in have_tables: - s = "select Reserved0 from " + tblname - merged_cursor.execute(s) - r0 = merged_cursor.fetchall() - - ex_sql = f"select `Key`,Reserved0,Buf,Reserved1,Reserved2 from {tblname} where Reserved0 not in ({','.join([str(r[0]) for r in r0])})" - cursor.execute(ex_sql) - data = cursor.fetchall() - - insert_sql = f"INSERT INTO {tblname} (Key,Reserved0,Buf,Reserved1,Reserved2) VALUES ({','.join(['?' for _ in range(5)])})" - try: - merged_cursor.executemany(insert_sql, data) - except Exception as e: - print(f"error: {db_file}\n{tblname}\n{insert_sql}\n{data}\n{len(data)}\n{e}\n**********") - raise e - merged_conn.commit() - conn_part.close() - - merged_conn.close() - return save_path - - -if __name__ == '__main__': - # 创建命令行参数解析器 - parser = argparse.ArgumentParser() - parser.add_argument("-k", "--key", help="解密密钥", nargs="+", required=True) - - # 解析命令行参数 - args = parser.parse_args() - - # 检查是否缺少必要参数,并抛出错误 - if not args.key: - raise ValueError("缺少必要的命令行参数!请提供密钥。") - - # 从命令行参数获取值 - keys = args.key - - decrypted_ROOT = os.path.join(os.getcwd(), "decrypted") - - if keys is None: - print("keys is None") - exit(0) - if isinstance(keys, str): - keys = [keys] - - user_dirs = get_wechat_db() - for user, db_path in user_dirs.items(): # 遍历用户 - MicroMsgPaths = db_path["MicroMsg"] - MsgPaths = db_path["Msg"] - MediaMSGPaths = db_path["MediaMSG"] - # FTSMSGPaths = db_path["FTSMSG"] - SnsPaths = db_path["Sns"] - EmotionPaths = db_path["Emotion"] - - decrypted_path_tmp = os.path.join(decrypted_ROOT, user, "tmp") # 解密后的目录 - if not os.path.exists(decrypted_path_tmp): - os.makedirs(decrypted_path_tmp) - - MicroMsgDecryptPaths = all_decrypt(keys, MicroMsgPaths, decrypted_path_tmp) - MsgDecryptPaths = all_decrypt(keys, MsgPaths, decrypted_path_tmp) - MediaMSGDecryptPaths = all_decrypt(keys, MediaMSGPaths, decrypted_path_tmp) - SnsDecryptPaths = all_decrypt(keys, SnsPaths, decrypted_path_tmp) - EmotionDecryptPaths = all_decrypt(keys, EmotionPaths, decrypted_path_tmp) - - # 合并数据库 - decrypted_path = os.path.join(decrypted_ROOT, user) # 解密后的目录 - - MicroMsgDbPath = os.path.join(decrypted_path, "MicroMsg.db") - MsgDbPath = os.path.join(decrypted_path, "MSG_all.db") - MediaMSGDbPath = os.path.join(decrypted_path, "MediaMSG_all.db") - SnsDbPath = os.path.join(decrypted_path, "Sns_all.db") - EmmotionDbPath = os.path.join(decrypted_path, "Emotion_all.db") - - merge_copy_msg_db(MicroMsgDecryptPaths, MicroMsgDbPath) - merge_msg_db(MsgDecryptPaths, MsgDbPath, 0) - merge_media_msg_db(MediaMSGDecryptPaths, MediaMSGDbPath) - merge_copy_msg_db(SnsDecryptPaths, SnsDbPath) - merge_copy_msg_db(EmotionDecryptPaths, EmmotionDbPath) - - shutil.rmtree(decrypted_path_tmp) # 删除临时文件 - print(f"解密完成:{user}, {decrypted_path}") diff --git a/build/lib/pywxdump/show_chat/__init__.py b/build/lib/pywxdump/show_chat/__init__.py deleted file mode 100644 index 2a14789..0000000 --- a/build/lib/pywxdump/show_chat/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: __init__.py.py -# Description: -# Author: xaoyaoo -# Date: 2023/11/10 -# ------------------------------------------------------------------------------- -from .main_window import app_show_chat, get_user_list, export diff --git a/build/lib/pywxdump/show_chat/main_window.py b/build/lib/pywxdump/show_chat/main_window.py deleted file mode 100644 index aee6496..0000000 --- a/build/lib/pywxdump/show_chat/main_window.py +++ /dev/null @@ -1,269 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: GUI.py -# Description: -# Author: xaoyaoo -# Date: 2023/11/10 -# ------------------------------------------------------------------------------- -import base64 -import sqlite3 -import os -import json -import time -import hashlib -from pywxdump.analyse import read_img_dat, decompress_CompressContent, read_audio, parse_xml_string - -from flask import Flask, request, render_template, g, Blueprint - - -def get_md5(s): - m = hashlib.md5() - m.update(s.encode("utf-8")) - return m.hexdigest() - - -def get_user_list(MSG_ALL_db_path, MicroMsg_db_path): - users = [] - # 连接 MSG_ALL.db 数据库,并执行查询 - db1 = sqlite3.connect(MSG_ALL_db_path) - cursor1 = db1.cursor() - cursor1.execute("SELECT StrTalker, COUNT(*) AS ChatCount FROM MSG GROUP BY StrTalker ORDER BY ChatCount DESC") - result = cursor1.fetchall() - - for row in result: - # 获取用户名、昵称、备注和聊天记录数量 - db2 = sqlite3.connect(MicroMsg_db_path) - cursor2 = db2.cursor() - cursor2.execute("SELECT UserName, NickName, Remark FROM Contact WHERE UserName=?", (row[0],)) - result2 = cursor2.fetchone() - if result2: - username, nickname, remark = result2 - chat_count = row[1] - - # 拼接四列数据为元组 - row_data = {"username": username, "nickname": nickname, "remark": remark, "chat_count": chat_count, - "isChatRoom": username.startswith("@chatroom")} - users.append(row_data) - cursor2.close() - db2.close() - cursor1.close() - db1.close() - return users - - -def load_base64_audio_data(MsgSvrID, MediaMSG_all_db_path): - wave_data = read_audio(MsgSvrID, is_wave=True, DB_PATH=MediaMSG_all_db_path) - if not wave_data: - return "" - video_base64 = base64.b64encode(wave_data).decode("utf-8") - video_data = f"data:audio/wav;base64,{video_base64}" - return video_data - - -def load_base64_img_data(start_time, end_time, username_md5, FileStorage_path): - """ - 获取图片的base64数据 - :param start_time: 开始时间戳 - :param end_time: 结束时间戳 - :param username_md5: 用户名的md5值 - :return: - """ - # 获取CreateTime的最大值日期 - min_time = time.strftime("%Y-%m", time.localtime(start_time)) - max_time = time.strftime("%Y-%m", time.localtime(end_time)) - img_path = os.path.join(FileStorage_path, "MsgAttach", username_md5, "Image") - if not os.path.exists(img_path): - return {} - # print(min_time, max_time, img_path) - paths = [] - for root, path, files in os.walk(img_path): - for p in path: - if p >= min_time and p <= max_time: - paths.append(os.path.join(root, p)) - # print(paths) - img_md5_data = {} - for path in paths: - for root, path, files in os.walk(path): - for file in files: - if file.endswith(".dat"): - file_path = os.path.join(root, file) - fomt, md5, out_bytes = read_img_dat(file_path) - out_bytes = base64.b64encode(out_bytes).decode("utf-8") - img_md5_data[md5] = f"data:{fomt};base64,{out_bytes}" - return img_md5_data - - -def load_chat_records(selected_talker, start_index, page_size, user_list, MSG_ALL_db_path, MediaMSG_all_db_path, - FileStorage_path): - username = user_list.get("username", "") - username_md5 = get_md5(username) - type_name_dict = { - 1: {0: "文本"}, - 3: {0: "图片"}, - 34: {0: "语音"}, - 43: {0: "视频"}, - 47: {0: "动画表情"}, - 49: {0: "文本", 1: "类似文字消息而不一样的消息", 5: "卡片式链接", 6: "文件", 8: "用户上传的 GIF 表情", - 19: "合并转发的聊天记录", 33: "分享的小程序", 36: "分享的小程序", 57: "带有引用的文本消息", - 63: "视频号直播或直播回放等", - 87: "群公告", 88: "视频号直播或直播回放等", 2000: "转账消息", 2003: "赠送红包封面"}, - 50: {0: "语音通话"}, - 10000: {0: "系统通知", 4: "拍一拍", 8000: "系统通知"} - } - - # 连接 MSG_ALL.db 数据库,并执行查询 - db1 = sqlite3.connect(MSG_ALL_db_path) - cursor1 = db1.cursor() - - cursor1.execute( - "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent FROM MSG WHERE StrTalker=? ORDER BY CreateTime ASC LIMIT ?,?", - (selected_talker, start_index, page_size)) - result1 = cursor1.fetchall() - - cursor1.close() - db1.close() - - img_md5_data = load_base64_img_data(result1[0][7], result1[-1][7], username_md5, FileStorage_path) # 获取图片的base64数据 - - data = [] - for row in result1: - localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent = row - CreateTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(CreateTime)) - - type_name = type_name_dict.get(Type, {}).get(SubType, "未知") - - content = {"src": "", "msg": "", "style": ""} - - if Type == 47 and SubType == 0: # 动画表情 - content_tmp = parse_xml_string(StrContent) - cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "") - # md5 = content_tmp.get("emoji", {}).get("md5", "") - if cdnurl: - content = {"src": cdnurl, "msg": "表情", "style": "width: 100px; height: 100px;"} - - elif Type == 49 and SubType == 57: # 带有引用的文本消息 - CompressContent = CompressContent.rsplit(b'\x00', 1)[0] - content["msg"] = decompress_CompressContent(CompressContent) - try: - content["msg"] = content["msg"].decode("utf-8") - content["msg"] = parse_xml_string(content["msg"]) - content["msg"] = json.dumps(content["msg"], ensure_ascii=False) - except Exception as e: - content["msg"] = "[带有引用的文本消息]解析失败" - elif Type == 34 and SubType == 0: # 语音 - tmp_c = parse_xml_string(StrContent) - voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "") - transtext = tmp_c.get("voicetrans", {}).get("transtext", "") - if voicelength.isdigit(): - voicelength = int(voicelength) / 1000 - voicelength = f"{voicelength:.2f}" - content["msg"] = f"语音时长:{voicelength}秒\n翻译结果:{transtext}" - - src = load_base64_audio_data(MsgSvrID, MediaMSG_all_db_path=MediaMSG_all_db_path) - content["src"] = src - elif Type == 3 and SubType == 0: # 图片 - xml_content = parse_xml_string(StrContent) - md5 = xml_content.get("img", {}).get("md5", "") - if md5: - content["src"] = img_md5_data.get(md5, "") - else: - content["src"] = "" - content["msg"] = "图片" - - else: - content["msg"] = StrContent - - row_data = {"MsgSvrID": MsgSvrID, "type_name": type_name, "is_sender": IsSender, - "content": content, "CreateTime": CreateTime} - data.append(row_data) - return data - - -def export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path, page_size=500): - name_save = user.get("remark", user.get("nickname", user.get("username", ""))) - username = user.get("username", "") - - chatCount = user.get("chat_count", 0) - if chatCount == 0: - return False, "没有聊天记录" - - for i in range(0, chatCount, page_size): - start_index = i - data = load_chat_records(username, start_index, page_size, user, MSG_ALL_db_path, MediaMSG_all_db_path, - FileStorage_path) - if len(data) == 0: - break - save_path = os.path.join(outpath, f"{name_save}_{int(i / page_size)}.html") - with open(save_path, "w", encoding="utf-8") as f: - f.write(render_template("chat.html", msgs=data)) - return True, f"导出成功{outpath}" - - -def export(username, outpath, MSG_ALL_db_path, MicroMsg_db_path, MediaMSG_all_db_path, FileStorage_path): - if not os.path.exists(outpath): - outpath = os.path.join(os.getcwd(), "export" + os.sep + username) - if not os.path.exists(outpath): - os.makedirs(outpath) - - USER_LIST = get_user_list(MSG_ALL_db_path, MicroMsg_db_path) - user = list(filter(lambda x: x["username"] == username, USER_LIST)) - - if username and len(user) > 0: - user = user[0] - return export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path) - - -app_show_chat = Blueprint('show_chat_main', __name__, template_folder='templates') -app_show_chat.debug = False - - -# 主页 - 显示用户列表 -@app_show_chat.route('/') -def index(): - g.USER_LIST = get_user_list(g.MSG_ALL_db_path, g.MicroMsg_db_path) - return render_template("index.html", users=g.USER_LIST) - - -# 获取聊天记录 -@app_show_chat.route('/get_chat_data', methods=["GET", 'POST']) -def get_chat_data(): - username = request.args.get("username", "") - user = list(filter(lambda x: x["username"] == username, g.USER_LIST)) - - if username and len(user) > 0: - user = user[0] - - limit = int(request.args.get("limit", 100)) # 每页显示的条数 - page = int(request.args.get("page", user.get("chat_count", limit) / limit)) # 当前页数 - - start_index = (page - 1) * limit - page_size = limit - - data = load_chat_records(username, start_index, page_size, user, g.MSG_ALL_db_path, g.MediaMSG_all_db_path, - g.FileStorage_path) - return render_template("chat.html", msgs=data) - else: - return "error" - - -# 聊天记录导出为html -@app_show_chat.route('/export_chat_data', methods=["GET", 'POST']) -def get_export(): - username = request.args.get("username", "") - - user = list(filter(lambda x: x["username"] == username, g.USER_LIST)) - - if username and len(user) > 0: - user = user[0] - n = f"{user.get('username', '')}_{user.get('nickname', '')}_{user.get('remark', '')}" - outpath = os.path.join(os.getcwd(), "export" + os.sep + n) - if not os.path.exists(outpath): - os.makedirs(outpath) - - ret = export_html(user, outpath, g.MSG_ALL_db_path, g.MediaMSG_all_db_path, g.FileStorage_path, page_size=200) - if ret[0]: - return ret[1] - else: - return ret[1] - else: - return "error" diff --git a/build/lib/pywxdump/show_chat/templates/chat.html b/build/lib/pywxdump/show_chat/templates/chat.html deleted file mode 100644 index cb2861d..0000000 --- a/build/lib/pywxdump/show_chat/templates/chat.html +++ /dev/null @@ -1,71 +0,0 @@ - - -
- -名称 | -数量 | -
---|---|
- {% if user.remark not in [None, '']%} - {{user.remark}} - {% else %} - {{user.nickname}} - {% endif %} - | -{{user.chat_count}} | -