python - pyzmq:套接字因 "ZMQError: Operation cannot be accomplished in current state"中断

标签 python zeromq distributed-computing pyzmq low-latency

我对 ZeroMQ 非常陌生,正在尝试构建一个非常基本的消息系统。该代码很大程度上基于 here 中的示例。有一些曲折

出于某种原因,在最后一条消息到达frontend套接字(hbmtx)之后,代码抛出了我不是的错误确定其来源

抱歉,代码/输出太长,我认为强调问题所在并帮助调试非常重要


下面是我的代码+输出:

s_frontend = "ipc://frontend.ipc"
s_backend  = "ipc://backend.ipc"

import time

def hbm():
    """Worker task, using a REQ socket to do load-balancing."""
    socket = zmq.Context().socket(zmq.REQ)
    socket.identity = u"hbm".encode("ascii")
    socket.connect(s_frontend)
    # Tell broker we're ready for work
    socket.send(b"READY")
    while True:
        msgs = socket.recv_multipart()
        print("hbm got something", msgs)
        if "BE READY" in msgs:
            print("hbm:BE is ready for some work")
        time.sleep(3)

def tx():
    """Worker task, using a REQ socket to do load-balancing."""
    socket = zmq.Context().socket(zmq.REQ)
    socket.identity = u"txtx".encode("ascii")
    socket.connect(s_frontend)
    # Tell broker we're ready for work
    socket.send(b"READY")
    while True:
        msgs = socket.recv_multipart()
        print("tx got something ", msgs)
        if "BE READY" in msgs:
            print("tx:BE is ready for some work")
        time.sleep(3)


def dev():
    """Worker task, using a REQ socket to do load-balancing."""
    socket = zmq.Context().socket(zmq.REQ)
    socket.identity = u"dev".encode("ascii")
    socket.connect(s_backend)
    # Tell broker we're ready for work
    socket.send(b"READY")


def main():
    # Prepare context and sockets
    context = zmq.Context.instance()
    frontend = context.socket(zmq.ROUTER)
    frontend.bind( s_frontend )                 # ( "ipc://frontend.ipc" )
    backend = context.socket(zmq.ROUTER)
    backend.bind( s_backend )                   # ( "ipc://backend.ipc" )

    def start(task, *args):
        process = multiprocessing.Process(target=task, args=args)
        process.daemon = True
        process.start()

    start(hbm)
    start(tx)


    time.sleep(1)
    start(dev)

    clients = []
    poller = zmq.Poller()
    poller.register(backend, zmq.POLLIN)
    #poller.register(frontend, zmq.POLLIN)
    all_is_ready = False
    while True:
        sockets = dict(poller.poll(timeout=1))
        #print(sockets)
        soc = None
        if backend in sockets:
            print("got something from backend")
            msg = backend.recv_multipart()
            print(msg)
            print("adding frontend to poller")
            poller.register(frontend, zmq.POLLIN)
            print("backend is ready, notify frontend")

        elif frontend in sockets:
            print("got something from frontend")
            msg = frontend.recv_multipart()
            print(msg)
            clients.append(bytes(msg[0]))

        elif len(clients) == 2 and all_is_ready is False:
            all_is_ready = True
            for c in clients:
                print("sending response to", c)
                time.sleep(0.1) # just to prevent print overlap
                frontend.send_multipart([c, b"", b"BE READY"])
        else:
            print("so much work, no rest, sleeping for 3")
            time.sleep(3)


    # Clean up
    backend.close()
    frontend.close()
    context.term()


if __name__ == "__main__":
    main()

运行此代码会产生以下输出:

so much work, no rest, sleeping for 3
got something from backend
['dev', '', 'READY']
adding frontend to poller
backend is ready, notify frontend
got something from frontend
['hbm', '', 'READY']
got something from frontend
['txtx', '', 'READY']
sending response to hbm
sending response to txtx
hbm got something ['BE READY']
hbm:BE is ready for some work
tx got something  ['BE READY']
tx:BE is ready for some work
so much work, no rest, sleeping for 3
Process Process-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/vm/repo/pyzmq_examples/hal_example/example1.py", line 104, in hbm
    msgs = socket.recv_multipart()
  File "/usr/local/lib/python2.7/dist-packages/zmq/sugar/socket.py", line 475, in recv_multipart
    parts = [self.recv(flags, copy=copy, track=track)]
  File "zmq/backend/cython/socket.pyx", line 791, in zmq.backend.cython.socket.Socket.recv
  File "zmq/backend/cython/socket.pyx", line 827, in zmq.backend.cython.socket.Socket.recv
  File "zmq/backend/cython/socket.pyx", line 191, in zmq.backend.cython.socket._recv_copy
  File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy
  File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
    raise ZMQError(errno)
ZMQError: Operation cannot be accomplished in current state
so much work, no rest, sleeping for 3
Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/vm/repo/pyzmq_examples/hal_example/example1.py", line 130, in tx
    msgs = socket.recv_multipart()
  File "/usr/local/lib/python2.7/dist-packages/zmq/sugar/socket.py", line 475, in recv_multipart
    parts = [self.recv(flags, copy=copy, track=track)]
  File "zmq/backend/cython/socket.pyx", line 791, in zmq.backend.cython.socket.Socket.recv
  File "zmq/backend/cython/socket.pyx", line 827, in zmq.backend.cython.socket.Socket.recv
  File "zmq/backend/cython/socket.pyx", line 191, in zmq.backend.cython.socket._recv_copy
  File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy
  File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
    raise ZMQError(errno)
ZMQError: Operation cannot be accomplished in current state
so much work, no rest, sleeping for 3
Traceback (most recent call last):
  File "/home/vm/repo/pyzmq_examples/hal_example/example1.py", line 244, in <module>
    main()
  File "/home/vm/repo/pyzmq_examples/hal_example/example1.py", line 233, in main
    time.sleep(3)
KeyboardInterrupt

进程已完成,退出代码为 1

最佳答案

Q : "... throws error ... I'm not sure of it's origin "

ZMQError: Operation cannot be accomplished in current state

起源是在每个使用 REQ-Archetype 的用例中,代码不满足 native API 条件:

...
socket = zmq.Context().socket( zmq.REQ ) #--------------------- REQ socket Archetype
socket.identity = u"hbm".encode( "ascii" )
socket.connect( s_frontend )
socket.send( b"READY" ) #-------------------------------------- REQ.send()-s
while True:             # ..................................... REQ next can .recv()
    #      socket-FSA( of a type of REQ ) can execute a .recv() iff.send() preceded.....?
    msgs = socket.recv_multipart() #- - - - - - - - - - - - - - REQ.recv_multipart()-s
    #.......................................................... REQ next can .send()
    continue                       #-?-?-?-?-?-?: DID IT TRY TO REQ.send()? NO, NEVER...!

Do [you] mean by that that REQ must do iterative recv send and cannot do any other way ? (send recv recv) – LordTitiKaka

REQ - 原型(prototype)有一个硬连线的期望,即强制保留顺序:
.send() - .recv() - .send() - .recv() - .send() - .recv() - ... 无限

任何违反此触发器的行为 - 更改其内部有限状态自动机 (FSA) 内部状态都会导致所述 ZMQError: 操作无法在当前状态下完成

关于python - pyzmq:套接字因 "ZMQError: Operation cannot be accomplished in current state"中断,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61874154/

相关文章:

python - 如何让 Beautiful Soup 从 href 和 class 获取链接?

python - 如何解决 E402 的问题?

python - 转置日期时间索引后,如何停止 Pandas 向列标题添加时间?

python - 奇怪的 SQLAlchemy 错误消息 : TypeError: 'dict' object does not support indexing

php - ZeroMQ Apache 模块

javascript - 带有 node.js 管道接收器的 ZeroMQ 会在一段时间后停止接收消息

c++ - ZeroMq:打开的文件太多。同一对象上的 fd 使用量持续增长

hadoop - 可以在hadoop YARN中运行的应用程序

java - Flink 中的局部变量

erlang - 启动远程 Erlang 节点