diff --git a/pywxdump/version_list.json b/pywxdump/version_list.json index cf32069..4a4dd6f 100644 --- a/pywxdump/version_list.json +++ b/pywxdump/version_list.json @@ -328,6 +328,13 @@ 0, 63488256 ], + "3.9.8.12": [ + 53479320, + 53480288, + 53479176, + 0, + 53480252 + ], "3.9.8.15": [ 64996632, 64997968, diff --git a/pywxdump/wx_info/get_bias_addr.py b/pywxdump/wx_info/get_bias_addr.py index ff71798..df6cbdd 100644 --- a/pywxdump/wx_info/get_bias_addr.py +++ b/pywxdump/wx_info/get_bias_addr.py @@ -175,98 +175,66 @@ class BiasAddr: result = self.search_memory_value(key, self.module_name) return result - def get_key_bias2(self, wx_db_path, account_bias=0): - wx_db_path = os.path.join(wx_db_path, "Msg", "MicroMsg.db") - if not os.path.exists(wx_db_path): - return 0 + def get_key_bias2(self, wx_db_path): - def get_maybe_key(mem_data): - min_addr = 0xffffffffffffffffffffffff - max_addr = 0 - for module1 in pm.list_modules(): - if module1.lpBaseOfDll < min_addr: - min_addr = module1.lpBaseOfDll - if module1.lpBaseOfDll > max_addr: - max_addr = module1.lpBaseOfDll + module1.SizeOfImage + addr_len = get_exe_bit(self.exe_path) // 8 + db_path = wx_db_path - maybe_key = [] - for i in range(0, len(mem_data), self.address_len): - addr = mem_data[i:i + self.address_len] - addr = int.from_bytes(addr, byteorder='little') - # 去掉不可能的地址 - if min_addr < addr < max_addr: - key = read_key(addr) - if key == b"": - continue - maybe_key.append([key, i]) - return maybe_key + def read_key_bytes(h_process, address, address_len=8): + array = ctypes.create_string_buffer(address_len) + if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None" + address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址) + key = ctypes.create_string_buffer(32) + if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None" + key_bytes = bytes(key) + return key_bytes - def read_key(addr): - key = ctypes.create_string_buffer(35) - if ReadProcessMemory(pm.process_handle, void_p(addr - 1), key, 35, 0) == 0: - return b"" - - if b"\x00\x00" in key.raw[1:33]: - return b"" - - if b"\x00\x00" == key.raw[33:35] and b"\x90" == key.raw[0:1]: - return key.raw[1:33] - return b"" - - def verify_key(keys, wx_db_path): + def verify_key(key, wx_db_path): + KEY_SIZE = 32 + DEFAULT_PAGESIZE = 4096 + DEFAULT_ITER = 64000 with open(wx_db_path, "rb") as file: blist = file.read(5000) salt = blist[:16] + byteKey = hashlib.pbkdf2_hmac("sha1", key, salt, DEFAULT_ITER, KEY_SIZE) first = blist[16:DEFAULT_PAGESIZE] + mac_salt = bytes([(salt[i] ^ 58) for i in range(16)]) + mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE) + hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1) + hash_mac.update(b'\x01\x00\x00\x00') - with multiprocessing.Pool(processes=8) as pool: - results = [pool.apply_async(validate_key, args=(key, salt, first, mac_salt)) for key, i in keys[-1::-1]] - results = [p.get() for p in results] - for i, result in enumerate(results[-1::-1]): - if result: - return keys[i] - return b"", 0 + if hash_mac.digest() != first[-32:-12]: + return False + return True - module_name = "WeChatWin.dll" - pm = self.pm - module = pymem.process.module_from_name(pm.process_handle, module_name) - start_addr = module.lpBaseOfDll - size = module.SizeOfImage - - if account_bias > 1: - maybe_key = [] - for i in [0x24, 0x40]: - addr = start_addr + account_bias - i - mem_data = pm.read_bytes(addr, self.address_len) - key = read_key(int.from_bytes(mem_data, byteorder='little')) - if key != b"": - maybe_key.append([key, addr - start_addr]) - key, bais = verify_key(maybe_key, wx_db_path) - if bais != 0: - return bais - - mem_data = pm.read_bytes(start_addr, size) - maybe_key = get_maybe_key(mem_data) - key, bais = verify_key(maybe_key, wx_db_path) - return bais - - def test(self): phone_type1 = "iphone\x00" phone_type2 = "android\x00" - Regex = re.compile(r"^[a-zA-Z0-9_]+$") - # 内存搜索 - module = pymem.process.module_from_name(self.pm.process_handle, self.module_name) - print(hex(module.lpBaseOfDll)) - phone_type1_bias = self.pm.pattern_scan_module(phone_type1.encode(), self.module_name, return_multiple=True) - phone_type2_bias = self.pm.pattern_scan_module(phone_type2.encode(), self.module_name, return_multiple=True) - phone_type_bias = phone_type1_bias + phone_type2_bias - print(len(phone_type1_bias)) - for i in phone_type_bias[::-1]: - for j in range(i, i - 1000, -16): - a = get_info_without_key(self.process_handle, j, 32) - if Regex.match(a) and len(a) >= 6: - print(a) + phone_type3 = "ipad\x00" + + pm = pymem.Pymem("WeChat.exe") + module_name = "WeChatWin.dll" + + MicroMsg_path = os.path.join(db_path, "MSG", "MicroMsg.db") + + module = pymem.process.module_from_name(pm.process_handle, module_name) + + type1_addrs = pm.pattern_scan_module(phone_type1.encode(), module, return_multiple=True) + type2_addrs = pm.pattern_scan_module(phone_type2.encode(), module, return_multiple=True) + type3_addrs = pm.pattern_scan_module(phone_type3.encode(), module, return_multiple=True) + print(len(type1_addrs), len(type2_addrs), len(type3_addrs)) + type_addrs = type1_addrs if len(type1_addrs) >= 2 else type2_addrs if len( + type2_addrs) >= 2 else type3_addrs if len(type3_addrs) >= 2 else "None" + if type_addrs == "None": + return 0 + for i in type_addrs[::-1]: + for j in range(i, i - 2000, -addr_len): + key_bytes = read_key_bytes(pm.process_handle, j, addr_len) + if key_bytes == "None": + continue + if verify_key(key_bytes, MicroMsg_path): + return j - module.lpBaseOfDll + return 0 def run(self, logging_path=False, version_list_path=None): if not self.get_process_handle()[0]: @@ -277,11 +245,10 @@ class BiasAddr: key_bias = 0 key_bias = self.get_key_bias1() key_bias = self.search_key(self.key) if key_bias <= 0 and self.key else key_bias - key_bias = self.get_key_bias2(self.db_path, account_bias) if key_bias <= 0 and self.db_path else key_bias + key_bias = self.get_key_bias2(self.db_path) if key_bias <= 0 and self.db_path else key_bias rdata = {self.version: [name_bias, account_bias, mobile_bias, 0, key_bias]} - # print(rdata) - # self.test() + if version_list_path and os.path.exists(version_list_path): with open(version_list_path, "r", encoding="utf-8") as f: data = json.load(f) @@ -307,6 +274,6 @@ def get_info_without_key(h_process, address, n_size=64): if __name__ == '__main__': - account, mobile, name, key, db_path = "test", "test", "test", "0000", "test" + account, mobile, name, key, db_path = "test", "test", "test",None, r"test" bias_addr = BiasAddr(account, mobile, name, key, db_path) - bias_addr.run() + bias_addr.run(logging_path=True) diff --git a/pywxdump/wx_info/get_wx_info.py b/pywxdump/wx_info/get_wx_info.py index 6ec195e..2d68190 100644 --- a/pywxdump/wx_info/get_wx_info.py +++ b/pywxdump/wx_info/get_wx_info.py @@ -5,6 +5,8 @@ # Author: xaoyaoo # Date: 2023/08/21 # ------------------------------------------------------------------------------- +import hmac +import hashlib import json import ctypes import os @@ -20,6 +22,41 @@ ReadProcessMemory = ctypes.windll.kernel32.ReadProcessMemory void_p = ctypes.c_void_p +# 获取exe文件的位数 +def get_exe_bit(file_path): + """ + 获取 PE 文件的位数: 32 位或 64 位 + :param file_path: PE 文件路径(可执行文件) + :return: 如果遇到错误则返回 64 + """ + try: + with open(file_path, 'rb') as f: + dos_header = f.read(2) + if dos_header != b'MZ': + print('get exe bit error: Invalid PE file') + return 64 + # Seek to the offset of the PE signature + f.seek(60) + pe_offset_bytes = f.read(4) + pe_offset = int.from_bytes(pe_offset_bytes, byteorder='little') + + # Seek to the Machine field in the PE header + f.seek(pe_offset + 4) + machine_bytes = f.read(2) + machine = int.from_bytes(machine_bytes, byteorder='little') + + if machine == 0x14c: + return 32 + elif machine == 0x8664: + return 64 + else: + print('get exe bit error: Unknown architecture: %s' % hex(machine)) + return 64 + except IOError: + print('get exe bit error: File not found or cannot be opened') + return 64 + + # 读取内存中的字符串(非key部分) def get_info_without_key(h_process, address, n_size=64): array = ctypes.create_string_buffer(n_size) @@ -54,21 +91,6 @@ def pattern_scan_all(handle, pattern, *, return_multiple=False, find_num=100): def get_info_wxid(h_process): - # find_num = 1000 - # addrs = pattern_scan_all(h_process, br'\\FileStorage', return_multiple=True, find_num=find_num) - # wxids = [] - # for addr in addrs: - # array = ctypes.create_string_buffer(33) - # if ReadProcessMemory(h_process, void_p(addr - 21), array, 33, 0) == 0: return "None" - # array = bytes(array) # .decode('utf-8', errors='ignore') - # array = array.split(br'\FileStorage')[0] - # for part in [b'}', b'\x7f', b'\\']: - # if part in array: - # array = array.split(part)[1] - # wxids.append(array.decode('utf-8', errors='ignore')) - # break - # wxid = max(wxids, key=wxids.count) if wxids else "None" - find_num = 100 addrs = pattern_scan_all(h_process, br'\\Msg\\FTSContact', return_multiple=True, find_num=find_num) wxids = [] @@ -91,7 +113,6 @@ def get_info_filePath(wxid="all"): value, _ = winreg.QueryValueEx(key, "FileSavePath") winreg.CloseKey(key) w_dir = value - print(0, w_dir) except Exception as e: # 获取文档实际目录 try: @@ -107,16 +128,13 @@ def get_info_filePath(wxid="all"): print(1, w_dir) else: w_dir = documents_path - print(2, w_dir) except Exception as e: profile = os.environ.get("USERPROFILE") # 获取用户目录 w_dir = os.path.join(profile, "Documents") - print(3, w_dir) if w_dir == "MyDocument:": profile = os.environ.get("USERPROFILE") w_dir = os.path.join(profile, "Documents") msg_dir = os.path.join(w_dir, "WeChat Files") - print(msg_dir) if wxid == "all" and os.path.exists(msg_dir): return msg_dir @@ -124,15 +142,60 @@ def get_info_filePath(wxid="all"): return filePath if os.path.exists(filePath) else "None" -# 读取内存中的key -def get_key(h_process, address, address_len=8): - array = ctypes.create_string_buffer(address_len) - if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None" - address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址) - key = ctypes.create_string_buffer(32) - if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None" - key_string = bytes(key).hex() - return key_string +def get_key(db_path, addr_len): + def read_key_bytes(h_process, address, address_len=8): + array = ctypes.create_string_buffer(address_len) + if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None" + address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址) + key = ctypes.create_string_buffer(32) + if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None" + key_bytes = bytes(key) + return key_bytes + + def verify_key(key, wx_db_path): + KEY_SIZE = 32 + DEFAULT_PAGESIZE = 4096 + DEFAULT_ITER = 64000 + with open(wx_db_path, "rb") as file: + blist = file.read(5000) + salt = blist[:16] + byteKey = hashlib.pbkdf2_hmac("sha1", key, salt, DEFAULT_ITER, KEY_SIZE) + first = blist[16:DEFAULT_PAGESIZE] + + mac_salt = bytes([(salt[i] ^ 58) for i in range(16)]) + mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE) + hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1) + hash_mac.update(b'\x01\x00\x00\x00') + + if hash_mac.digest() != first[-32:-12]: + return False + return True + + phone_type1 = "iphone\x00" + phone_type2 = "android\x00" + phone_type3 = "ipad\x00" + + pm = pymem.Pymem("WeChat.exe") + module_name = "WeChatWin.dll" + + MicroMsg_path = os.path.join(db_path, "MSG", "MicroMsg.db") + + type1_addrs = pm.pattern_scan_module(phone_type1.encode(), module_name, return_multiple=True) + type2_addrs = pm.pattern_scan_module(phone_type2.encode(), module_name, return_multiple=True) + type3_addrs = pm.pattern_scan_module(phone_type3.encode(), module_name, return_multiple=True) + type_addrs = type1_addrs if len(type1_addrs) == 2 else type2_addrs if len(type2_addrs) == 2 else type3_addrs if len( + type3_addrs) == 2 else "None" + if type_addrs == "None": + return "None" + for i in type_addrs[::-1]: + for j in range(i, i - 2000, -addr_len): + key_bytes = read_key_bytes(pm.process_handle, j, addr_len) + if key_bytes == "None": + continue + if verify_key(key_bytes, MicroMsg_path): + return key_bytes.hex() + + return "None" # 读取微信信息(account,mobile,name,mail,wxid,key) @@ -177,17 +240,18 @@ def read_info(version_list, is_logging=False): account__baseaddr = wechat_base_address + bias_list[1] mobile_baseaddr = wechat_base_address + bias_list[2] mail_baseaddr = wechat_base_address + bias_list[3] - key_baseaddr = wechat_base_address + bias_list[4] + # key_baseaddr = wechat_base_address + bias_list[4] - addrLen = 4 if tmp_rd['version'] in ["3.9.2.23", "3.9.2.26"] else 8 + addrLen = get_exe_bit(process.exe()) // 8 tmp_rd['account'] = get_info_without_key(Handle, account__baseaddr, 32) if bias_list[1] != 0 else "None" tmp_rd['mobile'] = get_info_without_key(Handle, mobile_baseaddr, 64) if bias_list[2] != 0 else "None" tmp_rd['name'] = get_info_without_key(Handle, name_baseaddr, 64) if bias_list[0] != 0 else "None" tmp_rd['mail'] = get_info_without_key(Handle, mail_baseaddr, 64) if bias_list[3] != 0 else "None" + tmp_rd['wxid'] = get_info_wxid(Handle) - tmp_rd['filePath'] = get_info_filePath(tmp_rd['wxid']) - tmp_rd['key'] = get_key(Handle, key_baseaddr, addrLen) if bias_list[4] != 0 else "None" + tmp_rd['filePath'] = get_info_filePath(tmp_rd['wxid']) if tmp_rd['wxid'] != "None" else "None" + tmp_rd['key'] = get_key(tmp_rd['filePath'], addrLen) if tmp_rd['filePath'] != "None" else "None" result.append(tmp_rd) if is_logging: @@ -267,6 +331,6 @@ def get_wechat_db(require_list: Union[List[str], str] = "all", msg_dir: str = No if __name__ == '__main__': - with open("version_list.json", "r", encoding="utf-8") as f: + with open("../version_list.json", "r", encoding="utf-8") as f: version_list = json.load(f) read_info(version_list, is_logging=True)