c++ - Protobuf导致ParseFromIstream上的段错误

标签 c++ serialization protocol-buffers

我正在尝试扩展编程知识,并且尝试进行一些多进程编程。

我想执行以下操作:在同一主机上,运行多个可执行文件。可执行文件之一负责扫描文件系统,可执行文件之一负责处理数据等。

但是,某些数据必须脱离主机传输。为了限制诸如网络防火墙设置之类的事情,我希望有一个守护程序(多线程)通过IPC接收数据,然后再使用尚未确定的套接字实现将其发送到外部主机。

经过大量搜索和研究之后,最明显的使用模式是消费者/生产者模式,其中包括多进程生产者(守护程序生成消息)和多线程消费者(接收数据,最好通过共享内存接收数据并发送它到一个外部主机)。

我希望我的应用程序能够尽可能地跨平台运行。为此,我正在使用boost::interprocess:message_queue。因为此Boost库仅接受二进制序列化的对象,所以我使用Google Protobuf来处理序列化和反序列化。

我创建了2个可执行文件,当前称为“消费者”和“生产者”。生产者通过消息队列将消息发送给消费者,消费者反序列化它。下面的代码在传递简单的“int”对象时起作用(在我看来,这意味着消息队列通信正在工作),但是在使用来自SerializeToOstream()的数据时则不起作用。

您可能已经注意到,我是IPC和多进程编程的新手,但我相信自己已经完成了功课。

这是我的producer.cpp:

#include <iostream>
#include <chrono>
#include <thread>
#include <fstream>

#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>

#include <boost/thread.hpp>

#include <internal/messages/testmessage.pb.h>

int main(int argc, char** argv) {
    // Construct the object to be passed
    GOOGLE_PROTOBUF_VERIFY_VERSION;

    struct protoremove {
        ~protoremove(){ google::protobuf::ShutdownProtobufLibrary(); }
    } remover;

    ib::protobuf::testMessage myMessage;
    myMessage.set_id(10);
    myMessage.set_version(1);
    std::cout << myMessage.DebugString() << std::endl;

    // Initialize the Boost message queue
    try{
        //Open a message queue.
        boost::interprocess::message_queue mq
                (boost::interprocess::open_or_create
                        ,"message_queue"           //name
                        ,100                       //max message number
                        ,1000               //max message size
                );

        // Send our message
        std::ofstream buftosend;
        myMessage.SerializeToOstream(&buftosend);
        mq.send(&buftosend, sizeof(buftosend), 1);

    }
    catch(boost::interprocess::interprocess_exception &ex){
        std::cout << ex.what() << std::endl;
        return 1;
    }

    return 0;
}

Consumer.cpp:
#include <iostream>
#include <fstream>

#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>

#include <boost/thread.hpp>

#include <internal/messages/testmessage.pb.h>

int main(int argc, char** argv) {
    // Open the message queue
    try {
        //Erase previous message queue
        boost::interprocess::message_queue::remove("message_queue");
        ib::protobuf::testMessage recvdMessage;

        //Create a message_queue.
        boost::interprocess::message_queue mq
                (boost::interprocess::open_or_create
                        ,"message_queue"           //name
                        ,100                       //max message number
                        ,1000               //max message size
                );

        unsigned int priority;
        boost::interprocess::message_queue::size_type recvd_size;

        std::ifstream incomingbuf;
        mq.receive(&incomingbuf, 1000, recvd_size, priority);

        recvdMessage.ParseFromIstream(&incomingbuf);

        recvdMessage.id();
        recvdMessage.DebugString();
    }
    catch(boost::interprocess::interprocess_exception &ex){
        boost::interprocess::message_queue::remove("message_queue");
        std::cout << "IP error " << ex.what() << std::endl;
        return 1;
    }
    boost::interprocess::message_queue::remove("message_queue");
    return 0;

}

和消息定义(.proto):
package ib.protobuf;

message testMessage {
    required int32 version = 1;
    optional int64 id = 2;
    optional string data = 3;
    optional int64 sequencenumber = 4;
}

运行使用者时,它等待数据(mq.receive()调用正在阻塞)。
当生产者开始时,消费者得到一个SIGSEGV。 gdb在其回溯中指示这是在第44行上发生的,这是ParseFromIstream()方法。
生产者在DebugString()中输出正确的值。
(gdb) r
Starting program: /home/roel/bin/consumer 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib/libthread_db.so.1".

Program received signal SIGSEGV, Segmentation fault.
std::istream::sentry::sentry (this=0x7fffffffe117, __in=..., __noskip=true)
    at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:50
50  /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc: No such file or directory.
(gdb) bt
#0  std::istream::sentry::sentry (this=0x7fffffffe117, __in=..., __noskip=true)
    at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:50
#1  0x00007ffff679f7ab in std::istream::read (this=0x7fffffffe380, 
    __s=0x637d20 "", __n=8192)
    at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:653
#2  0x00007ffff6b10030 in google::protobuf::io::IstreamInputStream::CopyingIstreamInputStream::Read(void*, int) () from /usr/lib/libprotobuf.so.9
#3  0x00007ffff6a99fe1 in google::protobuf::io::CopyingInputStreamAdaptor::Next(void const**, int*) () from /usr/lib/libprotobuf.so.9
#4  0x00007ffff6a97950 in google::protobuf::io::CodedInputStream::Refresh() ()
   from /usr/lib/libprotobuf.so.9
#5  0x00007ffff6a94da3 in google::protobuf::MessageLite::ParseFromZeroCopyStream(google::protobuf::io::ZeroCopyInputStream*) () from /usr/lib/libprotobuf.so.9
#6  0x00007ffff6af5ad9 in google::protobuf::Message::ParseFromIstream(std::istream*) () from /usr/lib/libprotobuf.so.9
#7  0x0000000000407e35 in main (argc=1, argv=0x7fffffffe6d8)
    at /home/roel/source/consumer.cpp:44
(gdb) 

这是使用CMake和GCC 6.0.1在Linux上编译的。
我对自己的程序有很多疑问:

Q1. 首先,也是最重要的; 是什么原因导致细分错误?
我究竟做错了什么?我已经看了很长时间的这段代码,但是看不到这个问题。

Q2. 在boost::interprocess::message_queue中
构造函数,我必须定义2个参数;最大数量
消息,以及大小。对于标准类型,此尺寸为
固定。但是,对于消息(通常),消息的大小
是可变的。因此,是确定数量的最佳方法
要为消息保留的内存量?
我应该简单地设置一个最大值
每封邮件的大小并创建多部分邮件参数?

Q3. 是否有更好的方法来实现我的目标? 序列化数据,
放入队列中看起来是如此..复杂,尤其是
看到这可能是一个非常普遍的问题。必须有更多
人们试图创建跨平台IPC。像ZeroMQ这样的库
仅支持UNIX域套接字。使用TCP套接字进行环回
界面似乎很难看。难道没有一个图书馆能让我
将任意对象(大小和布局)作为消息共享
内存段,然后消费者可以使用pop()?我的意思是
单线程,可以通过堆栈上的push()pop()进行修复。
进行所有这些额外的步骤似乎很麻烦。

预先感谢您的任何答复。

编辑

如The Dark所述,上面的代码使用的是std::string实例,而不是实际的字符串(std::string.data())
producer.cpp的答案如下:
std::string str = myMessage.SerializeAsString();
mq.send(str.data(), str.size(), 1); 

但是,这对于consumer.cpp不能按原样工作,因为字符串初始化时的大小为0。

这是我用于consumer.cpp的代码:
unsigned int priority;
boost::interprocess::message_queue::size_type recvd_size;

//Reserve 1000 bytes of memory for our message
char incomingBuffer[1000];
mq.receive(&incomingBuffer, 1000, recvd_size, priority);

ib::protobuf::testMessage recvdMessage;

//Only if string object is really required
std::basic_string<char> str = incomingBuffer;
std::cout << "Message: " << str.data() << ". Size is " << recvd_size << std::endl;

//ParseFromString() can also directly parse "incomingBuffer", avoiding the cast above
recvdMessage.ParseFromString(str.data());

std::cout << "Message ID " << recvdMessage.id() << std::endl;
std::cout << recvdMessage.DebugString();

最佳答案

生产者的这一部分似乎是错误的。

    // Send our message
    std::ofstream buftosend;
    myMessage.SerializeToOstream(&buftosend);
    mq.send(&buftosend, sizeof(buftosend), 1);
ofstream尚未打开,因此没有文件可存储任何内容,因此第一个调用将失败(不会崩溃)。 send调用跨线发送原始ofstream类结构。这将不是可传输格式。

我认为您想要的是序列化为ostringstream,然后传输ostringstream的内容(而不是整个对象)。

就像是:
    // Send our message
    std::ostringstream buftosend;
    myMessage.SerializeToOstream(&buftosend);
    std::string str = buftosend.str();
    mq.send(str.data(), str.size(), 1); 

或者更好:
    // Send our message
    std::string str = myMessage.SerializeAsString();
    mq.send(str.data(), str.size(), 1); 

您还可以添加一条调试行以显示str的内容,尽管请注意它将是二进制的,因此不清晰。

您的使用者可能遇到类似的问题(如果需要在文件上打开流)。

关于c++ - Protobuf导致ParseFromIstream上的段错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37780120/

相关文章:

protocol-buffers - 将 protobuf 转换为 avro

java - 编辑从数据库中读取的数百个 Protocol Buffer 序列化消息

c++ - 如何防止不同插件中出现重复的资源 ID?

c++ - 分段后如何创建掩码来选择分段类?

java - 序列化和反序列化 JSON 中的对象

java - Spark Kryo 异常 - 类未注册 : com. google.common.base.Present

Java JSON 库支持在没有模式的情况下获取和设置深层值吗?

c++ - C++11 是否要求分配器是默认可构造的,libstdc++ 和 libc++ 不同意?

c++ - UDP 服务器套接字缓冲区溢出

c++ - C++ 中的 Protobuf ParseDelimitedFrom 实现