From dd9a3dbb0d013676fdb77994a6f3ae2fd3b93cc4 Mon Sep 17 00:00:00 2001 From: Changhua Date: Wed, 14 Feb 2024 14:39:46 +0800 Subject: [PATCH] Fix Address in use --- WeChatFerry/spy/rpc_server.cpp | 64 ++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/WeChatFerry/spy/rpc_server.cpp b/WeChatFerry/spy/rpc_server.cpp index de0dc33..4e336dd 100644 --- a/WeChatFerry/spy/rpc_server.cpp +++ b/WeChatFerry/spy/rpc_server.cpp @@ -48,7 +48,7 @@ queue gMsgQueue; static int lport = 0; static DWORD lThreadId = 0; static bool lIsRunning = false; -static nng_socket sock; +static nng_socket cmdSock, msgSock; // TODO: 断开检测 static uint8_t gBuffer[G_BUF_SIZE] = { 0 }; bool func_is_login(uint8_t *out, size_t *len) @@ -422,7 +422,6 @@ bool func_forward_msg(uint64_t id, char *receiver, uint8_t *out, size_t *len) static void PushMessage() { - static nng_socket msg_sock; static uint8_t buffer[G_BUF_SIZE] = { 0 }; int rv; @@ -434,18 +433,18 @@ static void PushMessage() char url[URL_SIZE + 1] = { 0 }; sprintf_s(url, URL_SIZE, "%s:%d", BASE_URL, lport + 1); - if ((rv = nng_pair1_open(&msg_sock)) != 0) { + if ((rv = nng_pair1_open(&msgSock)) != 0) { LOG_ERROR("nng_pair0_open error {}", nng_strerror(rv)); return; } - if ((rv = nng_listen(msg_sock, url, NULL, 0)) != 0) { + if ((rv = nng_listen(msgSock, url, NULL, 0)) != 0) { LOG_ERROR("nng_listen error {}", nng_strerror(rv)); return; } LOG_INFO("MSG Server listening on {}", url); - if ((rv = nng_setopt_ms(msg_sock, NNG_OPT_SENDTIMEO, 2000)) != 0) { + if ((rv = nng_setopt_ms(msgSock, NNG_OPT_SENDTIMEO, 2000)) != 0) { LOG_ERROR("nng_setopt_ms: {}", nng_strerror(rv)); return; } @@ -468,22 +467,22 @@ static void PushMessage() rsp.msg.wxmsg.extra = (char *)wxmsg.extra.c_str(); rsp.msg.wxmsg.xml = (char *)wxmsg.xml.c_str(); gMsgQueue.pop(); - LOG_DEBUG("Recv msg: {}", wxmsg.content); + LOG_DEBUG("Push msg: {}", wxmsg.content); pb_ostream_t stream = pb_ostream_from_buffer(buffer, G_BUF_SIZE); if (!pb_encode(&stream, Response_fields, &rsp)) { LOG_ERROR("Encoding failed: {}", PB_GET_ERROR(&stream)); continue; } - rv = nng_send(msg_sock, buffer, stream.bytes_written, 0); + rv = nng_send(msgSock, buffer, stream.bytes_written, 0); if (rv != 0) { - LOG_ERROR("nng_send: {}", nng_strerror(rv)); + LOG_ERROR("msgSock-nng_send: {}", nng_strerror(rv)); } LOG_DEBUG("Send data length {}", stream.bytes_written); } } } - nng_close(msg_sock); + nng_close(msgSock); } bool func_enable_recv_txt(bool pyq, uint8_t *out, size_t *len) @@ -491,16 +490,20 @@ bool func_enable_recv_txt(bool pyq, uint8_t *out, size_t *len) Response rsp = Response_init_default; rsp.func = Functions_FUNC_ENABLE_RECV_TXT; rsp.which_msg = Response_status_tag; - rsp.msg.status = -1; + rsp.msg.status = 0; - ListenMessage(); - if (pyq) { - ListenPyq(); - } - HANDLE msgThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PushMessage, NULL, NULL, NULL); - if (msgThread != 0) { - CloseHandle(msgThread); - rsp.msg.status = 0; + if (!gIsListening) { + ListenMessage(); + if (pyq) { + ListenPyq(); + } + HANDLE msgThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PushMessage, NULL, NULL, NULL); + if (msgThread == NULL) { + rsp.msg.status = GetLastError(); + LOG_ERROR("func_enable_recv_txt failed: {}", rsp.msg.status); + } else { + CloseHandle(msgThread); + } } pb_ostream_t stream = pb_ostream_from_buffer(out, *len); @@ -957,18 +960,18 @@ static int RunServer() int rv = 0; char url[URL_SIZE + 1] = { 0 }; sprintf_s(url, URL_SIZE, "%s:%d", BASE_URL, lport); - if ((rv = nng_pair1_open(&sock)) != 0) { + if ((rv = nng_pair1_open(&cmdSock)) != 0) { LOG_ERROR("nng_pair0_open error {}", nng_strerror(rv)); return rv; } - if ((rv = nng_listen(sock, (char *)url, NULL, 0)) != 0) { + if ((rv = nng_listen(cmdSock, (char *)url, NULL, 0)) != 0) { LOG_ERROR("nng_listen error {}", nng_strerror(rv)); return rv; } LOG_INFO("CMD Server listening on {}", (char *)url); - if ((rv = nng_setopt_ms(sock, NNG_OPT_SENDTIMEO, 1000)) != 0) { + if ((rv = nng_setopt_ms(cmdSock, NNG_OPT_SENDTIMEO, 1000)) != 0) { LOG_ERROR("nng_setopt_ms error: {}", nng_strerror(rv)); return rv; } @@ -977,27 +980,25 @@ static int RunServer() while (lIsRunning) { uint8_t *in = NULL; size_t in_len, out_len = G_BUF_SIZE; - if ((rv = nng_recv(sock, &in, &in_len, NNG_FLAG_ALLOC)) != 0) { - LOG_ERROR("nng_recv error: {}", nng_strerror(rv)); + if ((rv = nng_recv(cmdSock, &in, &in_len, NNG_FLAG_ALLOC)) != 0) { + LOG_ERROR("cmdSock-nng_recv error: {}", nng_strerror(rv)); break; } try { - // LOG_BUFFER(in, in_len); if (dispatcher(in, in_len, gBuffer, &out_len)) { LOG_DEBUG("Send data length {}", out_len); // LOG_BUFFER(gBuffer, out_len); - rv = nng_send(sock, gBuffer, out_len, 0); + rv = nng_send(cmdSock, gBuffer, out_len, 0); if (rv != 0) { - LOG_ERROR("nng_send: {}", nng_strerror(rv)); + LOG_ERROR("cmdSock-nng_send: {}", nng_strerror(rv)); } - } else { - // Error + } else { // Error LOG_ERROR("Dispatcher failed..."); - rv = nng_send(sock, gBuffer, 0, 0); + rv = nng_send(cmdSock, gBuffer, 0, 0); if (rv != 0) { - LOG_ERROR("nng_send: {}", nng_strerror(rv)); + LOG_ERROR("cmdSock-nng_send: {}", nng_strerror(rv)); } // break; } @@ -1031,7 +1032,8 @@ int RpcStartServer(int port) int RpcStopServer() { if (lIsRunning) { - nng_close(sock); + nng_close(cmdSock); + nng_close(msgSock); UnListenMessage(); lIsRunning = false; Sleep(1000);