我正在尝试扩展编程知识,并且尝试进行一些多进程编程。
我想执行以下操作:在同一主机上,运行多个可执行文件。可执行文件之一负责扫描文件系统,可执行文件之一负责处理数据等。
但是,某些数据必须脱离主机传输。为了限制诸如网络防火墙设置之类的事情,我希望有一个守护程序(多线程)通过IPC接收数据,然后再使用尚未确定的套接字实现将其发送到外部主机。
经过大量搜索和研究之后,最明显的使用模式是消费者/生产者模式,其中包括多进程生产者(守护程序生成消息)和多线程消费者(接收数据,最好通过共享内存接收数据并发送它到一个外部主机)。
我希望我的应用程序能够尽可能地跨平台运行。为此,我正在使用boost::interprocess:message_queue。因为此Boost库仅接受二进制序列化的对象,所以我使用Google Protobuf来处理序列化和反序列化。
我创建了2个可执行文件,当前称为“消费者”和“生产者”。生产者通过消息队列将消息发送给消费者,消费者反序列化它。下面的代码在传递简单的“int”对象时起作用(在我看来,这意味着消息队列通信正在工作),但是在使用来自SerializeToOstream()的数据时则不起作用。
您可能已经注意到,我是IPC和多进程编程的新手,但我相信自己已经完成了功课。
这是我的producer.cpp:
#include <iostream>
#include <chrono>
#include <thread>
#include <fstream>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/thread.hpp>
#include <internal/messages/testmessage.pb.h>
int main(int argc, char** argv) {
// Construct the object to be passed
GOOGLE_PROTOBUF_VERIFY_VERSION;
struct protoremove {
~protoremove(){ google::protobuf::ShutdownProtobufLibrary(); }
} remover;
ib::protobuf::testMessage myMessage;
myMessage.set_id(10);
myMessage.set_version(1);
std::cout << myMessage.DebugString() << std::endl;
// Initialize the Boost message queue
try{
//Open a message queue.
boost::interprocess::message_queue mq
(boost::interprocess::open_or_create
,"message_queue" //name
,100 //max message number
,1000 //max message size
);
// Send our message
std::ofstream buftosend;
myMessage.SerializeToOstream(&buftosend);
mq.send(&buftosend, sizeof(buftosend), 1);
}
catch(boost::interprocess::interprocess_exception &ex){
std::cout << ex.what() << std::endl;
return 1;
}
return 0;
}
Consumer.cpp:
#include <iostream>
#include <fstream>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/thread.hpp>
#include <internal/messages/testmessage.pb.h>
int main(int argc, char** argv) {
// Open the message queue
try {
//Erase previous message queue
boost::interprocess::message_queue::remove("message_queue");
ib::protobuf::testMessage recvdMessage;
//Create a message_queue.
boost::interprocess::message_queue mq
(boost::interprocess::open_or_create
,"message_queue" //name
,100 //max message number
,1000 //max message size
);
unsigned int priority;
boost::interprocess::message_queue::size_type recvd_size;
std::ifstream incomingbuf;
mq.receive(&incomingbuf, 1000, recvd_size, priority);
recvdMessage.ParseFromIstream(&incomingbuf);
recvdMessage.id();
recvdMessage.DebugString();
}
catch(boost::interprocess::interprocess_exception &ex){
boost::interprocess::message_queue::remove("message_queue");
std::cout << "IP error " << ex.what() << std::endl;
return 1;
}
boost::interprocess::message_queue::remove("message_queue");
return 0;
}
和消息定义(.proto):
package ib.protobuf;
message testMessage {
required int32 version = 1;
optional int64 id = 2;
optional string data = 3;
optional int64 sequencenumber = 4;
}
运行使用者时,它等待数据(mq.receive()调用正在阻塞)。
当生产者开始时,消费者得到一个SIGSEGV。 gdb在其回溯中指示这是在第44行上发生的,这是ParseFromIstream()方法。
生产者在DebugString()中输出正确的值。
(gdb) r
Starting program: /home/roel/bin/consumer
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib/libthread_db.so.1".
Program received signal SIGSEGV, Segmentation fault.
std::istream::sentry::sentry (this=0x7fffffffe117, __in=..., __noskip=true)
at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:50
50 /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc: No such file or directory.
(gdb) bt
#0 std::istream::sentry::sentry (this=0x7fffffffe117, __in=..., __noskip=true)
at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:50
#1 0x00007ffff679f7ab in std::istream::read (this=0x7fffffffe380,
__s=0x637d20 "", __n=8192)
at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:653
#2 0x00007ffff6b10030 in google::protobuf::io::IstreamInputStream::CopyingIstreamInputStream::Read(void*, int) () from /usr/lib/libprotobuf.so.9
#3 0x00007ffff6a99fe1 in google::protobuf::io::CopyingInputStreamAdaptor::Next(void const**, int*) () from /usr/lib/libprotobuf.so.9
#4 0x00007ffff6a97950 in google::protobuf::io::CodedInputStream::Refresh() ()
from /usr/lib/libprotobuf.so.9
#5 0x00007ffff6a94da3 in google::protobuf::MessageLite::ParseFromZeroCopyStream(google::protobuf::io::ZeroCopyInputStream*) () from /usr/lib/libprotobuf.so.9
#6 0x00007ffff6af5ad9 in google::protobuf::Message::ParseFromIstream(std::istream*) () from /usr/lib/libprotobuf.so.9
#7 0x0000000000407e35 in main (argc=1, argv=0x7fffffffe6d8)
at /home/roel/source/consumer.cpp:44
(gdb)
这是使用CMake和GCC 6.0.1在Linux上编译的。
我对自己的程序有很多疑问:
Q1.
首先,也是最重要的; 是什么原因导致细分错误? 我究竟做错了什么?我已经看了很长时间的这段代码,但是看不到这个问题。
Q2.
在boost::interprocess::message_queue中构造函数,我必须定义2个参数;最大数量
消息,以及大小。对于标准类型,此尺寸为
固定。但是,对于消息(通常),消息的大小
是可变的。因此,是确定数量的最佳方法
要为消息保留的内存量? 我应该简单地设置一个最大值
每封邮件的大小并创建多部分邮件参数?
Q3.
是否有更好的方法来实现我的目标? 序列化数据,放入队列中看起来是如此..复杂,尤其是
看到这可能是一个非常普遍的问题。必须有更多
人们试图创建跨平台IPC。像ZeroMQ这样的库
仅支持UNIX域套接字。使用TCP套接字进行环回
界面似乎很难看。难道没有一个图书馆能让我
将任意对象(大小和布局)作为消息共享
内存段,然后消费者可以使用
pop()
?我的意思是单线程,可以通过堆栈上的
push()
和pop()
进行修复。进行所有这些额外的步骤似乎很麻烦。
预先感谢您的任何答复。
编辑
如The Dark所述,上面的代码使用的是
std::string
实例,而不是实际的字符串(std::string.data())
producer.cpp的答案如下:
std::string str = myMessage.SerializeAsString();
mq.send(str.data(), str.size(), 1);
但是,这对于consumer.cpp不能按原样工作,因为字符串初始化时的大小为0。
这是我用于consumer.cpp的代码:
unsigned int priority;
boost::interprocess::message_queue::size_type recvd_size;
//Reserve 1000 bytes of memory for our message
char incomingBuffer[1000];
mq.receive(&incomingBuffer, 1000, recvd_size, priority);
ib::protobuf::testMessage recvdMessage;
//Only if string object is really required
std::basic_string<char> str = incomingBuffer;
std::cout << "Message: " << str.data() << ". Size is " << recvd_size << std::endl;
//ParseFromString() can also directly parse "incomingBuffer", avoiding the cast above
recvdMessage.ParseFromString(str.data());
std::cout << "Message ID " << recvdMessage.id() << std::endl;
std::cout << recvdMessage.DebugString();
最佳答案
生产者的这一部分似乎是错误的。
// Send our message
std::ofstream buftosend;
myMessage.SerializeToOstream(&buftosend);
mq.send(&buftosend, sizeof(buftosend), 1);
ofstream
尚未打开,因此没有文件可存储任何内容,因此第一个调用将失败(不会崩溃)。 send调用跨线发送原始ofstream
类结构。这将不是可传输格式。我认为您想要的是序列化为ostringstream,然后传输ostringstream的内容(而不是整个对象)。
就像是:
// Send our message
std::ostringstream buftosend;
myMessage.SerializeToOstream(&buftosend);
std::string str = buftosend.str();
mq.send(str.data(), str.size(), 1);
或者更好:
// Send our message
std::string str = myMessage.SerializeAsString();
mq.send(str.data(), str.size(), 1);
您还可以添加一条调试行以显示
str
的内容,尽管请注意它将是二进制的,因此不清晰。您的使用者可能遇到类似的问题(如果需要在文件上打开流)。
关于c++ - Protobuf导致ParseFromIstream上的段错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37780120/