添加导出为csv函数

This commit is contained in:
xaoyaoo 2023-12-26 18:03:22 +08:00
parent 59a9878eb2
commit ee2ab5de80
3 changed files with 245 additions and 106 deletions

View File

@ -12,14 +12,15 @@
# Author: xaoyaoo # Author: xaoyaoo
# Date: 2023/11/10 # Date: 2023/11/10
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------
import base64 import re
import sqlite3 import sqlite3
import os import os
import json import json
import time import time
from functools import wraps from functools import wraps
from .utils import get_md5, attach_databases, execute_sql from .utils import get_md5, attach_databases, execute_sql, get_type_name, match_BytesExtra
from .db_parsing import parse_xml_string, decompress_CompressContent, read_BytesExtra
def get_contact_list(MicroMsg_db_path): def get_contact_list(MicroMsg_db_path):
@ -50,118 +51,83 @@ def get_contact_list(MicroMsg_db_path):
return users return users
def msg_db_connect(func): def get_chatroom_list(MicroMsg_db_path):
@wraps(func)
def wrapper(MSG_db_path, *args, **kwargs):
# 连接 MSG.db 数据库,并执行查询
if isinstance(MSG_db_path, list):
# alias, file_path
databases = {f"MSG{i}": db_path for i, db_path in enumerate(MSG_db_path)}
elif isinstance(MSG_db_path, str):
databases = {"MSG": MSG_db_path}
else:
raise TypeError("MSG_db_path 类型错误")
# 连接 MSG_ALL.db 数据库,并执行查询
if len(databases) > 1:
db = sqlite3.connect(":memory:")
attach_databases(db, databases)
else:
db = sqlite3.connect(list(databases.values())[0])
result = func("", db=db, databases=databases, *args, **kwargs)
# 断开数据库连接
if len(databases) > 1:
for alias in databases:
db.execute(f"DETACH DATABASE {alias}")
db.close()
return result
return wrapper
@msg_db_connect
def get_chat_count(MSG_db_path: [str, list], db=None, databases=None):
""" """
获取聊天记录数量 获取群聊列表
:param MSG_db_path: MSG.db 文件路径 :param MicroMsg_db_path: MicroMsg.db 文件路径
:return: 天记录数量列表 :return: 群聊列表
""" """
# 构造 SQL 查询,使用 UNION ALL 联合不同数据库的 MSG 表 rooms = []
union_sql = " UNION ALL ".join( # 连接 MicroMsg.db 数据库,并执行查询
f"SELECT StrTalker, COUNT(*) AS ChatCount FROM {alias}.MSG GROUP BY StrTalker" for alias in databases) db = sqlite3.connect(MicroMsg_db_path)
sql = f"SELECT StrTalker, SUM(ChatCount) AS TotalChatCount FROM ({union_sql}) GROUP BY StrTalker ORDER BY TotalChatCount DESC" sql = ("SELECT A.ChatRoomName,A.UserNameList, A.DisplayNameList, B.Announcement,B.AnnouncementEditor "
"FROM ChatRoom A,ChatRoomInfo B "
"where A.ChatRoomName==B.ChatRoomName "
"ORDER BY A.ChatRoomName ASC;")
chat_counts = []
result = execute_sql(db, sql) result = execute_sql(db, sql)
db.close()
for row in result: for row in result:
username, chat_count = row # 获取用户名、昵称、备注和聊天记录数量
row_data = {"username": username, "chat_count": chat_count} ChatRoomName, UserNameList, DisplayNameList, Announcement, AnnouncementEditor = row
chat_counts.append(row_data) UserNameList = UserNameList.split("^G")
return chat_counts DisplayNameList = DisplayNameList.split("^G")
rooms.append(
{"ChatRoomName": ChatRoomName, "UserNameList": UserNameList, "DisplayNameList": DisplayNameList,
"Announcement": Announcement, "AnnouncementEditor": AnnouncementEditor})
return rooms
def load_chat_records(selected_talker, start_index, page_size, user_list, MSG_ALL_db_path, MediaMSG_all_db_path, def get_msg_list(MSG_db_path, selected_talker="", start_index=0, page_size=500):
FileStorage_path): """
username = user_list.get("username", "") 获取聊天记录列表
username_md5 = get_md5(username) :param MSG_db_path: MSG.db 文件路径
type_name_dict = { :return: 聊天记录列表
1: {0: "文本"}, """
3: {0: "图片"},
34: {0: "语音"},
43: {0: "视频"},
47: {0: "动画表情"},
49: {0: "文本", 1: "类似文字消息而不一样的消息", 5: "卡片式链接", 6: "文件", 8: "用户上传的 GIF 表情",
19: "合并转发的聊天记录", 33: "分享的小程序", 36: "分享的小程序", 57: "带有引用的文本消息",
63: "视频号直播或直播回放等",
87: "群公告", 88: "视频号直播或直播回放等", 2000: "转账消息", 2003: "赠送红包封面"},
50: {0: "语音通话"},
10000: {0: "系统通知", 4: "拍一拍", 8000: "系统通知"}
}
# 连接 MSG_ALL.db 数据库,并执行查询 # 连接 MSG_ALL.db 数据库,并执行查询
db1 = sqlite3.connect(MSG_ALL_db_path) db1 = sqlite3.connect(MSG_db_path)
cursor1 = db1.cursor() cursor1 = db1.cursor()
if selected_talker:
cursor1.execute( sql = (
"SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent FROM MSG WHERE StrTalker=? ORDER BY CreateTime ASC LIMIT ?,?", "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra "
(selected_talker, start_index, page_size)) "FROM MSG WHERE StrTalker=? "
"ORDER BY CreateTime ASC LIMIT ?,?")
cursor1.execute(sql, (selected_talker, start_index, page_size))
else:
sql = (
"SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra "
"FROM MSG ORDER BY CreateTime ASC LIMIT ?,?")
cursor1.execute(sql, (start_index, page_size))
result1 = cursor1.fetchall() result1 = cursor1.fetchall()
cursor1.close() cursor1.close()
db1.close() db1.close()
img_md5_data = load_base64_img_data(result1[0][7], result1[-1][7], username_md5, FileStorage_path) # 获取图片的base64数据
data = [] data = []
for row in result1: for row in result1:
localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent = row localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent, BytesExtra = row
CreateTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(CreateTime)) CreateTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(CreateTime))
type_name = type_name_dict.get(Type, {}).get(SubType, "未知") type_id = (Type, SubType)
type_name = get_type_name(type_id)
content = {"src": "", "msg": "", "style": ""} content = {"src": "", "msg": StrContent}
if Type == 47 and SubType == 0: # 动画表情 if type_id == (1, 0): # 文本
content_tmp = parse_xml_string(StrContent) content["msg"] = StrContent
cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "")
# md5 = content_tmp.get("emoji", {}).get("md5", "")
if cdnurl:
content = {"src": cdnurl, "msg": "表情", "style": "width: 100px; height: 100px;"}
elif Type == 49 and SubType == 57: # 带有引用的文本消息 elif type_id == (3, 0): # 图片
CompressContent = CompressContent.rsplit(b'\x00', 1)[0] BytesExtra = read_BytesExtra(BytesExtra)
content["msg"] = decompress_CompressContent(CompressContent) BytesExtra = str(BytesExtra)
try: match = re.search(r"FileStorage(.*?)'", BytesExtra)
content["msg"] = content["msg"].decode("utf-8") if match:
content["msg"] = parse_xml_string(content["msg"]) img_path = match.group(0).replace("'", "")
content["msg"] = json.dumps(content["msg"], ensure_ascii=False) content["src"] = img_path
except Exception as e: else:
content["msg"] = "[带有引用的文本消息]解析失败" content["src"] = ""
elif Type == 34 and SubType == 0: # 语音 content["msg"] = "图片"
elif type_id == (34, 0):
tmp_c = parse_xml_string(StrContent) tmp_c = parse_xml_string(StrContent)
voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "") voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "")
transtext = tmp_c.get("voicetrans", {}).get("transtext", "") transtext = tmp_c.get("voicetrans", {}).get("transtext", "")
@ -169,27 +135,113 @@ def load_chat_records(selected_talker, start_index, page_size, user_list, MSG_AL
voicelength = int(voicelength) / 1000 voicelength = int(voicelength) / 1000
voicelength = f"{voicelength:.2f}" voicelength = f"{voicelength:.2f}"
content["msg"] = f"语音时长:{voicelength}\n翻译结果:{transtext}" content["msg"] = f"语音时长:{voicelength}\n翻译结果:{transtext}"
content["src"] = os.path.join("audio", f"{StrTalker}", f"{CreateTime}_{MsgSvrID}.wav")
src = load_base64_audio_data(MsgSvrID, MediaMSG_all_db_path=MediaMSG_all_db_path) elif type_id == (43, 0): # 视频
content["src"] = src BytesExtra = read_BytesExtra(BytesExtra)
elif Type == 3 and SubType == 0: # 图片 BytesExtra = str(BytesExtra)
xml_content = parse_xml_string(StrContent) match = re.search(r"FileStorage(.*?)'", BytesExtra)
md5 = xml_content.get("img", {}).get("md5", "") if match:
if md5: video_path = match.group(0).replace("'", "")
content["src"] = img_md5_data.get(md5, "") content["src"] = video_path
else: else:
content["src"] = "" content["src"] = ""
content["msg"] = "图片" content["msg"] = "视频"
elif type_id == (47, 0): # 动画表情
content_tmp = parse_xml_string(StrContent)
cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "")
# md5 = content_tmp.get("emoji", {}).get("md5", "")
if cdnurl:
content = {"src": cdnurl, "msg": "表情"}
elif type_id[0] == 49:
BytesExtra = read_BytesExtra(BytesExtra)
url = match_BytesExtra(BytesExtra)
content["src"] = url
content["msg"] = type_name
elif type_id == (50, 0): # 语音通话
BytesExtra = read_BytesExtra(BytesExtra)
# elif type_id == (10000, 0):
# content["msg"] = StrContent
# elif type_id == (10000, 4):
# content["msg"] = StrContent
# elif type_id == (10000, 8000):
# content["msg"] = StrContent
talker = "未知"
if IsSender == 1:
talker = ""
else: else:
content["msg"] = StrContent if StrTalker.endswith("@chatroom"):
bytes_extra = read_BytesExtra(BytesExtra)
if bytes_extra:
try:
talker = bytes_extra['3'][0]['2'].decode('utf-8', errors='ignore')
except:
pass
else:
talker = StrTalker
row_data = {"MsgSvrID": MsgSvrID, "type_name": type_name, "is_sender": IsSender, row_data = {"MsgSvrID": MsgSvrID, "type_name": type_name, "is_sender": IsSender, "talker": talker,
"content": content, "CreateTime": CreateTime} "room_name": StrTalker, "content": content, "CreateTime": CreateTime}
data.append(row_data) data.append(row_data)
return data return data
def get_chat_count(MSG_db_path: [str, list], username: str = ""):
"""
获取聊天记录数量
:param MSG_db_path: MSG.db 文件路径
:return: 聊天记录数量列表
"""
if username:
sql = f"SELECT StrTalker,COUNT(*) FROM MSG WHERE StrTalker='{username}';"
else:
sql = f"SELECT StrTalker, COUNT(*) FROM MSG GROUP BY StrTalker ORDER BY COUNT(*) DESC;"
db1 = sqlite3.connect(MSG_db_path)
result = execute_sql(db1, sql)
chat_counts = {}
for row in result:
username, chat_count = row
chat_counts[username] = chat_count
return chat_counts
def export_csv(username, outpath, MSG_ALL_db_path, page_size=5000):
if not os.path.exists(outpath):
outpath = os.path.join(os.getcwd(), "export" + os.sep + username)
if not os.path.exists(outpath):
os.makedirs(outpath)
count = get_chat_count(MSG_ALL_db_path, username)
chatCount = count.get(username, 0)
if chatCount == 0:
return False, "没有聊天记录"
for i in range(0, chatCount, page_size):
start_index = i
data = get_msg_list(MSG_ALL_db_path, username, start_index, page_size)
if len(data) == 0:
break
save_path = os.path.join(outpath, f"{username}_{int(i / page_size)}.csv")
with open(save_path, "w", encoding="utf-8") as f:
f.write("MsgSvrID,type_name,is_sender,talker,room_name,content,CreateTime\n")
for row in data:
MsgSvrID = row.get("MsgSvrID", "")
type_name = row.get("type_name", "")
is_sender = row.get("is_sender", "")
talker = row.get("talker", "")
room_name = row.get("room_name", "")
content = row.get("content", "")
CreateTime = row.get("CreateTime", "")
content = json.dumps(content, ensure_ascii=False)
f.write(f"{MsgSvrID},{type_name},{is_sender},{talker},{room_name},{content},{CreateTime}\n")
return True, f"导出成功: {outpath}"
def export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path, page_size=500): def export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path, page_size=500):
name_save = user.get("remark", user.get("nickname", user.get("username", ""))) name_save = user.get("remark", user.get("nickname", user.get("username", "")))
username = user.get("username", "") username = user.get("username", "")

View File

@ -6,6 +6,94 @@
# Date: 2023/12/03 # Date: 2023/12/03
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------
import hashlib import hashlib
import re
def read_dict_all_values(data):
"""
读取字典中所有的值单层
:param dict_data: 字典
:return: 所有值的list
"""
result = []
if isinstance(data, list):
for item in data:
result.extend(read_dict_all_values(item))
elif isinstance(data, dict):
for key, value in data.items():
result.extend(read_dict_all_values(value))
else:
if isinstance(data, bytes):
tmp = data.decode("utf-8")
else:
tmp = str(data) if isinstance(data, int) else data
result.append(tmp)
for i in range(len(result)):
if isinstance(result[i], bytes):
result[i] = result[i].decode("utf-8")
return result
def match_BytesExtra(BytesExtra, pattern=r"FileStorage(.*?)'"):
"""
匹配 BytesExtra
:param BytesExtra: BytesExtra
:param pattern: 匹配模式
:return:
"""
if not BytesExtra:
return False
BytesExtra = read_dict_all_values(BytesExtra)
BytesExtra = "'" + "'".join(BytesExtra) + "'"
# print(BytesExtra)
match = re.search(pattern, BytesExtra)
if match:
video_path = match.group(0).replace("'", "")
return video_path
else:
return ""
def get_type_name(type_id: tuple):
"""
获取消息类型名称
:param type_id: 消息类型ID 元组 eg: (1, 0)
:return:
"""
type_name_dict = {
(1, 0): "文本",
(3, 0): "图片",
(34, 0): "语音",
(43, 0): "视频",
(47, 0): "动画表情",
(49, 0): "文件",
(49, 1): "类似文字消息而不一样的消息",
(49, 5): "卡片式链接",
(49, 6): "文件",
(49, 8): "用户上传的 GIF 表情",
(49, 19): "合并转发的聊天记录",
(49, 33): "分享的小程序",
(49, 36): "分享的小程序",
(49, 57): "带有引用的文本消息",
(49, 63): "视频号直播或直播回放等",
(49, 87): "群公告",
(49, 88): "视频号直播或直播回放等",
(49, 2000): "转账消息",
(49, 2003): "赠送红包封面",
(50, 0): "语音通话",
(10000, 0): "系统通知",
(10000, 4): "拍一拍",
(10000, 8000): "系统通知"
}
if type_id in type_name_dict:
return type_name_dict[type_id]
else:
return "未知"
def get_md5(data): def get_md5(data):

View File

@ -188,10 +188,9 @@ def load_chat_records(selected_talker, start_index, page_size, user_list, MSG_AL
else: else:
content["src"] = "" content["src"] = ""
content["msg"] = "图片" content["msg"] = "图片"
else: else:
content["msg"] = StrContent content["msg"] = StrContent
talker = "未知" talker = "未知"
if IsSender == 1: if IsSender == 1:
talker = "" talker = ""