清理已经废弃的代码(可能会存在有用代码被清除)

This commit is contained in:
xaoyaoo 2024-04-20 19:17:40 +08:00
parent 8f436223b3
commit e45716805c
8 changed files with 253 additions and 1299 deletions

View File

@ -5,17 +5,15 @@
# Author: xaoyaoo
# Date: 2023/10/14
# -------------------------------------------------------------------------------
# from .analyzer.db_parsing import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, \
# parse_xml_string, read_BytesExtra
# from .ui import app_show_chat, get_user_list, export
from .wx_info import BiasAddr, read_info, get_wechat_db, batch_decrypt, decrypt, get_core_db
from .wx_info import merge_copy_db, merge_msg_db, merge_media_msg_db, merge_db, decrypt_merge, merge_real_time_db
from .analyzer.db_parsing import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, \
parse_xml_string, read_BytesExtra
from .analyzer import export_csv, export_json, DBPool
from .ui import app_show_chat, get_user_list, export
from .analyzer import DBPool
from .dbpreprocess import get_user_list, get_recent_user_list, wxid2userinfo, ParsingMSG, ParsingMicroMsg, \
ParsingMediaMSG, ParsingOpenIMContact
from .server import start_falsk
import os, json
try:
@ -26,7 +24,7 @@ except:
VERSION_LIST = {}
VERSION_LIST_PATH = None
PYWXDUMP_ROOT_PATH = os.path.dirname(__file__)
db_init = DBPool("DBPOOL_INIT")
# PYWXDUMP_ROOT_PATH = os.path.dirname(__file__)
# db_init = DBPool("DBPOOL_INIT")
__version__ = "3.0.10"

View File

@ -5,8 +5,4 @@
# Author: xaoyaoo
# Date: 2023/09/27
# -------------------------------------------------------------------------------
from .db_parsing import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, \
parse_xml_string, read_BytesExtra
from .export_chat import export_csv, get_contact_list, get_chatroom_list, get_msg_list, get_chat_count, export_json, \
get_all_chat_count
from .utils import get_type_name, get_name_typeid,DBPool
from .utils import DBPool

View File

@ -1,349 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: parse.py
# Description: 解析数据库内容
# Author: xaoyaoo
# Date: 2023/09/27
# -------------------------------------------------------------------------------
import os.path
import sqlite3
import pysilk
from io import BytesIO
import wave
import pyaudio
import requests
import hashlib
import lz4.block
import blackboxprotobuf
from PIL import Image
# import xml.etree.ElementTree as ET
import lxml.etree as ET # 这个模块更健壮些微信XML格式有时有非标格式会导致xml.etree.ElementTree处理失败
def get_md5(data):
md5 = hashlib.md5()
md5.update(data)
return md5.hexdigest()
def parse_xml_string(xml_string):
"""
解析 XML 字符串
:param xml_string: 要解析的 XML 字符串
:return: 解析结果以字典形式返回
"""
def parse_xml(element):
"""
递归解析 XML 元素
:param element: 要解析的 XML 元素
:return: 解析结果以字典形式返回
"""
result = {}
# 解析当前元素的属性
if element is None or element.attrib is None: # 有时可能会遇到没有属性,要处理下
return result
for key, value in element.attrib.items():
result[key] = value
# 解析当前元素的子元素
for child in element:
child_result = parse_xml(child)
# 如果子元素的标签已经在结果中存在,则将其转换为列表
if child.tag in result:
if not isinstance(result[child.tag], list):
result[child.tag] = [result[child.tag]]
result[child.tag].append(child_result)
else:
result[child.tag] = child_result
# 如果当前元素没有子元素,则将其文本内容作为值保存
if not result and element.text:
result = element.text
return result
if xml_string is None or not isinstance(xml_string, str):
return None
try:
parser = ET.XMLParser(recover=True) # 有时微信的聊天记录里面会冒出来xml格式不对的情况这里把parser设置成忽略错误
root = ET.fromstring(xml_string, parser)
except Exception as e:
return xml_string
return parse_xml(root)
def read_img_dat(input_data):
"""
读取图片文件dat格式
:param input_data: 图片文件路径或者图片文件数据
:return: 图片格式图片md5图片数据
"""
# 常见图片格式的文件头
img_head = {
b"\xFF\xD8\xFF": ".jpg",
b"\x89\x50\x4E\x47": ".png",
b"\x47\x49\x46\x38": ".gif",
b"\x42\x4D": ".BMP",
b"\x49\x49": ".TIFF",
b"\x4D\x4D": ".TIFF",
b"\x00\x00\x01\x00": ".ICO",
b"\x52\x49\x46\x46": ".WebP",
b"\x00\x00\x00\x18\x66\x74\x79\x70\x68\x65\x69\x63": ".HEIC",
}
if isinstance(input_data, str):
with open(input_data, "rb") as f:
input_bytes = f.read()
else:
input_bytes = input_data
try:
import numpy as np
input_bytes = np.frombuffer(input_bytes, dtype=np.uint8)
for hcode in img_head: # 遍历文件头
t = input_bytes[0] ^ hcode[0] # 异或解密
if np.all(t == np.bitwise_xor(np.frombuffer(input_bytes[:len(hcode)], dtype=np.uint8),
np.frombuffer(hcode, dtype=np.uint8))): # 使用NumPy进行向量化的异或解密操作并进行类型转换
fomt = img_head[hcode] # 获取文件格式
out_bytes = np.bitwise_xor(input_bytes, t) # 使用NumPy进行向量化的异或解密操作
md5 = get_md5(out_bytes)
return fomt, md5, out_bytes
return False
except ImportError:
pass
for hcode in img_head:
t = input_bytes[0] ^ hcode[0]
for i in range(1, len(hcode)):
if t == input_bytes[i] ^ hcode[i]:
fomt = img_head[hcode]
out_bytes = bytearray()
for nowByte in input_bytes: # 读取文件
newByte = nowByte ^ t # 异或解密
out_bytes.append(newByte)
md5 = get_md5(out_bytes)
return fomt, md5, out_bytes
return False
def read_emoji(cdnurl, is_show=False):
headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 10; Redmi K30 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Mobile Safari/537.36"
}
r1 = requests.get(cdnurl, headers=headers)
rdata = r1.content
if is_show: # 显示表情
img = Image.open(BytesIO(rdata))
img.show()
return rdata
def decompress_CompressContent(data):
"""
解压缩MsgCompressContent内容
:param data:
:return:
"""
if data is None or not isinstance(data, bytes):
return None
try:
dst = lz4.block.decompress(data, uncompressed_size=len(data) << 8)
dst = dst.replace(b'\x00', b'') # 已经解码完成后还含有0x00的部分要删掉要不后面ET识别的时候会报错
uncompressed_data = dst.decode('utf-8', errors='ignore')
return uncompressed_data
except Exception as e:
return data.decode('utf-8', errors='ignore')
def read_audio_buf(buf_data, is_play=False, is_wave=False, rate=24000):
silk_file = BytesIO(buf_data) # 读取silk文件
pcm_file = BytesIO() # 创建pcm文件
pysilk.decode(silk_file, pcm_file, rate) # 解码silk文件->pcm文件
pcm_data = pcm_file.getvalue() # 获取pcm文件数据
silk_file.close() # 关闭silk文件
pcm_file.close() # 关闭pcm文件
if is_play: # 播放音频
def play_audio(pcm_data, rate):
p = pyaudio.PyAudio() # 实例化pyaudio
stream = p.open(format=pyaudio.paInt16, channels=1, rate=rate, output=True) # 创建音频流对象
stream.write(pcm_data) # 写入音频流
stream.stop_stream() # 停止音频流
stream.close() # 关闭音频流
p.terminate() # 关闭pyaudio
play_audio(pcm_data, rate)
if is_wave: # 转换为wav文件
wave_file = BytesIO() # 创建wav文件
with wave.open(wave_file, 'wb') as wf:
wf.setparams((1, 2, rate, 0, 'NONE', 'NONE')) # 设置wav文件参数
wf.writeframes(pcm_data) # 写入wav文件
rdata = wave_file.getvalue() # 获取wav文件数据
wave_file.close() # 关闭wav文件
return rdata
return pcm_data
def read_audio(MsgSvrID, is_play=False, is_wave=False, DB_PATH: str = "", rate=24000):
if DB_PATH == "":
return False
DB = sqlite3.connect(DB_PATH)
cursor = DB.cursor()
sql = "select Buf from Media where Reserved0={}".format(MsgSvrID)
DBdata = cursor.execute(sql).fetchall()
if len(DBdata) == 0:
return False
data = DBdata[0][0] # [1:] + b'\xFF\xFF'
try:
pcm_data = read_audio_buf(data, is_play, is_wave, rate)
return pcm_data
except Exception as e:
return False
def wordcloud_generator(text, out_path="", is_show=False, img_path="", font="C:\Windows\Fonts\simhei.ttf"):
"""
词云
:param is_show: 是否显示
:param img_path: 背景图片路径
:param text: 文本
:param font: 字体路径
:return:
"""
try:
from wordcloud import WordCloud
import jieba
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.font_manager import fontManager
except ImportError as e:
print("error", e)
raise ImportError("请安装wordcloud,jieba,numpy,matplotlib,pillow库")
words = jieba.lcut(text) # 精确分词
newtxt = ' '.join(words) # 空格拼接
# 字体路径
# 创建WordCloud对象
wordcloud1 = WordCloud(width=800, height=400, background_color='white', font_path=font)
wordcloud1.generate(newtxt)
if out_path and out_path != "":
wordcloud1.to_file("wordcloud.png") # 保存图片
if img_path and os.path.exists(img_path): # 设置背景图片
img_color = np.array(Image.open(img_path)) # 读取背景图片
img_color = img_color.reshape((img_color.shape[0] * img_color.shape[1], 3))
wordcloud1.recolor(color_func=img_color) # 设置背景图片颜色
if is_show:
# 显示词云
wordcloud_img = wordcloud1.to_image()
wordcloud_img.show()
def convert_bytes_to_str(d):
"""
遍历字典并将bytes转换为字符串
:param d:
:return:
"""
for k, v in d.items():
if isinstance(v, dict):
convert_bytes_to_str(v)
elif isinstance(v, list):
for item in v:
if isinstance(item, dict):
convert_bytes_to_str(item)
elif isinstance(item, bytes):
item = item.decode('utf-8') # 将bytes转换为字符串
elif isinstance(v, bytes):
d[k] = v.decode('utf-8')
def read_BytesExtra(BytesExtra):
if BytesExtra is None or not isinstance(BytesExtra, bytes):
return None
try:
deserialize_data, message_type = blackboxprotobuf.decode_message(BytesExtra)
return deserialize_data
except Exception as e:
return None
def read_ChatRoom_RoomData(RoomData):
# 读取群聊数据,主要为 wxid以及对应昵称
if RoomData is None or not isinstance(RoomData, bytes):
return None
try:
data = read_BytesExtra(RoomData)
convert_bytes_to_str(data)
return data
except Exception as e:
return None
def read_ExtraBuf(ExtraBuf: bytes):
"""
读取ExtraBuf联系人表
:param ExtraBuf:
:return:
"""
if not ExtraBuf:
return None
try:
buf_dict = {
'DDF32683': '0', '74752C06': '性别[1男2女]', '88E28FCE': '2', '761A1D2D': '3', '0263A0CB': '4',
'0451FF12': '5',
'228C66A8': '6', '46CF10C4': '个性签名', 'A4D9024A': '', 'E2EAA8D1': '', '1D025BBF': '',
'4D6C4570': '11',
'F917BCC0': '公司名称', '759378AD': '手机号', '4335DFDD': '14', 'DE4CDAEB': '15', 'A72BC20A': '16',
'069FED52': '17',
'9B0F4299': '18', '3D641E22': '19', '1249822C': '20', '4EB96D85': '企微属性', 'B4F73ACB': '22',
'0959EB92': '23',
'3CF4A315': '24', 'C9477AC60201E44CD0E8': '26', 'B7ACF0F5': '28', '57A7B5A8': '29',
'81AE19B4': '朋友圈背景',
'695F3170': '31', 'FB083DD9': '32', '0240E37F': '33', '315D02A3': '34', '7DEC0BC3': '35',
'0E719F13': '备注图片',
'16791C90': '37'
}
rdata = {}
for buf_name in buf_dict:
rdata_name = buf_dict[buf_name]
buf_name = bytes.fromhex(buf_name)
offset = ExtraBuf.find(buf_name)
if offset == -1:
rdata[rdata_name] = ""
continue
offset += len(buf_name)
type_id = ExtraBuf[offset: offset + 1]
offset += 1
if type_id == b"\x04":
rdata[rdata_name] = int.from_bytes(ExtraBuf[offset: offset + 4], "little")
elif type_id == b"\x18":
length = int.from_bytes(ExtraBuf[offset: offset + 4], "little")
rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-16").rstrip("\x00")
elif type_id == b"\x17":
length = int.from_bytes(ExtraBuf[offset: offset + 4], "little")
rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-8").rstrip("\x00")
elif type_id == b"\x05":
rdata[rdata_name] = f"0x{ExtraBuf[offset: offset + 8].hex()}"
return rdata
except Exception as e:
print(f'解析错误:\n{e}')
return None

View File

@ -1,429 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: export_chat.py
# Description:
# Author: xaoyaoo
# Date: 2023/12/03
# -------------------------------------------------------------------------------
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: GUI.py
# Description:
# Author: xaoyaoo
# Date: 2023/11/10
# -------------------------------------------------------------------------------
import csv
import re
import sqlite3
import os
import json
import time
from functools import wraps
from .utils import get_md5, attach_databases, execute_sql, get_type_name, match_BytesExtra, DBPool, time_int2str
from .db_parsing import parse_xml_string, decompress_CompressContent, read_BytesExtra
def get_contact(MicroMsg_db_path, wx_id):
"""
获取联系人信息
:param MicroMsg_db_path: MicroMsg.db 文件路径
:param wx_id: 微信id
:return: 联系人信息
"""
with DBPool(MicroMsg_db_path) as db:
# 获取username是wx_id的用户
sql = ("SELECT A.UserName, A.NickName, A.Remark,A.Alias,A.Reserved6,B.bigHeadImgUrl "
"FROM Contact A,ContactHeadImgUrl B "
f"WHERE A.UserName = '{wx_id}' AND A.UserName = B.usrName "
"ORDER BY NickName ASC;")
result = execute_sql(db, sql)
print('联系人信息:', result)
if not result:
print('居然没找到!')
print(wx_id)
return None
return {"username": result[0], "nickname": result[1], "remark": result[2], "account": result[3],
"describe": result[4], "headImgUrl": result[5]}
def get_contact_list(MicroMsg_db_path, OpenIMContact_db_path=None):
"""
获取联系人列表
:param MicroMsg_db_path: MicroMsg.db 文件路径
:return: 联系人列表
"""
users = []
# 连接 MicroMsg.db 数据库,并执行查询
with DBPool(MicroMsg_db_path) as db:
sql = ("SELECT A.UserName, A.NickName, A.Remark,A.Alias,A.Reserved6,B.bigHeadImgUrl "
"FROM Contact A,ContactHeadImgUrl B "
"where UserName==usrName "
"ORDER BY NickName ASC;")
result = execute_sql(db, sql)
for row in result:
# 获取用户名、昵称、备注和聊天记录数量
username, nickname, remark, Alias, describe, headImgUrl = row
users.append(
{"username": username, "nickname": nickname, "remark": remark, "account": Alias, "describe": describe,
"headImgUrl": headImgUrl})
# return users
if OpenIMContact_db_path:
with DBPool(OpenIMContact_db_path) as db:
sql = ("SELECT A.UserName, A.NickName, A.Remark,A.BigHeadImgUrl FROM OpenIMContact A "
"ORDER BY NickName ASC;")
result = execute_sql(db, sql)
for row in result:
# 获取用户名、昵称、备注和聊天记录数量
username, nickname, remark, headImgUrl = row
users.append(
{"username": username, "nickname": nickname, "remark": remark, "account": "", "describe": "",
"headImgUrl": headImgUrl})
return users
def get_chatroom_list(MicroMsg_db_path):
"""
获取群聊列表
:param MicroMsg_db_path: MicroMsg.db 文件路径
:return: 群聊列表
"""
rooms = []
# 连接 MicroMsg.db 数据库,并执行查询
with DBPool(MicroMsg_db_path) as db:
sql = ("SELECT A.ChatRoomName,A.UserNameList, A.DisplayNameList, B.Announcement,B.AnnouncementEditor "
"FROM ChatRoom A,ChatRoomInfo B "
"where A.ChatRoomName==B.ChatRoomName "
"ORDER BY A.ChatRoomName ASC;")
result = execute_sql(db, sql)
for row in result:
# 获取用户名、昵称、备注和聊天记录数量
ChatRoomName, UserNameList, DisplayNameList, Announcement, AnnouncementEditor = row
UserNameList = UserNameList.split("^G")
DisplayNameList = DisplayNameList.split("^G")
rooms.append(
{"ChatRoomName": ChatRoomName, "UserNameList": UserNameList, "DisplayNameList": DisplayNameList,
"Announcement": Announcement, "AnnouncementEditor": AnnouncementEditor})
return rooms
def get_room_user_list(MSG_db_path, selected_talker):
"""
获取群聊中包含的所有用户列表
:param MSG_db_path: MSG.db 文件路径
:param selected_talker: 选中的聊天对象 wxid
:return: 聊天用户列表
"""
# 连接 MSG_ALL.db 数据库,并执行查询
with DBPool(MSG_db_path) as db1:
sql = (
"SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id "
"FROM MSG WHERE StrTalker=? "
"ORDER BY CreateTime ASC")
result1 = execute_sql(db1, sql, (selected_talker,))
user_list = []
read_user_wx_id = []
for row in result1:
localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent, BytesExtra, id = row
bytes_extra = read_BytesExtra(BytesExtra)
if bytes_extra:
try:
talker = bytes_extra['3'][0]['2'].decode('utf-8', errors='ignore')
except:
continue
if talker in read_user_wx_id:
continue
user = get_contact(MSG_db_path, talker)
if not user:
continue
user_list.append(user)
read_user_wx_id.append(talker)
return user_list
def get_msg_list(MSG_db_path, selected_talker="", start_index=0, page_size=500):
"""
获取聊天记录列表
:param MSG_db_path: MSG.db 文件路径
:param selected_talker: 选中的聊天对象 wxid
:param start_index: 开始索引
:param page_size: 每页数量
:return: 聊天记录列表
"""
# 连接 MSG_ALL.db 数据库,并执行查询
with DBPool(MSG_db_path) as db1:
if selected_talker:
sql = (
"SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id "
"FROM MSG WHERE StrTalker=? "
"ORDER BY CreateTime ASC LIMIT ?,?")
result1 = execute_sql(db1, sql, (selected_talker, start_index, page_size))
else:
sql = (
"SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id "
"FROM MSG ORDER BY CreateTime ASC LIMIT ?,?")
result1 = execute_sql(db1, sql, (start_index, page_size))
data = []
for row in result1:
localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent, BytesExtra, id = row
CreateTime = time_int2str(CreateTime)
type_id = (Type, SubType)
type_name = get_type_name(type_id)
content = {"src": "", "msg": StrContent}
if type_id == (1, 0): # 文本
content["msg"] = StrContent
elif type_id == (3, 0): # 图片
DictExtra = read_BytesExtra(BytesExtra)
DictExtra_str = str(DictExtra)
img_paths = [i for i in re.findall(r"(FileStorage.*?)'", DictExtra_str)]
img_paths = sorted(img_paths, key=lambda p: "Image" in p, reverse=True)
if img_paths:
img_path = img_paths[0].replace("'", "")
img_path = [i for i in img_path.split("\\") if i]
img_path = os.path.join(*img_path)
content["src"] = img_path
else:
content["src"] = ""
content["msg"] = "图片"
elif type_id == (34, 0): # 语音
tmp_c = parse_xml_string(StrContent)
voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "")
transtext = tmp_c.get("voicetrans", {}).get("transtext", "")
if voicelength.isdigit():
voicelength = int(voicelength) / 1000
voicelength = f"{voicelength:.2f}"
content[
"msg"] = f"语音时长:{voicelength}\n翻译结果:{transtext}" if transtext else f"语音时长:{voicelength}"
content["src"] = os.path.join("audio", f"{StrTalker}",
f"{CreateTime.replace(':', '-').replace(' ', '_')}_{IsSender}_{MsgSvrID}.wav")
elif type_id == (43, 0): # 视频
DictExtra = read_BytesExtra(BytesExtra)
DictExtra = str(DictExtra)
DictExtra_str = str(DictExtra)
video_paths = [i for i in re.findall(r"(FileStorage.*?)'", DictExtra_str)]
video_paths = sorted(video_paths, key=lambda p: "mp4" in p, reverse=True)
if video_paths:
video_path = video_paths[0].replace("'", "")
video_path = [i for i in video_path.split("\\") if i]
video_path = os.path.join(*video_path)
content["src"] = video_path
else:
content["src"] = ""
content["msg"] = "视频"
elif type_id == (47, 0): # 动画表情
content_tmp = parse_xml_string(StrContent)
cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "")
if cdnurl:
content = {"src": cdnurl, "msg": "表情"}
elif type_id == (49, 0):
DictExtra = read_BytesExtra(BytesExtra)
url = match_BytesExtra(DictExtra)
content["src"] = url
file_name = os.path.basename(url)
content["msg"] = file_name
elif type_id == (49, 19): # 合并转发的聊天记录
CompressContent = decompress_CompressContent(CompressContent)
content_tmp = parse_xml_string(CompressContent)
title = content_tmp.get("appmsg", {}).get("title", "")
des = content_tmp.get("appmsg", {}).get("des", "")
recorditem = content_tmp.get("appmsg", {}).get("recorditem", "")
recorditem = parse_xml_string(recorditem)
content["msg"] = f"{title}\n{des}"
content["src"] = recorditem
elif type_id == (49, 57): # 带有引用的文本消息
CompressContent = decompress_CompressContent(CompressContent)
content_tmp = parse_xml_string(CompressContent)
appmsg = content_tmp.get("appmsg", {})
title = appmsg.get("title", "")
refermsg = appmsg.get("refermsg", {})
displayname = refermsg.get("displayname", "")
display_content = refermsg.get("content", "")
display_createtime = refermsg.get("createtime", "")
display_createtime = time_int2str(
int(display_createtime)) if display_createtime.isdigit() else display_createtime
content["msg"] = f"{title}\n\n[引用]({display_createtime}){displayname}:{display_content}"
content["src"] = ""
elif type_id == (49, 2000): # 转账消息
CompressContent = decompress_CompressContent(CompressContent)
content_tmp = parse_xml_string(CompressContent)
feedesc = content_tmp.get("appmsg", {}).get("wcpayinfo", {}).get("feedesc", "")
content["msg"] = f"转账:{feedesc}"
content["src"] = ""
elif type_id[0] == 49 and type_id[1] != 0:
DictExtra = read_BytesExtra(BytesExtra)
url = match_BytesExtra(DictExtra)
content["src"] = url
content["msg"] = type_name
elif type_id == (50, 0): # 语音通话
content["msg"] = "语音/视频通话[%s]" % DisplayContent
# elif type_id == (10000, 0):
# content["msg"] = StrContent
# elif type_id == (10000, 4):
# content["msg"] = StrContent
# elif type_id == (10000, 8000):
# content["msg"] = StrContent
talker = "未知"
if IsSender == 1:
talker = ""
else:
if StrTalker.endswith("@chatroom"):
bytes_extra = read_BytesExtra(BytesExtra)
if bytes_extra:
try:
talker = bytes_extra['3'][0]['2'].decode('utf-8', errors='ignore')
if "publisher-id" in talker:
talker = "系统"
except:
pass
else:
talker = StrTalker
row_data = {"MsgSvrID": str(MsgSvrID), "type_name": type_name, "is_sender": IsSender, "talker": talker,
"room_name": StrTalker, "content": content, "CreateTime": CreateTime, "id": id}
data.append(row_data)
return data
def get_chat_count(MSG_db_path: [str, list], username: str = ""):
"""
获取聊天记录数量
:param MSG_db_path: MSG.db 文件路径
:return: 聊天记录数量列表
"""
if username:
sql = f"SELECT StrTalker,COUNT(*) FROM MSG WHERE StrTalker='{username}';"
else:
sql = f"SELECT StrTalker, COUNT(*) FROM MSG GROUP BY StrTalker ORDER BY COUNT(*) DESC;"
with DBPool(MSG_db_path) as db1:
result = execute_sql(db1, sql)
chat_counts = {}
for row in result:
username, chat_count = row
chat_counts[username] = chat_count
return chat_counts
def get_all_chat_count(MSG_db_path: [str, list]):
"""
获取聊天记录总数量
:param MSG_db_path: MSG.db 文件路径
:return: 聊天记录数量
"""
sql = f"SELECT COUNT(*) FROM MSG;"
with DBPool(MSG_db_path) as db1:
result = execute_sql(db1, sql)
if result and len(result) > 0:
chat_counts = result[0][0]
return chat_counts
return 0
def export_csv(username, outpath, MSG_ALL_db_path, page_size=5000):
if not os.path.exists(outpath):
outpath = os.path.join(os.getcwd(), "export" + os.sep + username)
if not os.path.exists(outpath):
os.makedirs(outpath)
count = get_chat_count(MSG_ALL_db_path, username)
chatCount = count.get(username, 0)
if chatCount == 0:
return False, "没有聊天记录"
if page_size > chatCount:
page_size = chatCount + 1
for i in range(0, chatCount, page_size):
start_index = i
data = get_msg_list(MSG_ALL_db_path, username, start_index, page_size)
if len(data) == 0:
return False, "没有聊天记录"
save_path = os.path.join(outpath, f"{username}_{i}_{i + page_size}.csv")
with open(save_path, "w", encoding="utf-8", newline='') as f:
csv_writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
csv_writer.writerow(["id", "MsgSvrID", "type_name", "is_sender", "talker", "room_name", "content",
"CreateTime"])
for row in data:
id = row.get("id", "")
MsgSvrID = row.get("MsgSvrID", "")
type_name = row.get("type_name", "")
is_sender = row.get("is_sender", "")
talker = row.get("talker", "")
room_name = row.get("room_name", "")
content = row.get("content", "")
CreateTime = row.get("CreateTime", "")
content = json.dumps(content, ensure_ascii=False)
csv_writer.writerow([id, MsgSvrID, type_name, is_sender, talker, room_name, content, CreateTime])
return True, f"导出成功: {outpath}"
def export_json(username, outpath, MSG_ALL_db_path):
if not os.path.exists(outpath):
outpath = os.path.join(os.getcwd(), "export" + os.sep + username)
if not os.path.exists(outpath):
os.makedirs(outpath)
count = get_chat_count(MSG_ALL_db_path, username)
chatCount = count.get(username, 0)
if chatCount == 0:
return False, "没有聊天记录"
page_size = chatCount + 1
for i in range(0, chatCount, page_size):
start_index = i
data = get_msg_list(MSG_ALL_db_path, username, start_index, page_size)
if len(data) == 0:
return False, "没有聊天记录"
save_path = os.path.join(outpath, f"{username}_{i}_{i + page_size}.json")
with open(save_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
return True, f"导出成功: {outpath}"
def export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path, page_size=500):
name_save = user.get("remark", user.get("nickname", user.get("username", "")))
username = user.get("username", "")
chatCount = user.get("chat_count", 0)
if chatCount == 0:
return False, "没有聊天记录"
for i in range(0, chatCount, page_size):
start_index = i
data = load_chat_records(username, start_index, page_size, user, MSG_ALL_db_path, MediaMSG_all_db_path,
FileStorage_path)
if len(data) == 0:
break
save_path = os.path.join(outpath, f"{name_save}_{int(i / page_size)}.html")
with open(save_path, "w", encoding="utf-8") as f:
f.write(render_template("chat.html", msgs=data))
return True, f"导出成功{outpath}"
def export(username, outpath, MSG_ALL_db_path, MicroMsg_db_path, MediaMSG_all_db_path, FileStorage_path):
if not os.path.exists(outpath):
outpath = os.path.join(os.getcwd(), "export" + os.sep + username)
if not os.path.exists(outpath):
os.makedirs(outpath)
USER_LIST = get_user_list(MSG_ALL_db_path, MicroMsg_db_path)
user = list(filter(lambda x: x["username"] == username, USER_LIST))
if username and len(user) > 0:
user = user[0]
return export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path)

View File

@ -12,18 +12,18 @@ import os
import re
import time
import shutil
import pythoncom
import pywxdump
from flask import Flask, request, render_template, g, Blueprint, send_file, make_response, session
from pywxdump import analyzer, read_img_dat, read_audio, get_wechat_db, get_core_db
from pywxdump.analyzer.export_chat import get_contact, get_room_user_list
from pywxdump import get_core_db
from pywxdump.api.rjson import ReJson, RqJson
from pywxdump.api.utils import read_session, get_session_wxids, save_session, error9999, gen_base64, validate_title
from pywxdump import read_info, VERSION_LIST, batch_decrypt, BiasAddr, merge_db, decrypt_merge, merge_real_time_db
import pywxdump
from pywxdump.dbpreprocess import wxid2userinfo, ParsingMSG, get_user_list, get_recent_user_list, ParsingMediaMSG, \
download_file
from pywxdump.dbpreprocess import export_csv,export_json
download_file,export_csv, export_json
from pywxdump.dbpreprocess.utils import dat2img
# app = Flask(__name__, static_folder='../ui/web/dist', static_url_path='/')
@ -345,7 +345,7 @@ def get_img(img_path):
original_img_path = os.path.join(wx_path, img_path)
if os.path.exists(original_img_path):
fomt, md5, out_bytes = read_img_dat(original_img_path)
fomt, md5, out_bytes = dat2img(original_img_path)
imgsavepath = os.path.join(img_tmp_path, img_path + "_" + ".".join([md5, fomt]))
if not os.path.exists(os.path.dirname(imgsavepath)):
os.makedirs(os.path.dirname(imgsavepath))
@ -559,187 +559,187 @@ def get_export_json():
return ReJson(2001, body=ret)
@api.route('/api/export', methods=["GET", 'POST'])
@error9999
def export():
"""
导出聊天记录
:return:
"""
export_type = request.json.get("export_type")
start_time = request.json.get("start_time", 0)
end_time = request.json.get("end_time", 0)
chat_type = request.json.get("chat_type")
username = request.json.get("username")
wx_path = request.json.get("wx_path", read_session(g.sf, "wx_path"))
key = request.json.get("key", read_session(g.sf, "key"))
if not export_type or not isinstance(export_type, str):
return ReJson(1002)
# 导出路径
outpath = os.path.join(g.tmp_path, "export", export_type)
if not os.path.exists(outpath):
os.makedirs(outpath)
if export_type == "endb": # 导出加密数据库
# 获取微信文件夹路径
if not wx_path:
return ReJson(1002)
if not os.path.exists(wx_path):
return ReJson(1001, body=wx_path)
# 分割wx_path的文件名和父目录
code, wxdbpaths = get_core_db(wx_path)
if not code:
return ReJson(2001, body=wxdbpaths)
for wxdb in wxdbpaths:
# 复制wxdb->outpath, os.path.basename(wxdb)
shutil.copy(wxdb, os.path.join(outpath, os.path.basename(wxdb)))
return ReJson(0, body=outpath)
elif export_type == "dedb":
if isinstance(start_time, int) and isinstance(end_time, int):
msg_path = read_session(g.sf, "msg_path")
micro_path = read_session(g.sf, "micro_path")
media_path = read_session(g.sf, "media_path")
dbpaths = [msg_path, media_path, micro_path]
dbpaths = list(set(dbpaths))
mergepath = merge_db(dbpaths, os.path.join(outpath, "merge.db"), start_time, end_time)
return ReJson(0, body=mergepath)
# if msg_path == media_path and msg_path == media_path:
# shutil.copy(msg_path, os.path.join(outpath, "merge.db"))
# return ReJson(0, body=msg_path)
# else:
# dbpaths = [msg_path, msg_path, micro_path]
# @api.route('/api/export', methods=["GET", 'POST'])
# @error9999
# def export():
# """
# 导出聊天记录
# :return:
# """
# export_type = request.json.get("export_type")
# start_time = request.json.get("start_time", 0)
# end_time = request.json.get("end_time", 0)
# chat_type = request.json.get("chat_type")
# username = request.json.get("username")
#
# wx_path = request.json.get("wx_path", read_session(g.sf, "wx_path"))
# key = request.json.get("key", read_session(g.sf, "key"))
#
# if not export_type or not isinstance(export_type, str):
# return ReJson(1002)
#
# # 导出路径
# outpath = os.path.join(g.tmp_path, "export", export_type)
# if not os.path.exists(outpath):
# os.makedirs(outpath)
#
# if export_type == "endb": # 导出加密数据库
# # 获取微信文件夹路径
# if not wx_path:
# return ReJson(1002)
# if not os.path.exists(wx_path):
# return ReJson(1001, body=wx_path)
#
# # 分割wx_path的文件名和父目录
# code, wxdbpaths = get_core_db(wx_path)
# if not code:
# return ReJson(2001, body=wxdbpaths)
#
# for wxdb in wxdbpaths:
# # 复制wxdb->outpath, os.path.basename(wxdb)
# shutil.copy(wxdb, os.path.join(outpath, os.path.basename(wxdb)))
# return ReJson(0, body=outpath)
#
# elif export_type == "dedb":
# if isinstance(start_time, int) and isinstance(end_time, int):
# msg_path = read_session(g.sf, "msg_path")
# micro_path = read_session(g.sf, "micro_path")
# media_path = read_session(g.sf, "media_path")
# dbpaths = [msg_path, media_path, micro_path]
# dbpaths = list(set(dbpaths))
# mergepath = merge_db(dbpaths, os.path.join(outpath, "merge.db"), start_time, end_time)
# return ReJson(0, body=mergepath)
else:
return ReJson(1002, body={"start_time": start_time, "end_time": end_time})
elif export_type == "csv":
outpath = os.path.join(outpath, username)
if not os.path.exists(outpath):
os.makedirs(outpath)
code, ret = analyzer.export_csv(username, outpath, read_session(g.sf, "msg_path"))
if code:
return ReJson(0, ret)
else:
return ReJson(2001, body=ret)
elif export_type == "json":
outpath = os.path.join(outpath, username)
if not os.path.exists(outpath):
os.makedirs(outpath)
code, ret = analyzer.export_json(username, outpath, read_session(g.sf, "msg_path"))
if code:
return ReJson(0, ret)
else:
return ReJson(2001, body=ret)
elif export_type == "html":
outpath = os.path.join(outpath, username)
if os.path.exists(outpath):
shutil.rmtree(outpath)
if not os.path.exists(outpath):
os.makedirs(outpath)
# chat_type_tups = []
# for ct in chat_type:
# tup = analyzer.get_name_typeid(ct)
# if tup:
# chat_type_tups += tup
# if not chat_type_tups:
# # if msg_path == media_path and msg_path == media_path:
# # shutil.copy(msg_path, os.path.join(outpath, "merge.db"))
# # return ReJson(0, body=msg_path)
# # else:
# # dbpaths = [msg_path, msg_path, micro_path]
# # dbpaths = list(set(dbpaths))
# # mergepath = merge_db(dbpaths, os.path.join(outpath, "merge.db"), start_time, end_time)
# # return ReJson(0, body=mergepath)
# else:
# return ReJson(1002, body={"start_time": start_time, "end_time": end_time})
#
# elif export_type == "csv":
# outpath = os.path.join(outpath, username)
# if not os.path.exists(outpath):
# os.makedirs(outpath)
# code, ret = analyzer.export_csv(username, outpath, read_session(g.sf, "msg_path"))
# if code:
# return ReJson(0, ret)
# else:
# return ReJson(2001, body=ret)
# elif export_type == "json":
# outpath = os.path.join(outpath, username)
# if not os.path.exists(outpath):
# os.makedirs(outpath)
# code, ret = analyzer.export_json(username, outpath, read_session(g.sf, "msg_path"))
# if code:
# return ReJson(0, ret)
# else:
# return ReJson(2001, body=ret)
# elif export_type == "html":
# outpath = os.path.join(outpath, username)
# if os.path.exists(outpath):
# shutil.rmtree(outpath)
# if not os.path.exists(outpath):
# os.makedirs(outpath)
# # chat_type_tups = []
# # for ct in chat_type:
# # tup = analyzer.get_name_typeid(ct)
# # if tup:
# # chat_type_tups += tup
# # if not chat_type_tups:
# # return ReJson(1002)
#
# # 复制文件 html
# export_html = os.path.join(os.path.dirname(pywxdump.VERSION_LIST_PATH), "ui", "export")
# indexhtml_path = os.path.join(export_html, "index.html")
# assets_path = os.path.join(export_html, "assets")
# if not os.path.exists(indexhtml_path) or not os.path.exists(assets_path):
# return ReJson(1001)
# js_path = ""
# css_path = ""
# for file in os.listdir(assets_path):
# if file.endswith('.js'):
# js_path = os.path.join(assets_path, file)
# elif file.endswith('.css'):
# css_path = os.path.join(assets_path, file)
# else:
# continue
# # 读取html,js,css
# with open(indexhtml_path, 'r', encoding='utf-8') as f:
# html = f.read()
# with open(js_path, 'r', encoding='utf-8') as f:
# js = f.read()
# with open(css_path, 'r', encoding='utf-8') as f:
# css = f.read()
#
# html = re.sub(r'<script .*?></script>', '', html) # 删除所有的script标签
# html = re.sub(r'<link rel="stylesheet" .*?>', '', html) # 删除所有的link标签
#
# html = html.replace('</head>', f'<style>{css}</style></head>')
# html = html.replace('</head>', f'<script type="module" crossorigin>{js}</script></head>')
# # END 生成index.html
#
# rdata = func_get_msgs(0, 10000000, username, "", "")
#
# msg_list = rdata["msg_list"]
# for i in range(len(msg_list)):
# if msg_list[i]["type_name"] == "语音":
# savePath = msg_list[i]["content"]["src"]
# MsgSvrID = savePath.split("_")[-1].replace(".wav", "")
# if not savePath:
# continue
# media_path = read_session(g.sf, "media_path")
# wave_data = read_audio(MsgSvrID, is_wave=True, DB_PATH=media_path)
# if not wave_data:
# continue
# # 判断savePath路径的文件夹是否存在
# savePath = os.path.join(outpath, savePath)
# if not os.path.exists(os.path.dirname(savePath)):
# os.makedirs(os.path.dirname(savePath))
# with open(savePath, "wb") as f:
# f.write(wave_data)
# elif msg_list[i]["type_name"] == "图片":
# img_path = msg_list[i]["content"]["src"]
# wx_path = read_session(g.sf, "wx_path")
# img_path_all = os.path.join(wx_path, img_path)
#
# if os.path.exists(img_path_all):
# fomt, md5, out_bytes = read_img_dat(img_path_all)
# imgsavepath = os.path.join(outpath, "img", img_path + "_" + ".".join([md5, fomt]))
# if not os.path.exists(os.path.dirname(imgsavepath)):
# os.makedirs(os.path.dirname(imgsavepath))
# with open(imgsavepath, "wb") as f:
# f.write(out_bytes)
# msg_list[i]["content"]["src"] = os.path.join("img", img_path + "_" + ".".join([md5, fomt]))
#
# rdata["msg_list"] = msg_list
# rdata["myuserdata"] = rdata["user_list"][rdata["my_wxid"]]
# rdata["myuserdata"]["chat_count"] = len(rdata["msg_list"])
# save_data = rdata
# save_json_path = os.path.join(outpath, "data")
# if not os.path.exists(save_json_path):
# os.makedirs(save_json_path)
# with open(os.path.join(save_json_path, "msg_user.json"), "w", encoding="utf-8") as f:
# json.dump(save_data, f, ensure_ascii=False)
#
# json_base64 = gen_base64(os.path.join(save_json_path, "msg_user.json"))
# html = html.replace('"./data/msg_user.json"', f'"{json_base64}"')
#
# with open(os.path.join(outpath, "index.html"), 'w', encoding='utf-8') as f:
# f.write(html)
# return ReJson(0, outpath)
#
# elif export_type == "pdf":
# pass
# elif export_type == "docx":
# pass
# else:
# return ReJson(1002)
# 复制文件 html
export_html = os.path.join(os.path.dirname(pywxdump.VERSION_LIST_PATH), "ui", "export")
indexhtml_path = os.path.join(export_html, "index.html")
assets_path = os.path.join(export_html, "assets")
if not os.path.exists(indexhtml_path) or not os.path.exists(assets_path):
return ReJson(1001)
js_path = ""
css_path = ""
for file in os.listdir(assets_path):
if file.endswith('.js'):
js_path = os.path.join(assets_path, file)
elif file.endswith('.css'):
css_path = os.path.join(assets_path, file)
else:
continue
# 读取html,js,css
with open(indexhtml_path, 'r', encoding='utf-8') as f:
html = f.read()
with open(js_path, 'r', encoding='utf-8') as f:
js = f.read()
with open(css_path, 'r', encoding='utf-8') as f:
css = f.read()
html = re.sub(r'<script .*?></script>', '', html) # 删除所有的script标签
html = re.sub(r'<link rel="stylesheet" .*?>', '', html) # 删除所有的link标签
html = html.replace('</head>', f'<style>{css}</style></head>')
html = html.replace('</head>', f'<script type="module" crossorigin>{js}</script></head>')
# END 生成index.html
rdata = func_get_msgs(0, 10000000, username, "", "")
msg_list = rdata["msg_list"]
for i in range(len(msg_list)):
if msg_list[i]["type_name"] == "语音":
savePath = msg_list[i]["content"]["src"]
MsgSvrID = savePath.split("_")[-1].replace(".wav", "")
if not savePath:
continue
media_path = read_session(g.sf, "media_path")
wave_data = read_audio(MsgSvrID, is_wave=True, DB_PATH=media_path)
if not wave_data:
continue
# 判断savePath路径的文件夹是否存在
savePath = os.path.join(outpath, savePath)
if not os.path.exists(os.path.dirname(savePath)):
os.makedirs(os.path.dirname(savePath))
with open(savePath, "wb") as f:
f.write(wave_data)
elif msg_list[i]["type_name"] == "图片":
img_path = msg_list[i]["content"]["src"]
wx_path = read_session(g.sf, "wx_path")
img_path_all = os.path.join(wx_path, img_path)
if os.path.exists(img_path_all):
fomt, md5, out_bytes = read_img_dat(img_path_all)
imgsavepath = os.path.join(outpath, "img", img_path + "_" + ".".join([md5, fomt]))
if not os.path.exists(os.path.dirname(imgsavepath)):
os.makedirs(os.path.dirname(imgsavepath))
with open(imgsavepath, "wb") as f:
f.write(out_bytes)
msg_list[i]["content"]["src"] = os.path.join("img", img_path + "_" + ".".join([md5, fomt]))
rdata["msg_list"] = msg_list
rdata["myuserdata"] = rdata["user_list"][rdata["my_wxid"]]
rdata["myuserdata"]["chat_count"] = len(rdata["msg_list"])
save_data = rdata
save_json_path = os.path.join(outpath, "data")
if not os.path.exists(save_json_path):
os.makedirs(save_json_path)
with open(os.path.join(save_json_path, "msg_user.json"), "w", encoding="utf-8") as f:
json.dump(save_data, f, ensure_ascii=False)
json_base64 = gen_base64(os.path.join(save_json_path, "msg_user.json"))
html = html.replace('"./data/msg_user.json"', f'"{json_base64}"')
with open(os.path.join(outpath, "index.html"), 'w', encoding='utf-8') as f:
f.write(html)
return ReJson(0, outpath)
elif export_type == "pdf":
pass
elif export_type == "docx":
pass
else:
return ReJson(1002)
return ReJson(9999, "")
#
# return ReJson(9999, "")
# end 导出聊天记录 *******************************************************************************************************

View File

@ -164,3 +164,59 @@ class ParsingMicroMsg(DatabaseBase):
{"ChatRoomName": ChatRoomName, "UserNameList": UserNameList, "DisplayNameList": DisplayNameList,
"Announcement": Announcement, "AnnouncementEditor": AnnouncementEditor, "wxid2remark": wxid2remark})
return rooms
def get_ExtraBuf(self, ExtraBuf: bytes):
"""
读取ExtraBuf联系人表
:param ExtraBuf:
:return:
"""
if not ExtraBuf:
return None
try:
buf_dict = {
'DDF32683': '0', '74752C06': '性别[1男2女]', '88E28FCE': '2', '761A1D2D': '3', '0263A0CB': '4',
'0451FF12': '5',
'228C66A8': '6', '46CF10C4': '个性签名', 'A4D9024A': '', 'E2EAA8D1': '', '1D025BBF': '',
'4D6C4570': '11',
'F917BCC0': '公司名称', '759378AD': '手机号', '4335DFDD': '14', 'DE4CDAEB': '15', 'A72BC20A': '16',
'069FED52': '17',
'9B0F4299': '18', '3D641E22': '19', '1249822C': '20', '4EB96D85': '企微属性', 'B4F73ACB': '22',
'0959EB92': '23',
'3CF4A315': '24', 'C9477AC60201E44CD0E8': '26', 'B7ACF0F5': '28', '57A7B5A8': '29',
'81AE19B4': '朋友圈背景',
'695F3170': '31', 'FB083DD9': '32', '0240E37F': '33', '315D02A3': '34', '7DEC0BC3': '35',
'0E719F13': '备注图片',
'16791C90': '37'
}
rdata = {}
for buf_name in buf_dict:
rdata_name = buf_dict[buf_name]
buf_name = bytes.fromhex(buf_name)
offset = ExtraBuf.find(buf_name)
if offset == -1:
rdata[rdata_name] = ""
continue
offset += len(buf_name)
type_id = ExtraBuf[offset: offset + 1]
offset += 1
if type_id == b"\x04":
rdata[rdata_name] = int.from_bytes(ExtraBuf[offset: offset + 4], "little")
elif type_id == b"\x18":
length = int.from_bytes(ExtraBuf[offset: offset + 4], "little")
rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-16").rstrip("\x00")
elif type_id == b"\x17":
length = int.from_bytes(ExtraBuf[offset: offset + 4], "little")
rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-8").rstrip("\x00")
elif type_id == b"\x05":
rdata[rdata_name] = f"0x{ExtraBuf[offset: offset + 8].hex()}"
return rdata
except Exception as e:
print(f'解析错误:\n{e}')
return None

View File

@ -5,7 +5,7 @@
# Author: xaoyaoo
# Date: 2023/12/03
# -------------------------------------------------------------------------------
from .view_chat import app_show_chat, get_user_list, export
# from .view_chat import app_show_chat, get_user_list, export
if __name__ == '__main__':
pass

View File

@ -1,318 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: GUI.py
# Description:
# Author: xaoyaoo
# Date: 2023/11/10
# -------------------------------------------------------------------------------
import base64
import re
import sqlite3
import os
import json
import time
import hashlib
from pywxdump.analyzer import read_img_dat, decompress_CompressContent, read_audio, parse_xml_string, read_BytesExtra
from flask import Flask, request, render_template, g, Blueprint
def get_md5(s):
m = hashlib.md5()
m.update(s.encode("utf-8"))
return m.hexdigest()
def get_user_list(MSG_ALL_db_path, MicroMsg_db_path):
users = []
# 连接 MSG_ALL.db 数据库,并执行查询
db1 = sqlite3.connect(MSG_ALL_db_path)
cursor1 = db1.cursor()
cursor1.execute("SELECT StrTalker, COUNT(*) AS ChatCount FROM MSG GROUP BY StrTalker ORDER BY ChatCount DESC")
result = cursor1.fetchall()
dict_user_count = {}
# 将结果转换为字典
for row in result:
dict_user_count[row[0]] = row[1]
db2 = sqlite3.connect(MicroMsg_db_path)
cursor2 = db2.cursor()
cursor2.execute("SELECT UserName, NickName, Remark FROM Contact;")
result2 = cursor2.fetchall()
for row in result2:
username, nickname, remark = row
# 拼接四列数据为元组
row_data = {"username": username, "nickname": nickname, "remark": remark,
"chat_count": dict_user_count.get(username, 0),
"isChatRoom": username.startswith("@chatroom")}
users.append(row_data)
users.sort(key=lambda x: x["chat_count"], reverse=True) # 按照聊天记录数量排序
cursor2.close()
db2.close()
cursor1.close()
db1.close()
return users
def load_base64_audio_data(MsgSvrID, MediaMSG_all_db_path):
wave_data = read_audio(MsgSvrID, is_wave=True, DB_PATH=MediaMSG_all_db_path)
if not wave_data:
return ""
video_base64 = base64.b64encode(wave_data).decode("utf-8")
video_data = f"data:audio/wav;base64,{video_base64}"
return video_data
def load_base64_img_data(start_time, end_time, username_md5, FileStorage_path):
"""
获取图片的base64数据
:param start_time: 开始时间戳
:param end_time: 结束时间戳
:param username_md5: 用户名的md5值
:return:
"""
# 获取CreateTime的最大值日期
min_time = time.strftime("%Y-%m", time.localtime(start_time))
max_time = time.strftime("%Y-%m", time.localtime(end_time))
img_path = os.path.join(FileStorage_path, "MsgAttach", username_md5, "Image") if FileStorage_path else ""
if not os.path.exists(img_path):
return {}
# print(min_time, max_time, img_path)
paths = []
for root, path, files in os.walk(img_path):
for p in path:
if p >= min_time and p <= max_time:
paths.append(os.path.join(root, p))
# print(paths)
img_md5_data = {}
for path in paths:
for root, path, files in os.walk(path):
for file in files:
if file.endswith(".dat"):
file_path = os.path.join(root, file)
fomt, md5, out_bytes = read_img_dat(file_path)
out_bytes = base64.b64encode(out_bytes).decode("utf-8")
img_md5_data[md5] = f"data:{fomt};base64,{out_bytes}"
return img_md5_data
def load_chat_records(selected_talker, start_index, page_size, user_list, MSG_ALL_db_path, MediaMSG_all_db_path,
FileStorage_path, USER_LIST):
username = user_list.get("username", "")
username_md5 = get_md5(username)
type_name_dict = {
1: {0: "文本"},
3: {0: "图片"},
34: {0: "语音"},
43: {0: "视频"},
47: {0: "动画表情"},
49: {0: "文本", 1: "类似文字消息而不一样的消息", 5: "卡片式链接", 6: "文件", 8: "用户上传的 GIF 表情",
19: "合并转发的聊天记录", 33: "分享的小程序", 36: "分享的小程序", 57: "带有引用的文本消息",
63: "视频号直播或直播回放等",
87: "群公告", 88: "视频号直播或直播回放等", 2000: "转账消息", 2003: "赠送红包封面"},
50: {0: "语音通话"},
10000: {0: "系统通知", 4: "拍一拍", 8000: "系统通知"}
}
# 连接 MSG_ALL.db 数据库,并执行查询
db1 = sqlite3.connect(MSG_ALL_db_path)
cursor1 = db1.cursor()
cursor1.execute(
"SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra FROM MSG WHERE StrTalker=? ORDER BY CreateTime ASC LIMIT ?,?",
(selected_talker, start_index, page_size))
result1 = cursor1.fetchall()
cursor1.close()
db1.close()
# 获取图片的base64数据
# img_md5_data = load_base64_img_data(result1[0][7], result1[-1][7], username_md5, FileStorage_path) if len(
# result1) > 0 else {}
data = []
room_username_count = {}
for row in result1:
localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent, BytesExtra = row
CreateTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(CreateTime))
type_name = type_name_dict.get(Type, {}).get(SubType, "未知")
content = {"src": "", "msg": "", "style": ""}
if Type == 47 and SubType == 0: # 动画表情
content_tmp = parse_xml_string(StrContent)
cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "")
# md5 = content_tmp.get("emoji", {}).get("md5", "")
if cdnurl:
content = {"src": cdnurl, "msg": "表情", "style": "width: 100px; height: 100px;"}
elif Type == 49 and SubType == 57: # 带有引用的文本消息
CompressContent = CompressContent.rsplit(b'\x00', 1)[0]
content["msg"] = decompress_CompressContent(CompressContent)
try:
content["msg"] = content["msg"].decode("utf-8")
content["msg"] = parse_xml_string(content["msg"])
content["msg"] = json.dumps(content["msg"], ensure_ascii=False)
except Exception as e:
content["msg"] = "[带有引用的文本消息]解析失败"
elif Type == 34 and SubType == 0: # 语音
tmp_c = parse_xml_string(StrContent)
voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "")
transtext = tmp_c.get("voicetrans", {}).get("transtext", "")
if voicelength.isdigit():
voicelength = int(voicelength) / 1000
voicelength = f"{voicelength:.2f}"
content["msg"] = f"语音时长:{voicelength}\n翻译结果:{transtext}"
src = load_base64_audio_data(MsgSvrID, MediaMSG_all_db_path=MediaMSG_all_db_path)
content["src"] = src
elif Type == 3 and SubType == 0: # 图片
xml_content = parse_xml_string(StrContent)
BytesExtra = read_BytesExtra(BytesExtra)
BytesExtra = str(BytesExtra)
match = re.search(r"MsgAttach(.*?)'", BytesExtra)
if match:
img_path = match.group(0).replace("'", "")
# print(FileStorage_path)
# print(img_path)
img_path = img_path.split("\\")
img_path = [i for i in img_path if i != ""]
img_path = os.path.join(*img_path)
if FileStorage_path:
img_path = os.path.join(FileStorage_path, img_path)
if os.path.exists(img_path):
fomt, md5, out_bytes = read_img_dat(img_path)
out_bytes = base64.b64encode(out_bytes).decode("utf-8")
content["src"] = f"data:{fomt};base64,{out_bytes}"
else:
content["src"] = ""
else:
content["src"] = ""
else:
content["src"] = ""
content["msg"] = "图片"
else:
content["msg"] = StrContent
talker = "未知"
if IsSender == 1:
talker = ""
else:
if StrTalker.endswith("@chatroom"):
bytes_extra = read_BytesExtra(BytesExtra)
if bytes_extra:
try:
matched_string = bytes_extra['3'][0]['2'].decode('utf-8', errors='ignore')
talker_dicts = list(filter(lambda x: x["username"] == matched_string, USER_LIST))
if len(talker_dicts) > 0:
talker_dict = talker_dicts[0]
room_username = talker_dict.get("username", "")
room_nickname = talker_dict.get("nickname", "")
room_remark = talker_dict.get("remark", "")
talker = room_remark if room_remark else room_nickname if room_nickname else room_username
else:
talker = matched_string
except:
pass
else:
talker = user_list.get("remark", user_list.get("nickname", user_list.get("username", "")))
row_data = {"MsgSvrID": MsgSvrID, "type_name": type_name, "is_sender": IsSender, "talker": talker,
"content": content, "CreateTime": CreateTime}
data.append(row_data)
return data
def export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path, page_size=500):
name_save = user.get("remark", user.get("nickname", user.get("username", "")))
username = user.get("username", "")
chatCount = user.get("chat_count", 0)
if chatCount == 0:
return False, "没有聊天记录"
for i in range(0, chatCount, page_size):
start_index = i
data = load_chat_records(username, start_index, page_size, user, MSG_ALL_db_path, MediaMSG_all_db_path,
FileStorage_path, [user])
if len(data) == 0:
break
save_path = os.path.join(outpath, f"{name_save}_{int(i / page_size)}.html")
with open(save_path, "w", encoding="utf-8") as f:
f.write(render_template("chat.html", msgs=data))
return True, f"导出成功{outpath}"
def export(username, outpath, MSG_ALL_db_path, MicroMsg_db_path, MediaMSG_all_db_path, FileStorage_path):
if not os.path.exists(outpath):
outpath = os.path.join(os.getcwd(), "export" + os.sep + username)
if not os.path.exists(outpath):
os.makedirs(outpath)
USER_LIST = get_user_list(MSG_ALL_db_path, MicroMsg_db_path)
user = list(filter(lambda x: x["username"] == username, USER_LIST))
if username and len(user) > 0:
user = user[0]
return export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path)
app_show_chat = Blueprint('show_chat_main', __name__, template_folder='templates')
app_show_chat.debug = False
# 主页 - 显示用户列表
@app_show_chat.route('/')
def index():
g.USER_LIST = get_user_list(g.MSG_ALL_db_path, g.MicroMsg_db_path)
# 只去前面500个有聊天记录的用户
USER_LIST = g.USER_LIST[:500]
return render_template("index.html", users=USER_LIST)
# 获取聊天记录
@app_show_chat.route('/get_chat_data', methods=["GET", 'POST'])
def get_chat_data():
username = request.args.get("username", "")
user = list(filter(lambda x: x["username"] == username, g.USER_LIST))
if username and len(user) > 0:
user = user[0]
limit = int(request.args.get("limit", 100)) # 每页显示的条数
page = int(request.args.get("page", user.get("chat_count", limit) / limit)) # 当前页数
start_index = (page - 1) * limit
page_size = limit
data = load_chat_records(username, start_index, page_size, user, g.MSG_ALL_db_path, g.MediaMSG_all_db_path,
g.FileStorage_path, g.USER_LIST)
return render_template("chat.html", msgs=data)
else:
return "error"
# 聊天记录导出为html
@app_show_chat.route('/export_chat_data', methods=["GET", 'POST'])
def get_export():
username = request.args.get("username", "")
user = list(filter(lambda x: x["username"] == username, g.USER_LIST))
if username and len(user) > 0:
user = user[0]
n = f"{user.get('username', '')}_{user.get('nickname', '')}_{user.get('remark', '')}"
outpath = os.path.join(os.getcwd(), "export" + os.sep + n)
if not os.path.exists(outpath):
os.makedirs(outpath)
ret = export_html(user, outpath, g.MSG_ALL_db_path, g.MediaMSG_all_db_path, g.FileStorage_path, page_size=200)
if ret[0]:
return ret[1]
else:
return ret[1]
else:
return "error"