Fix Address in use

This commit is contained in:
Changhua 2024-02-14 14:39:46 +08:00
parent 2ef366beb6
commit dd9a3dbb0d

View File

@ -48,7 +48,7 @@ queue<WxMsg_t> gMsgQueue;
static int lport = 0;
static DWORD lThreadId = 0;
static bool lIsRunning = false;
static nng_socket sock;
static nng_socket cmdSock, msgSock; // TODO: 断开检测
static uint8_t gBuffer[G_BUF_SIZE] = { 0 };
bool func_is_login(uint8_t *out, size_t *len)
@ -422,7 +422,6 @@ bool func_forward_msg(uint64_t id, char *receiver, uint8_t *out, size_t *len)
static void PushMessage()
{
static nng_socket msg_sock;
static uint8_t buffer[G_BUF_SIZE] = { 0 };
int rv;
@ -434,18 +433,18 @@ static void PushMessage()
char url[URL_SIZE + 1] = { 0 };
sprintf_s(url, URL_SIZE, "%s:%d", BASE_URL, lport + 1);
if ((rv = nng_pair1_open(&msg_sock)) != 0) {
if ((rv = nng_pair1_open(&msgSock)) != 0) {
LOG_ERROR("nng_pair0_open error {}", nng_strerror(rv));
return;
}
if ((rv = nng_listen(msg_sock, url, NULL, 0)) != 0) {
if ((rv = nng_listen(msgSock, url, NULL, 0)) != 0) {
LOG_ERROR("nng_listen error {}", nng_strerror(rv));
return;
}
LOG_INFO("MSG Server listening on {}", url);
if ((rv = nng_setopt_ms(msg_sock, NNG_OPT_SENDTIMEO, 2000)) != 0) {
if ((rv = nng_setopt_ms(msgSock, NNG_OPT_SENDTIMEO, 2000)) != 0) {
LOG_ERROR("nng_setopt_ms: {}", nng_strerror(rv));
return;
}
@ -468,22 +467,22 @@ static void PushMessage()
rsp.msg.wxmsg.extra = (char *)wxmsg.extra.c_str();
rsp.msg.wxmsg.xml = (char *)wxmsg.xml.c_str();
gMsgQueue.pop();
LOG_DEBUG("Recv msg: {}", wxmsg.content);
LOG_DEBUG("Push msg: {}", wxmsg.content);
pb_ostream_t stream = pb_ostream_from_buffer(buffer, G_BUF_SIZE);
if (!pb_encode(&stream, Response_fields, &rsp)) {
LOG_ERROR("Encoding failed: {}", PB_GET_ERROR(&stream));
continue;
}
rv = nng_send(msg_sock, buffer, stream.bytes_written, 0);
rv = nng_send(msgSock, buffer, stream.bytes_written, 0);
if (rv != 0) {
LOG_ERROR("nng_send: {}", nng_strerror(rv));
LOG_ERROR("msgSock-nng_send: {}", nng_strerror(rv));
}
LOG_DEBUG("Send data length {}", stream.bytes_written);
}
}
}
nng_close(msg_sock);
nng_close(msgSock);
}
bool func_enable_recv_txt(bool pyq, uint8_t *out, size_t *len)
@ -491,16 +490,20 @@ bool func_enable_recv_txt(bool pyq, uint8_t *out, size_t *len)
Response rsp = Response_init_default;
rsp.func = Functions_FUNC_ENABLE_RECV_TXT;
rsp.which_msg = Response_status_tag;
rsp.msg.status = -1;
rsp.msg.status = 0;
ListenMessage();
if (pyq) {
ListenPyq();
}
HANDLE msgThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PushMessage, NULL, NULL, NULL);
if (msgThread != 0) {
CloseHandle(msgThread);
rsp.msg.status = 0;
if (!gIsListening) {
ListenMessage();
if (pyq) {
ListenPyq();
}
HANDLE msgThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PushMessage, NULL, NULL, NULL);
if (msgThread == NULL) {
rsp.msg.status = GetLastError();
LOG_ERROR("func_enable_recv_txt failed: {}", rsp.msg.status);
} else {
CloseHandle(msgThread);
}
}
pb_ostream_t stream = pb_ostream_from_buffer(out, *len);
@ -957,18 +960,18 @@ static int RunServer()
int rv = 0;
char url[URL_SIZE + 1] = { 0 };
sprintf_s(url, URL_SIZE, "%s:%d", BASE_URL, lport);
if ((rv = nng_pair1_open(&sock)) != 0) {
if ((rv = nng_pair1_open(&cmdSock)) != 0) {
LOG_ERROR("nng_pair0_open error {}", nng_strerror(rv));
return rv;
}
if ((rv = nng_listen(sock, (char *)url, NULL, 0)) != 0) {
if ((rv = nng_listen(cmdSock, (char *)url, NULL, 0)) != 0) {
LOG_ERROR("nng_listen error {}", nng_strerror(rv));
return rv;
}
LOG_INFO("CMD Server listening on {}", (char *)url);
if ((rv = nng_setopt_ms(sock, NNG_OPT_SENDTIMEO, 1000)) != 0) {
if ((rv = nng_setopt_ms(cmdSock, NNG_OPT_SENDTIMEO, 1000)) != 0) {
LOG_ERROR("nng_setopt_ms error: {}", nng_strerror(rv));
return rv;
}
@ -977,27 +980,25 @@ static int RunServer()
while (lIsRunning) {
uint8_t *in = NULL;
size_t in_len, out_len = G_BUF_SIZE;
if ((rv = nng_recv(sock, &in, &in_len, NNG_FLAG_ALLOC)) != 0) {
LOG_ERROR("nng_recv error: {}", nng_strerror(rv));
if ((rv = nng_recv(cmdSock, &in, &in_len, NNG_FLAG_ALLOC)) != 0) {
LOG_ERROR("cmdSock-nng_recv error: {}", nng_strerror(rv));
break;
}
try {
// LOG_BUFFER(in, in_len);
if (dispatcher(in, in_len, gBuffer, &out_len)) {
LOG_DEBUG("Send data length {}", out_len);
// LOG_BUFFER(gBuffer, out_len);
rv = nng_send(sock, gBuffer, out_len, 0);
rv = nng_send(cmdSock, gBuffer, out_len, 0);
if (rv != 0) {
LOG_ERROR("nng_send: {}", nng_strerror(rv));
LOG_ERROR("cmdSock-nng_send: {}", nng_strerror(rv));
}
} else {
// Error
} else { // Error
LOG_ERROR("Dispatcher failed...");
rv = nng_send(sock, gBuffer, 0, 0);
rv = nng_send(cmdSock, gBuffer, 0, 0);
if (rv != 0) {
LOG_ERROR("nng_send: {}", nng_strerror(rv));
LOG_ERROR("cmdSock-nng_send: {}", nng_strerror(rv));
}
// break;
}
@ -1031,7 +1032,8 @@ int RpcStartServer(int port)
int RpcStopServer()
{
if (lIsRunning) {
nng_close(sock);
nng_close(cmdSock);
nng_close(msgSock);
UnListenMessage();
lIsRunning = false;
Sleep(1000);