重构文件结构,增加合并数据库功能,修复部分bug

This commit is contained in:
xaoyaoo 2023-12-06 13:42:23 +08:00
commit ca0b442d81
27 changed files with 939 additions and 930 deletions

330
README.md
View File

@ -13,48 +13,6 @@
[![GitHub license](https://img.shields.io/pypi/l/pywxdump)](https://github.com/xaoyaoo/PyWxDump/blob/master/LICENSE) [![GitHub license](https://img.shields.io/pypi/l/pywxdump)](https://github.com/xaoyaoo/PyWxDump/blob/master/LICENSE)
[![Publish](https://github.com/xaoyaoo/PyWxDump/actions/workflows/publish.yml/badge.svg)](https://github.com/xaoyaoo/PyWxDump/actions/workflows/publish.yml) [![Publish](https://github.com/xaoyaoo/PyWxDump/actions/workflows/publish.yml/badge.svg)](https://github.com/xaoyaoo/PyWxDump/actions/workflows/publish.yml)
<details>
<summary><strong>更新日志(点击展开)</strong></summary>
* 2023.12.03 增加分析聊天记录的功能,生成词云、绘制折线图等
* 2023.12.03 修复部分bug,更改获取wx文件夹方式 [#34](https://github.com/xaoyaoo/PyWxDump/issues/34)
* 2023.12.01 为exe添加图标
* 2023.11.30 优化命令行界面
* 2023.11.29 添加异形wxid获取方式添加用户路径自动获取重建说明文档对新手更友好
* 2023.11.28 修改wxid获取方式修复部分bug
* 2023.11.27 解决相对导入包的问题,完善错误提示
* 2023.11.25 聊天记录查看工具bootstrap更换国内cdn
* 2023.11.22 添加all命令中解密错误数据日志写入文件,修复部分bug
* 2023.11.16 增加聊天记录导出为html
* 2023.11.15 添加test文件添加自动构建可执行文件的脚本,添加版本描述
* 2023.11.15 [v2.2.5变化较大]重构解密脚本的返回值,重构命令行参数
* 2023.11.15 修复无法获取wxid的bug
* 2023.11.14 修复部分bug
* 2023.11.11 添加聊天记录解析,查看工具,修复部分bug
* 2023.11.10 修复wxdump wx_db命令行参数错误 [#19](https://github.com/xaoyaoo/PyWxDump/issues/19)
* 2023.11.08 增加3.9.8.15版本支持
* 2023.10.31 修复3.9.2.*版本无法正常运行
* 2023.10.28 添加自动发布到pypi的github action
* 2023.10.28 修复3.9.5.91版本的偏移
* 2023.10.24 add auto get bias addr ,not need input key or wx folder path.
* 2023.10.17 add LICENSE
* 2023.10.16 添加"3.9.7.15"版本的偏移[#12](https://github.com/xaoyaoo/PyWxDump/issues/12)
,感谢@[GentlemanII](https://github.com/GentlemanII)
* 2023.10.15 将整个项目作为包安装,增加命令行统一操作
* 2023.10.14 整体重构项目,优化代码,增加命令行统一操作
* 2023.10.11 添加"3.9.5.81"版本的偏移[#10](https://github.com/xaoyaoo/PyWxDump/issues/10)
,感谢@[sv3nbeast](https://github.com/sv3nbeast)
* 2023.10.09 获取key基址偏移可以根据微信文件夹获取不需要输入key
* 2023.10.09 优化代码,删减没必要代码,重新修改获取基址代码,加快运行速度(需要安装新的库 pymem
* 2023.10.07 修改获取基址内存搜索方式,防止进入死循环
* 2023.10.07 增加了3.9.7.29版本的偏移地址
* 2023.10.06 增加命令行解密数据库
* 2023.09.28 增加了数据库部分解析
* 2023.09.15 增加了3.9.7.25版本的偏移地址
</details>
**更新计划【由于家里有事,这些计划(除8、9、10)将会在12.30号前统一更新】** **更新计划【由于家里有事,这些计划(除8、9、10)将会在12.30号前统一更新】**
* 1.每个人聊天记录分析,生成词云。 * 1.每个人聊天记录分析,生成词云。
@ -63,38 +21,13 @@
* 4.生成年度可视化报告 * 4.生成年度可视化报告
* 5.创建GUI图形界面方便使用 * 5.创建GUI图形界面方便使用
* 6.查看群聊中具体发言成员的ID [#31](https://github.com/xaoyaoo/PyWxDump/issues/31) * 6.查看群聊中具体发言成员的ID [#31](https://github.com/xaoyaoo/PyWxDump/issues/31)
* 7.增加数据库合并功能,方便查看 * 7.增加数据库合并功能,方便查看(已完成,待测试)
* 8.增加企业微信的支持 * 8.增加企业微信的支持
* 9.增加获取实时聊天记录的功能 * 9.增加获取实时聊天记录的功能
* 10.聊天记录关键字搜索 或者按时间点搜索列出所有的联系人记录就nice了 * 10.聊天记录关键字搜索 或者按时间点搜索列出所有的联系人记录就nice了
注: 欢迎大家提供更多的想法,或者提供代码,一起完善这个项目。 注: 欢迎大家提供更多的想法,或者提供代码,一起完善这个项目。
<details>
<summary>贡献代码方法(点击展开)</summary>
提交拉取请求Pull Request请按照以下步骤进行操作
1. Fork 仓库:首先,在项目的 GitHub 页面上点击 "Fork" 按钮,将项目的代码仓库 fork 到你自己的 GitHub 账号下。
2. 克隆仓库:在你自己的 GitHub 账号下找到 fork 后的项目,点击 `Clone or download`按钮,获取仓库的 URL。然后在本地使用 Git
命令克隆仓库到你的电脑上:`git clone 仓库的URL`
3. 创建分支:在本地仓库中创建一个新的分支,用于进行你的修改:`git checkout -b 你的分支名`
4. 进行修改:在新创建的分支中进行你需要的修改,包括修复错误、改进现有功能或添加新功能。
5. 提交修改:使用 `git add``git commit` 命令将修改提交到本地仓库中:
```
git add .
git commit -m "提交信息"
```
6. 推送分支:使用 `git push` 命令将你的本地分支推送到你的 GitHub 仓库中:`git push origin 你的分支名`
7. 提交拉取请求:在你的 GitHub 仓库页面上切换到你刚刚推送的分支,点击 "New pull request"
按钮,填写一些说明信息,然后点击 `Create pull request`
按钮,即可提交拉取请求。
8. 等待审核:等待项目维护者审核你的拉取请求,如果通过审核,你的修改将会被合并到项目的主分支中
9. 接着你就可以在右边的`contributors`中看到你的名字了。
</details>
欢迎加入交流qq群577704006 or 欢迎加入交流qq群577704006 or
点击链接加入群聊[pywxdump功能交流](https://s.xaoyo.top/gOLUDl)。 点击链接加入群聊[pywxdump功能交流](https://s.xaoyo.top/gOLUDl)。
@ -122,11 +55,7 @@
* 7支持微信多开场景获取多用户信息等 * 7支持微信多开场景获取多用户信息等
* 8微信需要登录状态才能获取数据库密钥 * 8微信需要登录状态才能获取数据库密钥
* 9支持导出聊天记录为html,备份微信聊天记录,方便查看 * 9支持导出聊天记录为html,备份微信聊天记录,方便查看
* 10合并多个数据库方便查看
**版本差异**
1. 版本 < 3.7.0.30 只运行不登录能获取个人信息登录后可以获取数据库密钥
2. 版本 > 3.7.0.30 只运行不登录不能获取个人信息,登录后都能获取
**利用场景** **利用场景**
@ -136,41 +65,7 @@
4. 自行备份(日常备份自己留存) 4. 自行备份(日常备份自己留存)
5. 等等............... 5. 等等...............
## 3. 项目结构 ## 3. 其他
<details>
<summary>点击展开</summary>
```
PyWxDump
├─ pywxdump # 项目代码,存放各个模块
│ ├─ analyse # 解析数据库
│ │ └─ parse.py # 解析数据库脚本,可以解析语音、图片、聊天记录等
│ ├─ bias_addr # 获取偏移地址
│ │ └─ get_bias_addr.py # 获取偏移地址脚本
│ ├─ decrypted # 解密数据库
│ │ ├─ decrypt.py # 解密数据库脚本
│ │ └─ get_wx_decrypted_db.py # 直接读取当前登录微信的数据库解密后保存到当前目录下的decrypted文件夹中
│ ├─ wx_info # 获取微信基本信息
│ │ ├─ get_wx_info.py # 获取微信基本信息脚本
│ │ └─ get_wx_db.py # 获取本地所有的微信相关数据库
│ ├─ show_records # 显示聊天记录
│ │ ├─ main_window.py # 显示聊天记录的窗口
│ │ └─ templates # 显示聊天记录的html模板
│ ├─ command.py # 命令行入口
│ └─ version_list.json # 微信版本列表 (十进制)按顺序代表微信昵称、微信账号、微信手机号、微信邮箱默认0、微信KEY、微信原始IDwxid_******
├─ doc # 项目文档
│ ├─ python1.0_README.md # python1.0版本的README
│ ├─ wx数据库简述.md # wx数据库简述
│ └─ CE获取基址.md # CE获取基址
├─ README.md
├─ setup.py # 安装脚本
└─ requirements.txt
```
</details>
## 4. 其他
[PyWxDump](https://github.com/xaoyaoo/PyWxDump)是[SharpWxDump](https://github.com/AdminTest0/SharpWxDump) [PyWxDump](https://github.com/xaoyaoo/PyWxDump)是[SharpWxDump](https://github.com/AdminTest0/SharpWxDump)
的经过重构python语言版本同时添加了一些新的功能。 的经过重构python语言版本同时添加了一些新的功能。
@ -182,15 +77,7 @@ PyWxDump
* 如发现bug或有改进意见, 请提交[issues](https://github.com/xaoyaoo/PyWxDump/issues). * 如发现bug或有改进意见, 请提交[issues](https://github.com/xaoyaoo/PyWxDump/issues).
* 如有其他想要的功能, 请提交[issues](https://github.com/xaoyaoo/PyWxDump/issues). * 如有其他想要的功能, 请提交[issues](https://github.com/xaoyaoo/PyWxDump/issues).
* 常见问题请参考[FAQ](./doc/FAQ.md) * 常见问题请参考[FAQ](./doc/FAQ.md)
* 更新日志请参考[CHANGELOG](./doc/CHANGELOG.md)
<details>
<summary>提交issues方法(点击展开)</summary>
[![image](https://github.com/xaoyaoo/PyWxDump/assets/37209452/22d15ea6-05d6-4f30-8b24-04a51a59d56d)](https://github.com/xaoyaoo/PyWxDump/issues)
[![image](https://github.com/xaoyaoo/PyWxDump/assets/37209452/9bdc2961-694a-4104-a1c7-05403220c0fe)](https://github.com/xaoyaoo/PyWxDump/issues)
[![image](https://github.com/xaoyaoo/PyWxDump/assets/37209452/be1d8913-5a6e-4fff-9fcd-00edb33d255b)](https://github.com/xaoyaoo/PyWxDump/issues)
</details>
<details> <details>
<summary><strong>Star History(click to expand)</strong></summary> <summary><strong>Star History(click to expand)</strong></summary>
@ -201,214 +88,7 @@ PyWxDump
# 二、使用说明 # 二、使用说明
## 1. 安装 详细使用说明见[UserGuide.md](./doc/UserGuide.md)
### 1.1 从pypi安装(安装稳定版)
```shell script
pip install -U pywxdump
```
### 1.2 从源码安装(安装最新版)
<details>
<summary>点击展开</summary>
```shell script
pip install -U git+git://github.com/xaoyaoo/PyWxDump.git
```
```shell script
git clone https://github.com/xaoyaoo/PyWxDump.git
cd PyWxDump
python -m pip install -U .
```
</details>
### 1.3 使用可执行文件exe
<details>
<summary>点击展开</summary>
* 1.下载[release](https://github.com/xaoyaoo/PyWxDump/releases)中的exe文件
* 2.或者自行打包,打包脚本见: [build_exe.py](./tests/build_exe.py)
```shell
cd tests
python build_exe.py
```
</details>
## 2. 使用
### 2.1 命令行
激活虚拟环境后(如果有的话),在项目根目录下运行:
```shell script
wxdump 模式 [参数]
# 运行模式(mode):
# bias 获取微信基址偏移
# info 获取微信信息
# db_path 获取微信文件夹路径
# decrypt 解密微信数据库
# dbshow 聊天记录查看
# export 聊天记录导出为html
# all 获取微信信息,解密微信数据库,查看聊天记录
```
*示例*
<details>
<summary>点击展开示例</summary>
以下是示例命令:
##### 获取微信基址偏移
```bash
pywxdump bias --mobile <手机号> --name <微信昵称> --account <微信账号> [--key <密钥>] [--db_path <已登录账号的微信文件夹路径>] [--version_list_path <微信版本偏移文件路径>]
```
##### 获取微信信息
```bash
pywxdump info [--version_list_path <微信版本偏移文件路径>]
```
##### 获取微信文件夹路径
```bash
pywxdump db_path [-r <需要的数据库名称>] [-wf <WeChat Files 路径>] [-id <wxid_>]
```
##### 解密微信数据库
```bash
pywxdump decrypt -k <密钥> -i <数据库路径(目录or文件)> [-o <输出路径>]
```
##### 查看聊天记录
```bash
pywxdump dbshow -msg <解密后的 MSG.db 的路径> -micro <解密后的 MicroMsg.db 的路径> -media <解密后的 MediaMSG.db 的路径> [-fs <FileStorage 路径>]
```
##### 导出聊天记录为 HTML
```bash
pywxdump export -u <微信账号> -o <导出路径> -msg <解密后的 MSG.db 的路径> -micro <解密后的 MicroMsg.db 的路径> -media <解密后的 MediaMSG.db 的路径> [-fs <FileStorage 路径>]
```
##### 获取微信信息、解密数据库、查看聊天记录一条命令搞定开放端口5000浏览器访问查看聊天记录支持局域网其他机器访问
```bash
pywxdump all
```
</details>
### 2.2 python API
*import调用示例*
<details>
<summary>点击展开示例</summary>
```python
# 单独使用各模块,返回值一般为字典,参数参考命令行
from pywxdump import *
# ************************************************************************************************ #
# 获取微信基址偏移
args = {
"mode": "bias",
"mobile": "13800138000", # 手机号
"name": "微信昵称", # 微信昵称
"account": "微信账号", # 微信账号
"key": "密钥", # 密钥(可选)
"db_path": "已登录账号的微信文件夹路径", # 微信文件夹路径(可选)
"version_list_path": "微信版本偏移文件路径" # 微信版本偏移文件路径(可选)
}
bias_addr = BiasAddr(args["account"], args["mobile"], args["name"], args["key"], args["db_path"])
result = bias_addr.run(True, args["version_list_path"])
# ************************************************************************************************ #
# 获取微信信息
wx_info = read_info(VERSION_LIST, True)
# 获取微信文件夹路径
args = {
"mode": "db_path",
"require_list": "all", # 需要的数据库名称(可选)
"wx_files": "WeChat Files", # 'WeChat Files'路径(可选)
"wxid": "wxid_", # wxid_用于确认用户文件夹可选
}
user_dirs = get_wechat_db(args["require_list"], args["wx_files"], args["wxid"], True)
# ************************************************************************************************ #
# 解密微信数据库
args = {
"mode": "decrypt",
"key": "密钥", # 密钥
"db_path": "数据库路径(目录or文件)", # 数据库路径
"out_path": "/path/to/decrypted" # 输出路径(必须是目录)[默认为当前路径下decrypted文件夹]
}
result = batch_decrypt(args["key"], args["db_path"], args["out_path"], True)
# ************************************************************************************************ #
# 查看聊天记录
args = {
"mode": "dbshow",
"msg_path": "解密后的 MSG.db 的路径", # 解密后的 MSG.db 的路径
"micro_path": "解密后的 MicroMsg.db 的路径", # 解密后的 MicroMsg.db 的路径
"media_path": "解密后的 MediaMSG.db 的路径", # 解密后的 MediaMSG.db 的路径
"filestorage_path": "文件夹FileStorage的路径" # 文件夹 FileStorage 的路径(用于显示图片)
}
from flask import Flask, request, jsonify, render_template, g
import logging
app = Flask(__name__, template_folder='./show_chat/templates')
app.logger.setLevel(logging.ERROR)
@app.before_request
def before_request():
g.MSG_ALL_db_path = args["msg_path"]
g.MicroMsg_db_path = args["micro_path"]
g.MediaMSG_all_db_path = args["media_path"]
g.FileStorage_path = args["filestorage_path"]
g.USER_LIST = get_user_list(args["msg_path"], args["micro_path"])
app.register_blueprint(app_show_chat)
print("[+] 请使用浏览器访问 http://127.0.0.1:5000/ 查看聊天记录")
app.run(debug=False)
# ************************************************************************************************ #
# 导出聊天记录为 HTML
args = {
"mode": "export",
"username": "微信账号", # 微信账号(聊天对象账号)
"outpath": "/path/to/export", # 导出路径
"msg_path": "解密后的 MSG.db 的路径", # 解密后的 MSG.db 的路径
"micro_path": "解密后的 MicroMsg.db 的路径", # 解密后的 MicroMsg.db 的路径
"media_path": "解密后的 MediaMSG.db 的路径", # 解密后的 MediaMSG.db 的路径
"filestorage_path": "文件夹FileStorage的路径" # 文件夹 FileStorage 的路径(用于显示图片)
}
export(args["username"], args["outpath"], args["msg_path"], args["micro_path"], args["media_path"],
args["filestorage_path"])
```
</details>
更多使用方法参考[tests](./tests)文件夹下的[test_*.py](./tests/)文件
### 2.3 可执行文件exe
进入exe文件所在目录运行 `wxdump.exe 模式 [参数]`,方法同[命令行](#21-命令行)
### 2.4 其他说明
【注】: 【注】:

53
doc/CHANGELOG.md Normal file
View File

@ -0,0 +1,53 @@
# 更新日志
## v2.3.0 (2023-12-06)
### 新功能
- 增加数据库合并功能,方便查看
- 重新整合代码,优化代码结构
- 优化命令行参数
### 优化
- 修复部分bug
## version < v2.2.18 (2023-12-06)
### 优化/新功能
* 2023.12.03 增加分析聊天记录的功能,生成词云、绘制折线图等
* 2023.12.03 修复部分bug,更改获取wx文件夹方式 [#34](https://github.com/xaoyaoo/PyWxDump/issues/34)
* 2023.12.01 为exe添加图标
* 2023.11.30 优化命令行界面
* 2023.11.29 添加异形wxid获取方式添加用户路径自动获取重建说明文档对新手更友好
* 2023.11.28 修改wxid获取方式修复部分bug
* 2023.11.27 解决相对导入包的问题,完善错误提示
* 2023.11.25 聊天记录查看工具bootstrap更换国内cdn
* 2023.11.22 添加all命令中解密错误数据日志写入文件,修复部分bug
* 2023.11.16 增加聊天记录导出为html
* 2023.11.15 添加test文件添加自动构建可执行文件的脚本,添加版本描述
* 2023.11.15 [v2.2.5变化较大]重构解密脚本的返回值,重构命令行参数
* 2023.11.15 修复无法获取wxid的bug
* 2023.11.14 修复部分bug
* 2023.11.11 添加聊天记录解析,查看工具,修复部分bug
* 2023.11.10 修复wxdump wx_db命令行参数错误 [#19](https://github.com/xaoyaoo/PyWxDump/issues/19)
* 2023.11.08 增加3.9.8.15版本支持
* 2023.10.31 修复3.9.2.*版本无法正常运行
* 2023.10.28 添加自动发布到pypi的github action
* 2023.10.28 修复3.9.5.91版本的偏移
* 2023.10.24 add auto get bias addr ,not need input key or wx folder path.
* 2023.10.17 add LICENSE
* 2023.10.16 添加"3.9.7.15"版本的偏移[#12](https://github.com/xaoyaoo/PyWxDump/issues/12)
,感谢@[GentlemanII](https://github.com/GentlemanII)
* 2023.10.15 将整个项目作为包安装,增加命令行统一操作
* 2023.10.14 整体重构项目,优化代码,增加命令行统一操作
* 2023.10.11 添加"3.9.5.81"版本的偏移[#10](https://github.com/xaoyaoo/PyWxDump/issues/10)
,感谢@[sv3nbeast](https://github.com/sv3nbeast)
* 2023.10.09 获取key基址偏移可以根据微信文件夹获取不需要输入key
* 2023.10.09 优化代码,删减没必要代码,重新修改获取基址代码,加快运行速度(需要安装新的库 pymem
* 2023.10.07 修改获取基址内存搜索方式,防止进入死循环
* 2023.10.07 增加了3.9.7.29版本的偏移地址
* 2023.10.06 增加命令行解密数据库
* 2023.09.28 增加了数据库部分解析
* 2023.09.15 增加了3.9.7.25版本的偏移地址

View File

@ -1,4 +1,6 @@
## 怎么下载 # FAQ
- ### 一、怎么下载/怎么安装?
方法一:进入链接[releases](https://github.com/xaoyaoo/PyWxDump/releases)下载最新版本exe文件 方法一:进入链接[releases](https://github.com/xaoyaoo/PyWxDump/releases)下载最新版本exe文件
@ -7,7 +9,8 @@
pip install PyWxDump pip install PyWxDump
``` ```
## 怎么使用 - ### 二、怎么使用
1. 打开微信电脑版,登录微信 1. 打开微信电脑版,登录微信
2. 进入下载的exe文件所在目录,使用pip安装跳过此步 2. 进入下载的exe文件所在目录,使用pip安装跳过此步
@ -15,17 +18,55 @@ pip install PyWxDump
4. 在命令窗口中输入`PyWxDump`按回车键pip安装输入`wxdump` 4. 在命令窗口中输入`PyWxDump`按回车键pip安装输入`wxdump`
5. 接着根据提示输入参数,回车键确认 5. 接着根据提示输入参数,回车键确认
## 每台电脑上微信账户的key是不是永远不会变 - ### 三、每台电脑上微信账户的key是不是永远不会变
同一设备同一微信不删除数据情况下key密钥相同 1. 同一设备同一微信不删除数据情况下key密钥相同
- ### 四、刚打开就闪退的问题
## 刚打开就闪退的问题
1. 请检查是否由cmd或powershell打开不要直接双击exe文件 1. 请检查是否由cmd或powershell打开不要直接双击exe文件
2. 如果使用方法二安装请检查是否已经安装了python环境如果使用pip安装命令行直接输入wxdump即可 2. 如果使用方法二安装请检查是否已经安装了python环境如果使用pip安装命令行直接输入wxdump即可
3. 如果使用方法二安装检查是否将python安装目录添加到了环境变量中如果没有请添加 3. 如果使用方法二安装检查是否将python安装目录添加到了环境变量中如果没有请添加
## 如果遇到其他问题 - ### 五、如果遇到其他问题
1. 截图或复制错误信息,请全截图或全复制,不要只截一部分或复制部分信息。
2. 通过issue反馈问题或者加入QQ群[加入QQ群](https://s.xaoyo.top/gOLUDl)
- ### 六、如何为PyWxDump贡献代码提交pr
提交拉取请求Pull Request请按照以下步骤进行操作
1. Fork 仓库:首先,在项目的 GitHub 页面上点击 "Fork" 按钮,将项目的代码仓库 fork 到你自己的 GitHub 账号下。
2. 克隆仓库:在你自己的 GitHub 账号下找到 fork 后的项目,点击 `Clone or download`按钮,获取仓库的 URL。然后在本地使用 Git
命令克隆仓库到你的电脑上:`git clone 仓库的URL`
3. 创建分支:在本地仓库中创建一个新的分支,用于进行你的修改:`git checkout -b 你的分支名`
4. 进行修改:在新创建的分支中进行你需要的修改,包括修复错误、改进现有功能或添加新功能。
5. 提交修改:使用 `git add``git commit` 命令将修改提交到本地仓库中:
```
git add .
git commit -m "提交信息"
```
6. 推送分支:使用 `git push` 命令将你的本地分支推送到你的 GitHub 仓库中:`git push origin 你的分支名`
7. 提交拉取请求:在你的 GitHub 仓库页面上切换到你刚刚推送的分支,点击 "New pull request"
按钮,填写一些说明信息,然后点击 `Create pull request`
按钮,即可提交拉取请求。
8. 等待审核:等待项目维护者审核你的拉取请求,如果通过审核,你的修改将会被合并到项目的主分支中
9. 接着你就可以在右边的`contributors`中看到你的名字了。
- ### 七、为什么要提交issues
1. 提交issues可以帮助我们更好的改进项目提高项目的质量
- ### 八、提交issues方法
[![image](https://github.com/xaoyaoo/PyWxDump/assets/37209452/22d15ea6-05d6-4f30-8b24-04a51a59d56d)](https://github.com/xaoyaoo/PyWxDump/issues)
[![image](https://github.com/xaoyaoo/PyWxDump/assets/37209452/9bdc2961-694a-4104-a1c7-05403220c0fe)](https://github.com/xaoyaoo/PyWxDump/issues)
[![image](https://github.com/xaoyaoo/PyWxDump/assets/37209452/be1d8913-5a6e-4fff-9fcd-00edb33d255b)](https://github.com/xaoyaoo/PyWxDump/issues)
- ### 九、版本差异
1. 版本 < 3.7.0.30 只运行不登录能获取个人信息登录后可以获取数据库密钥
2. 版本 > 3.7.0.30 只运行不登录不能获取个人信息,登录后都能获取
截图或复制错误信息,请全截图或全复制,不要只截一部分或复制部分信息。
通过issue反馈问题或者加入QQ群[加入QQ群](https://s.xaoyo.top/gOLUDl)

237
doc/UserGuide.md Normal file
View File

@ -0,0 +1,237 @@
# 用户指南
## 小白教程
### 1. 安装
下载[release](https://github.com/xaoyaoo/PyWxDump/releases)中的exe文件
### 2. 使用
* 1.打开微信电脑版,登录微信
* 2.进入下载的exe文件所在目录
* 3.按住shift键同时鼠标右键选择“在此处打开命令窗口”或者“在此处打开powershell窗口”
* 4.在命令窗口中输入`wxdump`,按回车键
* 5.接着根据提示输入参数,回车键确认
eg:
```shell script
wxdump info # 获取微信信息
wxdump decrypt -k "密钥" -i "数据库路径(目录or文件)" # 解密微信数据库,引号必须在英文状态下输入
wxdump dbshow -msg "解密后的 MSG.db 的路径" -micro "解密后的 MicroMsg.db 的路径" -media "解密后的 MediaMSG.db 的路径" # 接着打开浏览器访问 http://127.0.0.1:5000/ 查看聊天记录
wxdump export -u "微信账号" -o "导出路径" -msg "解密后的 MSG.db 的路径" -micro "解密后的 MicroMsg.db 的路径" -media "解密后的 MediaMSG.db 的路径" # 导出聊天记录为html
wxdump all # 获取微信信息,解密微信数据库,查看聊天记录
```
* 6.查看聊天记录后,按`ctrl+c`退出
## 详细教程(小白请看上面)
### 1. 安装
#### 1.1 从pypi安装(安装稳定版)
```shell script
pip install -U pywxdump
```
#### 1.2 从源码安装(安装最新版)
```shell script
pip install -U git+git://github.com/xaoyaoo/PyWxDump.git
```
```shell script
git clone https://github.com/xaoyaoo/PyWxDump.git
cd PyWxDump
python -m pip install -U .
```
#### 1.3 打包可执行文件exe
* 自行打包,打包脚本见: [build_exe.py](./tests/build_exe.py)
```shell
cd tests
python build_exe.py
# 接着执行输出的打包脚本
```
* 直接下载打包好的exe文件[release](https://github.com/xaoyaoo/PyWxDump/releases)
### 2. 使用
#### 2.1 命令行
激活虚拟环境后(如果有的话),在项目根目录下运行:
```shell script
wxdump 模式 [参数]
# 运行模式(mode):
# bias 获取微信基址偏移
# info 获取微信信息
# db_path 获取微信文件夹路径
# decrypt 解密微信数据库
# dbshow 聊天记录查看
# export 聊天记录导出为html
# all 获取微信信息,解密微信数据库,查看聊天记录
```
*示例*
<details>
<summary>点击展开示例</summary>
以下是示例命令:
##### 获取微信基址偏移
```bash
pywxdump bias --mobile <手机号> --name <微信昵称> --account <微信账号> [--key <密钥>] [--db_path <已登录账号的微信文件夹路径>] [--version_list_path <微信版本偏移文件路径>]
```
##### 获取微信信息
```bash
pywxdump info [--version_list_path <微信版本偏移文件路径>]
```
##### 获取微信文件夹路径
```bash
pywxdump db_path [-r <需要的数据库名称>] [-wf <WeChat Files 路径>] [-id <wxid_>]
```
##### 解密微信数据库
```bash
pywxdump decrypt -k <密钥> -i <数据库路径(目录or文件)> [-o <输出路径>]
```
##### 查看聊天记录
```bash
pywxdump dbshow -msg <解密后的 MSG.db 的路径> -micro <解密后的 MicroMsg.db 的路径> -media <解密后的 MediaMSG.db 的路径> [-fs <FileStorage 路径>]
```
##### 导出聊天记录为 HTML
```bash
pywxdump export -u <微信账号> -o <导出路径> -msg <解密后的 MSG.db 的路径> -micro <解密后的 MicroMsg.db 的路径> -media <解密后的 MediaMSG.db 的路径> [-fs <FileStorage 路径>]
```
##### 获取微信信息、解密数据库、查看聊天记录一条命令搞定开放端口5000浏览器访问查看聊天记录支持局域网其他机器访问
```bash
pywxdump all
```
</details>
#### 2.2 python API
*import调用示例*
<details>
<summary>点击展开示例</summary>
```python
# 单独使用各模块,返回值一般为字典,参数参考命令行
from pywxdump import *
# ************************************************************************************************ #
# 获取微信基址偏移
args = {
"mode": "bias",
"mobile": "13800138000", # 手机号
"name": "微信昵称", # 微信昵称
"account": "微信账号", # 微信账号
"key": "密钥", # 密钥(可选)
"db_path": "已登录账号的微信文件夹路径", # 微信文件夹路径(可选)
"version_list_path": "微信版本偏移文件路径" # 微信版本偏移文件路径(可选)
}
bias_addr = BiasAddr(args["account"], args["mobile"], args["name"], args["key"], args["db_path"])
result = bias_addr.run(True, args["version_list_path"])
# ************************************************************************************************ #
# 获取微信信息
wx_info = read_info(VERSION_LIST, True)
# 获取微信文件夹路径
args = {
"mode": "db_path",
"require_list": "all", # 需要的数据库名称(可选)
"wx_files": "WeChat Files", # 'WeChat Files'路径(可选)
"wxid": "wxid_", # wxid_用于确认用户文件夹可选
}
user_dirs = get_wechat_db(args["require_list"], args["wx_files"], args["wxid"], True)
# ************************************************************************************************ #
# 解密微信数据库
args = {
"mode": "decrypt",
"key": "密钥", # 密钥
"db_path": "数据库路径(目录or文件)", # 数据库路径
"out_path": "/path/to/decrypted" # 输出路径(必须是目录)[默认为当前路径下decrypted文件夹]
}
result = batch_decrypt(args["key"], args["db_path"], args["out_path"], True)
# ************************************************************************************************ #
# 查看聊天记录
args = {
"mode": "dbshow",
"msg_path": "解密后的 MSG.db 的路径", # 解密后的 MSG.db 的路径
"micro_path": "解密后的 MicroMsg.db 的路径", # 解密后的 MicroMsg.db 的路径
"media_path": "解密后的 MediaMSG.db 的路径", # 解密后的 MediaMSG.db 的路径
"filestorage_path": "文件夹FileStorage的路径" # 文件夹 FileStorage 的路径(用于显示图片)
}
from flask import Flask, request, jsonify, render_template, g
import logging
app = Flask(__name__, template_folder='./show_chat/templates')
app.logger.setLevel(logging.ERROR)
@app.before_request
def before_request():
g.MSG_ALL_db_path = args["msg_path"]
g.MicroMsg_db_path = args["micro_path"]
g.MediaMSG_all_db_path = args["media_path"]
g.FileStorage_path = args["filestorage_path"]
g.USER_LIST = get_user_list(args["msg_path"], args["micro_path"])
app.register_blueprint(app_show_chat)
print("[+] 请使用浏览器访问 http://127.0.0.1:5000/ 查看聊天记录")
app.run(debug=False)
# ************************************************************************************************ #
# 导出聊天记录为 HTML
args = {
"mode": "export",
"username": "微信账号", # 微信账号(聊天对象账号)
"outpath": "/path/to/export", # 导出路径
"msg_path": "解密后的 MSG.db 的路径", # 解密后的 MSG.db 的路径
"micro_path": "解密后的 MicroMsg.db 的路径", # 解密后的 MicroMsg.db 的路径
"media_path": "解密后的 MediaMSG.db 的路径", # 解密后的 MediaMSG.db 的路径
"filestorage_path": "文件夹FileStorage的路径" # 文件夹 FileStorage 的路径(用于显示图片)
}
export(args["username"], args["outpath"], args["msg_path"], args["micro_path"], args["media_path"],
args["filestorage_path"])
```
</details>
更多使用方法参考[tests](../tests)文件夹下的[test_*.py](../tests/)文件
#### 2.3 可执行文件exe
进入exe文件所在目录运行 `wxdump.exe 模式 [参数]`,方法同[命令行](#21-命令行)
### 3. FAQ
详见[FAQ](./FAQ.md)
### 4. 更新日志
详见[更新日志](./CHANGELOG.md)

View File

@ -189,7 +189,7 @@ python get_wx_decrypted_db.py --key ********
## 四、解析数据库 ## 四、解析数据库
* [parse.py](../pywxdump/analyse/parse.py) : 数据库解析脚本,可以解析语音、图片、聊天记录等 * [parse.py](../pywxdump/analyzer/parse.py) : 数据库解析脚本,可以解析语音、图片、聊天记录等
* 关于各个数据库的说明文档,请查看[wx数据库简述.md](./wx数据库简述.md) * 关于各个数据库的说明文档,请查看[wx数据库简述.md](./wx数据库简述.md)
未完待续... 未完待续...

View File

@ -5,13 +5,10 @@
# Author: xaoyaoo # Author: xaoyaoo
# Date: 2023/10/14 # Date: 2023/10/14
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------
from .bias_addr.get_bias_addr import BiasAddr from .wx_info import BiasAddr,read_info, get_wechat_db,encrypt,batch_decrypt,decrypt
from .wx_info.get_wx_info import read_info from .wx_info import merge_copy_db, merge_msg_db, merge_media_msg_db
from .wx_info.get_wx_db import get_wechat_db from .analyzer.db_parsing import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, parse_xml_string
from .decrypted.decrypt import batch_decrypt, decrypt,encrypt from .ui import app_show_chat, get_user_list, export
from .decrypted.get_wx_decrypted_db import all_decrypt, merge_copy_msg_db, merge_msg_db, merge_media_msg_db
from .analyse.parse import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, parse_xml_string
from .show_chat import app_show_chat, get_user_list, export
import os,json import os,json

View File

@ -5,4 +5,4 @@
# Author: xaoyaoo # Author: xaoyaoo
# Date: 2023/09/27 # Date: 2023/09/27
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------
from .parse import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, parse_xml_string from .db_parsing import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, parse_xml_string

View File

@ -153,9 +153,10 @@ def decompress_CompressContent(data):
""" """
if data is None or not isinstance(data, bytes): if data is None or not isinstance(data, bytes):
return None return None
dst = lz4.block.decompress(data, uncompressed_size=len(data) << 8) dst = lz4.block.decompress(data, uncompressed_size=len(data) << 8)
dst.decode().replace('\x00', '') # 已经解码完成后还含有0x00的部分要删掉要不后面ET识别的时候会报错 dst.decode().replace('\x00', '') # 已经解码完成后还含有0x00的部分要删掉要不后面ET识别的时候会报错
uncompressed_data = dst.encode() uncompressed_data = dst.encode('utf-8', errors='ignore')
return uncompressed_data return uncompressed_data
@ -244,19 +245,25 @@ def wordcloud_generator(text, out_path="", is_show=False, img_path="", font="C:\
wordcloud_img = wordcloud1.to_image() wordcloud_img = wordcloud1.to_image()
wordcloud_img.show() wordcloud_img.show()
def read_BytesExtra(bytes_extra):
if bytes_extra is None: def read_BytesExtra(BytesExtra):
if BytesExtra is None or not isinstance(BytesExtra, bytes):
return None return None
try: try:
deserialize_data, message_type = blackboxprotobuf.decode_message(bytes_extra) deserialize_data, message_type = blackboxprotobuf.decode_message(BytesExtra)
return deserialize_data return deserialize_data
except Exception as e: except Exception as e:
# print(f"can not decode bytes_extra:{e}")
return None return None
if __name__ == '__main__': if __name__ == '__main__':
data = '' DB = sqlite3.connect(r"D:\_code\py_code\test\a2023\b0821wxdb\merge_wfwx_db\kkWxMsg\MSG_all.db")
read_BytesExtra(data) cursor = DB.cursor()
print('*' * 50) sql = "select MsgSvrID,BytesExtra from MSG where BytesExtra is not null and StrTalker='24724392255@chatroom' order by CreateTime desc limit 10"
data2 = '' DBdata = cursor.execute(sql).fetchall()
read_BytesExtra(data2) for i in DBdata:
MsgSvrID, BytesExtra = i
data = read_BytesExtra(BytesExtra)
# 提取特定键的信息
print(MsgSvrID,"\n",data)
print("-" * 64)

View File

@ -0,0 +1,324 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: export_chat.py
# Description:
# Author: xaoyaoo
# Date: 2023/12/03
# -------------------------------------------------------------------------------
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: GUI.py
# Description:
# Author: xaoyaoo
# Date: 2023/11/10
# -------------------------------------------------------------------------------
import base64
import sqlite3
import os
import json
import time
from functools import wraps
from .utils import get_md5, detach_databases, attach_databases, execute_sql
from .db_parsing import read_img_dat, decompress_CompressContent, read_audio, parse_xml_string
from flask import Flask, request, render_template, g, Blueprint
def get_contact_list(MicroMsg_db_path):
"""
获取联系人列表
:param MicroMsg_db_path: MicroMsg.db 文件路径
:return: 联系人列表
"""
users = []
# 连接 MicroMsg.db 数据库,并执行查询
db = sqlite3.connect(MicroMsg_db_path)
cursor = db.cursor()
cursor.execute(
"SELECT A.UserName, A.NickName, A.Remark,B.bigHeadImgUrl FROM Contact A,ContactHeadImgUrl B ORDER BY NickName ASC")
result = cursor.fetchall()
for row in result:
# 获取用户名、昵称、备注和聊天记录数量
username, nickname, remark, headImgUrl = row
users.append({"username": username, "nickname": nickname, "remark": remark, "headImgUrl": headImgUrl})
cursor.close()
db.close()
return users
def msg_db_connect(func):
@wraps(func)
def wrapper(MSG_db_path, *args, **kwargs):
# 连接 MSG.db 数据库,并执行查询
if isinstance(MSG_db_path, list):
# alias, file_path
databases = {f"MSG{i}": db_path for i, db_path in enumerate(MSG_db_path)}
elif isinstance(MSG_db_path, str):
databases = {"MSG": MSG_db_path}
else:
raise TypeError("MSG_db_path 类型错误")
# 连接 MSG_ALL.db 数据库,并执行查询
if len(databases) > 1:
db = sqlite3.connect(":memory:")
attach_databases(db, databases)
else:
db = sqlite3.connect(list(databases.values())[0])
result = func("", db=db, databases=databases, *args, **kwargs)
# 断开数据库连接
if len(databases) > 1:
for alias in databases:
db.execute(f"DETACH DATABASE {alias}")
db.close()
return result
return wrapper
@msg_db_connect
def get_chat_count(MSG_db_path: [str, list], db=None, databases=None):
"""
获取聊天记录数量
:param MSG_db_path: MSG.db 文件路径
:return: 聊天记录数量列表
"""
# 构造 SQL 查询,使用 UNION ALL 联合不同数据库的 MSG 表
union_sql = " UNION ALL ".join(
f"SELECT StrTalker, COUNT(*) AS ChatCount FROM {alias}.MSG GROUP BY StrTalker" for alias in databases)
sql = f"SELECT StrTalker, SUM(ChatCount) AS TotalChatCount FROM ({union_sql}) GROUP BY StrTalker ORDER BY TotalChatCount DESC"
chat_counts = []
result = execute_sql(db, sql)
for row in result:
username, chat_count = row
row_data = {"username": username, "chat_count": chat_count}
chat_counts.append(row_data)
return chat_counts
def load_base64_audio_data(MsgSvrID, MediaMSG_all_db_path):
wave_data = read_audio(MsgSvrID, is_wave=True, DB_PATH=MediaMSG_all_db_path)
if not wave_data:
return ""
video_base64 = base64.b64encode(wave_data).decode("utf-8")
video_data = f"data:audio/wav;base64,{video_base64}"
return video_data
def load_base64_img_data(start_time, end_time, username_md5, FileStorage_path):
"""
获取图片的base64数据
:param start_time: 开始时间戳
:param end_time: 结束时间戳
:param username_md5: 用户名的md5值
:return:
"""
# 获取CreateTime的最大值日期
min_time = time.strftime("%Y-%m", time.localtime(start_time))
max_time = time.strftime("%Y-%m", time.localtime(end_time))
img_path = os.path.join(FileStorage_path, "MsgAttach", username_md5, "Image")
if not os.path.exists(img_path):
return {}
# print(min_time, max_time, img_path)
paths = []
for root, path, files in os.walk(img_path):
for p in path:
if p >= min_time and p <= max_time:
paths.append(os.path.join(root, p))
# print(paths)
img_md5_data = {}
for path in paths:
for root, path, files in os.walk(path):
for file in files:
if file.endswith(".dat"):
file_path = os.path.join(root, file)
fomt, md5, out_bytes = read_img_dat(file_path)
out_bytes = base64.b64encode(out_bytes).decode("utf-8")
img_md5_data[md5] = f"data:{fomt};base64,{out_bytes}"
return img_md5_data
def load_chat_records(selected_talker, start_index, page_size, user_list, MSG_ALL_db_path, MediaMSG_all_db_path,
FileStorage_path):
username = user_list.get("username", "")
username_md5 = get_md5(username)
type_name_dict = {
1: {0: "文本"},
3: {0: "图片"},
34: {0: "语音"},
43: {0: "视频"},
47: {0: "动画表情"},
49: {0: "文本", 1: "类似文字消息而不一样的消息", 5: "卡片式链接", 6: "文件", 8: "用户上传的 GIF 表情",
19: "合并转发的聊天记录", 33: "分享的小程序", 36: "分享的小程序", 57: "带有引用的文本消息",
63: "视频号直播或直播回放等",
87: "群公告", 88: "视频号直播或直播回放等", 2000: "转账消息", 2003: "赠送红包封面"},
50: {0: "语音通话"},
10000: {0: "系统通知", 4: "拍一拍", 8000: "系统通知"}
}
# 连接 MSG_ALL.db 数据库,并执行查询
db1 = sqlite3.connect(MSG_ALL_db_path)
cursor1 = db1.cursor()
cursor1.execute(
"SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent FROM MSG WHERE StrTalker=? ORDER BY CreateTime ASC LIMIT ?,?",
(selected_talker, start_index, page_size))
result1 = cursor1.fetchall()
cursor1.close()
db1.close()
img_md5_data = load_base64_img_data(result1[0][7], result1[-1][7], username_md5, FileStorage_path) # 获取图片的base64数据
data = []
for row in result1:
localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent = row
CreateTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(CreateTime))
type_name = type_name_dict.get(Type, {}).get(SubType, "未知")
content = {"src": "", "msg": "", "style": ""}
if Type == 47 and SubType == 0: # 动画表情
content_tmp = parse_xml_string(StrContent)
cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "")
# md5 = content_tmp.get("emoji", {}).get("md5", "")
if cdnurl:
content = {"src": cdnurl, "msg": "表情", "style": "width: 100px; height: 100px;"}
elif Type == 49 and SubType == 57: # 带有引用的文本消息
CompressContent = CompressContent.rsplit(b'\x00', 1)[0]
content["msg"] = decompress_CompressContent(CompressContent)
try:
content["msg"] = content["msg"].decode("utf-8")
content["msg"] = parse_xml_string(content["msg"])
content["msg"] = json.dumps(content["msg"], ensure_ascii=False)
except Exception as e:
content["msg"] = "[带有引用的文本消息]解析失败"
elif Type == 34 and SubType == 0: # 语音
tmp_c = parse_xml_string(StrContent)
voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "")
transtext = tmp_c.get("voicetrans", {}).get("transtext", "")
if voicelength.isdigit():
voicelength = int(voicelength) / 1000
voicelength = f"{voicelength:.2f}"
content["msg"] = f"语音时长:{voicelength}\n翻译结果:{transtext}"
src = load_base64_audio_data(MsgSvrID, MediaMSG_all_db_path=MediaMSG_all_db_path)
content["src"] = src
elif Type == 3 and SubType == 0: # 图片
xml_content = parse_xml_string(StrContent)
md5 = xml_content.get("img", {}).get("md5", "")
if md5:
content["src"] = img_md5_data.get(md5, "")
else:
content["src"] = ""
content["msg"] = "图片"
else:
content["msg"] = StrContent
row_data = {"MsgSvrID": MsgSvrID, "type_name": type_name, "is_sender": IsSender,
"content": content, "CreateTime": CreateTime}
data.append(row_data)
return data
def export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path, page_size=500):
name_save = user.get("remark", user.get("nickname", user.get("username", "")))
username = user.get("username", "")
chatCount = user.get("chat_count", 0)
if chatCount == 0:
return False, "没有聊天记录"
for i in range(0, chatCount, page_size):
start_index = i
data = load_chat_records(username, start_index, page_size, user, MSG_ALL_db_path, MediaMSG_all_db_path,
FileStorage_path)
if len(data) == 0:
break
save_path = os.path.join(outpath, f"{name_save}_{int(i / page_size)}.html")
with open(save_path, "w", encoding="utf-8") as f:
f.write(render_template("chat.html", msgs=data))
return True, f"导出成功{outpath}"
def export(username, outpath, MSG_ALL_db_path, MicroMsg_db_path, MediaMSG_all_db_path, FileStorage_path):
if not os.path.exists(outpath):
outpath = os.path.join(os.getcwd(), "export" + os.sep + username)
if not os.path.exists(outpath):
os.makedirs(outpath)
USER_LIST = get_user_list(MSG_ALL_db_path, MicroMsg_db_path)
user = list(filter(lambda x: x["username"] == username, USER_LIST))
if username and len(user) > 0:
user = user[0]
return export_html(user, outpath, MSG_ALL_db_path, MediaMSG_all_db_path, FileStorage_path)
app_show_chat = Blueprint('show_chat_main', __name__, template_folder='templates')
app_show_chat.debug = False
# 主页 - 显示用户列表
@app_show_chat.route('/')
def index():
g.USER_LIST = get_user_list(g.MSG_ALL_db_path, g.MicroMsg_db_path)
return render_template("index.html", users=g.USER_LIST)
# 获取聊天记录
@app_show_chat.route('/get_chat_data', methods=["GET", 'POST'])
def get_chat_data():
username = request.args.get("username", "")
user = list(filter(lambda x: x["username"] == username, g.USER_LIST))
if username and len(user) > 0:
user = user[0]
limit = int(request.args.get("limit", 100)) # 每页显示的条数
page = int(request.args.get("page", user.get("chat_count", limit) / limit)) # 当前页数
start_index = (page - 1) * limit
page_size = limit
data = load_chat_records(username, start_index, page_size, user, g.MSG_ALL_db_path, g.MediaMSG_all_db_path,
g.FileStorage_path)
return render_template("chat.html", msgs=data)
else:
return "error"
# 聊天记录导出为html
@app_show_chat.route('/export_chat_data', methods=["GET", 'POST'])
def get_export():
username = request.args.get("username", "")
user = list(filter(lambda x: x["username"] == username, g.USER_LIST))
if username and len(user) > 0:
user = user[0]
n = f"{user.get('username', '')}_{user.get('nickname', '')}_{user.get('remark', '')}"
outpath = os.path.join(os.getcwd(), "export" + os.sep + n)
if not os.path.exists(outpath):
os.makedirs(outpath)
ret = export_html(user, outpath, g.MSG_ALL_db_path, g.MediaMSG_all_db_path, g.FileStorage_path, page_size=200)
if ret[0]:
return ret[1]
else:
return ret[1]
else:
return "error"
if __name__ == '__main__':
pass

BIN
pywxdump/analyzer/img.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 331 KiB

View File

@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: utils.py
# Description:
# Author: xaoyaoo
# Date: 2023/12/03
# -------------------------------------------------------------------------------
import hashlib
def get_md5(data):
"""
获取数据的 MD5
:param data: 数据bytes
:return:
"""
md5 = hashlib.md5()
md5.update(data)
return md5.hexdigest()
def attach_databases(connection, databases):
"""
将多个数据库附加到给定的SQLite连接
参数
-连接SQLite连接
-数据库包含数据库别名和文件路径的词典
"""
cursor = connection.cursor()
for alias, file_path in databases.items():
attach_command = f"ATTACH DATABASE '{file_path}' AS {alias};"
cursor.execute(attach_command)
connection.commit()
def detach_databases(connection, aliases):
"""
从给定的 SQLite 连接中分离多个数据库
参数
- connection SQLite连接
- aliases要分离的数据库别名列表
"""
cursor = connection.cursor()
for alias in aliases:
detach_command = f"DETACH DATABASE {alias};"
cursor.execute(detach_command)
connection.commit()
def execute_sql(connection, sql, params=None):
"""
执行给定的SQL语句返回结果
参数
- connection SQLite连接
- sql要执行的SQL语句
- paramsSQL语句中的参数
"""
cursor = connection.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
return cursor.fetchall()
if __name__ == '__main__':
pass

View File

@ -1,8 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: __init__.py.py
# Description:
# Author: xaoyaoo
# Date: 2023/10/14
# -------------------------------------------------------------------------------
from .get_bias_addr import BiasAddr

View File

@ -126,6 +126,45 @@ class MainDecrypt():
return result return result
class MainMerge():
def init_parses(self, parser):
self.mode = "merge"
# 添加 'decrypt' 子命令解析器
sb_merge = parser.add_parser(self.mode, help="合并微信数据库(MSG.db or MediaMSG.db)")
sb_merge.add_argument("-t", "--dbtype", type=str, help="数据库类型(可选值)[msg,media]", required=True, metavar="")
sb_merge.add_argument("-i", "--db_path", type=str, help="数据库路径(文件路径,使用英文[,]分割)", required=True, metavar="")
sb_merge.add_argument("-o", "--out_path", type=str, default=os.path.join(os.getcwd(), "decrypted"),
help="输出路径(必须是目录)[默认为当前路径下decrypted文件夹]", required=False,
metavar="")
return sb_merge
def run(self, args):
# 从命令行参数获取值
dbtype = args.dbtype
db_path = args.db_path
out_path = args.out_path
db_path = db_path.split(",")
for i in db_path:
if not os.path.exists(i):
print(f"[-] 数据库路径不存在:{i}")
return
if not os.path.exists(out_path):
os.makedirs(out_path)
print(f"[+] 创建输出文件夹:{out_path}")
if dbtype == "msg":
result = merge_msg_db(db_path, out_path)
elif dbtype == "media":
result = merge_media_msg_db(db_path, out_path)
else:
print(f"[-] 未知数据库类型:{dbtype}")
return
return result
class MainShowChatRecords(): class MainShowChatRecords():
def init_parses(self, parser): def init_parses(self, parser):
self.mode = "dbshow" self.mode = "dbshow"
@ -363,6 +402,11 @@ def console_run():
sb_decrypt = main_decrypt.init_parses(subparsers) sb_decrypt = main_decrypt.init_parses(subparsers)
modes[main_decrypt.mode] = main_decrypt modes[main_decrypt.mode] = main_decrypt
# 添加 'merge' 子命令解析器
main_merge = MainMerge()
sb_merge = main_merge.init_parses(subparsers)
modes[main_merge.mode] = main_merge
# 添加 '' 子命令解析器 # 添加 '' 子命令解析器
main_show_chat_records = MainShowChatRecords() main_show_chat_records = MainShowChatRecords()
sb_dbshow = main_show_chat_records.init_parses(subparsers) sb_dbshow = main_show_chat_records.init_parses(subparsers)

View File

@ -1,9 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: __init__.py.py
# Description:
# Author: xaoyaoo
# Date: 2023/08/21
# -------------------------------------------------------------------------------
from .decrypt import batch_decrypt, encrypt
from .get_wx_decrypted_db import all_decrypt, merge_copy_msg_db, merge_msg_db, merge_media_msg_db

View File

@ -3,6 +3,9 @@
# Name: __init__.py.py # Name: __init__.py.py
# Description: # Description:
# Author: xaoyaoo # Author: xaoyaoo
# Date: 2023/11/10 # Date: 2023/12/03
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------
from .main_window import app_show_chat, get_user_list, export from .view_chat import app_show_chat, get_user_list, export
if __name__ == '__main__':
pass

View File

@ -11,7 +11,7 @@ import os
import json import json
import time import time
import hashlib import hashlib
from pywxdump.analyse import read_img_dat, decompress_CompressContent, read_audio, parse_xml_string from pywxdump.analyzer import read_img_dat, decompress_CompressContent, read_audio, parse_xml_string
from flask import Flask, request, render_template, g, Blueprint from flask import Flask, request, render_template, g, Blueprint
@ -71,7 +71,7 @@ def load_base64_img_data(start_time, end_time, username_md5, FileStorage_path):
# 获取CreateTime的最大值日期 # 获取CreateTime的最大值日期
min_time = time.strftime("%Y-%m", time.localtime(start_time)) min_time = time.strftime("%Y-%m", time.localtime(start_time))
max_time = time.strftime("%Y-%m", time.localtime(end_time)) max_time = time.strftime("%Y-%m", time.localtime(end_time))
img_path = os.path.join(FileStorage_path, "MsgAttach", username_md5, "Image") img_path = os.path.join(FileStorage_path, "MsgAttach", username_md5, "Image") if FileStorage_path else ""
if not os.path.exists(img_path): if not os.path.exists(img_path):
return {} return {}
# print(min_time, max_time, img_path) # print(min_time, max_time, img_path)

View File

@ -5,5 +5,7 @@
# Author: xaoyaoo # Author: xaoyaoo
# Date: 2023/08/21 # Date: 2023/08/21
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------
from .get_wx_info import read_info from .get_wx_info import read_info, get_wechat_db
from .get_wx_db import get_wechat_db from .get_bias_addr import BiasAddr
from .decryption import batch_decrypt, encrypt, decrypt
from .merge_db import merge_msg_db, merge_copy_db, merge_media_msg_db

View File

@ -12,7 +12,6 @@
# 为了保证数据部分长度是16字节即AES块大小的整倍数每一页的末尾将填充一段空字节使得保留字段的长度为48字节。 # 为了保证数据部分长度是16字节即AES块大小的整倍数每一页的末尾将填充一段空字节使得保留字段的长度为48字节。
# 综上加密文件结构为第一页4KB数据前16字节为盐值紧接着4032字节数据再加上16字节IV和20字节HMAC以及12字节空字节而后的页均是4048字节长度的加密数据段和48字节的保留段。 # 综上加密文件结构为第一页4KB数据前16字节为盐值紧接着4032字节数据再加上16字节IV和20字节HMAC以及12字节空字节而后的页均是4048字节长度的加密数据段和48字节的保留段。
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------
import argparse import argparse
import hmac import hmac
import hashlib import hashlib
@ -206,23 +205,3 @@ def encrypt(key: str, db_path, out_path):
enFile.write(encrypted) enFile.write(encrypted)
return True, [db_path, out_path, key] return True, [db_path, out_path, key]
if __name__ == '__main__':
# 创建命令行参数解析器
parser = argparse.ArgumentParser()
parser.add_argument("-k", "--key", type=str, help="密钥", required=True)
parser.add_argument("-i", "--db_path", type=str, help="数据库路径(目录or文件)", required=True)
parser.add_argument("-o", "--out_path", type=str,
help="输出路径(必须是目录),输出文件为 out_path/de_{original_name}", required=True)
# 解析命令行参数
args = parser.parse_args()
# 从命令行参数获取值
key = args.key
db_path = args.db_path
out_path = args.out_path
# 调用 decrypt 函数,并传入参数
result = batch_decrypt(key, db_path, out_path, is_logging=True)

View File

@ -99,7 +99,7 @@ class BiasAddr:
self.mobile = mobile.encode("utf-8") self.mobile = mobile.encode("utf-8")
self.name = name.encode("utf-8") self.name = name.encode("utf-8")
self.key = bytes.fromhex(key) if key else b"" self.key = bytes.fromhex(key) if key else b""
self.db_path = db_path if os.path.exists(db_path) else "" self.db_path = db_path if db_path and os.path.exists(db_path) else ""
self.process_name = "WeChat.exe" self.process_name = "WeChat.exe"
self.module_name = "WeChatWin.dll" self.module_name = "WeChatWin.dll"
@ -251,6 +251,23 @@ class BiasAddr:
key, bais = verify_key(maybe_key, wx_db_path) key, bais = verify_key(maybe_key, wx_db_path)
return bais return bais
def test(self):
phone_type1 = "iphone\x00"
phone_type2 = "android\x00"
Regex = re.compile(r"^[a-zA-Z0-9_]+$")
# 内存搜索
module = pymem.process.module_from_name(self.pm.process_handle, self.module_name)
print(hex(module.lpBaseOfDll))
phone_type1_bias = self.pm.pattern_scan_module(phone_type1.encode(), self.module_name, return_multiple=True)
phone_type2_bias = self.pm.pattern_scan_module(phone_type2.encode(), self.module_name, return_multiple=True)
phone_type_bias = phone_type1_bias + phone_type2_bias
print(len(phone_type1_bias))
for i in phone_type_bias[::-1]:
for j in range(i, i - 1000, -16):
a = get_info_without_key(self.process_handle, j, 32)
if Regex.match(a) and len(a) >= 6:
print(a)
def run(self, logging_path=False, version_list_path=None): def run(self, logging_path=False, version_list_path=None):
if not self.get_process_handle()[0]: if not self.get_process_handle()[0]:
return None return None
@ -263,6 +280,8 @@ class BiasAddr:
key_bias = self.get_key_bias2(self.db_path, account_bias) if key_bias <= 0 and self.db_path else key_bias key_bias = self.get_key_bias2(self.db_path, account_bias) if key_bias <= 0 and self.db_path else key_bias
rdata = {self.version: [name_bias, account_bias, mobile_bias, 0, key_bias]} rdata = {self.version: [name_bias, account_bias, mobile_bias, 0, key_bias]}
# print(rdata)
# self.test()
if version_list_path and os.path.exists(version_list_path): if version_list_path and os.path.exists(version_list_path):
with open(version_list_path, "r", encoding="utf-8") as f: with open(version_list_path, "r", encoding="utf-8") as f:
data = json.load(f) data = json.load(f)
@ -279,235 +298,15 @@ class BiasAddr:
return rdata return rdata
# class BiasAddr: def get_info_without_key(h_process, address, n_size=64):
# def __init__(self, account, mobile, name, key, db_path): array = ctypes.create_string_buffer(n_size)
# self.account = account.encode("utf-8") if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None"
# self.mobile = mobile.encode("utf-8") array = bytes(array).split(b"\x00")[0] if b"\x00" in array else bytes(array)
# self.name = name.encode("utf-8") text = array.decode('utf-8', errors='ignore')
# self.key = bytes.fromhex(key) if key else b"" return text.strip() if text.strip() != "" else "None"
# self.db_path = db_path if db_path else ""
#
# self.process_name = "WeChat.exe"
# self.module_name = "WeChatWin.dll"
#
# self.pm = Pymem("WeChat.exe")
#
# self.bits = self.get_osbits()
# self.version = self.get_file_version(self.process_name)
# self.address_len = self.get_addr_len()
#
# self.islogin = True
#
# def get_addr_len(self):
# version_nums = list(map(int, self.version.split("."))) # 将版本号拆分为数字列表
# if version_nums[0] <= 3 and version_nums[1] <= 9 and version_nums[2] <= 2:
# return 4
# else:
# return 8
#
# def find_all(self, c: bytes, string: bytes, base_addr=0):
# """
# 查找字符串中所有子串的位置
# :param c: 子串 b'123'
# :param string: 字符串 b'123456789123'
# :return:
# """
# return [base_addr + m.start() for m in re.finditer(re.escape(c), string)]
#
# def get_file_version(self, process_name):
# for process in psutil.process_iter(['pid', 'name', 'exe']):
# if process.name() == process_name:
# file_version = Dispatch("Scripting.FileSystemObject").GetFileVersion(process.exe())
# return file_version
# self.islogin = False
#
# def get_osbits(self):
# return int(platform.architecture()[0][:-3])
#
# def search_memory_value(self, value: bytes, module_name="WeChatWin.dll"):
# # 创建 Pymem 对象
# pm = self.pm
# module = pymem.process.module_from_name(pm.process_handle, module_name)
#
# # result = pymem.pattern.pattern_scan_module(pm.process_handle, module, value, return_multiple=True)
# # result = result[-1]-module.lpBaseOfDll if len(result) > 0 else 0
# mem_data = pm.read_bytes(module.lpBaseOfDll, module.SizeOfImage)
# result = self.find_all(value, mem_data)
# result = result[-1] if len(result) > 0 else 0
# return result
#
# def search_key(self, key: bytes):
# byteLen = self.address_len # if self.bits == 32 else 8 # 4字节或8字节
# key = re.escape(key) # 转义特殊字符
# key_addr = self.pm.pattern_scan_all(key, return_multiple=True)[-1] if len(key) > 0 else 0
# key = key_addr.to_bytes(byteLen, byteorder='little', signed=True)
# result = self.search_memory_value(key, self.module_name)
# return result
#
# def get_key_bias_test(self):
# byteLen = self.address_len # 4 if self.bits == 32 else 8 # 4字节或8字节
# keyLenOffset = 0x8c if self.bits == 32 else 0xd0
# keyWindllOffset = 0x90 if self.bits == 32 else 0xd8
#
# pm = self.pm
#
# module = pymem.process.module_from_name(pm.process_handle, "WeChatWin.dll")
# keyBytes = b'-----BEGIN PUBLIC KEY-----\n...'
# publicKeyList = pymem.pattern.pattern_scan_all(self.pm.process_handle, keyBytes, return_multiple=True)
#
# keyaddrs = []
# for addr in publicKeyList:
# keyBytes = addr.to_bytes(byteLen, byteorder="little", signed=True) # 低位在前
# addrs = pymem.pattern.pattern_scan_module(pm.process_handle, module, keyBytes, return_multiple=True)
# if addrs != 0:
# keyaddrs += addrs
#
# keyWinAddr = 0
# for addr in keyaddrs:
# keyLen = pm.read_uchar(addr - keyLenOffset)
# if keyLen != 32:
# continue
# keyWinAddr = addr - keyWindllOffset
# # keyaddr = int.from_bytes(pm.read_bytes(keyWinAddr, byteLen), byteorder='little')
# # key = pm.read_bytes(keyaddr, 32)
# # print("key", key.hex())
#
# return keyWinAddr - module.lpBaseOfDll
#
# def get_key_bias(self, wx_db_path, account_bias=0):
# wx_db_path = os.path.join(wx_db_path, "Msg", "MicroMsg.db")
# if not os.path.exists(wx_db_path):
# return 0
#
# def get_maybe_key(mem_data):
# maybe_key = []
# for i in range(0, len(mem_data), self.address_len):
# addr = mem_data[i:i + self.address_len]
# addr = int.from_bytes(addr, byteorder='little')
# # 去掉不可能的地址
# if min_addr < addr < max_addr:
# key = read_key(addr)
# if key == b"":
# continue
# maybe_key.append([key, i])
# return maybe_key
#
# def read_key(addr):
# key = ctypes.create_string_buffer(35)
# if ReadProcessMemory(pm.process_handle, void_p(addr - 1), key, 35, 0) == 0:
# return b""
#
# if b"\x00\x00" in key.raw[1:33]:
# return b""
#
# if b"\x00\x00" == key.raw[33:35] and b"\x90" == key.raw[0:1]:
# return key.raw[1:33]
# return b""
#
# def verify_key(keys, wx_db_path):
# with open(wx_db_path, "rb") as file:
# blist = file.read(5000)
# salt = blist[:16]
# first = blist[16:DEFAULT_PAGESIZE]
# mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
#
# with multiprocessing.Pool(processes=8) as pool:
# results = [pool.apply_async(validate_key, args=(key, salt, first, mac_salt)) for key, i in keys[-1::-1]]
# results = [p.get() for p in results]
# for i, result in enumerate(results[-1::-1]):
# if result:
# return keys[i]
# return b"", 0
#
# module_name = "WeChatWin.dll"
# pm = self.pm
# module = pymem.process.module_from_name(pm.process_handle, module_name)
# start_addr = module.lpBaseOfDll
# size = module.SizeOfImage
#
# if account_bias > 1:
# maybe_key = []
# for i in [0x24, 0x40]:
# addr = start_addr + account_bias - i
# mem_data = pm.read_bytes(addr, self.address_len)
# key = read_key(int.from_bytes(mem_data, byteorder='little'))
# if key != b"":
# maybe_key.append([key, addr - start_addr])
# key, bais = verify_key(maybe_key, wx_db_path)
# if bais != 0:
# return bais
#
# min_addr = 0xffffffffffffffffffffffff
# max_addr = 0
# for module1 in pm.list_modules():
# if module1.lpBaseOfDll < min_addr:
# min_addr = module1.lpBaseOfDll
# if module1.lpBaseOfDll > max_addr:
# max_addr = module1.lpBaseOfDll + module1.SizeOfImage
#
# mem_data = pm.read_bytes(start_addr, size)
# maybe_key = get_maybe_key(mem_data)
# key, bais = verify_key(maybe_key, wx_db_path)
# return bais
#
# def run(self, is_logging=False, version_list_path=None):
# self.version = self.get_file_version(self.process_name)
# if not self.islogin:
# error = "[-] WeChat No Run"
# if is_logging: print(error)
# return error
# mobile_bias = self.search_memory_value(self.mobile)
# name_bias = self.search_memory_value(self.name)
# account_bias = self.search_memory_value(self.account)
# # version_bias = self.search_memory_value(self.version.encode("utf-8"))
#
# try:
# key_bias = self.get_key_bias_test()
# except:
# key_bias = 0
#
# if key_bias <= 0:
# if self.key:
# key_bias = self.search_key(self.key)
# elif self.db_path:
# key_bias = self.get_key_bias(self.db_path, account_bias)
# else:
# key_bias = 0
# rdata = {self.version: [name_bias, account_bias, mobile_bias, 0, key_bias]}
# if version_list_path and os.path.exists(version_list_path):
# with open(version_list_path, "r", encoding="utf-8") as f:
# data = json.load(f)
# data.update(rdata)
# with open(version_list_path, "w", encoding="utf-8") as f:
# json.dump(data, f, ensure_ascii=False, indent=4)
# if is_logging:
# print("{版本号:昵称,账号,手机号,邮箱,KEY}")
# print(rdata)
# return rdata
if __name__ == '__main__': if __name__ == '__main__':
# 创建命令行参数解析器 account, mobile, name, key, db_path = "test", "test", "test", "0000", "test"
parser = argparse.ArgumentParser() bias_addr = BiasAddr(account, mobile, name, key, db_path)
parser.add_argument("--mobile", type=str, help="手机号", required=True) bias_addr.run()
parser.add_argument("--name", type=str, help="微信昵称", required=True)
parser.add_argument("--account", type=str, help="微信账号", required=True)
parser.add_argument("--key", type=str, help="(可选)密钥")
parser.add_argument("--db_path", type=str, help="(可选)已登录账号的微信文件夹路径")
# 解析命令行参数
args = parser.parse_args()
# 检查是否缺少必要参数,并抛出错误
if not args.mobile or not args.name or not args.account:
raise ValueError("缺少必要的命令行参数!请提供手机号、微信昵称、微信账号。")
# 从命令行参数获取值
mobile = args.mobile
name = args.name
account = args.account
key = args.key
db_path = args.db_path
# 调用 run 函数,并传入参数
rdata = BiasAddr(account, mobile, name, key, db_path).run(True, "../version_list.json")

View File

@ -1,101 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: get_wx_db.py
# Description:
# Author: xaoyaoo
# Date: 2023/10/14
# -------------------------------------------------------------------------------
import os
import re
import winreg
from typing import List, Union
def get_wechat_db(require_list: Union[List[str], str] = "all", msg_dir: str = None, wxid: Union[List[str], str] = None,
is_logging: bool = False):
if not msg_dir:
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Tencent\WeChat", 0, winreg.KEY_READ)
value, _ = winreg.QueryValueEx(key, "FileSavePath")
winreg.CloseKey(key)
w_dir = value
except Exception as e:
# 获取文档实际目录
try:
# 打开注册表路径
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders")
documents_path = winreg.QueryValueEx(key, "Personal")[0] # 读取文档实际目录路径
winreg.CloseKey(key) # 关闭注册表
documents_paths = os.path.split(documents_path)
if "%" in documents_paths[0]:
w_dir = os.environ.get(documents_paths[0].replace("%", ""))
w_dir = os.path.join(w_dir, os.path.join(*documents_paths[1:]))
else:
w_dir = documents_path
except Exception as e:
profile = os.path.expanduser("~")
w_dir = os.path.join(profile, "Documents")
msg_dir = os.path.join(w_dir, "WeChat Files")
if not os.path.exists(msg_dir):
error = f"[-] 目录不存在: {msg_dir}"
if is_logging: print(error)
return error
user_dirs = {} # wx用户目录
files = os.listdir(msg_dir)
if wxid: # 如果指定wxid
if isinstance(wxid, str):
wxid = wxid.split(";")
for file_name in files:
if file_name in wxid:
user_dirs[os.path.join(msg_dir, file_name)] = os.path.join(msg_dir, file_name)
else: # 如果未指定wxid
for file_name in files:
if file_name == "All Users" or file_name == "Applet" or file_name == "WMPF":
continue
user_dirs[os.path.join(msg_dir, file_name)] = os.path.join(msg_dir, file_name)
if isinstance(require_list, str):
require_list = require_list.split(";")
# generate pattern
if "all" in require_list:
pattern = {"all": re.compile(r".*\.db$")}
elif isinstance(require_list, list):
pattern = {}
for require in require_list:
pattern[require] = re.compile(r"%s.*\.db$" % require)
else:
error = f"[-] 参数错误: {require_list}"
if is_logging: print(error)
return error
# 获取数据库路径
for user, user_dir in user_dirs.items(): # 遍历用户目录
user_dirs[user] = {n: [] for n in pattern.keys()}
for root, dirs, files in os.walk(user_dir):
for file_name in files:
for n, p in pattern.items():
if p.match(file_name):
src_path = os.path.join(root, file_name)
user_dirs[user][n].append(src_path)
if is_logging:
for user, user_dir in user_dirs.items():
print(f"[+] user_path: {user}")
for n, paths in user_dir.items():
print(f" {n}:")
for path in paths:
print(f" {path.replace(user, '')}")
print("-" * 32)
print(f"[+] 共 {len(user_dirs)} 个微信账号")
return user_dirs
if __name__ == '__main__':
require_list = ["MediaMSG", "MicroMsg", "FTSMSG", "MSG", "Sns", "Emotion"]
# require_list = "all"
user_dirs = get_wechat_db(require_list, is_logging=True)

View File

@ -8,11 +8,13 @@
import json import json
import ctypes import ctypes
import os import os
import re
import winreg import winreg
import pymem import pymem
from win32com.client import Dispatch from win32com.client import Dispatch
import psutil import psutil
import sys import sys
from typing import List, Union
ReadProcessMemory = ctypes.windll.kernel32.ReadProcessMemory ReadProcessMemory = ctypes.windll.kernel32.ReadProcessMemory
void_p = ctypes.c_void_p void_p = ctypes.c_void_p
@ -89,24 +91,32 @@ def get_info_filePath(wxid="all"):
value, _ = winreg.QueryValueEx(key, "FileSavePath") value, _ = winreg.QueryValueEx(key, "FileSavePath")
winreg.CloseKey(key) winreg.CloseKey(key)
w_dir = value w_dir = value
print(0, w_dir)
except Exception as e: except Exception as e:
# 获取文档实际目录 # 获取文档实际目录
try: try:
# 打开注册表路径 # 打开注册表路径
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,r"Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders") key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders")
documents_path = winreg.QueryValueEx(key, "Personal")[0] # 读取文档实际目录路径 documents_path = winreg.QueryValueEx(key, "Personal")[0] # 读取文档实际目录路径
winreg.CloseKey(key) # 关闭注册表 winreg.CloseKey(key) # 关闭注册表
documents_paths = os.path.split(documents_path) documents_paths = os.path.split(documents_path)
if "%" in documents_paths[0]: if "%" in documents_paths[0]:
w_dir = os.environ.get(documents_paths[0].replace("%", "")) w_dir = os.environ.get(documents_paths[0].replace("%", ""))
w_dir = os.path.join(w_dir, os.path.join(*documents_paths[1:])) w_dir = os.path.join(w_dir, os.path.join(*documents_paths[1:]))
print(1, w_dir)
else: else:
w_dir = documents_path w_dir = documents_path
print(2, w_dir)
except Exception as e: except Exception as e:
profile = os.path.expanduser("~") profile = os.environ.get("USERPROFILE") # 获取用户目录
w_dir = os.path.join(profile, "Documents")
print(3, w_dir)
if w_dir == "MyDocument:":
profile = os.environ.get("USERPROFILE")
w_dir = os.path.join(profile, "Documents") w_dir = os.path.join(profile, "Documents")
msg_dir = os.path.join(w_dir, "WeChat Files") msg_dir = os.path.join(w_dir, "WeChat Files")
print(msg_dir)
if wxid == "all" and os.path.exists(msg_dir): if wxid == "all" and os.path.exists(msg_dir):
return msg_dir return msg_dir
@ -194,27 +204,69 @@ def read_info(version_list, is_logging=False):
return result return result
if __name__ == "__main__": def get_wechat_db(require_list: Union[List[str], str] = "all", msg_dir: str = None, wxid: Union[List[str], str] = None,
import argparse is_logging: bool = False):
if not msg_dir:
msg_dir = get_info_filePath(wxid="all")
parser = argparse.ArgumentParser() if not os.path.exists(msg_dir):
parser.add_argument("--vlfile", type=str, help="手机号", required=False) error = f"[-] 目录不存在: {msg_dir}"
parser.add_argument("--vldict", type=str, help="微信昵称", required=False) if is_logging: print(error)
return error
args = parser.parse_args() user_dirs = {} # wx用户目录
files = os.listdir(msg_dir)
if wxid: # 如果指定wxid
if isinstance(wxid, str):
wxid = wxid.split(";")
for file_name in files:
if file_name in wxid:
user_dirs[os.path.join(msg_dir, file_name)] = os.path.join(msg_dir, file_name)
else: # 如果未指定wxid
for file_name in files:
if file_name == "All Users" or file_name == "Applet" or file_name == "WMPF":
continue
user_dirs[os.path.join(msg_dir, file_name)] = os.path.join(msg_dir, file_name)
# 读取微信各版本偏移 if isinstance(require_list, str):
if args.vlfile: require_list = require_list.split(";")
VERSION_LIST_PATH = args.vlfile
with open(VERSION_LIST_PATH, "r", encoding="utf-8") as f:
VERSION_LIST = json.load(f)
if args.vldict:
VERSION_LIST = json.loads(args.vldict)
if not args.vlfile and not args.vldict: # generate pattern
VERSION_LIST_PATH = "../version_list.json" if "all" in require_list:
pattern = {"all": re.compile(r".*\.db$")}
elif isinstance(require_list, list):
pattern = {}
for require in require_list:
pattern[require] = re.compile(r"%s.*\.db$" % require)
else:
error = f"[-] 参数错误: {require_list}"
if is_logging: print(error)
return error
with open(VERSION_LIST_PATH, "r", encoding="utf-8") as f: # 获取数据库路径
VERSION_LIST = json.load(f) for user, user_dir in user_dirs.items(): # 遍历用户目录
user_dirs[user] = {n: [] for n in pattern.keys()}
for root, dirs, files in os.walk(user_dir):
for file_name in files:
for n, p in pattern.items():
if p.match(file_name):
src_path = os.path.join(root, file_name)
user_dirs[user][n].append(src_path)
result = read_info(VERSION_LIST, True) # 读取微信信息 if is_logging:
for user, user_dir in user_dirs.items():
print(f"[+] user_path: {user}")
for n, paths in user_dir.items():
print(f" {n}:")
for path in paths:
print(f" {path.replace(user, '')}")
print("-" * 32)
print(f"[+] 共 {len(user_dirs)} 个微信账号")
return user_dirs
if __name__ == '__main__':
with open("version_list.json", "r", encoding="utf-8") as f:
version_list = json.load(f)
read_info(version_list, is_logging=True)

View File

@ -1,110 +1,16 @@
# -*- coding: utf-8 -*-# # -*- coding: utf-8 -*-#
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------
# Name: get_wx_decrypted_db.py # Name: merge_db.py
# Description: # Description:
# Author: xaoyaoo # Author: xaoyaoo
# Date: 2023/08/25 # Date: 2023/12/03
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------
import argparse
import os import os
import re
import shutil import shutil
import sqlite3 import sqlite3
# import sys
import winreg
# sys.path.append(os.path.dirname(os.path.abspath(__file__)))
try:
from decrypted.decrypt import decrypt
except ImportError:
from .decrypt import decrypt
def merge_copy_db(db_path, save_path):
# 开始获取微信数据库
def get_wechat_db():
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Tencent\WeChat", 0, winreg.KEY_READ)
value, _ = winreg.QueryValueEx(key, "FileSavePath")
winreg.CloseKey(key)
w_dir = value
except Exception as e:
try:
w_dir = "MyDocument:"
except Exception as e:
print("读取注册表错误:", str(e))
return str(e)
if w_dir == "MyDocument:":
profile = os.path.expanduser("~")
msg_dir = os.path.join(profile, "Documents", "WeChat Files")
else:
msg_dir = os.path.join(w_dir, "WeChat Files")
if not os.path.exists(msg_dir):
return FileNotFoundError("目录不存在")
user_dirs = {} # wx用户目录
files = os.listdir(msg_dir)
for file_name in files:
if file_name == "All Users" or file_name == "Applet" or file_name == "WMPF":
continue
user_dirs[file_name] = os.path.join(msg_dir, file_name)
# 获取数据库路径
for user, user_dir in user_dirs.items():
Media_p = []
Micro_p = []
FTS_p = []
Sns_p = []
Msg = []
Emotion_p = []
for root, dirs, files in os.walk(user_dir):
for file_name in files:
if re.match(r".*MediaMSG.*\.db$", file_name):
src_path = os.path.join(root, file_name)
Media_p.append(src_path)
elif re.match(r".*MicroMsg.*\.db$", file_name):
src_path = os.path.join(root, file_name)
Micro_p.append(src_path)
elif re.match(r".*FTSMSG.*\.db$", file_name):
src_path = os.path.join(root, file_name)
FTS_p.append(src_path)
elif re.match(r".*MSG.*\.db$", file_name):
src_path = os.path.join(root, file_name)
Msg.append(src_path)
elif re.match(r".*Sns.*\.db$", file_name):
src_path = os.path.join(root, file_name)
Sns_p.append(src_path)
elif re.match(r".*Emotion.*\.db$", file_name):
src_path = os.path.join(root, file_name)
Emotion_p.append(src_path)
Media_p.sort()
Msg.sort()
Micro_p.sort()
# FTS_p.sort()
user_dirs[user] = {"MicroMsg": Micro_p, "Msg": Msg, "MediaMSG": Media_p, "Sns": Sns_p, "Emotion": Emotion_p}
return user_dirs
# 解密所有数据库 paths文件 到 decrypted_path目录
def all_decrypt(keys, paths, decrypted_path):
decrypted_paths = []
for key in keys:
for path in paths:
name = os.path.basename(path) # 文件名
dtp = os.path.join(decrypted_path, name) # 解密后的路径
if not decrypt(key, path, dtp):
break
decrypted_paths.append(dtp)
else: # for循环正常结束没有break
break # 跳出while循环
else:
return False # while循环正常结束没有break 解密失败
return decrypted_paths
def merge_copy_msg_db(db_path, save_path):
if isinstance(db_path, list) and len(db_path) == 1: if isinstance(db_path, list) and len(db_path) == 1:
db_path = db_path[0] db_path = db_path[0]
if not os.path.exists(db_path): if not os.path.exists(db_path):
@ -112,7 +18,7 @@ def merge_copy_msg_db(db_path, save_path):
shutil.move(db_path, save_path) shutil.move(db_path, save_path)
# 合并相同名称的数据库 # 合并相同名称的数据库 MSG0-MSG9.db
def merge_msg_db(db_path: list, save_path: str, CreateTime: int = 0): # CreateTime: 从这个时间开始的消息 10位时间戳 def merge_msg_db(db_path: list, save_path: str, CreateTime: int = 0): # CreateTime: 从这个时间开始的消息 10位时间戳
merged_conn = sqlite3.connect(save_path) merged_conn = sqlite3.connect(save_path)
@ -252,64 +158,3 @@ def merge_media_msg_db(db_path: list, save_path: str):
merged_conn.close() merged_conn.close()
return save_path return save_path
if __name__ == '__main__':
# 创建命令行参数解析器
parser = argparse.ArgumentParser()
parser.add_argument("-k", "--key", help="解密密钥", nargs="+", required=True)
# 解析命令行参数
args = parser.parse_args()
# 检查是否缺少必要参数,并抛出错误
if not args.key:
raise ValueError("缺少必要的命令行参数!请提供密钥。")
# 从命令行参数获取值
keys = args.key
decrypted_ROOT = os.path.join(os.getcwd(), "decrypted")
if keys is None:
print("keys is None")
exit(0)
if isinstance(keys, str):
keys = [keys]
user_dirs = get_wechat_db()
for user, db_path in user_dirs.items(): # 遍历用户
MicroMsgPaths = db_path["MicroMsg"]
MsgPaths = db_path["Msg"]
MediaMSGPaths = db_path["MediaMSG"]
# FTSMSGPaths = db_path["FTSMSG"]
SnsPaths = db_path["Sns"]
EmotionPaths = db_path["Emotion"]
decrypted_path_tmp = os.path.join(decrypted_ROOT, user, "tmp") # 解密后的目录
if not os.path.exists(decrypted_path_tmp):
os.makedirs(decrypted_path_tmp)
MicroMsgDecryptPaths = all_decrypt(keys, MicroMsgPaths, decrypted_path_tmp)
MsgDecryptPaths = all_decrypt(keys, MsgPaths, decrypted_path_tmp)
MediaMSGDecryptPaths = all_decrypt(keys, MediaMSGPaths, decrypted_path_tmp)
SnsDecryptPaths = all_decrypt(keys, SnsPaths, decrypted_path_tmp)
EmotionDecryptPaths = all_decrypt(keys, EmotionPaths, decrypted_path_tmp)
# 合并数据库
decrypted_path = os.path.join(decrypted_ROOT, user) # 解密后的目录
MicroMsgDbPath = os.path.join(decrypted_path, "MicroMsg.db")
MsgDbPath = os.path.join(decrypted_path, "MSG_all.db")
MediaMSGDbPath = os.path.join(decrypted_path, "MediaMSG_all.db")
SnsDbPath = os.path.join(decrypted_path, "Sns_all.db")
EmmotionDbPath = os.path.join(decrypted_path, "Emotion_all.db")
merge_copy_msg_db(MicroMsgDecryptPaths, MicroMsgDbPath)
merge_msg_db(MsgDecryptPaths, MsgDbPath, 0)
merge_media_msg_db(MediaMSGDecryptPaths, MediaMSGDbPath)
merge_copy_msg_db(SnsDecryptPaths, SnsDbPath)
merge_copy_msg_db(EmotionDecryptPaths, EmmotionDbPath)
shutil.rmtree(decrypted_path_tmp) # 删除临时文件
print(f"解密完成:{user}, {decrypted_path}")

View File

@ -3,7 +3,7 @@ from setuptools import setup, find_packages
with open("README.md", "r", encoding="utf-8") as fh: with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read() long_description = fh.read()
version = "2.2.18" version = "2.3.0"
install_requires = [ install_requires = [
"psutil", "psutil",
@ -29,18 +29,15 @@ setup(
url="https://github.com/xaoyaoo/PyWxDump", url="https://github.com/xaoyaoo/PyWxDump",
license='MIT', license='MIT',
packages=['pywxdump', 'pywxdump.bias_addr', 'pywxdump.wx_info', 'pywxdump.decrypted', 'pywxdump.analyse', packages=['pywxdump', 'pywxdump.ui', 'pywxdump.wx_info', 'pywxdump.analyzer'],
'pywxdump.show_chat'],
package_dir={'pywxdump': 'pywxdump', package_dir={'pywxdump': 'pywxdump',
'pywxdump.bias_addr': 'pywxdump/bias_addr',
'pywxdump.wx_info': 'pywxdump/wx_info', 'pywxdump.wx_info': 'pywxdump/wx_info',
'pywxdump.decrypted': 'pywxdump/decrypted', 'pywxdump.analyzer': 'pywxdump/analyzer',
'pywxdump.analyse': 'pywxdump/analyse', 'pywxdump.ui': 'pywxdump/ui',
'pywxdump.show_chat': 'pywxdump/show_chat'
}, },
package_data={ package_data={
'pywxdump': ['version_list.json', 'show_chat/templates/*'] 'pywxdump': ['version_list.json', 'ui/templates/*']
}, },
classifiers=[ classifiers=[
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
@ -50,7 +47,7 @@ setup(
install_requires=install_requires, install_requires=install_requires,
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'wxdump = pywxdump.command:console_run', 'wxdump = pywxdump.cli:console_run',
], ],
}, },
setup_requires=['wheel'] setup_requires=['wheel']

File diff suppressed because one or more lines are too long