使用protobuf序列化消息与zeromq进行通信

标签 c sockets protocol-buffers zeromq

我无法使用 C 通过 ZeroMQ 套接字接收 protobuf 中序列化的消息。 我已序列化客户端输入的消息,并使用 zhelpers.h 中定义的 s_send() 函数将此缓冲区发送到服务器。服务器代码与作为示例的 Zeromq 包捆绑的测试代码相同。

这是我的客户端:

#include "amessage.pb-c.h"
#include "zhelpers.h"

int main (void)
{

    AMessage msg = AMESSAGE__INIT; // AMessage
    void *buf;                     // Buffer to store serialized data
    unsigned len;

    printf ("Connecting to server...\n");
    void *context = zmq_ctx_new ();
    void *requester = zmq_socket (context, ZMQ_REQ);

    char buffer[256] = "";

    printf("[client] :");
    scanf("%s", buffer );

    msg.csmsg = buffer;
    len = amessage__get_packed_size(&msg);        
    buf = malloc(len);

    printf("[client]: pack msg len : %d\n ", len);
    printf("Sent msg : %d\n", buf);

    amessage__pack(&msg,buf);

    s_send(requester, buf);

    zmq_close (requester);
    zmq_ctx_destroy (context);
    return 0;
}

服务器端:

#include "zhelpers.h"
#include <pthread.h>
#include <stdlib.h>
#include "amessage.pb-c.h"

#define MAX_MSG_SIZE 256

static size_t read_buffer (unsigned max_length, unsigned char *out)
{
   size_t cur_len = 0, nread;
   uint8_t c;
   while ((nread=fread(out + cur_len, 1, max_length - cur_len, stdin)) != 0)
   {
       cur_len += nread;
       if (cur_len == max_length)
       {
           fprintf(stderr, "[server]: max message length exceeded\n");
           exit(1);
       }
   }
   return cur_len;
 }


 static void * worker_routine (void *context) 
 {

    AMessage *msg;

    uint8_t buf[MAX_MSG_SIZE];
   char buffer[256];

   //  Socket to talk to dispatcher
   void *receiver = zmq_socket (context, ZMQ_REP);
   zmq_connect (receiver, "inproc://workers");

   while (1) {

          uint8_t *string = s_recv (receiver);

       if(string == 0)
            printf("[server]: Error: In receiving msg.\n");
       else
       {

        size_t msg_len = read_buffer (MAX_MSG_SIZE, string);
        printf("[server]: client msg len is: %d.\n", msg_len);
        msg = amessage__unpack(NULL, msg_len, string);   
        if (msg == NULL)
        {
            fprintf(stderr, "[server]: error unpacking incoming message\n");
            exit(1);
        }
        printf ("[client]: %s \n", msg->csmsg);

    }
    amessage__free_unpacked(msg, NULL);
    free (string);
    //  Do some 'work'
    sleep (1);

   }
   zmq_close (receiver);
   return NULL;
}

  int main (void)
  {
   void *context = zmq_ctx_new ();
   void *clients = zmq_socket (context, ZMQ_ROUTER);
   zmq_bind (clients, "tcp://*:5555");

   void *workers = zmq_socket (context, ZMQ_DEALER);
   zmq_bind (workers, "inproc://workers");

   //  Launch pool of worker threads
   int thread_nbr;
   for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, context);
   }
    //  Connect work threads to client threads via a queue proxy
    zmq_proxy (clients, workers, NULL);

    zmq_close (clients);
    zmq_close (workers);

    zmq_ctx_destroy (context);
    return 0;
  }

知道我做错了什么吗?

最佳答案

您正在使用 s_send(),它需要 C 字符串作为参数,并调用 strlen() 来确定其大小。但是, Protocol Buffer 数据是二进制数据,并且可能在消息中的任何位置包含空字节。

而是使用 zmq_send() 并将消息的长度提供给 zmq_msg_init_size() 函数。

关于使用protobuf序列化消息与zeromq进行通信,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15336726/

相关文章:

Java - DocumentBuilder 关闭 Socket 连接

c++ - C/C++ Linux下的Packet Sniffer

google-cloud-platform - 如何导入 gRPC empty 和 Google api annotations proto

c++ - 带有 C++ 插件的 Protobuf

无法处理连续的 SIGSEGV 信号

c - 我如何使用 Win32 API 启动带参数的进程?

c - 使文件在 finder 中只读,但在 c 程序中可写

c - 程序中出现段错误(核心转储)错误

c - Unix 域套接字在 Solaris 10 上比在 Linux 上慢 100 倍?

go - 什么时候使用嵌入?