c++ - ZMQ : socket_send/recv blocking

标签 c++ python python-2.7 zeromq pyzmq

所以我试图在Python中的ZMQ和C/C++扩展中的ZMQ之间建立一些简单的通信。 Python 设置上下文,绑定(bind) inproc 套接字,并将上下文和套接字名称传递给扩展。该扩展设置自己的套接字、连接并监听消息。然后,Python 将 header 和字典的字符串表示形式发送到扩展。使用 REQ/REP 套接字非常简单。然而,由于某种原因,我似乎找不到,对 socket.send 的调用是阻塞的,并且扩展永远不会通过对 zmq_recv 的调用。我有一个测试环境,其中出现几乎完全相同的场景,但套接字不会阻塞,并且我已经对代码进行了三次检查,它应该以相同的方式工作。

Python:

import zmq
import cppextension
# No lectures about using threading please. I'm restricted to, in essence
# using this function because of the code base I'm working with.
from thread import start_new_thread

socket = self.zmq_context.socket(zmq.REQ)
socket_name = "inproc://agl"
socket.bind(socket_name)
t = start_native_thread(cppextension.actor,
                        (self.zmq_context, socket_name))

test_send = {"foo": 1, "bar": 2}
# BLOCKS ON THIS LINE VVVVV
socket.send("TEST", flags=zmq.SNDMORE)
socket.send(str(test_send))
socket.recv()
socket.send("STOP")

C/C++:

// Originally these used std::basic_string<Py_UNICODE> but I reverted
// back to normal std::string so I can use a JSON parsing library.
typedef string pystring;
typedef char pystring_t;

extern "C" PyObject *
actor(PyObject *self, PyObject *args) {
    PyObject *py_context, *py_connect_to;
    PyThreadState *_save;
    void *context;
    char *connect_to;
    void *socket;
    int rc;

    if(!PyArg_ParseTuple(args, "OO", &py_context, &py_connect_to)) {
        PyErr_SetString(PyExc_TypeError, "Expected two arguments (ZMQ context, name of socket to connect to)");
        return NULL;
    }
    py_context = PyObject_GetAttrString(py_context, "_handle");
    if(py_context == NULL) {
        PyErr_SetString(PyExc_TypeError, "Could not get '_handle' from context");
        return NULL;
    }
    if(!PyInt_Check(py_context)) {
        PyErr_SetString(PyExc_TypeError, "_handle was not an integer");
        return NULL;
    }
    context = (void*)PyInt_AsLong(py_context);
    connect_to = new char[PyString_Size(py_connect_to) + 1];
    strcpy(connect_to, PyString_AsString(py_connect_to));
    _save = PyEval_SaveThread();

    //
    // GIL-less operation BEGIN
    // ** WARNING: Do NOT call any functions that begin with 'Py', or touch any
    //    data structures that begin with 'Py' while in this section. It *WILL*
    //    blow up the Python interpreter.
    //
    socket = zmq_socket(context, ZMQ_REP);
    rc = zmq_connect(socket, connect_to);

    pystring TEST("TEST");
    pystring STOP("STOP");
    pystring SUCCESS("SUCCESS");
    pystring FAILURE("FAILURE");

    if(rc == 0) {
        int going = 1;
        // Should be able to hold a full megabyte of text, which should be enough
        // for any message being passed in.
        // Is there a way to query size of the incoming message...?
        char buffer[1000000];
        while(going) {
            // BLOCKS ON THIS LINE VVVVVV
            int size = zmq_recv(socket, buffer, 1000000, 0);
            if(size == -1) {
                // ERROR
                continue;
            }
            // Assume we don't get larger than 1MB of data. Should put a
            // check around this at some point, but not right now.
            buffer[size] = 0;

            pystring fullmsg(buffer);
            cout << "ZMQ RECIEVED: " << fullmsg << endl;
            if(fullmsg == TEST) {
                size = zmq_recv(socket, &buffer, 1000000, 0);
                if(size != -1) {
                    buffer[size] = 0;
                    pystring json_fullmsg(buffer);
                    cout << "ZMQ JSON: " << json_fullmsg << endl;
                    contacts.add(json_fullmsg);
                    zmq_send(socket, SUCCESS.c_str(), SUCCESS.size() + 1, 0);
                }
                else {
                    zmq_send(socket, FAILURE.c_str(), FAILURE.size() + 1, 0);
                }
            }
            else if(fullmsg == STOP) {
                going = 0;
                zmq_send(socket, SUCCESS.c_str(), SUCCESS.size() + 1, 0);
            }
        }
    }
    else {
        // ERROR
        int err = zmq_errno();
        switch(err) {
        case EINVAL:
            cout << "ZMQ CONNECT ERR: " << "Endpoint supplied is invalid" << endl;
            break;
        default:
            cout << "ZMQ CONNECT ERR: " << err << endl;
            break;
        }
    }
    zmq_close(socket);
    //
    // GIL-less operation END
    //

    PyEval_RestoreThread(_save);
    Py_INCREF(Py_None);
    return Py_None;
}

非常感谢您帮助了解这里发生的情况。

编辑:另请注意,此代码将在 gevent 对标准库进行了 Monkeypatched 的环境中运行。这是我使用 thread.start_new_thread 的部分原因,因为它是在猴子修补发生之前保存的,并且我想要一个真正的线程而不是绿色线程。

最佳答案

有两件事,

因为您在修改版本中使用了 req/rep,所以“send,send,recv,send...”将 不行。发送/接收都必须以“锁步”方式工作(发送、接收、发送、接收。)

ZMQ_NOBLOCK会引发EAGAIN异常,这可能意味着“套接字连接 尚未完成,请稍后再回来。”尝试在绑定(bind)后放置一个计时器/ sleep 发送/接收。这就是导致“资源暂时不可用”消息的原因。

希望这有帮助

先生。奥诺夫

关于c++ - ZMQ : socket_send/recv blocking,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18407799/

相关文章:

python - 使用 tf.metric 模块中的变量时出现 TensorFlow FailedPreconditionError

python - 能够绘制为列表,无法绘制为 Pandas 系列

c++ - 如何在单个概念中实现逻辑 'or' 的需求?

c++ - 如何遍历一组集合 C++

c++ - 用作模板参数的 lambda 正文中的右位移位在 GCC 上无法编译

python - 如何从 AWS 中的 SQS 队列接收多于 1 条消息?

Python ctypes - 类型错误 : int expected instead of float

Python:诅咒和默认的黑色

python - pandas:基于 NaN 的切片数据帧

c++ - 尝试将字符索引传递给 DrawText