优化代码

This commit is contained in:
xaoyo 2023-10-09 01:34:30 +08:00
parent f43be30408
commit 48139c2456
3 changed files with 276 additions and 220 deletions

View File

@ -9,249 +9,299 @@ import argparse
import ctypes import ctypes
import json import json
import re import re
import time
import psutil import psutil
import win32api import win32api
from pymem import Pymem
import pymem
ReadProcessMemory = ctypes.windll.kernel32.ReadProcessMemory
void_p = ctypes.c_void_p
def hex2dec(hex): # def get_pid(keyword):
return int(hex, 16) # """
# 获取进程id
# :param keyword: 关键字
def dec2hex(dec): # :return:
return hex(dec) # """
# pids = {}
# for proc in psutil.process_iter():
def hex_add(hex1, hex2, base1=16, base2=16): # if keyword in proc.name():
""" # pids[proc.pid] = proc
两个任意进制数相加 # return pids
:param hex1: #
:param hex2: #
:return: # class BaseAddr:
""" # def __init__(self, pid, proc_module_name="WeChatWin.dll"):
return hex(int(hex1, base1) + int(hex2, base2)) # self.pid = pid
# self.module_name = proc_module_name
# self.proc = psutil.Process(self.pid)
def hex_sub(hex1, hex2, base1=16, base2=16): # self.version = self.get_app_version(self.proc.exe())
""" # self.base_address = 0
两个任意进制数相减 # self.end_address = 0
:param hex1: # self.batch = 0
:param hex2: #
:return: # self.key_start_addr = 0
""" # self.key_end_addr = 0
return hex(int(hex1, base1) - int(hex2, base2)) #
# self.mobile_addr = []
# self.name_addr = []
def get_pid(keyword): # self.account_addr = []
""" # # self.key_addr = []
获取进程id #
:param keyword: 关键字 # self.get_base_addr()
:return: #
""" # def get_app_version(self, executable_path):
pids = {} # info = win32api.GetFileVersionInfo(executable_path, "\\")
for proc in psutil.process_iter(): # version = info['FileVersionMS'] >> 16, info['FileVersionMS'] & 0xFFFF, \
if keyword in proc.name(): # info['FileVersionLS'] >> 16, info['FileVersionLS'] & 0xFFFF
pids[proc.pid] = proc # version_str = ".".join(map(str, version))
return pids #
# return version_str
#
# def get_base_addr(self):
# """
# 获取模块基址
# :param pid: 进程id
# :param module_name: 模块名
# :return:
# """
# base_address = 0
# end_address = 0
# batch = 0
# n = 0
# for module in self.proc.memory_maps(grouped=False):
# if self.module_name in module.path:
# if n == 0:
# base_address = int(module.addr, 16)
# batch = module.rss
# n += 1
# end_address = int(module.addr, 16) + module.rss
#
# self.base_address = base_address
# self.end_address = end_address
# self.batch = batch
# # self.batch = end_address - base_address
#
# def find_all(self, c, string):
# """
# 查找字符串中所有子串的位置
# :param c: 子串 b'123'
# :param string: 字符串 b'123456789123'
# :return:
# """
# return [m.start() for m in re.finditer(re.escape(c), string)]
#
# # 搜索内存地址范围内的值
# def search_memory_value(self, mobile, name, account):
# mobile = mobile.encode("utf-8")
# name = name.encode("utf-8")
# account = account.encode("utf-8")
#
# Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, self.pid)
#
# mobile_addr = []
# name_addr = []
# account_addr = []
#
# array = ctypes.create_string_buffer(self.batch)
# for i in range(self.base_address, self.end_address, self.batch):
# if ReadProcessMemory(Handle, void_p(i), array, self.batch, None) == 0:
# continue
#
# hex_string = array.raw # 读取到的内存数据
#
# if mobile in hex_string:
# mobile_addr = mobile_addr + [m.start() + i for m in re.finditer(re.escape(mobile), hex_string)]
# if name in hex_string:
# name_addr = name_addr + [m.start() + i for m in re.finditer(re.escape(name), hex_string)]
# if account in hex_string:
# account_addr = account_addr + [m.start() + i for m in re.finditer(re.escape(account), hex_string)]
#
# self.mobile_addr = mobile_addr
# self.name_addr = name_addr
# self.account_addr = account_addr
# return mobile_addr, name_addr, account_addr
#
# def get_key_addr(self, key):
# """
# 获取key的地址
# :param key:
# :return:
# """
# key = bytes.fromhex(key)
#
# module_start_addr = 34199871460642
# module_end_addr = 0
# for module in self.proc.memory_maps(grouped=False):
# if "WeChat" in module.path:
# start_addr = int(module.addr, 16)
# end_addr = start_addr + module.rss
#
# if module_start_addr > start_addr:
# module_start_addr = start_addr
# if module_end_addr < end_addr:
# module_end_addr = end_addr
#
# Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, self.pid)
# array = ctypes.create_string_buffer(self.batch)
# for i in range(module_start_addr, module_end_addr, self.batch):
# if ReadProcessMemory(Handle, void_p(i), array, self.batch, None) == 0:
# continue
#
# hex_string = array.raw # 读取到的内存数据
# if key in hex_string:
# self.key_addr_tmp = i + hex_string.find(key)
# break
# if ((i - module_start_addr) / self.batch) > 300000:
# self.key_addr = 0
# return -1
#
# array_key = []
# for i in range(8):
# byte_value = (self.key_addr_tmp >> (i * 8)) & 0xFF
# hex_string = format(byte_value, '02x')
# byte_obj = bytes.fromhex(hex_string)
# array_key.append(byte_obj)
# # 合并数组
# array_key = b''.join(array_key)
#
# array = ctypes.create_string_buffer(self.batch)
# for i in range(self.base_address, self.end_address, self.batch):
# if ReadProcessMemory(Handle, void_p(i), array, self.batch, None) == 0:
# continue
#
# hex_string = array.raw # 读取到的内存数据
# if array_key in hex_string:
# self.key_addr = i + hex_string.find(array_key)
# return self.key_addr
#
# def calculate_offset(self, addr):
# """
# 计算偏移量
# :param addr:
# :return:
# """
# if addr == 0:
# return 0
# offset = addr - self.base_address
# return offset
#
# def get_offset(self):
# """
# 计算偏移量
# :param addr:
# :return:
# """
# mobile_offset = 0
# name_offset = 0
# account_offset = 0
# key_offset = 0
# if len(self.mobile_addr) >= 1:
# mobile_offset = self.calculate_offset(self.mobile_addr[0])
# if len(self.name_addr) >= 1:
# name_offset = self.calculate_offset(self.name_addr[0])
# if len(self.account_addr) >= 1:
# if len(self.account_addr) >= 2:
# account_offset = self.calculate_offset(self.account_addr[1])
# else:
# account_offset = self.calculate_offset(self.account_addr[0])
#
# key_offset = self.calculate_offset(self.key_addr)
#
# self.key_offset = key_offset
# self.mobile_offset = mobile_offset
# self.name_offset = name_offset
# self.account_offset = account_offset
# return name_offset, account_offset, mobile_offset, 0, key_offset
#
#
# def run(mobile, name, account, key):
# proc_name = "WeChat.exe"
# proc_module_name = "WeChatWin.dll"
#
# pids = get_pid(proc_name)
# for pid, proc in pids.items():
# ba = BaseAddr(pid, proc_module_name)
# ba.search_memory_value(mobile, name, account)
# ba.get_key_addr(key)
# name_offset, account_offset, mobile_offset, _, key_offset = ba.get_offset()
# rdata = {ba.version: [name_offset, account_offset, mobile_offset, 0, key_offset]}
# return rdata
class BaseAddr: class BaseAddr:
def __init__(self, pid, proc_module_name="WeChatWin.dll"): def __init__(self, account, mobile, name, key):
self.pid = pid self.account = account.encode("utf-8")
self.module_name = proc_module_name self.mobile = mobile.encode("utf-8")
self.proc = psutil.Process(self.pid) self.name = name.encode("utf-8")
self.version = self.get_app_version(self.proc.exe()) self.key = bytes.fromhex(key)
self.base_address = 0
self.end_address = 0
self.batch = 0
self.key_start_addr = 0 self.process_name = "WeChat.exe"
self.key_end_addr = 0 self.module_name = "WeChatWin.dll"
self.mobile_addr = [] self.pm = Pymem("WeChat.exe")
self.name_addr = []
self.account_addr = []
# self.key_addr = []
self.get_base_addr() def find_all(self, c: bytes, string: bytes, base_addr=0):
def get_app_version(self, executable_path):
info = win32api.GetFileVersionInfo(executable_path, "\\")
version = info['FileVersionMS'] >> 16, info['FileVersionMS'] & 0xFFFF, \
info['FileVersionLS'] >> 16, info['FileVersionLS'] & 0xFFFF
version_str = ".".join(map(str, version))
return version_str
def get_base_addr(self):
"""
获取模块基址
:param pid: 进程id
:param module_name: 模块名
:return:
"""
base_address = 0
end_address = 0
batch = 0
n = 0
for module in self.proc.memory_maps(grouped=False):
if self.module_name in module.path:
if n == 0:
base_address = int(module.addr, 16)
batch = module.rss
n += 1
end_address = int(module.addr, 16) + module.rss
self.base_address = base_address
self.end_address = end_address
self.batch = batch
# self.batch = end_address - base_address
def find_all(self, c, string):
""" """
查找字符串中所有子串的位置 查找字符串中所有子串的位置
:param c: 子串 b'123' :param c: 子串 b'123'
:param string: 字符串 b'123456789123' :param string: 字符串 b'123456789123'
:return: :return:
""" """
return [m.start() for m in re.finditer(re.escape(c), string)] return [base_addr + m.start() for m in re.finditer(re.escape(c), string)]
# 搜索内存地址范围内的值 def get_file_version(self, process_name):
def search_memory_value(self, mobile, name, account): for process in psutil.process_iter(['name', 'exe', 'pid', 'cmdline']):
mobile = mobile.encode("utf-8") if process.name() == process_name:
name = name.encode("utf-8") file_path = process.exe()
account = account.encode("utf-8") info = win32api.GetFileVersionInfo(file_path, "\\")
ms, ls = info['FileVersionMS'], info['FileVersionLS']
file_version = f"{win32api.HIWORD(ms)}.{win32api.LOWORD(ms)}.{win32api.HIWORD(ls)}.{win32api.LOWORD(ls)}"
return file_version
Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, self.pid) def search_memory_value(self, value: bytes, module_name="WeChatWin.dll"):
# 创建 Pymem 对象
pm = self.pm
module = pymem.process.module_from_name(pm.process_handle, module_name)
mem_data = pm.read_bytes(module.lpBaseOfDll, module.SizeOfImage)
result = self.find_all(value, mem_data)
result = result[-1] if len(result) > 0 else 0
return result
mobile_addr = [] def search_key(self, key: bytes):
name_addr = [] pm = self.pm
account_addr = [] pid = pm.process_id
module = pymem.process.module_from_name(pm.process_handle, "WeChatResource.dll")
start_addr, mem_size = module.lpBaseOfDll, 30918448
array = ctypes.create_string_buffer(self.batch) batch = 4096
for i in range(self.base_address, self.end_address, self.batch):
if ctypes.windll.kernel32.ReadProcessMemory(Handle, ctypes.c_void_p(i), array, self.batch, None) == 0: Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, pid)
array = ctypes.create_string_buffer(batch)
key_addr = 0
for i in range(start_addr, start_addr + mem_size, batch):
if ReadProcessMemory(Handle, void_p(i), array, batch, None) == 0:
continue continue
hex_string = array.raw # 读取到的内存数据 hex_string = array.raw # 读取到的内存数据
key_addr = self.find_all(key, hex_string, i)
if mobile in hex_string: if len(key_addr) > 0:
mobile_addr = mobile_addr + [m.start() + i for m in re.finditer(re.escape(mobile), hex_string)] key_addr = key_addr[0]
if name in hex_string:
name_addr = name_addr + [m.start() + i for m in re.finditer(re.escape(name), hex_string)]
if account in hex_string:
account_addr = account_addr + [m.start() + i for m in re.finditer(re.escape(account), hex_string)]
self.mobile_addr = mobile_addr
self.name_addr = name_addr
self.account_addr = account_addr
return mobile_addr, name_addr, account_addr
def get_key_addr(self, key):
"""
获取key的地址
:param key:
:return:
"""
key = bytes.fromhex(key)
module_start_addr = 34199871460642
module_end_addr = 0
for module in self.proc.memory_maps(grouped=False):
if "WeChat" in module.path:
start_addr = int(module.addr, 16)
end_addr = start_addr + module.rss
if module_start_addr > start_addr:
module_start_addr = start_addr
if module_end_addr < end_addr:
module_end_addr = end_addr
Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, self.pid)
array = ctypes.create_string_buffer(self.batch)
for i in range(module_start_addr, module_end_addr, self.batch):
if ctypes.windll.kernel32.ReadProcessMemory(Handle, ctypes.c_void_p(i), array, self.batch, None) == 0:
continue
hex_string = array.raw # 读取到的内存数据
if key in hex_string:
self.key_addr_tmp = i + hex_string.find(key)
break break
if ((i - module_start_addr) / self.batch) > 300000:
self.key_addr = 0
return -1
array_key = [] key = key_addr.to_bytes(8, byteorder='little')
for i in range(8): result = self.search_memory_value(key, self.module_name)
byte_value = (self.key_addr_tmp >> (i * 8)) & 0xFF return result
hex_string = format(byte_value, '02x')
byte_obj = bytes.fromhex(hex_string)
array_key.append(byte_obj)
# 合并数组
array_key = b''.join(array_key)
array = ctypes.create_string_buffer(self.batch) def run(self):
for i in range(self.base_address, self.end_address, self.batch): self.version = self.get_file_version(self.process_name)
if ctypes.windll.kernel32.ReadProcessMemory(Handle, ctypes.c_void_p(i), array, self.batch, None) == 0: key_bias = self.search_key(self.key)
continue mobile_bias = self.search_memory_value(self.mobile)
name_bias = self.search_memory_value(self.name)
hex_string = array.raw # 读取到的内存数据 account_bias = self.search_memory_value(self.account)
if array_key in hex_string: return {self.version: [name_bias, account_bias, mobile_bias, 0, key_bias]}
self.key_addr = i + hex_string.find(array_key)
return self.key_addr
def calculate_offset(self, addr):
"""
计算偏移量
:param addr:
:return:
"""
if addr == 0:
return 0
offset = addr - self.base_address
return offset
def get_offset(self):
"""
计算偏移量
:param addr:
:return:
"""
mobile_offset = 0
name_offset = 0
account_offset = 0
key_offset = 0
if len(self.mobile_addr) >= 1:
mobile_offset = self.calculate_offset(self.mobile_addr[0])
if len(self.name_addr) >= 1:
name_offset = self.calculate_offset(self.name_addr[0])
if len(self.account_addr) >= 1:
if len(self.account_addr) >= 2:
account_offset = self.calculate_offset(self.account_addr[1])
else:
account_offset = self.calculate_offset(self.account_addr[0])
key_offset = self.calculate_offset(self.key_addr)
self.key_offset = key_offset
self.mobile_offset = mobile_offset
self.name_offset = name_offset
self.account_offset = account_offset
return name_offset, account_offset, mobile_offset, 0, key_offset
def run(mobile, name, account, key):
proc_name = "WeChat.exe"
proc_module_name = "WeChatWin.dll"
pids = get_pid(proc_name)
for pid, proc in pids.items():
ba = BaseAddr(pid, proc_module_name)
ba.search_memory_value(mobile, name, account)
ba.get_key_addr(key)
name_offset, account_offset, mobile_offset, _, key_offset = ba.get_offset()
rdata = {ba.version: [name_offset, account_offset, mobile_offset, 0, key_offset]}
return rdata
if __name__ == '__main__': if __name__ == '__main__':
@ -276,7 +326,10 @@ if __name__ == '__main__':
key = args.key key = args.key
# 调用 run 函数,并传入参数 # 调用 run 函数,并传入参数
rdata = run(mobile, name, account, key) # rdata = run(mobile, name, account, key)
# print(rdata)
rdata = BaseAddr(account, mobile, name, key).run()
print(rdata) print(rdata)
# 添加到version_list.json # 添加到version_list.json

View File

@ -1,8 +1,9 @@
# <center>PyWxDump</center> # <center>PyWxDump</center>
* 更新日志(如果有[version_list.json](./Program/version_list.json)缺少的版本,请帮忙添加。) * 更新日志(如果有[version_list.json](./Program/version_list.json)缺少的版本,请帮忙添加。)
* 2021.10.07 修改获取基址内存搜索方式,防止进入死循环 * 2023.10.09 优化代码,删减没必要代码,重新修改获取基址代码,加快运行速度(需要安装新的库 pymem
* 2021.10.07 增加了3.9.7.29版本的偏移地址 * 2023.10.07 修改获取基址内存搜索方式,防止进入死循环
* 2023.10.07 增加了3.9.7.29版本的偏移地址
* 2023.10.06 增加命令行解密数据库 * 2023.10.06 增加命令行解密数据库
* 2023.09.28 增加了数据库部分解析 * 2023.09.28 增加了数据库部分解析
* 2023.09.15 增加了3.9.7.25版本的偏移地址 * 2023.09.15 增加了3.9.7.25版本的偏移地址
@ -14,6 +15,7 @@
该分支是[SharpWxDump](https://github.com/AdminTest0/SharpWxDump)的经过重构python语言版本同时添加了一些新的功能。 该分支是[SharpWxDump](https://github.com/AdminTest0/SharpWxDump)的经过重构python语言版本同时添加了一些新的功能。
* *
*如果觉得好用的话的话,帮忙点个[![Star](https://img.shields.io/github/stars/xaoyaoo/PyWxDump.svg?style=social&label=Star)](https://github.com/xaoyaoo/PyWxDump/) *如果觉得好用的话的话,帮忙点个[![Star](https://img.shields.io/github/stars/xaoyaoo/PyWxDump.svg?style=social&label=Star)](https://github.com/xaoyaoo/PyWxDump/)
呗** 呗**

View File

@ -1,3 +1,4 @@
psutil psutil
pycryptodomex pycryptodomex
pywin32 pywin32
pymem