Delete build/lib/pywxdump directory

Signed-off-by: xaoyaoo <37209452+xaoyaoo@users.noreply.github.com>
This commit is contained in:
xaoyaoo 2023-12-04 19:37:23 +08:00 committed by GitHub
parent 3139e2474f
commit 96a7a1116d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 0 additions and 3332 deletions

View File

@ -1,20 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: __init__.py.py
# Description:
# Author: xaoyaoo
# Date: 2023/10/14
# -------------------------------------------------------------------------------
from .bias_addr.get_bias_addr import BiasAddr
from .wx_info.get_wx_info import read_info
from .wx_info.get_wx_db import get_wechat_db
from .decrypted.decrypt import batch_decrypt, decrypt,encrypt
from .decrypted.get_wx_decrypted_db import all_decrypt, merge_copy_msg_db, merge_msg_db, merge_media_msg_db
from .analyse.parse import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, parse_xml_string
from .show_chat import app_show_chat, get_user_list, export
import os,json
VERSION_LIST_PATH = os.path.join(os.path.dirname(__file__), "version_list.json")
with open(VERSION_LIST_PATH, "r", encoding="utf-8") as f:
VERSION_LIST = json.load(f)

View File

@ -1,8 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: __init__.py.py
# Description:
# Author: xaoyaoo
# Date: 2023/09/27
# -------------------------------------------------------------------------------
from .parse import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, parse_xml_string

View File

@ -1,370 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: analyser.py
# Description:
# Author: xaoyaoo
# Date: 2023/12/01
# -------------------------------------------------------------------------------
import sqlite3
import time
from collections import Counter
import pandas as pd
from pywxdump.analyse import parse_xml_string
def read_msgs(MSG_path, selected_talker=None, start_time=time.time() * 3600 * 24 * 365, end_time=time.time()):
"""
读取消息内容-MSG.db 包含IsSenderStrContentStrTalkerypeSubTypeCreateTimeMsgSvrID
:param MSG_path: MSG.db 路径
:param selected_talker: 选中的聊天对象
:param start_time: 开始时间 时间戳10位
:param end_time: 结束时间 时间戳10位
:return:
"""
type_name_dict = {
1: {0: "文本"},
3: {0: "图片"},
34: {0: "语音"},
43: {0: "视频"},
47: {0: "动画表情"},
49: {0: "文本", 1: "类文本消息", 5: "卡片式链接", 6: "文件", 8: "上传的GIF表情",
19: "合并转发聊天记录", 33: "分享的小程序", 36: "分享的小程序", 57: "带有引用的文本",
63: "视频号直播或回放等",
87: "群公告", 88: "视频号直播或回放等", 2000: "转账消息", 2003: "红包封面"},
50: {0: "语音通话"},
10000: {0: "系统通知", 4: "拍一拍", 8000: "系统通知"}
}
# 连接 MSG_ALL.db 数据库,并执行查询
db1 = sqlite3.connect(MSG_path)
cursor1 = db1.cursor()
if isinstance(start_time, str):
start_time = time.mktime(time.strptime(start_time, "%Y-%m-%d %H:%M:%S"))
if isinstance(end_time, str):
end_time = time.mktime(time.strptime(end_time, "%Y-%m-%d %H:%M:%S"))
if selected_talker is None or selected_talker == "": # 如果 selected_talker 为 None则查询全部对话
cursor1.execute(
"SELECT MsgSvrID,IsSender, StrContent, StrTalker, Type, SubType,CreateTime FROM MSG WHERE CreateTime>=? AND CreateTime<=? ORDER BY CreateTime ASC",
(start_time, end_time))
else:
cursor1.execute(
"SELECT MsgSvrID,IsSender, StrContent, StrTalker, Type, SubType,CreateTime FROM MSG WHERE StrTalker=? AND CreateTime>=? AND CreateTime<=? ORDER BY CreateTime ASC",
(selected_talker, start_time, end_time))
result1 = cursor1.fetchall()
cursor1.close()
db1.close()
def get_emoji_cdnurl(row):
if row["type_name"] == "动画表情":
parsed_content = parse_xml_string(row["StrContent"])
if isinstance(parsed_content, dict) and "emoji" in parsed_content:
return parsed_content["emoji"].get("cdnurl", "")
return row["content"]
init_data = pd.DataFrame(result1, columns=["MsgSvrID", "IsSender", "StrContent", "StrTalker", "Type", "SubType",
"CreateTime"])
init_data["CreateTime"] = pd.to_datetime(init_data["CreateTime"], unit="s")
init_data["AdjustedTime"] = init_data["CreateTime"] - pd.Timedelta(hours=4)
init_data["AdjustedTime"] = init_data["AdjustedTime"].dt.strftime("%Y-%m-%d %H:%M:%S")
init_data["CreateTime"] = init_data["CreateTime"].dt.strftime("%Y-%m-%d %H:%M:%S")
init_data["type_name"] = init_data.apply(lambda x: type_name_dict.get(x["Type"], {}).get(x["SubType"], "未知"),
axis=1)
init_data["content"] = init_data.apply(lambda x: x["StrContent"] if x["type_name"] == "文本" else "", axis=1)
init_data["content"] = init_data.apply(get_emoji_cdnurl, axis=1)
init_data["content_len"] = init_data.apply(lambda x: len(x["content"]) if x["type_name"] == "文本" else 0, axis=1)
chat_data = init_data[
["MsgSvrID", "IsSender", "StrTalker", "type_name", "content", "content_len", "CreateTime", "AdjustedTime"]]
return True, chat_data
# 绘制直方图
def draw_hist_all_count(chat_data, out_path="", is_show=False):
try:
import matplotlib.pyplot as plt
except ImportError as e:
print("error", e)
raise ImportError("请安装matplotlib库")
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
type_count = Counter(chat_data["type_name"])
# 对type_count按值进行排序并返回排序后的结果
sorted_type_count = dict(sorted(type_count.items(), key=lambda item: item[1], reverse=True))
plt.figure(figsize=(12, 8))
plt.bar(range(len(sorted_type_count)), list(sorted_type_count.values()), tick_label=list(sorted_type_count.keys()))
plt.title("消息类型分布图")
plt.xlabel("消息类型")
plt.ylabel("数量")
# 设置x轴标签的旋转角度为45度
plt.xticks(rotation=-45)
# 在每个柱上添加数字标签
for i, v in enumerate(list(sorted_type_count.values())):
plt.text(i, v, str(v), ha='center', va='bottom')
if out_path != "":
plt.savefig(out_path)
if is_show:
plt.show()
plt.close()
# 按照interval绘制折线图
def draw_line_type_name(chat_data, interval="W", type_name_list=None, out_path="", is_show=False):
"""
绘制折线图横轴为时间纵轴为消息数量不同类型的消息用不同的颜色表示
:param chat_data:
:param interval:
:param type_name_list: 消息类型列表按照列表中的顺序绘制折线图 可选全部类型发送接收总字数发送字数接收字数其他类型
:param out_path:
:param is_show:
:return:
"""
if type_name_list is None:
type_name_list = ["全部类型", "发送", "接收"] + ["总字数", "发送字数", "接收字数"]
# type_name_list = ["总字数", "发送字数", "接收字数"]
try:
import matplotlib.pyplot as plt
import pandas as pd
except ImportError as e:
print("error", e)
raise ImportError("请安装matplotlib库")
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
chat_data["CreateTime"] = pd.to_datetime(chat_data["CreateTime"])
chat_data["AdjustedTime"] = pd.to_datetime(chat_data["AdjustedTime"])
# interval = interval.lower()
interval_dict = {"day": "%Y-%m-%d", "month": "%Y-%m", "year": "%Y", "week": "%Y-%W",
"d": "%Y-%m-%d", "m": "%Y-%m", "y": "%Y", "W": "%Y-%W"
}
if interval not in interval_dict:
raise ValueError("interval参数错误可选值为day、month、year、week")
chat_data["interval"] = chat_data["AdjustedTime"].dt.strftime(interval_dict[interval])
# 根据chat_data["interval"]最大值和最小值,生成一个时间间隔列表
interval_list = pd.date_range(chat_data["AdjustedTime"].min(), chat_data["AdjustedTime"].max(), freq=interval)
interval_list = interval_list.append(pd.Index([interval_list[-1] + pd.Timedelta(days=1)])) # 最后一天加一天
# 构建数据集
# interval type_name1 type_name2 type_name3
# 2021-01 文本数量 其他类型数量 其他类型数量
# 2021-02 文本数量 其他类型数量 其他类型数量
type_data = pd.DataFrame(columns=["interval"] + list(chat_data["type_name"].unique()))
type_data["interval"] = interval_list.strftime(interval_dict[interval])
type_data = type_data.set_index("interval")
for type_name in chat_data["type_name"].unique():
type_data[type_name] = chat_data[chat_data["type_name"] == type_name].groupby("interval").size()
type_data["全部类型"] = type_data.sum(axis=1)
type_data["发送"] = chat_data[chat_data["IsSender"] == 1].groupby("interval").size()
type_data["接收"] = chat_data[chat_data["IsSender"] == 0].groupby("interval").size()
type_data["总字数"] = chat_data.groupby("interval")["content_len"].sum()
type_data["发送字数"] = chat_data[chat_data["IsSender"] == 1].groupby("interval")["content_len"].sum()
type_data["接收字数"] = chat_data[chat_data["IsSender"] == 0].groupby("interval")["content_len"].sum()
type_data = type_data.fillna(0)
# 调整typename顺序使其按照总数量排序只要最大的5个
type_data = type_data.reindex(type_data.sum().sort_values(ascending=False).index, axis=1)
if type_name_list is not None:
type_data = type_data[type_name_list]
else:
type_data = type_data.iloc[:, :5]
# if interval == "W" or interval == "week": # 改为当前周的周一的日期
# #
plt.figure(figsize=(12, 8))
# 绘制折线图
for type_name in type_data.columns:
plt.plot(type_data.index, type_data[type_name], label=type_name)
# 设置x轴标签的旋转角度为45度
plt.xticks(rotation=-45)
# 设置标题、坐标轴标签、图例等信息
plt.title("消息类型分布图")
plt.xlabel("时间")
plt.ylabel("数量")
plt.legend(loc="upper right") # 设置图例位置
# 显示图形
if out_path != "":
plt.savefig(out_path)
if is_show:
plt.tight_layout()
plt.show()
plt.close()
def wordcloud_generator(chat_data, interval="m", stopwords=None, out_path="", is_show=False, bg_img=None,
font="C:\Windows\Fonts\simhei.ttf"):
"""
词云
:param is_show: 是否显示
:param img_path: 背景图片路径
:param text: 文本
:param font: 字体路径
:return:
"""
try:
from wordcloud import WordCloud, ImageColorGenerator
import wordcloud
import jieba
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.font_manager import fontManager
import pandas as pd
import codecs
import re
from imageio import imread
except ImportError as e:
print("error", e)
raise ImportError("请安装wordcloud,jieba,numpy,matplotlib,pillow库")
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
chat_data["CreateTime"] = pd.to_datetime(chat_data["CreateTime"])
chat_data["AdjustedTime"] = pd.to_datetime(chat_data["AdjustedTime"])
# interval = interval.lower()
interval_dict = {"day": "%Y-%m-%d", "month": "%Y-%m", "year": "%Y", "week": "%Y-%W",
"d": "%Y-%m-%d", "m": "%Y-%m", "y": "%Y", "W": "%Y-%W"
}
if interval not in interval_dict:
raise ValueError("interval参数错误可选值为day、month、year、week")
chat_data["interval"] = chat_data["AdjustedTime"].dt.strftime(interval_dict[interval])
# 根据chat_data["interval"]最大值和最小值,生成一个时间间隔列表
interval_list = pd.date_range(chat_data["AdjustedTime"].min(), chat_data["AdjustedTime"].max(), freq=interval)
interval_list = interval_list.append(pd.Index([interval_list[-1] + pd.Timedelta(days=1)])) # 最后一天加一天
# 构建数据集
# interval text_all text_sender text_receiver
# 2021-01 文本\n合并 聊天记录\n文本\n合并 聊天记录\n文本\n合并 聊天记录\n
def merage_text(x):
pattern = re.compile("(\[.+?\])") # 匹配表情
rt = "\n".join(x)
rt = pattern.sub('', rt).replace("\n", " ")
return rt
chat_data["content"] = chat_data.apply(lambda x: x["content"] if x["type_name"] == "文本" else "", axis=1)
text_data = pd.DataFrame(columns=["interval", "text_all", "text_sender", "text_receiver"])
text_data["interval"] = interval_list.strftime(interval_dict[interval])
text_data = text_data.set_index("interval")
# 使用“\n”合并
text_data["text_all"] = chat_data.groupby("interval")["content"].apply(merage_text)
text_data["text_sender"] = chat_data[chat_data["IsSender"] == 1].groupby("interval")["content"].apply(merage_text)
text_data["text_receiver"] = chat_data[chat_data["IsSender"] == 0].groupby("interval")["content"].apply(merage_text)
def gen_img(texts,out_path,is_show,bg_img,title=""):
words = jieba.lcut(texts)
res = [word for word in words if word not in stopwords and word.replace(" ", "") != "" and len(word) > 1]
count_dict = dict(Counter(res))
if bg_img:
bgimg = imread(open(bg_img, 'rb'))
# 获得词云对象,设定词云背景颜色及其图片和字体
wc = WordCloud(background_color='white', mask=bgimg, font_path='simhei.ttf', mode='RGBA', include_numbers=False,
random_state=0)
else:
# 如果你的背景色是透明的,请用这两条语句替换上面两条
bgimg = None
wc = WordCloud(background_color='white', mode='RGBA', font_path='simhei.ttf', include_numbers=False,
random_state=0,width=500, height=500) # 如果不指定中文字体路径,词云会乱码
wc = wc.fit_words(count_dict)
fig = plt.figure(figsize=(8, 8))
fig.suptitle(title, fontsize=26)
ax = fig.subplots()
ax.imshow(wc)
ax.axis('off')
if out_path != "":
plt.savefig(out_path)
if is_show:
plt.show()
plt.close()
for i in text_data.index:
out_path = f"out/img_{i}.png"
gen_img(text_data["text_all"][i], out_path=out_path, is_show=False, bg_img=bg_img, title=f"全部({i})")
# gen_img(text_data["text_sender"][i], out_path="", is_show=is_show, bg_img=bg_img, title=f"发送_{i}")
# gen_img(text_data["text_receiver"][i], out_path="", is_show=is_show, bg_img=bg_img, title=f"接收_{i}")
# time.sleep(1)
# 情感分析
def sentiment_analysis(chat_data, stopwords="", out_path="", is_show=False, bg_img=None):
try:
from snownlp import SnowNLP
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
except ImportError as e:
print("error", e)
raise ImportError("请安装snownlp,pandas,matplotlib,seaborn库")
sns.set_style('white', {'font.sans-serif': ['simhei', 'FangSong']})
chats = []
for row in chat_data:
if row["type_name"] != "文本" or row["content"] == "":
continue
chats.append(row)
scores = []
for row in chats:
s = SnowNLP(row["content"])
scores.append(s.sentiments)
def draw(data):
df = pd.DataFrame({'Sentiment Score': data})
plt.figure(figsize=(8, 6))
sns.histplot(data=df, x='Sentiment Score', kde=True)
plt.title("Sentiment Analysis")
plt.xlabel("Sentiment Score")
plt.ylabel("Frequency")
if out_path != "":
plt.savefig(out_path)
if is_show:
plt.show()
plt.close()
draw(scores)
if __name__ == '__main__':
MSG_PATH = r""
selected_talker = "wxid_"
start_time = time.time() - 3600 * 24 * 50000
end_time = time.time()
code, chat_data = read_msgs(MSG_PATH, selected_talker, start_time, end_time)
# print(chat_data)
# code, data, classify_count, all_type_count = merge_chat_data(chat_data, interval="month")
# draw_hist_all_count(chat_data, is_show=True) # 绘制直方图 消息类型分布图
# draw_line_type_name(chat_data, is_show=True) # 绘制折线图 消息类型分布图
# bg_img = 'img.png'
stopwords = ['', '', '', '', '', '', '', '', '', '', '', '', '一个', '', '', '', '',
'', '',
'', '', '', '', '没有', '', '', '自己', '']
wordcloud_generator(chat_data, stopwords=stopwords, out_path="", is_show=True)
# sentiment_analysis(chat_data)

View File

@ -1,262 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: parse.py
# Description: 解析数据库内容
# Author: xaoyaoo
# Date: 2023/09/27
# -------------------------------------------------------------------------------
import os.path
import sqlite3
import pysilk
from io import BytesIO
import wave
import pyaudio
import requests
import hashlib
import lz4.block
import blackboxprotobuf
from PIL import Image
#import xml.etree.ElementTree as ET
import lxml.etree as ET #这个模块更健壮些微信XML格式有时有非标格式会导致xml.etree.ElementTree处理失败
def get_md5(data):
md5 = hashlib.md5()
md5.update(data)
return md5.hexdigest()
def parse_xml_string(xml_string):
"""
解析 XML 字符串
:param xml_string: 要解析的 XML 字符串
:return: 解析结果以字典形式返回
"""
def parse_xml(element):
"""
递归解析 XML 元素
:param element: 要解析的 XML 元素
:return: 解析结果以字典形式返回
"""
result = {}
# 解析当前元素的属性
if element is None or element.attrib is None:
return result
for key, value in element.attrib.items():
result[key] = value
# 解析当前元素的子元素
for child in element:
child_result = parse_xml(child)
# 如果子元素的标签已经在结果中存在,则将其转换为列表
if child.tag in result:
if not isinstance(result[child.tag], list):
result[child.tag] = [result[child.tag]]
result[child.tag].append(child_result)
else:
result[child.tag] = child_result
# 如果当前元素没有子元素,则将其文本内容作为值保存
if not result and element.text:
result = element.text
return result
if xml_string is None or not isinstance(xml_string, str):
return None
try:
parser = ET.XMLParser(recover=True) # 有时微信的聊天记录里面会冒出来xml格式不对的情况这里把parser设置成忽略错误
root = ET.fromstring(xml_string,parser)
except Exception as e:
return xml_string
return parse_xml(root)
def read_img_dat(input_data):
"""
读取图片文件dat格式
:param input_data: 图片文件路径或者图片文件数据
:return: 图片格式图片md5图片数据
"""
# 常见图片格式的文件头
img_head = {
b"\xFF\xD8\xFF": ".jpg",
b"\x89\x50\x4E\x47": ".png",
b"\x47\x49\x46\x38": ".gif",
b"\x42\x4D": ".BMP",
b"\x49\x49": ".TIFF",
b"\x4D\x4D": ".TIFF",
b"\x00\x00\x01\x00": ".ICO",
b"\x52\x49\x46\x46": ".WebP",
b"\x00\x00\x00\x18\x66\x74\x79\x70\x68\x65\x69\x63": ".HEIC",
}
if isinstance(input_data, str):
with open(input_data, "rb") as f:
input_bytes = f.read()
else:
input_bytes = input_data
try:
import numpy as np
input_bytes = np.frombuffer(input_bytes, dtype=np.uint8)
for hcode in img_head: # 遍历文件头
t = input_bytes[0] ^ hcode[0] # 异或解密
if np.all(t == np.bitwise_xor(np.frombuffer(input_bytes[:len(hcode)], dtype=np.uint8),
np.frombuffer(hcode, dtype=np.uint8))): # 使用NumPy进行向量化的异或解密操作并进行类型转换
fomt = img_head[hcode] # 获取文件格式
out_bytes = np.bitwise_xor(input_bytes, t) # 使用NumPy进行向量化的异或解密操作
md5 = get_md5(out_bytes)
return fomt, md5, out_bytes
return False
except ImportError:
pass
for hcode in img_head:
t = input_bytes[0] ^ hcode[0]
for i in range(1, len(hcode)):
if t == input_bytes[i] ^ hcode[i]:
fomt = img_head[hcode]
out_bytes = bytearray()
for nowByte in input_bytes: # 读取文件
newByte = nowByte ^ t # 异或解密
out_bytes.append(newByte)
md5 = get_md5(out_bytes)
return fomt, md5, out_bytes
return False
def read_emoji(cdnurl, is_show=False):
headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 10; Redmi K30 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Mobile Safari/537.36"
}
r1 = requests.get(cdnurl, headers=headers)
rdata = r1.content
if is_show: # 显示表情
img = Image.open(BytesIO(rdata))
img.show()
return rdata
def decompress_CompressContent(data):
"""
解压缩MsgCompressContent内容
:param data:
:return:
"""
if data is None or not isinstance(data, bytes):
return None
dst = lz4.block.decompress(compress_content, uncompressed_size=len(compress_content) << 8)
dst.decode().replace('\x00', '') # 已经解码完成后还含有0x00的部分要删掉要不后面ET识别的时候会报错
uncompressed_data = dst.encode()
return uncompressed_data
def read_audio_buf(buf_data, is_play=False, is_wave=False, rate=24000):
silk_file = BytesIO(buf_data) # 读取silk文件
pcm_file = BytesIO() # 创建pcm文件
pysilk.decode(silk_file, pcm_file, rate) # 解码silk文件->pcm文件
pcm_data = pcm_file.getvalue() # 获取pcm文件数据
silk_file.close() # 关闭silk文件
pcm_file.close() # 关闭pcm文件
if is_play: # 播放音频
def play_audio(pcm_data, rate):
p = pyaudio.PyAudio() # 实例化pyaudio
stream = p.open(format=pyaudio.paInt16, channels=1, rate=rate, output=True) # 创建音频流对象
stream.write(pcm_data) # 写入音频流
stream.stop_stream() # 停止音频流
stream.close() # 关闭音频流
p.terminate() # 关闭pyaudio
play_audio(pcm_data, rate)
if is_wave: # 转换为wav文件
wave_file = BytesIO() # 创建wav文件
with wave.open(wave_file, 'wb') as wf:
wf.setparams((1, 2, rate, 0, 'NONE', 'NONE')) # 设置wav文件参数
wf.writeframes(pcm_data) # 写入wav文件
rdata = wave_file.getvalue() # 获取wav文件数据
wave_file.close() # 关闭wav文件
return rdata
return pcm_data
def read_audio(MsgSvrID, is_play=False, is_wave=False, DB_PATH: str = "", rate=24000):
if DB_PATH == "":
return False
DB = sqlite3.connect(DB_PATH)
cursor = DB.cursor()
sql = "select Buf from Media where Reserved0='{}'".format(MsgSvrID)
DBdata = cursor.execute(sql).fetchall()
if len(DBdata) == 0:
return False
data = DBdata[0][0] # [1:] + b'\xFF\xFF'
pcm_data = read_audio_buf(data, is_play, is_wave, rate)
return pcm_data
def wordcloud_generator(text, out_path="", is_show=False, img_path="", font="C:\Windows\Fonts\simhei.ttf"):
"""
词云
:param is_show: 是否显示
:param img_path: 背景图片路径
:param text: 文本
:param font: 字体路径
:return:
"""
try:
from wordcloud import WordCloud
import jieba
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.font_manager import fontManager
except ImportError as e:
print("error", e)
raise ImportError("请安装wordcloud,jieba,numpy,matplotlib,pillow库")
words = jieba.lcut(text) # 精确分词
newtxt = ' '.join(words) # 空格拼接
# 字体路径
# 创建WordCloud对象
wordcloud1 = WordCloud(width=800, height=400, background_color='white', font_path=font)
wordcloud1.generate(newtxt)
if out_path and out_path != "":
wordcloud1.to_file("wordcloud.png") # 保存图片
if img_path and os.path.exists(img_path): # 设置背景图片
img_color = np.array(Image.open(img_path)) # 读取背景图片
img_color = img_color.reshape((img_color.shape[0] * img_color.shape[1], 3))
wordcloud1.recolor(color_func=img_color) # 设置背景图片颜色
if is_show:
# 显示词云
wordcloud_img = wordcloud1.to_image()
wordcloud_img.show()
def read_BytesExtra(data):
if data[0:2] == '0x':
data = data[2:]
data = bytes.fromhex(data)
print(data)
print('*' * 50)
print(data.decode('utf-8', errors='ignore'))
if __name__ == '__main__':
data = ''
read_BytesExtra(data)
print('*' * 50)
data2 = ''
read_BytesExtra(data2)

View File

@ -1,8 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: __init__.py.py
# Description:
# Author: xaoyaoo
# Date: 2023/10/14
# -------------------------------------------------------------------------------
from .get_bias_addr import BiasAddr

View File

@ -1,513 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: get_base_addr.py
# Description:
# Author: xaoyaoo
# Date: 2023/08/22
# -------------------------------------------------------------------------------
import argparse
import ctypes
import hashlib
import json
import multiprocessing
import os
import re
import sys
import psutil
from win32com.client import Dispatch
from pymem import Pymem
import pymem
import hmac
ReadProcessMemory = ctypes.windll.kernel32.ReadProcessMemory
void_p = ctypes.c_void_p
KEY_SIZE = 32
DEFAULT_PAGESIZE = 4096
DEFAULT_ITER = 64000
def validate_key(key, salt, first, mac_salt):
byteKey = hashlib.pbkdf2_hmac("sha1", key, salt, DEFAULT_ITER, KEY_SIZE)
mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1)
hash_mac.update(b'\x01\x00\x00\x00')
if hash_mac.digest() == first[-32:-12]:
return True
else:
return False
def get_exe_bit(file_path):
"""
获取 PE 文件的位数: 32 位或 64
:param file_path: PE 文件路径(可执行文件)
:return: 如果遇到错误则返回 64
"""
try:
with open(file_path, 'rb') as f:
dos_header = f.read(2)
if dos_header != b'MZ':
print('get exe bit error: Invalid PE file')
return 64
# Seek to the offset of the PE signature
f.seek(60)
pe_offset_bytes = f.read(4)
pe_offset = int.from_bytes(pe_offset_bytes, byteorder='little')
# Seek to the Machine field in the PE header
f.seek(pe_offset + 4)
machine_bytes = f.read(2)
machine = int.from_bytes(machine_bytes, byteorder='little')
if machine == 0x14c:
return 32
elif machine == 0x8664:
return 64
else:
print('get exe bit error: Unknown architecture: %s' % hex(machine))
return 64
except IOError:
print('get exe bit error: File not found or cannot be opened')
return 64
def get_exe_version(file_path):
"""
获取 PE 文件的版本号
:param file_path: PE 文件路径(可执行文件)
:return: 如果遇到错误则返回
"""
file_version = Dispatch("Scripting.FileSystemObject").GetFileVersion(file_path)
return file_version
def find_all(c: bytes, string: bytes, base_addr=0):
"""
查找字符串中所有子串的位置
:param c: 子串 b'123'
:param string: 字符串 b'123456789123'
:return:
"""
return [base_addr + m.start() for m in re.finditer(re.escape(c), string)]
class BiasAddr:
def __init__(self, account, mobile, name, key, db_path):
self.account = account.encode("utf-8")
self.mobile = mobile.encode("utf-8")
self.name = name.encode("utf-8")
self.key = bytes.fromhex(key) if key else b""
self.db_path = db_path if os.path.exists(db_path) else ""
self.process_name = "WeChat.exe"
self.module_name = "WeChatWin.dll"
self.pm = None # Pymem 对象
self.is_WoW64 = None # True: 32位进程运行在64位系统上 False: 64位进程运行在64位系统上
self.process_handle = None # 进程句柄
self.pid = None # 进程ID
self.version = None # 微信版本号
self.process = None # 进程对象
self.exe_path = None # 微信路径
self.address_len = None # 4 if self.bits == 32 else 8 # 4字节或8字节
self.bits = 64 if sys.maxsize > 2 ** 32 else 32 # 系统32位或64位
def get_process_handle(self):
try:
self.pm = Pymem(self.process_name)
self.pm.check_wow64()
self.is_WoW64 = self.pm.is_WoW64
self.process_handle = self.pm.process_handle
self.pid = self.pm.process_id
self.process = psutil.Process(self.pid)
self.exe_path = self.process.exe()
self.version = get_exe_version(self.exe_path)
version_nums = list(map(int, self.version.split("."))) # 将版本号拆分为数字列表
if version_nums[0] <= 3 and version_nums[1] <= 9 and version_nums[2] <= 2:
self.address_len = 4
else:
self.address_len = 8
return True, ""
except pymem.exception.ProcessNotFound:
return False, "[-] WeChat No Run"
def search_memory_value(self, value: bytes, module_name="WeChatWin.dll"):
# 创建 Pymem 对象
module = pymem.process.module_from_name(self.pm.process_handle, module_name)
ret = self.pm.pattern_scan_module(value, module, return_multiple=True)
ret = ret[-1] - module.lpBaseOfDll if len(ret) > 0 else 0
return ret
def get_key_bias1(self):
try:
byteLen = self.address_len # 4 if self.bits == 32 else 8 # 4字节或8字节
keyLenOffset = 0x8c if self.bits == 32 else 0xd0
keyWindllOffset = 0x90 if self.bits == 32 else 0xd8
module = pymem.process.module_from_name(self.process_handle, self.module_name)
keyBytes = b'-----BEGIN PUBLIC KEY-----\n...'
publicKeyList = pymem.pattern.pattern_scan_all(self.process_handle, keyBytes, return_multiple=True)
keyaddrs = []
for addr in publicKeyList:
keyBytes = addr.to_bytes(byteLen, byteorder="little", signed=True) # 低位在前
may_addrs = pymem.pattern.pattern_scan_module(self.process_handle, module, keyBytes,
return_multiple=True)
if may_addrs != 0 and len(may_addrs) > 0:
for addr in may_addrs:
keyLen = self.pm.read_uchar(addr - keyLenOffset)
if keyLen != 32:
continue
keyaddrs.append(addr - keyWindllOffset)
return keyaddrs[-1] - module.lpBaseOfDll if len(keyaddrs) > 0 else 0
except:
return 0
def search_key(self, key: bytes):
key = re.escape(key) # 转义特殊字符
key_addr = self.pm.pattern_scan_all(key, return_multiple=False)
key = key_addr.to_bytes(self.address_len, byteorder='little', signed=True)
result = self.search_memory_value(key, self.module_name)
return result
def get_key_bias2(self, wx_db_path, account_bias=0):
wx_db_path = os.path.join(wx_db_path, "Msg", "MicroMsg.db")
if not os.path.exists(wx_db_path):
return 0
def get_maybe_key(mem_data):
min_addr = 0xffffffffffffffffffffffff
max_addr = 0
for module1 in pm.list_modules():
if module1.lpBaseOfDll < min_addr:
min_addr = module1.lpBaseOfDll
if module1.lpBaseOfDll > max_addr:
max_addr = module1.lpBaseOfDll + module1.SizeOfImage
maybe_key = []
for i in range(0, len(mem_data), self.address_len):
addr = mem_data[i:i + self.address_len]
addr = int.from_bytes(addr, byteorder='little')
# 去掉不可能的地址
if min_addr < addr < max_addr:
key = read_key(addr)
if key == b"":
continue
maybe_key.append([key, i])
return maybe_key
def read_key(addr):
key = ctypes.create_string_buffer(35)
if ReadProcessMemory(pm.process_handle, void_p(addr - 1), key, 35, 0) == 0:
return b""
if b"\x00\x00" in key.raw[1:33]:
return b""
if b"\x00\x00" == key.raw[33:35] and b"\x90" == key.raw[0:1]:
return key.raw[1:33]
return b""
def verify_key(keys, wx_db_path):
with open(wx_db_path, "rb") as file:
blist = file.read(5000)
salt = blist[:16]
first = blist[16:DEFAULT_PAGESIZE]
mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
with multiprocessing.Pool(processes=8) as pool:
results = [pool.apply_async(validate_key, args=(key, salt, first, mac_salt)) for key, i in keys[-1::-1]]
results = [p.get() for p in results]
for i, result in enumerate(results[-1::-1]):
if result:
return keys[i]
return b"", 0
module_name = "WeChatWin.dll"
pm = self.pm
module = pymem.process.module_from_name(pm.process_handle, module_name)
start_addr = module.lpBaseOfDll
size = module.SizeOfImage
if account_bias > 1:
maybe_key = []
for i in [0x24, 0x40]:
addr = start_addr + account_bias - i
mem_data = pm.read_bytes(addr, self.address_len)
key = read_key(int.from_bytes(mem_data, byteorder='little'))
if key != b"":
maybe_key.append([key, addr - start_addr])
key, bais = verify_key(maybe_key, wx_db_path)
if bais != 0:
return bais
mem_data = pm.read_bytes(start_addr, size)
maybe_key = get_maybe_key(mem_data)
key, bais = verify_key(maybe_key, wx_db_path)
return bais
def run(self, logging_path=False, version_list_path=None):
if not self.get_process_handle()[0]:
return None
mobile_bias = self.search_memory_value(self.mobile, self.module_name)
name_bias = self.search_memory_value(self.name, self.module_name)
account_bias = self.search_memory_value(self.account, self.module_name)
key_bias = 0
key_bias = self.get_key_bias1()
key_bias = self.search_key(self.key) if key_bias <= 0 and self.key else key_bias
key_bias = self.get_key_bias2(self.db_path, account_bias) if key_bias <= 0 and self.db_path else key_bias
rdata = {self.version: [name_bias, account_bias, mobile_bias, 0, key_bias]}
if version_list_path and os.path.exists(version_list_path):
with open(version_list_path, "r", encoding="utf-8") as f:
data = json.load(f)
data.update(rdata)
with open(version_list_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
if os.path.exists(logging_path) and isinstance(logging_path, str):
with open(logging_path, "a", encoding="utf-8") as f:
f.write("{版本号:昵称,账号,手机号,邮箱,KEY}" + "\n")
f.write(str(rdata) + "\n")
elif logging_path:
print("{版本号:昵称,账号,手机号,邮箱,KEY}")
print(rdata)
return rdata
# class BiasAddr:
# def __init__(self, account, mobile, name, key, db_path):
# self.account = account.encode("utf-8")
# self.mobile = mobile.encode("utf-8")
# self.name = name.encode("utf-8")
# self.key = bytes.fromhex(key) if key else b""
# self.db_path = db_path if db_path else ""
#
# self.process_name = "WeChat.exe"
# self.module_name = "WeChatWin.dll"
#
# self.pm = Pymem("WeChat.exe")
#
# self.bits = self.get_osbits()
# self.version = self.get_file_version(self.process_name)
# self.address_len = self.get_addr_len()
#
# self.islogin = True
#
# def get_addr_len(self):
# version_nums = list(map(int, self.version.split("."))) # 将版本号拆分为数字列表
# if version_nums[0] <= 3 and version_nums[1] <= 9 and version_nums[2] <= 2:
# return 4
# else:
# return 8
#
# def find_all(self, c: bytes, string: bytes, base_addr=0):
# """
# 查找字符串中所有子串的位置
# :param c: 子串 b'123'
# :param string: 字符串 b'123456789123'
# :return:
# """
# return [base_addr + m.start() for m in re.finditer(re.escape(c), string)]
#
# def get_file_version(self, process_name):
# for process in psutil.process_iter(['pid', 'name', 'exe']):
# if process.name() == process_name:
# file_version = Dispatch("Scripting.FileSystemObject").GetFileVersion(process.exe())
# return file_version
# self.islogin = False
#
# def get_osbits(self):
# return int(platform.architecture()[0][:-3])
#
# def search_memory_value(self, value: bytes, module_name="WeChatWin.dll"):
# # 创建 Pymem 对象
# pm = self.pm
# module = pymem.process.module_from_name(pm.process_handle, module_name)
#
# # result = pymem.pattern.pattern_scan_module(pm.process_handle, module, value, return_multiple=True)
# # result = result[-1]-module.lpBaseOfDll if len(result) > 0 else 0
# mem_data = pm.read_bytes(module.lpBaseOfDll, module.SizeOfImage)
# result = self.find_all(value, mem_data)
# result = result[-1] if len(result) > 0 else 0
# return result
#
# def search_key(self, key: bytes):
# byteLen = self.address_len # if self.bits == 32 else 8 # 4字节或8字节
# key = re.escape(key) # 转义特殊字符
# key_addr = self.pm.pattern_scan_all(key, return_multiple=True)[-1] if len(key) > 0 else 0
# key = key_addr.to_bytes(byteLen, byteorder='little', signed=True)
# result = self.search_memory_value(key, self.module_name)
# return result
#
# def get_key_bias_test(self):
# byteLen = self.address_len # 4 if self.bits == 32 else 8 # 4字节或8字节
# keyLenOffset = 0x8c if self.bits == 32 else 0xd0
# keyWindllOffset = 0x90 if self.bits == 32 else 0xd8
#
# pm = self.pm
#
# module = pymem.process.module_from_name(pm.process_handle, "WeChatWin.dll")
# keyBytes = b'-----BEGIN PUBLIC KEY-----\n...'
# publicKeyList = pymem.pattern.pattern_scan_all(self.pm.process_handle, keyBytes, return_multiple=True)
#
# keyaddrs = []
# for addr in publicKeyList:
# keyBytes = addr.to_bytes(byteLen, byteorder="little", signed=True) # 低位在前
# addrs = pymem.pattern.pattern_scan_module(pm.process_handle, module, keyBytes, return_multiple=True)
# if addrs != 0:
# keyaddrs += addrs
#
# keyWinAddr = 0
# for addr in keyaddrs:
# keyLen = pm.read_uchar(addr - keyLenOffset)
# if keyLen != 32:
# continue
# keyWinAddr = addr - keyWindllOffset
# # keyaddr = int.from_bytes(pm.read_bytes(keyWinAddr, byteLen), byteorder='little')
# # key = pm.read_bytes(keyaddr, 32)
# # print("key", key.hex())
#
# return keyWinAddr - module.lpBaseOfDll
#
# def get_key_bias(self, wx_db_path, account_bias=0):
# wx_db_path = os.path.join(wx_db_path, "Msg", "MicroMsg.db")
# if not os.path.exists(wx_db_path):
# return 0
#
# def get_maybe_key(mem_data):
# maybe_key = []
# for i in range(0, len(mem_data), self.address_len):
# addr = mem_data[i:i + self.address_len]
# addr = int.from_bytes(addr, byteorder='little')
# # 去掉不可能的地址
# if min_addr < addr < max_addr:
# key = read_key(addr)
# if key == b"":
# continue
# maybe_key.append([key, i])
# return maybe_key
#
# def read_key(addr):
# key = ctypes.create_string_buffer(35)
# if ReadProcessMemory(pm.process_handle, void_p(addr - 1), key, 35, 0) == 0:
# return b""
#
# if b"\x00\x00" in key.raw[1:33]:
# return b""
#
# if b"\x00\x00" == key.raw[33:35] and b"\x90" == key.raw[0:1]:
# return key.raw[1:33]
# return b""
#
# def verify_key(keys, wx_db_path):
# with open(wx_db_path, "rb") as file:
# blist = file.read(5000)
# salt = blist[:16]
# first = blist[16:DEFAULT_PAGESIZE]
# mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
#
# with multiprocessing.Pool(processes=8) as pool:
# results = [pool.apply_async(validate_key, args=(key, salt, first, mac_salt)) for key, i in keys[-1::-1]]
# results = [p.get() for p in results]
# for i, result in enumerate(results[-1::-1]):
# if result:
# return keys[i]
# return b"", 0
#
# module_name = "WeChatWin.dll"
# pm = self.pm
# module = pymem.process.module_from_name(pm.process_handle, module_name)
# start_addr = module.lpBaseOfDll
# size = module.SizeOfImage
#
# if account_bias > 1:
# maybe_key = []
# for i in [0x24, 0x40]:
# addr = start_addr + account_bias - i
# mem_data = pm.read_bytes(addr, self.address_len)
# key = read_key(int.from_bytes(mem_data, byteorder='little'))
# if key != b"":
# maybe_key.append([key, addr - start_addr])
# key, bais = verify_key(maybe_key, wx_db_path)
# if bais != 0:
# return bais
#
# min_addr = 0xffffffffffffffffffffffff
# max_addr = 0
# for module1 in pm.list_modules():
# if module1.lpBaseOfDll < min_addr:
# min_addr = module1.lpBaseOfDll
# if module1.lpBaseOfDll > max_addr:
# max_addr = module1.lpBaseOfDll + module1.SizeOfImage
#
# mem_data = pm.read_bytes(start_addr, size)
# maybe_key = get_maybe_key(mem_data)
# key, bais = verify_key(maybe_key, wx_db_path)
# return bais
#
# def run(self, is_logging=False, version_list_path=None):
# self.version = self.get_file_version(self.process_name)
# if not self.islogin:
# error = "[-] WeChat No Run"
# if is_logging: print(error)
# return error
# mobile_bias = self.search_memory_value(self.mobile)
# name_bias = self.search_memory_value(self.name)
# account_bias = self.search_memory_value(self.account)
# # version_bias = self.search_memory_value(self.version.encode("utf-8"))
#
# try:
# key_bias = self.get_key_bias_test()
# except:
# key_bias = 0
#
# if key_bias <= 0:
# if self.key:
# key_bias = self.search_key(self.key)
# elif self.db_path:
# key_bias = self.get_key_bias(self.db_path, account_bias)
# else:
# key_bias = 0
# rdata = {self.version: [name_bias, account_bias, mobile_bias, 0, key_bias]}
# if version_list_path and os.path.exists(version_list_path):
# with open(version_list_path, "r", encoding="utf-8") as f:
# data = json.load(f)
# data.update(rdata)
# with open(version_list_path, "w", encoding="utf-8") as f:
# json.dump(data, f, ensure_ascii=False, indent=4)
# if is_logging:
# print("{版本号:昵称,账号,手机号,邮箱,KEY}")
# print(rdata)
# return rdata
if __name__ == '__main__':
# 创建命令行参数解析器
parser = argparse.ArgumentParser()
parser.add_argument("--mobile", type=str, help="手机号", required=True)
parser.add_argument("--name", type=str, help="微信昵称", required=True)
parser.add_argument("--account", type=str, help="微信账号", required=True)
parser.add_argument("--key", type=str, help="(可选)密钥")
parser.add_argument("--db_path", type=str, help="(可选)已登录账号的微信文件夹路径")
# 解析命令行参数
args = parser.parse_args()
# 检查是否缺少必要参数,并抛出错误
if not args.mobile or not args.name or not args.account:
raise ValueError("缺少必要的命令行参数!请提供手机号、微信昵称、微信账号。")
# 从命令行参数获取值
mobile = args.mobile
name = args.name
account = args.account
key = args.key
db_path = args.db_path
# 调用 run 函数,并传入参数
rdata = BiasAddr(account, mobile, name, key, db_path).run(True, "../version_list.json")

View File

@ -1,398 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: main.py.py
# Description:
# Author: xaoyaoo
# Date: 2023/10/14
# -------------------------------------------------------------------------------
import argparse
import importlib.metadata
import sys
from pywxdump import *
wxdump_ascii = r"""
"""
class MainBiasAddr():
def init_parses(self, parser):
self.mode = "bias"
# 添加 'bias_addr' 子命令解析器
sb_bias_addr = parser.add_parser(self.mode, help="获取微信基址偏移")
sb_bias_addr.add_argument("--mobile", type=str, help="手机号", metavar="", required=True)
sb_bias_addr.add_argument("--name", type=str, help="微信昵称", metavar="", required=True)
sb_bias_addr.add_argument("--account", type=str, help="微信账号", metavar="", required=True)
sb_bias_addr.add_argument("--key", type=str, metavar="", help="(可选)密钥")
sb_bias_addr.add_argument("--db_path", type=str, metavar="", help="(可选)已登录账号的微信文件夹路径")
sb_bias_addr.add_argument("-vlp", '--version_list_path', type=str, metavar="",
help="(可选)微信版本偏移文件路径,如有,则自动更新",
default=None)
self.sb_bias_addr = sb_bias_addr
return sb_bias_addr
def run(self, args):
# 判断是否至少输入一个参数
# if not args.key and not args.db_path:
# self.sb_bias_addr.error("必须至少指定 --key 或 --db_path 参数中的一个")
# 从命令行参数获取值
mobile = args.mobile
name = args.name
account = args.account
key = args.key
db_path = args.db_path
vlp = args.version_list_path
# 调用 run 函数,并传入参数
rdata = BiasAddr(account, mobile, name, key, db_path).run(True, vlp)
return rdata
class MainWxInfo():
def init_parses(self, parser):
self.mode = "info"
# 添加 'wx_info' 子命令解析器
sb_wx_info = parser.add_parser(self.mode, help="获取微信信息")
sb_wx_info.add_argument("-vlp", '--version_list_path', metavar="", type=str,
help="(可选)微信版本偏移文件路径", default=VERSION_LIST_PATH)
return sb_wx_info
def run(self, args):
# 读取微信各版本偏移
path = args.version_list_path
version_list = json.load(open(path, "r", encoding="utf-8"))
result = read_info(version_list, True) # 读取微信信息
return result
class MainWxDbPath():
def init_parses(self, parser):
self.mode = "db_path"
# 添加 'wx_db_path' 子命令解析器
sb_wx_db_path = parser.add_parser(self.mode, help="获取微信文件夹路径")
sb_wx_db_path.add_argument("-r", "--require_list", type=str,
help="(可选)需要的数据库名称(eg: -r MediaMSG;MicroMsg;FTSMSG;MSG;Sns;Emotion )",
default="all", metavar="")
sb_wx_db_path.add_argument("-wf", "--wx_files", type=str, help="(可选)'WeChat Files'路径", default=None,
metavar="")
sb_wx_db_path.add_argument("-id", "--wxid", type=str, help="(可选)wxid_,用于确认用户文件夹",
default=None, metavar="")
return sb_wx_db_path
def run(self, args):
# 从命令行参数获取值
require_list = args.require_list
msg_dir = args.wx_files
wxid = args.wxid
user_dirs = get_wechat_db(require_list, msg_dir, wxid, True) # 获取微信数据库路径
return user_dirs
class MainDecrypt():
def init_parses(self, parser):
self.mode = "decrypt"
# 添加 'decrypt' 子命令解析器
sb_decrypt = parser.add_parser(self.mode, help="解密微信数据库")
sb_decrypt.add_argument("-k", "--key", type=str, help="密钥", required=True, metavar="")
sb_decrypt.add_argument("-i", "--db_path", type=str, help="数据库路径(目录or文件)", required=True, metavar="")
sb_decrypt.add_argument("-o", "--out_path", type=str, default=os.path.join(os.getcwd(), "decrypted"),
help="输出路径(必须是目录)[默认为当前路径下decrypted文件夹]", required=False,
metavar="")
return sb_decrypt
def run(self, args):
# 从命令行参数获取值
key = args.key
db_path = args.db_path
out_path = args.out_path
if not os.path.exists(db_path):
print(f"[-] 数据库路径不存在:{db_path}")
return
if not os.path.exists(out_path):
os.makedirs(out_path)
print(f"[+] 创建输出文件夹:{out_path}")
# 调用 decrypt 函数,并传入参数
result = batch_decrypt(key, db_path, out_path, True)
return result
class MainShowChatRecords():
def init_parses(self, parser):
self.mode = "dbshow"
# 添加 'decrypt' 子命令解析器
sb_decrypt = parser.add_parser(self.mode, help="聊天记录查看")
sb_decrypt.add_argument("-msg", "--msg_path", type=str, help="解密后的 MSG.db 的路径", required=True,
metavar="")
sb_decrypt.add_argument("-micro", "--micro_path", type=str, help="解密后的 MicroMsg.db 的路径", required=True,
metavar="")
sb_decrypt.add_argument("-media", "--media_path", type=str, help="解密后的 MediaMSG.db 的路径", required=True,
metavar="")
sb_decrypt.add_argument("-fs", "--filestorage_path", type=str,
help="(可选)文件夹FileStorage的路径用于显示图片", required=False,
metavar="")
return sb_decrypt
def run(self, args):
# 从命令行参数获取值
try:
from flask import Flask, request, jsonify, render_template, g
import logging
except Exception as e:
print(e)
print("[-] 请安装flask( pip install flask )")
return
if not os.path.exists(args.msg_path) or not os.path.exists(args.micro_path) or not os.path.exists(
args.media_path):
print(os.path.exists(args.msg_path), os.path.exists(args.micro_path), os.path.exists(args.media_path))
print("[-] 输入数据库路径不存在")
return
app = Flask(__name__, template_folder='./show_chat/templates')
app.logger.setLevel(logging.ERROR)
@app.before_request
def before_request():
g.MSG_ALL_db_path = args.msg_path
g.MicroMsg_db_path = args.micro_path
g.MediaMSG_all_db_path = args.media_path
g.FileStorage_path = args.filestorage_path
g.USER_LIST = get_user_list(args.msg_path, args.micro_path)
app.register_blueprint(app_show_chat)
print("[+] 请使用浏览器访问 http://127.0.0.1:5000/ 查看聊天记录")
app.run(debug=False)
class MainExportChatRecords():
def init_parses(self, parser):
self.mode = "export"
# 添加 'decrypt' 子命令解析器
sb_decrypt = parser.add_parser(self.mode, help="聊天记录导出为html")
sb_decrypt.add_argument("-u", "--username", type=str, help="微信账号(聊天对象账号)", required=True, metavar="")
sb_decrypt.add_argument("-o", "--outpath", type=str, help="导出路径", required=True, metavar="")
sb_decrypt.add_argument("-msg", "--msg_path", type=str, help="解密后的 MSG.db 的路径", required=True,
metavar="")
sb_decrypt.add_argument("-micro", "--micro_path", type=str, help="解密后的 MicroMsg.db 的路径", required=True,
metavar="")
sb_decrypt.add_argument("-media", "--media_path", type=str, help="解密后的 MediaMSG.db 的路径", required=True,
metavar="")
sb_decrypt.add_argument("-fs", "--filestorage_path", type=str,
help="(可选)文件夹FileStorage的路径用于显示图片", required=False,
metavar="")
return sb_decrypt
def run(self, args):
# 从命令行参数获取值
try:
from flask import Flask, request, jsonify, render_template, g
import logging
except Exception as e:
print(e)
print("[-] 请安装flask( pip install flask)")
return
if not os.path.exists(args.msg_path) or not os.path.exists(args.micro_path) or not os.path.exists(
args.media_path):
print(os.path.exists(args.msg_path), os.path.exists(args.micro_path), os.path.exists(args.media_path))
print("[-] 输入数据库路径不存在")
return
if not os.path.exists(args.outpath):
os.makedirs(args.outpath)
print(f"[+] 创建输出文件夹:{args.outpath}")
export(args.username, args.outpath, args.msg_path, args.micro_path, args.media_path, args.filestorage_path)
print(f"[+] 导出成功{args.outpath}")
class MainAll():
def init_parses(self, parser):
self.mode = "all"
# 添加 'all' 子命令解析器
sb_all = parser.add_parser(self.mode, help="获取微信信息,解密微信数据库,查看聊天记录")
return sb_all
def run(self, args):
# 获取微信信息
WxInfo = read_info(VERSION_LIST, True)
for user in WxInfo:
key = user.get("key", "")
if not key:
print("[-] 未获取到密钥")
return
wxid = user.get("wxid", None)
WxDbPath = get_wechat_db('all', None, wxid=wxid, is_logging=True) # 获取微信数据库路径
if isinstance(WxDbPath, str): # 如果返回的是字符串,则表示出错
print(WxDbPath)
return
wxdbpaths = [path for user_dir in WxDbPath.values() for paths in user_dir.values() for path in paths]
if len(wxdbpaths) == 0:
print("[-] 未获取到数据库路径")
return
wxdblen = len(wxdbpaths)
print(f"[+] 共发现 {wxdblen} 个微信数据库")
print("=" * 32)
out_path = os.path.join(os.getcwd(), "decrypted", wxid) if wxid else os.path.join(os.getcwd(), "decrypted")
print(f"[*] 解密后文件夹:{out_path} ")
print(f"[*] 解密中...(用时较久,耐心等待)")
if not os.path.exists(out_path):
os.makedirs(out_path)
# 判断out_path是否为空目录
if os.listdir(out_path):
isdel = input(f"[*] 输出文件夹不为空({out_path})\n 是否删除?(y/n):")
if isdel.lower() == 'y' or isdel.lower() == 'yes':
for root, dirs, files in os.walk(out_path, topdown=False):
for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
# 调用 decrypt 函数,并传入参数 # 解密
code, ret = batch_decrypt(key, wxdbpaths, out_path, False)
if not code:
print(ret)
return
print("[+] 解密完成")
print("-" * 32)
errors = []
out_dbs = []
for code1, ret1 in ret:
if code1 == False:
errors.append(ret1)
else:
print(
f'[+] success "{os.path.relpath(ret1[0], os.path.commonprefix(wxdbpaths))}" -> "{os.path.relpath(ret1[1], os.getcwd())}"')
out_dbs.append(ret1[1])
print("-" * 32)
print(
"[-] " + f"{len(errors)} 个文件解密失败(可能原因:非当前登录用户数据库;非加密数据库),详见{out_path}下‘解密失败.txt;")
# print("; ".join([f'"{wxdbpaths[i]}"' for i in errors]))
with open(os.path.join(out_path, "解密失败.txt"), "w", encoding="utf-8") as f:
f.write("\n".join([f'{i}' for i in errors]))
print("=" * 32)
if len(out_dbs) <= 0:
print("[-] 未获取到解密后的数据库路径")
return
user_path = out_dbs[0].split("MSG")
FileStorage_path = os.path.join(user_path[0], "FileStorage")
# 查看聊天记录
MSGDB = [i for i in out_dbs if "de_MSG" in i]
MSGDB = MSGDB[-1] if MSGDB else None
MicroMsgDB = [i for i in out_dbs if "de_MicroMsg" in i]
MicroMsgDB = MicroMsgDB[-1] if MicroMsgDB else None
MediaMSGDB = [i for i in out_dbs if "de_MediaMSG" in i]
MediaMSGDB = MediaMSGDB[-1] if MediaMSGDB else None
args.msg_path = MSGDB
args.micro_path = MicroMsgDB
args.media_path = MediaMSGDB
args.filestorage_path = FileStorage_path
MainShowChatRecords().run(args)
PYWXDUMP_VERSION = importlib.metadata.version('pywxdump')
class CustomArgumentParser(argparse.ArgumentParser):
def format_help(self):
# 首先显示软件简介
# 定义软件简介文本并进行格式化
line_len = 70
PYWXDUMP_VERSION = importlib.metadata.version('pywxdump')
wxdump_line = '\n'.join([f'\033[36m{line:^{line_len}}\033[0m' for line in wxdump_ascii.split('\n') if line])
first_line = f'\033[36m{" PyWxDump v" + PYWXDUMP_VERSION + " ":=^{line_len}}\033[0m'
brief = 'PyWxDump功能获取账号信息、解密数据库、查看聊天记录、导出聊天记录为html等'
other = '更多详情请查看: \033[4m\033[1mhttps://github.com/xaoyaoo/PyWxDump\033[0m'
separator = f'\033[36m{" options ":-^{line_len}}\033[0m'
# 获取帮助信息并添加到软件简介下方
help_text = super().format_help().strip()
return f'\n{wxdump_line}\n\n{first_line}\n{brief}\n{separator}\n{help_text}\n{separator}\n{other}\n{first_line}\n'
def console_run():
# 创建命令行参数解析器
parser = CustomArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
PYWXDUMP_VERSION = importlib.metadata.version('pywxdump')
parser.add_argument('-V', '--version', action='version', version=f"PyWxDump v{PYWXDUMP_VERSION}")
# 添加子命令解析器
subparsers = parser.add_subparsers(dest="mode", help="""运行模式:""", required=True, metavar="mode")
modes = {}
# 添加 'bias' 子命令解析器
main_bias_addr = MainBiasAddr()
sb_bias_addr = main_bias_addr.init_parses(subparsers)
modes[main_bias_addr.mode] = main_bias_addr
# 添加 'info' 子命令解析器
main_wx_info = MainWxInfo()
sb_wx_info = main_wx_info.init_parses(subparsers)
modes[main_wx_info.mode] = main_wx_info
# 添加 'db_path' 子命令解析器
main_wx_db_path = MainWxDbPath()
sb_wx_db_path = main_wx_db_path.init_parses(subparsers)
modes[main_wx_db_path.mode] = main_wx_db_path
# 添加 'decrypt' 子命令解析器
main_decrypt = MainDecrypt()
sb_decrypt = main_decrypt.init_parses(subparsers)
modes[main_decrypt.mode] = main_decrypt
# 添加 '' 子命令解析器
main_show_chat_records = MainShowChatRecords()
sb_dbshow = main_show_chat_records.init_parses(subparsers)
modes[main_show_chat_records.mode] = main_show_chat_records
# 添加 'export' 子命令解析器
main_export_chat_records = MainExportChatRecords()
sb_export = main_export_chat_records.init_parses(subparsers)
modes[main_export_chat_records.mode] = main_export_chat_records
# 添加 'all' 子命令解析器
main_all = MainAll()
sb_all = main_all.init_parses(subparsers)
modes[main_all.mode] = main_all
# 检查是否需要显示帮助信息
if len(sys.argv) == 1:
sys.argv.append('-h')
elif len(sys.argv) == 2 and sys.argv[1] in modes.keys() and sys.argv[1] not in [main_all.mode, main_wx_info.mode,
main_wx_db_path.mode]:
sys.argv.append('-h')
args = parser.parse_args() # 解析命令行参数
if not any(vars(args).values()):
parser.print_help()
# 根据不同的 'mode' 参数,执行不同的操作
modes[args.mode].run(args)
if __name__ == '__main__':
console_run()

View File

@ -1,9 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: __init__.py.py
# Description:
# Author: xaoyaoo
# Date: 2023/08/21
# -------------------------------------------------------------------------------
from .decrypt import batch_decrypt, encrypt
from .get_wx_decrypted_db import all_decrypt, merge_copy_msg_db, merge_msg_db, merge_media_msg_db

View File

@ -1,228 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: getwxinfo.py
# Description:
# Author: xaoyaoo
# Date: 2023/08/21
# 微信数据库采用的加密算法是256位的AES-CBC。数据库的默认的页大小是4096字节即4KB其中每一个页都是被单独加解密的。
# 加密文件的每一个页都有一个随机的初始化向量,它被保存在每一页的末尾。
# 加密文件的每一页都存有着消息认证码算法使用的是HMAC-SHA1安卓数据库使用的是SHA512。它也被保存在每一页的末尾。
# 每一个数据库文件的开头16字节都保存了一段唯一且随机的盐值作为HMAC的验证和数据的解密。
# 用来计算HMAC的key与解密的key是不同的解密用的密钥是主密钥和之前提到的16字节的盐值通过PKCS5_PBKF2_HMAC1密钥扩展算法迭代64000次计算得到的。而计算HMAC的密钥是刚提到的解密密钥和16字节盐值异或0x3a的值通过PKCS5_PBKF2_HMAC1密钥扩展算法迭代2次计算得到的。
# 为了保证数据部分长度是16字节即AES块大小的整倍数每一页的末尾将填充一段空字节使得保留字段的长度为48字节。
# 综上加密文件结构为第一页4KB数据前16字节为盐值紧接着4032字节数据再加上16字节IV和20字节HMAC以及12字节空字节而后的页均是4048字节长度的加密数据段和48字节的保留段。
# -------------------------------------------------------------------------------
import argparse
import hmac
import hashlib
import os
from typing import Union, List
from Cryptodome.Cipher import AES
# from Crypto.Cipher import AES # 如果上面的导入失败,可以尝试使用这个
SQLITE_FILE_HEADER = "SQLite format 3\x00" # SQLite文件头
KEY_SIZE = 32
DEFAULT_PAGESIZE = 4096
DEFAULT_ITER = 64000
# 通过密钥解密数据库
def decrypt(key: str, db_path, out_path):
"""
通过密钥解密数据库
:param key: 密钥 64位16进制字符串
:param db_path: 待解密的数据库路径(必须是文件)
:param out_path: 解密后的数据库输出路径(必须是文件)
:return:
"""
if not os.path.exists(db_path) or not os.path.isfile(db_path):
return False, f"[-] db_path:'{db_path}' File not found!"
if not os.path.exists(os.path.dirname(out_path)):
return False, f"[-] out_path:'{out_path}' File not found!"
if len(key) != 64:
return False, f"[-] key:'{key}' Len Error!"
password = bytes.fromhex(key.strip())
with open(db_path, "rb") as file:
blist = file.read()
salt = blist[:16]
byteKey = hashlib.pbkdf2_hmac("sha1", password, salt, DEFAULT_ITER, KEY_SIZE)
first = blist[16:DEFAULT_PAGESIZE]
if len(salt) != 16:
return False, f"[-] db_path:'{db_path}' File Error!"
mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1)
hash_mac.update(b'\x01\x00\x00\x00')
if hash_mac.digest() != first[-32:-12]:
return False, f"[-] Key Error! (key:'{key}'; db_path:'{db_path}'; out_path:'{out_path}' )"
newblist = [blist[i:i + DEFAULT_PAGESIZE] for i in range(DEFAULT_PAGESIZE, len(blist), DEFAULT_PAGESIZE)]
with open(out_path, "wb") as deFile:
deFile.write(SQLITE_FILE_HEADER.encode())
t = AES.new(byteKey, AES.MODE_CBC, first[-48:-32])
decrypted = t.decrypt(first[:-48])
deFile.write(decrypted)
deFile.write(first[-48:])
for i in newblist:
t = AES.new(byteKey, AES.MODE_CBC, i[-48:-32])
decrypted = t.decrypt(i[:-48])
deFile.write(decrypted)
deFile.write(i[-48:])
return True, [db_path, out_path, key]
def batch_decrypt(key: str, db_path: Union[str, List[str]], out_path: str, is_logging: bool = False):
if not isinstance(key, str) or not isinstance(out_path, str) or not os.path.exists(out_path) or len(key) != 64:
error = f"[-] (key:'{key}' or out_path:'{out_path}') Error!"
if is_logging: print(error)
return False, error
process_list = []
if isinstance(db_path, str):
if not os.path.exists(db_path):
error = f"[-] db_path:'{db_path}' not found!"
if is_logging: print(error)
return False, error
if os.path.isfile(db_path):
inpath = db_path
outpath = os.path.join(out_path, 'de_' + os.path.basename(db_path))
process_list.append([key, inpath, outpath])
elif os.path.isdir(db_path):
for root, dirs, files in os.walk(db_path):
for file in files:
inpath = os.path.join(root, file)
rel = os.path.relpath(root, db_path)
outpath = os.path.join(out_path, rel, 'de_' + file)
if not os.path.exists(os.path.dirname(outpath)):
os.makedirs(os.path.dirname(outpath))
process_list.append([key, inpath, outpath])
else:
error = f"[-] db_path:'{db_path}' Error "
if is_logging: print(error)
return False, error
elif isinstance(db_path, list):
rt_path = os.path.commonprefix(db_path)
if not os.path.exists(rt_path):
rt_path = os.path.dirname(rt_path)
for inpath in db_path:
if not os.path.exists(inpath):
erreor = f"[-] db_path:'{db_path}' not found!"
if is_logging: print(erreor)
return False, erreor
inpath = os.path.normpath(inpath)
rel = os.path.relpath(os.path.dirname(inpath), rt_path)
outpath = os.path.join(out_path, rel, 'de_' + os.path.basename(inpath))
if not os.path.exists(os.path.dirname(outpath)):
os.makedirs(os.path.dirname(outpath))
process_list.append([key, inpath, outpath])
else:
error = f"[-] db_path:'{db_path}' Error "
if is_logging: print(error)
return False, error
result = []
for i in process_list:
result.append(decrypt(*i)) # 解密
# 删除空文件夹
for root, dirs, files in os.walk(out_path, topdown=False):
for dir in dirs:
if not os.listdir(os.path.join(root, dir)):
os.rmdir(os.path.join(root, dir))
if is_logging:
print("=" * 32)
success_count = 0
fail_count = 0
for code, ret in result:
if code == False:
print(ret)
fail_count += 1
else:
print(f'[+] "{ret[0]}" -> "{ret[1]}"')
success_count += 1
print("-" * 32)
print(f"[+] 共 {len(result)} 个文件, 成功 {success_count} 个, 失败 {fail_count}")
print("=" * 32)
return True, result
def encrypt(key: str, db_path, out_path):
"""
通过密钥加密数据库
:param key: 密钥 64位16进制字符串
:param db_path: 待加密的数据库路径(必须是文件)
:param out_path: 加密后的数据库输出路径(必须是文件)
:return:
"""
if not os.path.exists(db_path) or not os.path.isfile(db_path):
return False, f"[-] db_path:'{db_path}' File not found!"
if not os.path.exists(os.path.dirname(out_path)):
return False, f"[-] out_path:'{out_path}' File not found!"
if len(key) != 64:
return False, f"[-] key:'{key}' Len Error!"
password = bytes.fromhex(key.strip())
with open(db_path, "rb") as file:
blist = file.read()
salt = os.urandom(16) # 生成随机盐值
byteKey = hashlib.pbkdf2_hmac("sha1", password, salt, DEFAULT_ITER, KEY_SIZE)
# 计算消息认证码
mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
hash_mac = hmac.new(mac_key, blist[:-32], hashlib.sha1)
hash_mac.update(b'\x01\x00\x00\x00')
mac_digest = hash_mac.digest()
newblist = [blist[i:i + DEFAULT_PAGESIZE] for i in range(DEFAULT_PAGESIZE, len(blist), DEFAULT_PAGESIZE)]
with open(out_path, "wb") as enFile:
enFile.write(salt) # 写入盐值
enFile.write(mac_digest) # 写入消息认证码
for i in newblist:
t = AES.new(byteKey, AES.MODE_CBC, os.urandom(16)) # 生成随机的初始向量
encrypted = t.encrypt(i) # 加密数据块
enFile.write(encrypted)
return True, [db_path, out_path, key]
if __name__ == '__main__':
# 创建命令行参数解析器
parser = argparse.ArgumentParser()
parser.add_argument("-k", "--key", type=str, help="密钥", required=True)
parser.add_argument("-i", "--db_path", type=str, help="数据库路径(目录or文件)", required=True)
parser.add_argument("-o", "--out_path", type=str,
help="输出路径(必须是目录),输出文件为 out_path/de_{original_name}", required=True)
# 解析命令行参数
args = parser.parse_args()
# 从命令行参数获取值
key = args.key
db_path = args.db_path
out_path = args.out_path
# 调用 decrypt 函数,并传入参数
result = batch_decrypt(key, db_path, out_path, is_logging=True)

View File

@ -1,315 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: get_wx_decrypted_db.py
# Description:
# Author: xaoyaoo
# Date: 2023/08/25
# -------------------------------------------------------------------------------
import argparse
import os
import re
import shutil
import sqlite3
# import sys
import winreg
# sys.path.append(os.path.dirname(os.path.abspath(__file__)))
try:
from decrypted.decrypt import decrypt
except ImportError:
from .decrypt import decrypt
# 开始获取微信数据库
def get_wechat_db():
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Tencent\WeChat", 0, winreg.KEY_READ)
value, _ = winreg.QueryValueEx(key, "FileSavePath")
winreg.CloseKey(key)
w_dir = value
except Exception as e:
try:
w_dir = "MyDocument:"
except Exception as e:
print("读取注册表错误:", str(e))
return str(e)
if w_dir == "MyDocument:":
profile = os.path.expanduser("~")
msg_dir = os.path.join(profile, "Documents", "WeChat Files")
else:
msg_dir = os.path.join(w_dir, "WeChat Files")
if not os.path.exists(msg_dir):
return FileNotFoundError("目录不存在")
user_dirs = {} # wx用户目录
files = os.listdir(msg_dir)
for file_name in files:
if file_name == "All Users" or file_name == "Applet" or file_name == "WMPF":
continue
user_dirs[file_name] = os.path.join(msg_dir, file_name)
# 获取数据库路径
for user, user_dir in user_dirs.items():
Media_p = []
Micro_p = []
FTS_p = []
Sns_p = []
Msg = []
Emotion_p = []
for root, dirs, files in os.walk(user_dir):
for file_name in files:
if re.match(r".*MediaMSG.*\.db$", file_name):
src_path = os.path.join(root, file_name)
Media_p.append(src_path)
elif re.match(r".*MicroMsg.*\.db$", file_name):
src_path = os.path.join(root, file_name)
Micro_p.append(src_path)
elif re.match(r".*FTSMSG.*\.db$", file_name):
src_path = os.path.join(root, file_name)
FTS_p.append(src_path)
elif re.match(r".*MSG.*\.db$", file_name):
src_path = os.path.join(root, file_name)
Msg.append(src_path)
elif re.match(r".*Sns.*\.db$", file_name):
src_path = os.path.join(root, file_name)
Sns_p.append(src_path)
elif re.match(r".*Emotion.*\.db$", file_name):
src_path = os.path.join(root, file_name)
Emotion_p.append(src_path)
Media_p.sort()
Msg.sort()
Micro_p.sort()
# FTS_p.sort()
user_dirs[user] = {"MicroMsg": Micro_p, "Msg": Msg, "MediaMSG": Media_p, "Sns": Sns_p, "Emotion": Emotion_p}
return user_dirs
# 解密所有数据库 paths文件 到 decrypted_path目录
def all_decrypt(keys, paths, decrypted_path):
decrypted_paths = []
for key in keys:
for path in paths:
name = os.path.basename(path) # 文件名
dtp = os.path.join(decrypted_path, name) # 解密后的路径
if not decrypt(key, path, dtp):
break
decrypted_paths.append(dtp)
else: # for循环正常结束没有break
break # 跳出while循环
else:
return False # while循环正常结束没有break 解密失败
return decrypted_paths
def merge_copy_msg_db(db_path, save_path):
if isinstance(db_path, list) and len(db_path) == 1:
db_path = db_path[0]
if not os.path.exists(db_path):
raise FileNotFoundError("目录不存在")
shutil.move(db_path, save_path)
# 合并相同名称的数据库
def merge_msg_db(db_path: list, save_path: str, CreateTime: int = 0): # CreateTime: 从这个时间开始的消息 10位时间戳
merged_conn = sqlite3.connect(save_path)
merged_cursor = merged_conn.cursor()
for db_file in db_path:
c_tabels = merged_cursor.execute(
"select tbl_name from sqlite_master where type='table' and tbl_name!='sqlite_sequence'")
tabels_all = c_tabels.fetchall() # 所有表名
tabels_all = [row[0] for row in tabels_all]
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
# 创建表
if len(tabels_all) < 4:
cursor.execute(
"select tbl_name,sql from sqlite_master where type='table' and tbl_name!='sqlite_sequence'")
c_part = cursor.fetchall()
for tbl_name, sql in c_part:
if tbl_name in tabels_all:
continue
try:
merged_cursor.execute(sql)
tabels_all.append(tbl_name)
except Exception as e:
print(f"error: {db_file}\n{tbl_name}\n{sql}\n{e}\n**********")
raise e
merged_conn.commit()
# 写入数据
for tbl_name in tabels_all:
if tbl_name == "MSG":
MsgSvrIDs = merged_cursor.execute(
f"select MsgSvrID from MSG where CreateTime>{CreateTime} and MsgSvrID!=0").fetchall()
cursor.execute(f"PRAGMA table_info({tbl_name})")
columns = cursor.fetchall()
columns = [column[1] for column in columns[1:]]
ex_sql = f"select {','.join(columns)} from {tbl_name} where CreateTime>{CreateTime} and MsgSvrID not in ({','.join([str(MsgSvrID[0]) for MsgSvrID in MsgSvrIDs])})"
cursor.execute(ex_sql)
insert_sql = f"INSERT INTO {tbl_name} ({','.join(columns)}) VALUES ({','.join(['?' for _ in range(len(columns))])})"
try:
merged_cursor.executemany(insert_sql, cursor.fetchall())
except Exception as e:
print(
f"error: {db_file}\n{tbl_name}\n{insert_sql}\n{cursor.fetchall()}\n{len(cursor.fetchall())}\n{e}\n**********")
raise e
merged_conn.commit()
else:
ex_sql = f"select * from {tbl_name}"
cursor.execute(ex_sql)
for r in cursor.fetchall():
cursor.execute(f"PRAGMA table_info({tbl_name})")
columns = cursor.fetchall()
if len(columns) > 1:
columns = [column[1] for column in columns[1:]]
values = r[1:]
else:
columns = [columns[0][1]]
values = [r[0]]
query_1 = "select * from " + tbl_name + " where " + columns[0] + "=?" # 查询语句 用于判断是否存在
c2 = merged_cursor.execute(query_1, values)
if len(c2.fetchall()) > 0: # 已存在
continue
query = "INSERT INTO " + tbl_name + " (" + ",".join(columns) + ") VALUES (" + ",".join(
["?" for _ in range(len(values))]) + ")"
try:
merged_cursor.execute(query, values)
except Exception as e:
print(f"error: {db_file}\n{tbl_name}\n{query}\n{values}\n{len(values)}\n{e}\n**********")
raise e
merged_conn.commit()
conn.close()
sql = '''delete from MSG where localId in (SELECT localId from MSG
where MsgSvrID != 0 and MsgSvrID in (select MsgSvrID from MSG
where MsgSvrID != 0 GROUP BY MsgSvrID HAVING COUNT(*) > 1)
and localId not in (select min(localId) from MSG
where MsgSvrID != 0 GROUP BY MsgSvrID HAVING COUNT(*) > 1))'''
c = merged_cursor.execute(sql)
merged_conn.commit()
merged_conn.close()
return save_path
def merge_media_msg_db(db_path: list, save_path: str):
merged_conn = sqlite3.connect(save_path)
merged_cursor = merged_conn.cursor()
for db_file in db_path:
s = "select tbl_name,sql from sqlite_master where type='table' and tbl_name!='sqlite_sequence'"
have_tables = merged_cursor.execute(s).fetchall()
have_tables = [row[0] for row in have_tables]
conn_part = sqlite3.connect(db_file)
cursor = conn_part.cursor()
if len(have_tables) < 1:
cursor.execute(s)
table_part = cursor.fetchall()
tblname, sql = table_part[0]
sql = "CREATE TABLE Media(localId INTEGER PRIMARY KEY AUTOINCREMENT,Key TEXT,Reserved0 INT,Buf BLOB,Reserved1 INT,Reserved2 TEXT)"
try:
merged_cursor.execute(sql)
have_tables.append(tblname)
except Exception as e:
print(f"error: {db_file}\n{tblname}\n{sql}\n{e}\n**********")
raise e
merged_conn.commit()
for tblname in have_tables:
s = "select Reserved0 from " + tblname
merged_cursor.execute(s)
r0 = merged_cursor.fetchall()
ex_sql = f"select `Key`,Reserved0,Buf,Reserved1,Reserved2 from {tblname} where Reserved0 not in ({','.join([str(r[0]) for r in r0])})"
cursor.execute(ex_sql)
data = cursor.fetchall()
insert_sql = f"INSERT INTO {tblname} (Key,Reserved0,Buf,Reserved1,Reserved2) VALUES ({','.join(['?' for _ in range(5)])})"
try:
merged_cursor.executemany(insert_sql, data)
except Exception as e:
print(f"error: {db_file}\n{tblname}\n{insert_sql}\n{data}\n{len(data)}\n{e}\n**********")
raise e
merged_conn.commit()
conn_part.close()
merged_conn.close()
return save_path
if __name__ == '__main__':
# 创建命令行参数解析器
parser = argparse.ArgumentParser()
parser.add_argument("-k", "--key", help="解密密钥", nargs="+", required=True)
# 解析命令行参数
args = parser.parse_args()
# 检查是否缺少必要参数,并抛出错误
if not args.key:
raise ValueError("缺少必要的命令行参数!请提供密钥。")
# 从命令行参数获取值
keys = args.key
decrypted_ROOT = os.path.join(os.getcwd(), "decrypted")
if keys is None:
print("keys is None")
exit(0)
if isinstance(keys, str):
keys = [keys]
user_dirs = get_wechat_db()
for user, db_path in user_dirs.items(): # 遍历用户
MicroMsgPaths = db_path["MicroMsg"]
MsgPaths = db_path["Msg"]
MediaMSGPaths = db_path["MediaMSG"]
# FTSMSGPaths = db_path["FTSMSG"]
SnsPaths = db_path["Sns"]
EmotionPaths = db_path["Emotion"]
decrypted_path_tmp = os.path.join(decrypted_ROOT, user, "tmp") # 解密后的目录
if not os.path.exists(decrypted_path_tmp):
os.makedirs(decrypted_path_tmp)
MicroMsgDecryptPaths = all_decrypt(keys, MicroMsgPaths, decrypted_path_tmp)
MsgDecryptPaths = all_decrypt(keys, MsgPaths, decrypted_path_tmp)
MediaMSGDecryptPaths = all_decrypt(keys, MediaMSGPaths, decrypted_path_tmp)
SnsDecryptPaths = all_decrypt(keys, SnsPaths, decrypted_path_tmp)
EmotionDecryptPaths = all_decrypt(keys, EmotionPaths, decrypted_path_tmp)
# 合并数据库
decrypted_path = os.path.join(decrypted_ROOT, user) # 解密后的目录
MicroMsgDbPath = os.path.join(decrypted_path, "MicroMsg.db")
MsgDbPath = os.path.join(decrypted_path, "MSG_all.db")
MediaMSGDbPath = os.path.join(decrypted_path, "MediaMSG_all.db")
SnsDbPath = os.path.join(decrypted_path, "Sns_all.db")
EmmotionDbPath = os.path.join(decrypted_path, "Emotion_all.db")
merge_copy_msg_db(MicroMsgDecryptPaths, MicroMsgDbPath)
merge_msg_db(MsgDecryptPaths, MsgDbPath, 0)
merge_media_msg_db(MediaMSGDecryptPaths, MediaMSGDbPath)
merge_copy_msg_db(SnsDecryptPaths, SnsDbPath)
merge_copy_msg_db(EmotionDecryptPaths, EmmotionDbPath)
shutil.rmtree(decrypted_path_tmp) # 删除临时文件
print(f"解密完成:{user}, {decrypted_path}")

View File

@ -1,8 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: __init__.py.py
# Description:
# Author: xaoyaoo
# Date: 2023/11/10
# -------------------------------------------------------------------------------
from .main_window import app_show_chat, get_user_list, export

View File

@ -1,269 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: GUI.py
# Description:
# Author: xaoyaoo
# Date: 2023/11/10
# -------------------------------------------------------------------------------
import base64
import sqlite3
import os
import json
import time
import hashlib
from pywxdump.analyse import read_img_dat, decompress_CompressContent, read_audio, parse_xml_string
from flask import Flask, request, render_template, g, Blueprint
def get_md5(s):
m = hashlib.md5()
m.update(s.encode("utf-8"))
return m.hexdigest()
def get_user_list(MSG_ALL_db_path, MicroMsg_db_path):
users = []
# 连接 MSG_ALL.db 数据库,并执行查询
db1 = sqlite3.connect(MSG_ALL_db_path)
cursor1 = db1.cursor()
cursor1.execute("SELECT StrTalker, COUNT(*) AS ChatCount FROM MSG GROUP BY StrTalker ORDER BY ChatCount DESC")
result = cursor1.fetchall()
for row in result:
# 获取用户名、昵称、备注和聊天记录数量
db2 = sqlite3.connect(MicroMsg_db_path)
cursor2 = db2.cursor()
cursor2.execute("SELECT UserName, NickName, Remark FROM Contact WHERE UserName=?", (row[0],))
result2 = cursor2.fetchone()
if result2:
username, nickname, remark = result2
chat_count = row[1]
# 拼接四列数据为元组
row_data = {"username": username, "nickname": nickname, "remark": remark, "chat_count": chat_count,
"isChatRoom": username.startswith("@chatroom")}
users.append(row_data)
cursor2.close()
db2.close()
cursor1.close()
db1.close()
return users
def load_base64_audio_data(MsgSvrID, MediaMSG_all_db_path):
wave_data = read_audio(MsgSvrID, is_wave=True, DB_PATH=MediaMSG_all_db_path)
if not wave_data:
return ""
video_base64 = base64.b64encode(wave_data).decode("utf-8")
video_data = f"data:audio/wav;base64,{video_base64}"
return video_data
def load_base64_img_data(start_time, end_time, username_md5, FileStorage_path):
"""
获取图片的base64数据
:param start_time: 开始时间戳
:param end_time: 结束时间戳
:param username_md5: 用户名的md5值
:return:
"""
# 获取CreateTime的最大值日期
min_time = time.strftime("%Y-%m", time.localtime(start_time))
max_time = time.strftime("%Y-%m", time.localtime(end_time))
img_path = os.path.join(FileStorage_path, "MsgAttach", username_md5, "Image")
if not os.path.exists(img_path):
return {}
# print(min_time, max_time, img_path)
paths = []
for root, path, files in os.walk(img_path):
for p in path:
if p >= min_time and p <= max_time:
paths.append(os.path.join(root, p))
# print(paths)
img_md5_data = {}
for path in paths:
for root, path, files in os.walk(path):
for file in files:
if file.endswith(".dat"):
file_path = os.path.join(root, file)
fomt, md5, out_bytes = read_img_dat(file_path)
out_bytes = base64.b64encode(out_bytes).decode("utf-8")
img_md5_data[md5] = f"data:{fomt};base64,{out_bytes}"
return img_md5_data
def load_chat_records(selected_talker, start_index, page_size, user_list, MSG_ALL_db_path, MediaMSG_all_db_path,
FileStorage_path):
username = user_list.get("username", "")
username_md5 = get_md5(username)
type_name_dict = {
1: {0: "文本"},
3: {0: "图片"},
34: {0: "语音"},
43: {0: "视频"},
47: {0: "动画表情"},
49: {0: "文本", 1: "类似文字消息而不一样的消息", 5: "卡片式链接", 6: "文件", 8: "用户上传的 GIF 表情",
19: "合并转发的聊天记录", 33: "分享的小程序", 36: "分享的小程序", 57: "带有引用的文本消息",
63: "视频号直播或直播回放等",
87: "群公告", 88: "视频号直播或直播回放等", 2000: "转账消息", 2003: "赠送红包封面"},
50: {0: "语音通话"},
10000: {0: "系统通知", 4: "拍一拍", 8000: "系统通知"}
}
# 连接 MSG_ALL.db 数据库,并执行查询
db1 = sqlite3.connect(MSG_ALL_db_path)
cursor1 = db1.cursor()
cursor1.execute(
"SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent FROM MSG WHERE StrTalker=? ORDER BY CreateTime ASC LIMIT ?,?",
(selected_talker, start_index, page_size))
result1 = cursor1.fetchall()
cursor1.close()
db1.close()
img_md5_data = load_base64_img_data(result1[0][7], result1[-1][7], username_md5, FileStorage_path) # 获取图片的base64数据
data = []
for row in result1:
localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent = row
CreateTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(CreateTime))
type_name = type_name_dict.get(Type, {}).get(SubType, "未知")
content = {"src": "", "msg": "", "style": ""}
if Type == 47 and SubType == 0: # 动画表情
content_tmp = parse_xml_string(StrContent)
cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "")
# md5 = content_tmp.get("emoji", {}).get("md5", "")
if cdnurl:
content = {"src": cdnurl, "msg": "表情", "style": "width: 100px; height: 100px;"}
elif Type == 49 and SubType == 57: # 带有引用的文本消息
CompressContent = CompressContent.rsplit(b'\x00', 1)[0]
content["msg"] = decompress_CompressContent(CompressContent)
try:
content["msg"] = content["msg"].decode("utf-8")
content["msg"] = parse_xml_string(content["msg"])
content["msg"] = json.dumps(content["msg"], ensure_ascii=False)
except Exception as e:
content["msg"] = "[带有引用的文本消息]解析失败"
elif Type == 34 and SubType == 0: # 语音
tmp_c = parse_xml_string(StrContent)
voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "")
transtext = tmp_c.get("voicetrans", {}).get("transtext", "")
if voicelength.isdigit():
voicelength = int(voicelength) / 1000
voicelength = f"{voicelength:.2f}"
content["msg"] = f"语音时长:{voicelength}\n翻译结果:{transtext}"
src = load_base64_audio_data(MsgSvrID, MediaMSG_all_db_path=MediaMSG_all_db_path)
content["src"] = src
elif Type == 3 and SubType == 0: # 图片
xml_content = parse_xml_string(StrContent)
md5 = xml_content.get("img", {}).get("md5", "")
if md5:
content["src"] = img_md5_data.get(md5, "")
else:
content["src"] = ""
content["msg"] = "图片"
else:
content["msg"] = StrContent
row_data = {"MsgSvrID": MsgSvrID, "type_name": type_name, "is_sender": IsSender,
"content": content, "CreateTime": CreateTime}
data.append(row_data)
return data
def export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path, page_size=500):
name_save = user.get("remark", user.get("nickname", user.get("username", "")))
username = user.get("username", "")
chatCount = user.get("chat_count", 0)
if chatCount == 0:
return False, "没有聊天记录"
for i in range(0, chatCount, page_size):
start_index = i
data = load_chat_records(username, start_index, page_size, user, MSG_ALL_db_path, MediaMSG_all_db_path,
FileStorage_path)
if len(data) == 0:
break
save_path = os.path.join(outpath, f"{name_save}_{int(i / page_size)}.html")
with open(save_path, "w", encoding="utf-8") as f:
f.write(render_template("chat.html", msgs=data))
return True, f"导出成功{outpath}"
def export(username, outpath, MSG_ALL_db_path, MicroMsg_db_path, MediaMSG_all_db_path, FileStorage_path):
if not os.path.exists(outpath):
outpath = os.path.join(os.getcwd(), "export" + os.sep + username)
if not os.path.exists(outpath):
os.makedirs(outpath)
USER_LIST = get_user_list(MSG_ALL_db_path, MicroMsg_db_path)
user = list(filter(lambda x: x["username"] == username, USER_LIST))
if username and len(user) > 0:
user = user[0]
return export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path)
app_show_chat = Blueprint('show_chat_main', __name__, template_folder='templates')
app_show_chat.debug = False
# 主页 - 显示用户列表
@app_show_chat.route('/')
def index():
g.USER_LIST = get_user_list(g.MSG_ALL_db_path, g.MicroMsg_db_path)
return render_template("index.html", users=g.USER_LIST)
# 获取聊天记录
@app_show_chat.route('/get_chat_data', methods=["GET", 'POST'])
def get_chat_data():
username = request.args.get("username", "")
user = list(filter(lambda x: x["username"] == username, g.USER_LIST))
if username and len(user) > 0:
user = user[0]
limit = int(request.args.get("limit", 100)) # 每页显示的条数
page = int(request.args.get("page", user.get("chat_count", limit) / limit)) # 当前页数
start_index = (page - 1) * limit
page_size = limit
data = load_chat_records(username, start_index, page_size, user, g.MSG_ALL_db_path, g.MediaMSG_all_db_path,
g.FileStorage_path)
return render_template("chat.html", msgs=data)
else:
return "error"
# 聊天记录导出为html
@app_show_chat.route('/export_chat_data', methods=["GET", 'POST'])
def get_export():
username = request.args.get("username", "")
user = list(filter(lambda x: x["username"] == username, g.USER_LIST))
if username and len(user) > 0:
user = user[0]
n = f"{user.get('username', '')}_{user.get('nickname', '')}_{user.get('remark', '')}"
outpath = os.path.join(os.getcwd(), "export" + os.sep + n)
if not os.path.exists(outpath):
os.makedirs(outpath)
ret = export_html(user, outpath, g.MSG_ALL_db_path, g.MediaMSG_all_db_path, g.FileStorage_path, page_size=200)
if ret[0]:
return ret[1]
else:
return ret[1]
else:
return "error"

View File

@ -1,71 +0,0 @@
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<title>chat</title>
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/4.0.0/css/bootstrap.min.css">
<style>
img {
max-width: 400px;
}
</style>
</head>
<body>
<div>
<div class="row" style="background-color: #d3eed3">
<div style="overflow-y: auto;height:90vh;">
<table class="table">
<tbody>
{% for msg in msgs %}
<tr id="{{ msg.MsgSvrID }}">
{% if msg.is_sender == 1 %}
<div style="background-color: #f3e9c1;">
<label style="color:#A13A50">[发][{{msg.type_name}}] {{msg.CreateTime}}</label><br>
{% if msg.type_name == '语音' %}
<audio controls>
<source src="{{msg.content.src}}" type="audio/wav">
</audio>
{% elif msg.type_name == '图片' %}
<img src="{{msg.content.src}}" alt="{{msg.content.msg}}" style="{{msg.content.style}}"/>
{% elif msg.type_name == '动画表情' %}
<img src="{{msg.content.src}}" alt="{{msg.content.msg}}" style="{{msg.content.style}}"/>
{% else %}
<p>{{msg.content.msg}}</p>
{% endif %}
</div>
{% else %}
<div style="background-color: #d3eed3">
<label style="color:#f54f71">[收][{{msg.type_name}}] {{msg.CreateTime}}</label><br>
{% if msg.type_name == '语音' %}
<audio controls>
<source src="{{msg.content.src}}" type="audio/wav">
</audio>
{% elif msg.type_name == '图片' %}
<img src="{{msg.content.src}}" alt="{{msg.content.msg}}" style="{{msg.content.style}}"/>
{% elif msg.type_name == '动画表情' %}
<img src="{{msg.content.src}}" alt="{{msg.content.msg}}" style="{{msg.content.style}}"/>
{% else %}
<p>{{msg.content.msg}}</p>
{% endif %}
</div>
{% endif %}
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
</div>
<script src="https://code.jquery.com/jquery-3.2.1.slim.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/popper.js/1.12.9/umd/popper.min.js"></script>
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/4.0.0/js/bootstrap.min.js"></script>
</body>
</html>

View File

@ -1,197 +0,0 @@
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<title>聊天记录显示</title>
<link rel="stylesheet" href="https://cdn.staticfile.org/bootstrap/4.5.3/css/bootstrap.min.css">
<style>
.left-area {
background-color: #f2f2f2;
height: 100vh;
}
</style>
</head>
<body>
<div class="container-fluid">
<div class="row">
<div class="col-3 left-area">
<div style="height:100vh; overflow-y: auto;">
<table class="table">
<thead>
<tr>
<th scope="col">名称</th>
<th scope="col">数量</th>
</tr>
</thead>
<tbody>
{% for user in users %}
<tr id="{{ user.username }}">
<td style="display: none;">
<username id="username1">{{user.username}}</username>
<nickname id="nickname1">{{user.nickname}}</nickname>
<remark id="remark1">{{user.remark}}</remark>
<chat_count id="chat_count1">{{user.chat_count}}</chat_count>
</td>
<td>
{% if user.remark not in [None, '']%}
{{user.remark}}
{% else %}
{{user.nickname}}
{% endif %}
</td>
<td>{{user.chat_count}}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
<div class="col-9 right-area">
<div id="topdiv" class="row" style="background-color: #ccdcef; max-height: 120px;display: none;">
<div class="col-3">
账号:<span id="username" style="color: #2f6006;word-wrap: break-word;"></span>
</div>
<div class="col-4">
昵称:<span id="nickname" style="color: #4a5905;word-wrap: break-word;"></span>
</div>
<div class="col-3">
备注:<span id="remark" style="color: #b66a2f;word-wrap: break-word;"></span>
</div>
<div class="col-2">
消息数:<span id="chat_count" style="color: #f6062a;"></span>
</div>
</div>
<div id="pagination" class="row"
style="background-color: #ccdcef; max-height: 120px; display: flex; align-items: center; display: none;">
<div class="col-9" style="display: flex;">
<label class="page-link">
<a id="pre_page" class="" href="#">上一页</a>&nbsp;
<a id="next_page" class="" href="#">下一页</a>
&nbsp; &nbsp; &nbsp;
<input id="ipt_go" type="number" min="1" max="1000"
style="width: 80px; margin-right: 10px;"/>/<a id="all_pages"></a>&nbsp;
<a id="goButton" href="#">跳转</a></label>
</div>
<div class="col-3" style="display: flex; justify-content: flex-end;">
<button id="btn_export" type="button" class="btn btn-primary">导出</button>
</div>
</div>
<div class="init-right-area"
style="background-color: #e6e6e6; height: 100vh; display: grid; place-items: center; ">
<h2 style="text-align: center">欢迎使用<a href="https://github.com/xaoyaoo/PyWxDump.git">PyWxDump</a>聊天记录查看工具!
</h2>
</div>
</div>
</div>
</div>
<script src="https://code.jquery.com/jquery-3.2.1.slim.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/popper.js/1.12.9/umd/popper.min.js"></script>
<script src="https://cdn.staticfile.org/bootstrap/4.5.3/js/bootstrap.min.js"></script>
<script>
var globalUsername = ''; // 全局变量
var globalNickname = ''; // 全局变量
var globalRemark = ''; // 全局变量
var globalChatCount = 0; // 全局变量
var globalLimit = 100; // 全局变量
var globalPages = Math.ceil(globalChatCount / globalLimit); // 全局变量
var globalCurrentPage = globalPages; // 全局变量
// 发送请求并更新右侧区域内容
var request_function = function (url) {
fetch(url, {
method: 'GET',
headers: {
'Content-Type': 'text/plain'
}
})
.then(response => response.text())
.then(data => {
document.querySelector('.init-right-area').style = '';
document.querySelector('.init-right-area').innerHTML = ''; // 清空右侧区域内容
document.querySelector('.init-right-area').innerHTML = data; // 更新右侧区域内容
document.getElementById('topdiv').style.display = "";
document.getElementById('username').innerHTML = globalUsername
document.getElementById('nickname').innerHTML = globalNickname;
document.getElementById('remark').innerHTML = globalRemark;
document.getElementById('chat_count').innerHTML = globalChatCount;
document.getElementById('pagination').style.display = 'flex';
document.getElementById('ipt_go').max = globalPages;
document.getElementById('ipt_go').value =globalCurrentPage;
document.getElementById('all_pages').innerHTML = globalPages;
});
};
// 为每行添加点击事件监听器
document.querySelectorAll('.left-area tbody tr').forEach(function (row) {
row.addEventListener('click', function () {
globalUsername = row.id; // 获取用户名
globalNickname = row.querySelector('#nickname1').innerHTML; // 获取昵称
globalRemark = row.querySelector('#remark1').innerHTML; // 获取备注
globalChatCount = row.querySelector('#chat_count1').innerHTML; // 获取消息数
globalLimit = 100; // 设置全局变量
globalPages = Math.ceil(globalChatCount / globalLimit); // 设置全局变量
globalCurrentPage = globalPages; // 设置全局变量
var requestUrl = '/get_chat_data?username=' + encodeURIComponent(globalUsername) + '&page=' + globalCurrentPage + '&limit=' + globalLimit;
// 发送请求并更新右侧区域内容
request_function(requestUrl);
})
;
});
// 上一页按钮点击事件
document.getElementById('pre_page').addEventListener('click', function () {
if (globalCurrentPage > 1) {
globalCurrentPage -= 1;
var requestUrl = '/get_chat_data?username=' + encodeURIComponent(globalUsername) + '&page=' + globalCurrentPage + '&limit=' + globalLimit;
// 发送请求并更新右侧区域内容
request_function(requestUrl);
}
});
// 下一页按钮点击事件
document.getElementById('next_page').addEventListener('click', function () {
if (globalCurrentPage < globalPages) {
globalCurrentPage += 1;
var requestUrl = '/get_chat_data?username=' + encodeURIComponent(globalUsername) + '&page=' + globalCurrentPage + '&limit=' + globalLimit;
// 发送请求并更新右侧区域内容
request_function(requestUrl);
}
});
// 跳转按钮点击事件
document.getElementById('goButton').addEventListener('click', function () {
var page = document.getElementById('ipt_go').value;
if (page > 0 && page <= globalPages) {
globalCurrentPage = page;
var requestUrl = '/get_chat_data?username=' + encodeURIComponent(globalUsername) + '&page=' + globalCurrentPage + '&limit=' + globalLimit;
// 发送请求并更新右侧区域内容
request_function(requestUrl);
}
});
// 导出按钮点击事件
document.getElementById('btn_export').addEventListener('click', function () {
var requestUrl = '/export_chat_data?username=' + encodeURIComponent(globalUsername);
window.open(requestUrl);
});
</script>
</body>
</html>

View File

@ -1,338 +0,0 @@
{
"3.2.1.154": [
328121948,
328122328,
328123056,
328121976,
328123020
],
"3.3.0.115": [
31323364,
31323744,
31324472,
31323392,
31324436
],
"3.3.0.84": [
31315212,
31315592,
31316320,
31315240,
31316284
],
"3.3.0.93": [
31323364,
31323744,
31324472,
31323392,
31324436
],
"3.3.5.34": [
30603028,
30603408,
30604120,
30603056,
30604100
],
"3.3.5.42": [
30603012,
30603392,
30604120,
30603040,
30604084
],
"3.3.5.46": [
30578372,
30578752,
30579480,
30578400,
30579444
],
"3.4.0.37": [
31608116,
31608496,
31609224,
31608144,
31609188
],
"3.4.0.38": [
31604044,
31604424,
31605152,
31604072,
31605116
],
"3.4.0.50": [
31688500,
31688880,
31689608,
31688528,
31689572
],
"3.4.0.54": [
31700852,
31701248,
31700920,
31700880,
31701924
],
"3.4.5.27": [
32133788,
32134168,
32134896,
32133816,
32134860
],
"3.4.5.45": [
32147012,
32147392,
32147064,
32147040,
32148084
],
"3.5.0.20": [
35494484,
35494864,
35494536,
35494512,
35495556
],
"3.5.0.29": [
35507980,
35508360,
35508032,
35508008,
35509052
],
"3.5.0.33": [
35512140,
35512520,
35512192,
35512168,
35513212
],
"3.5.0.39": [
35516236,
35516616,
35516288,
35516264,
35517308
],
"3.5.0.42": [
35512140,
35512520,
35512192,
35512168,
35513212
],
"3.5.0.44": [
35510836,
35511216,
35510896,
35510864,
35511908
],
"3.5.0.46": [
35506740,
35507120,
35506800,
35506768,
35507812
],
"3.6.0.18": [
35842996,
35843376,
35843048,
35843024,
35844068
],
"3.6.5.7": [
35864356,
35864736,
35864408,
35864384,
35865428
],
"3.6.5.16": [
35909428,
35909808,
35909480,
35909456,
35910500
],
"3.7.0.26": [
37105908,
37106288,
37105960,
37105936,
37106980
],
"3.7.0.29": [
37105908,
37106288,
37105960,
37105936,
37106980
],
"3.7.0.30": [
37118196,
37118576,
37118248,
37118224,
37119268
],
"3.7.5.11": [
37883280,
37884088,
37883136,
37883008,
37884052
],
"3.7.5.23": [
37895736,
37896544,
37895592,
37883008,
37896508
],
"3.7.5.27": [
37895736,
37896544,
37895592,
37895464,
37896508
],
"3.7.5.31": [
37903928,
37904736,
37903784,
37903656,
37904700
],
"3.7.6.24": [
38978840,
38979648,
38978696,
38978604,
38979612
],
"3.7.6.29": [
38986376,
38987184,
38986232,
38986104,
38987148
],
"3.7.6.44": [
39016520,
39017328,
39016376,
38986104,
39017292
],
"3.8.0.31": [
46064088,
46064912,
46063944,
38986104,
46064876
],
"3.8.0.33": [
46059992,
46060816,
46059848,
38986104,
46060780
],
"3.8.0.41": [
46064024,
46064848,
46063880,
38986104,
46064812
],
"3.8.1.26": [
46409448,
46410272,
46409304,
38986104,
46410236
],
"3.9.0.28": [
48418376,
48419280,
48418232,
38986104,
48419244
],
"3.9.2.23": [
50320784,
50321712,
50320640,
38986104,
50321676
],
"3.9.2.26": [
50329040,
50329968,
50328896,
38986104,
50329932
],
"3.9.5.81": [
61650872,
61652208,
61650680,
0,
61652144
],
"3.9.5.91": [
61654904,
61656240,
61654712,
38986104,
61656176
],
"3.9.6.19": [
61997688,
61997464,
61997496,
38986104,
61998960
],
"3.9.6.33": [
62030600,
62031936,
62030408,
0,
62031872
],
"3.9.7.15": [
63482696,
63484032,
63482504,
0,
63483968
],
"3.9.7.25": [
63482760,
63484096,
63482568,
0,
63484032
],
"3.9.7.29": [
63486984,
63488320,
63486792,
0,
63488256
],
"3.9.8.15": [
64996632,
64997968,
64996440,
0,
64997904
]
}

View File

@ -1,9 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: __init__.py.py
# Description:
# Author: xaoyaoo
# Date: 2023/08/21
# -------------------------------------------------------------------------------
from .get_wx_info import read_info
from .get_wx_db import get_wechat_db

View File

@ -1,101 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: get_wx_db.py
# Description:
# Author: xaoyaoo
# Date: 2023/10/14
# -------------------------------------------------------------------------------
import os
import re
import winreg
from typing import List, Union
def get_wechat_db(require_list: Union[List[str], str] = "all", msg_dir: str = None, wxid: Union[List[str], str] = None,
is_logging: bool = False):
if not msg_dir:
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Tencent\WeChat", 0, winreg.KEY_READ)
value, _ = winreg.QueryValueEx(key, "FileSavePath")
winreg.CloseKey(key)
w_dir = value
except Exception as e:
# 获取文档实际目录
try:
# 打开注册表路径
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders")
documents_path = winreg.QueryValueEx(key, "Personal")[0] # 读取文档实际目录路径
winreg.CloseKey(key) # 关闭注册表
documents_paths = os.path.split(documents_path)
if "%" in documents_paths[0]:
w_dir = os.environ.get(documents_paths[0].replace("%", ""))
w_dir = os.path.join(w_dir, os.path.join(*documents_paths[1:]))
else:
w_dir = documents_path
except Exception as e:
profile = os.path.expanduser("~")
w_dir = os.path.join(profile, "Documents")
msg_dir = os.path.join(w_dir, "WeChat Files")
if not os.path.exists(msg_dir):
error = f"[-] 目录不存在: {msg_dir}"
if is_logging: print(error)
return error
user_dirs = {} # wx用户目录
files = os.listdir(msg_dir)
if wxid: # 如果指定wxid
if isinstance(wxid, str):
wxid = wxid.split(";")
for file_name in files:
if file_name in wxid:
user_dirs[os.path.join(msg_dir, file_name)] = os.path.join(msg_dir, file_name)
else: # 如果未指定wxid
for file_name in files:
if file_name == "All Users" or file_name == "Applet" or file_name == "WMPF":
continue
user_dirs[os.path.join(msg_dir, file_name)] = os.path.join(msg_dir, file_name)
if isinstance(require_list, str):
require_list = require_list.split(";")
# generate pattern
if "all" in require_list:
pattern = {"all": re.compile(r".*\.db$")}
elif isinstance(require_list, list):
pattern = {}
for require in require_list:
pattern[require] = re.compile(r"%s.*\.db$" % require)
else:
error = f"[-] 参数错误: {require_list}"
if is_logging: print(error)
return error
# 获取数据库路径
for user, user_dir in user_dirs.items(): # 遍历用户目录
user_dirs[user] = {n: [] for n in pattern.keys()}
for root, dirs, files in os.walk(user_dir):
for file_name in files:
for n, p in pattern.items():
if p.match(file_name):
src_path = os.path.join(root, file_name)
user_dirs[user][n].append(src_path)
if is_logging:
for user, user_dir in user_dirs.items():
print(f"[+] user_path: {user}")
for n, paths in user_dir.items():
print(f" {n}:")
for path in paths:
print(f" {path.replace(user, '')}")
print("-" * 32)
print(f"[+] 共 {len(user_dirs)} 个微信账号")
return user_dirs
if __name__ == '__main__':
require_list = ["MediaMSG", "MicroMsg", "FTSMSG", "MSG", "Sns", "Emotion"]
# require_list = "all"
user_dirs = get_wechat_db(require_list, is_logging=True)

View File

@ -1,208 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: getwxinfo.py
# Description:
# Author: xaoyaoo
# Date: 2023/08/21
# -------------------------------------------------------------------------------
import json
import ctypes
import os
import winreg
import pymem
from win32com.client import Dispatch
import psutil
import sys
ReadProcessMemory = ctypes.windll.kernel32.ReadProcessMemory
void_p = ctypes.c_void_p
# 读取内存中的字符串(非key部分)
def get_info_without_key(h_process, address, n_size=64):
array = ctypes.create_string_buffer(n_size)
if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None"
array = bytes(array).split(b"\x00")[0] if b"\x00" in array else bytes(array)
text = array.decode('utf-8', errors='ignore')
return text.strip() if text.strip() != "" else "None"
def pattern_scan_all(handle, pattern, *, return_multiple=False, find_num=100):
next_region = 0
found = []
user_space_limit = 0x7FFFFFFF0000 if sys.maxsize > 2 ** 32 else 0x7fff0000
while next_region < user_space_limit:
try:
next_region, page_found = pymem.pattern.scan_pattern_page(
handle,
next_region,
pattern,
return_multiple=return_multiple
)
except Exception as e:
print(e)
break
if not return_multiple and page_found:
return page_found
if page_found:
found += page_found
if len(found) > find_num:
break
return found
def get_info_wxid(h_process):
find_num = 100
addrs = pattern_scan_all(h_process, br'\\FileStorage', return_multiple=True, find_num=find_num)
wxids = []
for addr in addrs:
array = ctypes.create_string_buffer(33)
if ReadProcessMemory(h_process, void_p(addr - 21), array, 33, 0) == 0: return "None"
array = bytes(array) # .decode('utf-8', errors='ignore')
array = array.split(br'\FileStorage')[0]
for part in [b'}', b'\x7f', b'\\']:
if part in array:
array = array.split(part)[1]
wxids.append(array.decode('utf-8', errors='ignore'))
break
wxid = max(wxids, key=wxids.count) if wxids else "None"
return wxid
def get_info_filePath(wxid="all"):
if not wxid:
return "None"
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Tencent\WeChat", 0, winreg.KEY_READ)
value, _ = winreg.QueryValueEx(key, "FileSavePath")
winreg.CloseKey(key)
w_dir = value
except Exception as e:
# 获取文档实际目录
try:
# 打开注册表路径
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,r"Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders")
documents_path = winreg.QueryValueEx(key, "Personal")[0]# 读取文档实际目录路径
winreg.CloseKey(key) # 关闭注册表
documents_paths = os.path.split(documents_path)
if "%" in documents_paths[0]:
w_dir = os.environ.get(documents_paths[0].replace("%",""))
w_dir = os.path.join(w_dir,os.path.join(*documents_paths[1:]))
else:
w_dir = documents_path
except Exception as e:
profile = os.path.expanduser("~")
w_dir = os.path.join(profile, "Documents")
msg_dir = os.path.join(w_dir, "WeChat Files")
if wxid == "all" and os.path.exists(msg_dir):
return msg_dir
filePath = os.path.join(msg_dir, wxid)
return filePath if os.path.exists(filePath) else "None"
# 读取内存中的key
def get_key(h_process, address, address_len=8):
array = ctypes.create_string_buffer(address_len)
if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址key地址
key = ctypes.create_string_buffer(32)
if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None"
key_string = bytes(key).hex()
return key_string
# 读取微信信息(account,mobile,name,mail,wxid,key)
def read_info(version_list, is_logging=False):
wechat_process = []
result = []
error = ""
for process in psutil.process_iter(['name', 'exe', 'pid', 'cmdline']):
if process.name() == 'WeChat.exe':
wechat_process.append(process)
if len(wechat_process) == 0:
error = "[-] WeChat No Run"
if is_logging: print(error)
return error
for process in wechat_process:
tmp_rd = {}
tmp_rd['pid'] = process.pid
tmp_rd['version'] = Dispatch("Scripting.FileSystemObject").GetFileVersion(process.exe())
bias_list = version_list.get(tmp_rd['version'], None)
if not isinstance(bias_list, list):
error = f"[-] WeChat Current Version {tmp_rd['version']} Is Not Supported"
if is_logging: print(error)
return error
wechat_base_address = 0
for module in process.memory_maps(grouped=False):
if module.path and 'WeChatWin.dll' in module.path:
wechat_base_address = int(module.addr, 16)
break
if wechat_base_address == 0:
error = f"[-] WeChat WeChatWin.dll Not Found"
if is_logging: print(error)
return error
Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, process.pid)
name_baseaddr = wechat_base_address + bias_list[0]
account__baseaddr = wechat_base_address + bias_list[1]
mobile_baseaddr = wechat_base_address + bias_list[2]
mail_baseaddr = wechat_base_address + bias_list[3]
key_baseaddr = wechat_base_address + bias_list[4]
addrLen = 4 if tmp_rd['version'] in ["3.9.2.23", "3.9.2.26"] else 8
tmp_rd['account'] = get_info_without_key(Handle, account__baseaddr, 32) if bias_list[1] != 0 else "None"
tmp_rd['mobile'] = get_info_without_key(Handle, mobile_baseaddr, 64) if bias_list[2] != 0 else "None"
tmp_rd['name'] = get_info_without_key(Handle, name_baseaddr, 64) if bias_list[0] != 0 else "None"
tmp_rd['mail'] = get_info_without_key(Handle, mail_baseaddr, 64) if bias_list[3] != 0 else "None"
tmp_rd['wxid'] = get_info_wxid(Handle)
tmp_rd['filePath'] = get_info_filePath(tmp_rd['wxid'])
tmp_rd['key'] = get_key(Handle, key_baseaddr, addrLen) if bias_list[4] != 0 else "None"
result.append(tmp_rd)
if is_logging:
print("=" * 32)
if isinstance(result, str): # 输出报错
print(result)
else: # 输出结果
for i, rlt in enumerate(result):
for k, v in rlt.items():
print(f"[+] {k:>8}: {v}")
print(end="-" * 32 + "\n" if i != len(result) - 1 else "")
print("=" * 32)
return result
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--vlfile", type=str, help="手机号", required=False)
parser.add_argument("--vldict", type=str, help="微信昵称", required=False)
args = parser.parse_args()
# 读取微信各版本偏移
if args.vlfile:
VERSION_LIST_PATH = args.vlfile
with open(VERSION_LIST_PATH, "r", encoding="utf-8") as f:
VERSION_LIST = json.load(f)
if args.vldict:
VERSION_LIST = json.loads(args.vldict)
if not args.vlfile and not args.vldict:
VERSION_LIST_PATH = "../version_list.json"
with open(VERSION_LIST_PATH, "r", encoding="utf-8") as f:
VERSION_LIST = json.load(f)
result = read_info(VERSION_LIST, True) # 读取微信信息