修复32版本无法获取到key的偏移
This commit is contained in:
parent
e48aa5eb91
commit
929ec885b7
@ -328,6 +328,13 @@
|
||||
0,
|
||||
63488256
|
||||
],
|
||||
"3.9.8.12": [
|
||||
53479320,
|
||||
53480288,
|
||||
53479176,
|
||||
0,
|
||||
53480252
|
||||
],
|
||||
"3.9.8.15": [
|
||||
64996632,
|
||||
64997968,
|
||||
|
@ -175,98 +175,66 @@ class BiasAddr:
|
||||
result = self.search_memory_value(key, self.module_name)
|
||||
return result
|
||||
|
||||
def get_key_bias2(self, wx_db_path, account_bias=0):
|
||||
wx_db_path = os.path.join(wx_db_path, "Msg", "MicroMsg.db")
|
||||
if not os.path.exists(wx_db_path):
|
||||
return 0
|
||||
def get_key_bias2(self, wx_db_path):
|
||||
|
||||
def get_maybe_key(mem_data):
|
||||
min_addr = 0xffffffffffffffffffffffff
|
||||
max_addr = 0
|
||||
for module1 in pm.list_modules():
|
||||
if module1.lpBaseOfDll < min_addr:
|
||||
min_addr = module1.lpBaseOfDll
|
||||
if module1.lpBaseOfDll > max_addr:
|
||||
max_addr = module1.lpBaseOfDll + module1.SizeOfImage
|
||||
addr_len = get_exe_bit(self.exe_path) // 8
|
||||
db_path = wx_db_path
|
||||
|
||||
maybe_key = []
|
||||
for i in range(0, len(mem_data), self.address_len):
|
||||
addr = mem_data[i:i + self.address_len]
|
||||
addr = int.from_bytes(addr, byteorder='little')
|
||||
# 去掉不可能的地址
|
||||
if min_addr < addr < max_addr:
|
||||
key = read_key(addr)
|
||||
if key == b"":
|
||||
continue
|
||||
maybe_key.append([key, i])
|
||||
return maybe_key
|
||||
def read_key_bytes(h_process, address, address_len=8):
|
||||
array = ctypes.create_string_buffer(address_len)
|
||||
if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
|
||||
address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址)
|
||||
key = ctypes.create_string_buffer(32)
|
||||
if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None"
|
||||
key_bytes = bytes(key)
|
||||
return key_bytes
|
||||
|
||||
def read_key(addr):
|
||||
key = ctypes.create_string_buffer(35)
|
||||
if ReadProcessMemory(pm.process_handle, void_p(addr - 1), key, 35, 0) == 0:
|
||||
return b""
|
||||
|
||||
if b"\x00\x00" in key.raw[1:33]:
|
||||
return b""
|
||||
|
||||
if b"\x00\x00" == key.raw[33:35] and b"\x90" == key.raw[0:1]:
|
||||
return key.raw[1:33]
|
||||
return b""
|
||||
|
||||
def verify_key(keys, wx_db_path):
|
||||
def verify_key(key, wx_db_path):
|
||||
KEY_SIZE = 32
|
||||
DEFAULT_PAGESIZE = 4096
|
||||
DEFAULT_ITER = 64000
|
||||
with open(wx_db_path, "rb") as file:
|
||||
blist = file.read(5000)
|
||||
salt = blist[:16]
|
||||
byteKey = hashlib.pbkdf2_hmac("sha1", key, salt, DEFAULT_ITER, KEY_SIZE)
|
||||
first = blist[16:DEFAULT_PAGESIZE]
|
||||
|
||||
mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
|
||||
mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
|
||||
hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1)
|
||||
hash_mac.update(b'\x01\x00\x00\x00')
|
||||
|
||||
with multiprocessing.Pool(processes=8) as pool:
|
||||
results = [pool.apply_async(validate_key, args=(key, salt, first, mac_salt)) for key, i in keys[-1::-1]]
|
||||
results = [p.get() for p in results]
|
||||
for i, result in enumerate(results[-1::-1]):
|
||||
if result:
|
||||
return keys[i]
|
||||
return b"", 0
|
||||
if hash_mac.digest() != first[-32:-12]:
|
||||
return False
|
||||
return True
|
||||
|
||||
module_name = "WeChatWin.dll"
|
||||
pm = self.pm
|
||||
module = pymem.process.module_from_name(pm.process_handle, module_name)
|
||||
start_addr = module.lpBaseOfDll
|
||||
size = module.SizeOfImage
|
||||
|
||||
if account_bias > 1:
|
||||
maybe_key = []
|
||||
for i in [0x24, 0x40]:
|
||||
addr = start_addr + account_bias - i
|
||||
mem_data = pm.read_bytes(addr, self.address_len)
|
||||
key = read_key(int.from_bytes(mem_data, byteorder='little'))
|
||||
if key != b"":
|
||||
maybe_key.append([key, addr - start_addr])
|
||||
key, bais = verify_key(maybe_key, wx_db_path)
|
||||
if bais != 0:
|
||||
return bais
|
||||
|
||||
mem_data = pm.read_bytes(start_addr, size)
|
||||
maybe_key = get_maybe_key(mem_data)
|
||||
key, bais = verify_key(maybe_key, wx_db_path)
|
||||
return bais
|
||||
|
||||
def test(self):
|
||||
phone_type1 = "iphone\x00"
|
||||
phone_type2 = "android\x00"
|
||||
Regex = re.compile(r"^[a-zA-Z0-9_]+$")
|
||||
# 内存搜索
|
||||
module = pymem.process.module_from_name(self.pm.process_handle, self.module_name)
|
||||
print(hex(module.lpBaseOfDll))
|
||||
phone_type1_bias = self.pm.pattern_scan_module(phone_type1.encode(), self.module_name, return_multiple=True)
|
||||
phone_type2_bias = self.pm.pattern_scan_module(phone_type2.encode(), self.module_name, return_multiple=True)
|
||||
phone_type_bias = phone_type1_bias + phone_type2_bias
|
||||
print(len(phone_type1_bias))
|
||||
for i in phone_type_bias[::-1]:
|
||||
for j in range(i, i - 1000, -16):
|
||||
a = get_info_without_key(self.process_handle, j, 32)
|
||||
if Regex.match(a) and len(a) >= 6:
|
||||
print(a)
|
||||
phone_type3 = "ipad\x00"
|
||||
|
||||
pm = pymem.Pymem("WeChat.exe")
|
||||
module_name = "WeChatWin.dll"
|
||||
|
||||
MicroMsg_path = os.path.join(db_path, "MSG", "MicroMsg.db")
|
||||
|
||||
module = pymem.process.module_from_name(pm.process_handle, module_name)
|
||||
|
||||
type1_addrs = pm.pattern_scan_module(phone_type1.encode(), module, return_multiple=True)
|
||||
type2_addrs = pm.pattern_scan_module(phone_type2.encode(), module, return_multiple=True)
|
||||
type3_addrs = pm.pattern_scan_module(phone_type3.encode(), module, return_multiple=True)
|
||||
print(len(type1_addrs), len(type2_addrs), len(type3_addrs))
|
||||
type_addrs = type1_addrs if len(type1_addrs) >= 2 else type2_addrs if len(
|
||||
type2_addrs) >= 2 else type3_addrs if len(type3_addrs) >= 2 else "None"
|
||||
if type_addrs == "None":
|
||||
return 0
|
||||
for i in type_addrs[::-1]:
|
||||
for j in range(i, i - 2000, -addr_len):
|
||||
key_bytes = read_key_bytes(pm.process_handle, j, addr_len)
|
||||
if key_bytes == "None":
|
||||
continue
|
||||
if verify_key(key_bytes, MicroMsg_path):
|
||||
return j - module.lpBaseOfDll
|
||||
return 0
|
||||
|
||||
def run(self, logging_path=False, version_list_path=None):
|
||||
if not self.get_process_handle()[0]:
|
||||
@ -277,11 +245,10 @@ class BiasAddr:
|
||||
key_bias = 0
|
||||
key_bias = self.get_key_bias1()
|
||||
key_bias = self.search_key(self.key) if key_bias <= 0 and self.key else key_bias
|
||||
key_bias = self.get_key_bias2(self.db_path, account_bias) if key_bias <= 0 and self.db_path else key_bias
|
||||
key_bias = self.get_key_bias2(self.db_path) if key_bias <= 0 and self.db_path else key_bias
|
||||
|
||||
rdata = {self.version: [name_bias, account_bias, mobile_bias, 0, key_bias]}
|
||||
# print(rdata)
|
||||
# self.test()
|
||||
|
||||
if version_list_path and os.path.exists(version_list_path):
|
||||
with open(version_list_path, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
@ -307,6 +274,6 @@ def get_info_without_key(h_process, address, n_size=64):
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
account, mobile, name, key, db_path = "test", "test", "test", "0000", "test"
|
||||
account, mobile, name, key, db_path = "test", "test", "test",None, r"test"
|
||||
bias_addr = BiasAddr(account, mobile, name, key, db_path)
|
||||
bias_addr.run()
|
||||
bias_addr.run(logging_path=True)
|
||||
|
@ -5,6 +5,8 @@
|
||||
# Author: xaoyaoo
|
||||
# Date: 2023/08/21
|
||||
# -------------------------------------------------------------------------------
|
||||
import hmac
|
||||
import hashlib
|
||||
import json
|
||||
import ctypes
|
||||
import os
|
||||
@ -20,6 +22,41 @@ ReadProcessMemory = ctypes.windll.kernel32.ReadProcessMemory
|
||||
void_p = ctypes.c_void_p
|
||||
|
||||
|
||||
# 获取exe文件的位数
|
||||
def get_exe_bit(file_path):
|
||||
"""
|
||||
获取 PE 文件的位数: 32 位或 64 位
|
||||
:param file_path: PE 文件路径(可执行文件)
|
||||
:return: 如果遇到错误则返回 64
|
||||
"""
|
||||
try:
|
||||
with open(file_path, 'rb') as f:
|
||||
dos_header = f.read(2)
|
||||
if dos_header != b'MZ':
|
||||
print('get exe bit error: Invalid PE file')
|
||||
return 64
|
||||
# Seek to the offset of the PE signature
|
||||
f.seek(60)
|
||||
pe_offset_bytes = f.read(4)
|
||||
pe_offset = int.from_bytes(pe_offset_bytes, byteorder='little')
|
||||
|
||||
# Seek to the Machine field in the PE header
|
||||
f.seek(pe_offset + 4)
|
||||
machine_bytes = f.read(2)
|
||||
machine = int.from_bytes(machine_bytes, byteorder='little')
|
||||
|
||||
if machine == 0x14c:
|
||||
return 32
|
||||
elif machine == 0x8664:
|
||||
return 64
|
||||
else:
|
||||
print('get exe bit error: Unknown architecture: %s' % hex(machine))
|
||||
return 64
|
||||
except IOError:
|
||||
print('get exe bit error: File not found or cannot be opened')
|
||||
return 64
|
||||
|
||||
|
||||
# 读取内存中的字符串(非key部分)
|
||||
def get_info_without_key(h_process, address, n_size=64):
|
||||
array = ctypes.create_string_buffer(n_size)
|
||||
@ -54,21 +91,6 @@ def pattern_scan_all(handle, pattern, *, return_multiple=False, find_num=100):
|
||||
|
||||
|
||||
def get_info_wxid(h_process):
|
||||
# find_num = 1000
|
||||
# addrs = pattern_scan_all(h_process, br'\\FileStorage', return_multiple=True, find_num=find_num)
|
||||
# wxids = []
|
||||
# for addr in addrs:
|
||||
# array = ctypes.create_string_buffer(33)
|
||||
# if ReadProcessMemory(h_process, void_p(addr - 21), array, 33, 0) == 0: return "None"
|
||||
# array = bytes(array) # .decode('utf-8', errors='ignore')
|
||||
# array = array.split(br'\FileStorage')[0]
|
||||
# for part in [b'}', b'\x7f', b'\\']:
|
||||
# if part in array:
|
||||
# array = array.split(part)[1]
|
||||
# wxids.append(array.decode('utf-8', errors='ignore'))
|
||||
# break
|
||||
# wxid = max(wxids, key=wxids.count) if wxids else "None"
|
||||
|
||||
find_num = 100
|
||||
addrs = pattern_scan_all(h_process, br'\\Msg\\FTSContact', return_multiple=True, find_num=find_num)
|
||||
wxids = []
|
||||
@ -91,7 +113,6 @@ def get_info_filePath(wxid="all"):
|
||||
value, _ = winreg.QueryValueEx(key, "FileSavePath")
|
||||
winreg.CloseKey(key)
|
||||
w_dir = value
|
||||
print(0, w_dir)
|
||||
except Exception as e:
|
||||
# 获取文档实际目录
|
||||
try:
|
||||
@ -107,16 +128,13 @@ def get_info_filePath(wxid="all"):
|
||||
print(1, w_dir)
|
||||
else:
|
||||
w_dir = documents_path
|
||||
print(2, w_dir)
|
||||
except Exception as e:
|
||||
profile = os.environ.get("USERPROFILE") # 获取用户目录
|
||||
w_dir = os.path.join(profile, "Documents")
|
||||
print(3, w_dir)
|
||||
if w_dir == "MyDocument:":
|
||||
profile = os.environ.get("USERPROFILE")
|
||||
w_dir = os.path.join(profile, "Documents")
|
||||
msg_dir = os.path.join(w_dir, "WeChat Files")
|
||||
print(msg_dir)
|
||||
if wxid == "all" and os.path.exists(msg_dir):
|
||||
return msg_dir
|
||||
|
||||
@ -124,15 +142,60 @@ def get_info_filePath(wxid="all"):
|
||||
return filePath if os.path.exists(filePath) else "None"
|
||||
|
||||
|
||||
# 读取内存中的key
|
||||
def get_key(h_process, address, address_len=8):
|
||||
array = ctypes.create_string_buffer(address_len)
|
||||
if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
|
||||
address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址)
|
||||
key = ctypes.create_string_buffer(32)
|
||||
if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None"
|
||||
key_string = bytes(key).hex()
|
||||
return key_string
|
||||
def get_key(db_path, addr_len):
|
||||
def read_key_bytes(h_process, address, address_len=8):
|
||||
array = ctypes.create_string_buffer(address_len)
|
||||
if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
|
||||
address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址)
|
||||
key = ctypes.create_string_buffer(32)
|
||||
if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None"
|
||||
key_bytes = bytes(key)
|
||||
return key_bytes
|
||||
|
||||
def verify_key(key, wx_db_path):
|
||||
KEY_SIZE = 32
|
||||
DEFAULT_PAGESIZE = 4096
|
||||
DEFAULT_ITER = 64000
|
||||
with open(wx_db_path, "rb") as file:
|
||||
blist = file.read(5000)
|
||||
salt = blist[:16]
|
||||
byteKey = hashlib.pbkdf2_hmac("sha1", key, salt, DEFAULT_ITER, KEY_SIZE)
|
||||
first = blist[16:DEFAULT_PAGESIZE]
|
||||
|
||||
mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
|
||||
mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
|
||||
hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1)
|
||||
hash_mac.update(b'\x01\x00\x00\x00')
|
||||
|
||||
if hash_mac.digest() != first[-32:-12]:
|
||||
return False
|
||||
return True
|
||||
|
||||
phone_type1 = "iphone\x00"
|
||||
phone_type2 = "android\x00"
|
||||
phone_type3 = "ipad\x00"
|
||||
|
||||
pm = pymem.Pymem("WeChat.exe")
|
||||
module_name = "WeChatWin.dll"
|
||||
|
||||
MicroMsg_path = os.path.join(db_path, "MSG", "MicroMsg.db")
|
||||
|
||||
type1_addrs = pm.pattern_scan_module(phone_type1.encode(), module_name, return_multiple=True)
|
||||
type2_addrs = pm.pattern_scan_module(phone_type2.encode(), module_name, return_multiple=True)
|
||||
type3_addrs = pm.pattern_scan_module(phone_type3.encode(), module_name, return_multiple=True)
|
||||
type_addrs = type1_addrs if len(type1_addrs) == 2 else type2_addrs if len(type2_addrs) == 2 else type3_addrs if len(
|
||||
type3_addrs) == 2 else "None"
|
||||
if type_addrs == "None":
|
||||
return "None"
|
||||
for i in type_addrs[::-1]:
|
||||
for j in range(i, i - 2000, -addr_len):
|
||||
key_bytes = read_key_bytes(pm.process_handle, j, addr_len)
|
||||
if key_bytes == "None":
|
||||
continue
|
||||
if verify_key(key_bytes, MicroMsg_path):
|
||||
return key_bytes.hex()
|
||||
|
||||
return "None"
|
||||
|
||||
|
||||
# 读取微信信息(account,mobile,name,mail,wxid,key)
|
||||
@ -177,17 +240,18 @@ def read_info(version_list, is_logging=False):
|
||||
account__baseaddr = wechat_base_address + bias_list[1]
|
||||
mobile_baseaddr = wechat_base_address + bias_list[2]
|
||||
mail_baseaddr = wechat_base_address + bias_list[3]
|
||||
key_baseaddr = wechat_base_address + bias_list[4]
|
||||
# key_baseaddr = wechat_base_address + bias_list[4]
|
||||
|
||||
addrLen = 4 if tmp_rd['version'] in ["3.9.2.23", "3.9.2.26"] else 8
|
||||
addrLen = get_exe_bit(process.exe()) // 8
|
||||
|
||||
tmp_rd['account'] = get_info_without_key(Handle, account__baseaddr, 32) if bias_list[1] != 0 else "None"
|
||||
tmp_rd['mobile'] = get_info_without_key(Handle, mobile_baseaddr, 64) if bias_list[2] != 0 else "None"
|
||||
tmp_rd['name'] = get_info_without_key(Handle, name_baseaddr, 64) if bias_list[0] != 0 else "None"
|
||||
tmp_rd['mail'] = get_info_without_key(Handle, mail_baseaddr, 64) if bias_list[3] != 0 else "None"
|
||||
|
||||
tmp_rd['wxid'] = get_info_wxid(Handle)
|
||||
tmp_rd['filePath'] = get_info_filePath(tmp_rd['wxid'])
|
||||
tmp_rd['key'] = get_key(Handle, key_baseaddr, addrLen) if bias_list[4] != 0 else "None"
|
||||
tmp_rd['filePath'] = get_info_filePath(tmp_rd['wxid']) if tmp_rd['wxid'] != "None" else "None"
|
||||
tmp_rd['key'] = get_key(tmp_rd['filePath'], addrLen) if tmp_rd['filePath'] != "None" else "None"
|
||||
result.append(tmp_rd)
|
||||
|
||||
if is_logging:
|
||||
@ -267,6 +331,6 @@ def get_wechat_db(require_list: Union[List[str], str] = "all", msg_dir: str = No
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
with open("version_list.json", "r", encoding="utf-8") as f:
|
||||
with open("../version_list.json", "r", encoding="utf-8") as f:
|
||||
version_list = json.load(f)
|
||||
read_info(version_list, is_logging=True)
|
||||
|
Loading…
Reference in New Issue
Block a user