zeromq和python多处理,打开文件过多

标签 zeromq python-multiprocessing

我有一个基于代理的模型,其中几个代理由一个中央进程启动,并通过另一个中央进程进行通信。每个代理和通信过程都通过zmq进行通信。但是,当我启动100个以上的代理程序时,standard_out发送:

Invalid argument (src/stream_engine.cpp:143) Too many open files (src/ipc_listener.cpp:292)



Mac OS提示问题报告:

Python quit unexpectedly while using the libzmq.5.dylib plug-in.



在我看来,问题在于打开了太多上下文。但是如何通过多处理来避免这种情况?

我附上下面的部分代码:

class Agent(Database, Logger, Trade, Messaging, multiprocessing.Process):
    def __init__(self, idn, group, _addresses, trade_logging):
        multiprocessing.Process.__init__(self)
        ....

    def run(self):
        self.context = zmq.Context()
        self.commands = self.context.socket(zmq.SUB)
        self.commands.connect(self._addresses['command_addresse'])
        self.commands.setsockopt(zmq.SUBSCRIBE, "all")
        self.commands.setsockopt(zmq.SUBSCRIBE, self.name)
        self.commands.setsockopt(zmq.SUBSCRIBE, group_address(self.group))

        self.out = self.context.socket(zmq.PUSH)
        self.out.connect(self._addresses['frontend'])
        time.sleep(0.1)
        self.database_connection = self.context.socket(zmq.PUSH)
        self.database_connection.connect(self._addresses['database'])
        time.sleep(0.1)
        self.logger_connection = self.context.socket(zmq.PUSH)
        self.logger_connection.connect(self._addresses['logger'])

        self.messages_in = self.context.socket(zmq.DEALER)
        self.messages_in.setsockopt(zmq.IDENTITY, self.name)
        self.messages_in.connect(self._addresses['backend'])

        self.shout = self.context.socket(zmq.SUB)
        self.shout.connect(self._addresses['group_backend'])
        self.shout.setsockopt(zmq.SUBSCRIBE, "all")
        self.shout.setsockopt(zmq.SUBSCRIBE, self.name)
        self.shout.setsockopt(zmq.SUBSCRIBE, group_address(self.group))

        self.out.send_multipart(['!', '!', 'register_agent', self.name])

        while True:
            try:
                self.commands.recv()  # catches the group adress.
            except KeyboardInterrupt:
                print('KeyboardInterrupt: %s,self.commands.recv() to catch own adress ~1888' % (self.name))
                break
            command = self.commands.recv()
            if command == "!":
                subcommand = self.commands.recv()
                if subcommand == 'die':
                    self.__signal_finished()
                    break
            try:
                self._methods[command]()
            except KeyError:
                if command not in self._methods:
                    raise SystemExit('The method - ' + command + ' - called in the agent_list is not declared (' + self.name)
                else:
                    raise
            except KeyboardInterrupt:
                print('KeyboardInterrupt: %s, Current command: %s ~1984' % (self.name, command))
                break

            if command[0] != '_':
                self.__reject_polled_but_not_accepted_offers()
                self.__signal_finished()
        #self.context.destroy()

整个代码在http://www.github.com/DavoudTaghawiNejad/abce

最佳答案

奇怪的是,这不是太多的上下文,而是太多的套接字。查看您的存储库,我发现您(正确地)使用IPC作为传输工具。 IPC使用文件描述符作为“地址”,以在不同进程之间来回传递数据。如果我没看错,则每个进程最多要打开7个套接字,这样一来,它们就会很快累加起来。我敢打赌,如果在代码中间进行一些调试,您会发现创建最后一个上下文时,它不会失败,但是当最后一个套接字将打开文件的限制推到边缘时,它不会失败。

我的理解是,开放式FD的典型用户限制约为1000个,因此在大约100个代理程序下,您只为套接字推送700个开放式FD。其余的可能只是典型的。将您的限额提高到10,000,这取决于您的情况,应该没有问题。否则,您将不得不重写以使用每个进程更少的套接字来获得更高的进程限制。

关于zeromq和python多处理,打开文件过多,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31220483/

相关文章:

android - 在 jeromq 中使用轮询

java - 如何在Java中为0MQ(ZeroMQ)设置超时?

visual-studio - 如何将ZeroMQ导入Unity引擎?

go - 使用 ZeroMQ 在 Go 中处理中断的惯用方法

python - 处理两个传入数据流并将它们组合在 python 中?

python - 服务器上多个客户端的扭曲线程

python - 如何在多处理中结合 TimeoutError 和 tq​​dm 进度条?

python - 使用 ZeroMQ 实现多线程服务器

python - matplotlib 中的多进程绘图

python - 为什么 Python Multiprocessing Workers 不会死?