我正在尝试调用子进程内的子进程,以便使用 ZMQ 将信息发送到 Unity 应用程序。当我调用 socket.recv()
或 time.sleep
时,它会卡住父进程(这是主进程的子进程)
import json
import zmq
from multiprocessing import Process
import multiprocessing as mp
from absl import app, flags, logging
from absl.flags import FLAGS
def send_unity_data(arg):
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
socket.bind("tcp://*:8080")
while True:
if(arg.poll()):
message=arg.recv()
x = { "x":str(message[0]), "y":str(message[1])}
app_json = json.dumps(x)
socket.send_string(app_json)
message = socket.recv()
print("Received request: %s" % message)
def streaming(detection,args):
try:
vid = cv2.VideoCapture(int(FLAGS.video))
except:
vid = cv2.VideoCapture(FLAGS.video)
receiver1 , sender1 = mp.Pipe()
b_proc3 = Process(target=send_unity_data, args=[receiver1])
b_proc3.start()
while(True):
...
def Main(_argv):
receiver , sender = mp.Pipe()
b_proc = Process(target=streaming, args=[receiver,FLAGS])
b_proc.start()
while(True):
...
我想将位置坐标发送到Unity应用程序,该坐标是通过流处理计算的,如果有人有更好的方法,我也可以更改我的代码。
最佳答案
避免视频流中出现任何不确定的长时间阻塞状态
如果不进行更深入的分析,您的代码将使用阻塞模式操作,只要 Context()
-s 实例接收队列中还没有消息并且代码提交对socket.recv()
-方法,如上面的 message = socket.recv()
SLOC 中。
设计多层/多进程协调是为了避免每一个潜在的阻塞 - ZeroMQ 有 .poll()
-非阻塞或确定性方法(最大延迟预算综合 MUX) -ed ) 优先级轮询(类似主循环)-“ Controller ”-策略。
请随时阅读有关如何 best use ZeroMQ Hierarchy 的更多详细信息适合您的项目。
<小时/>代码块在哪里?让我们回顾一下现状:
多处理
模块具有其他默认值,并表现出与基于 ZeroMQ 消息/信令基础设施的工具不同的行为。最好在两侧使用 ZeroMQ - 无需依赖第二层 multiprocessing.Pipe 工具将内容传送到 ZeroMQ 操作领域。 ZeroMQ 无堆栈传输类,如 inproc://
和 ipc://
甚至集群范围tipc://
提供更好的性能,因为它们可以使用零复制来最终消除处理延迟。
无论如何,避免调用所有阻塞形式的方法并设计代码,使其不依赖于(还不是,更不依赖于)传递的消息:
def send_unity_data( arg ): ### an ad-hoc called
context = zmq.Context() ### 1st: spends time to instantiate a .Context()
socket = context.socket( zmq.ROUTER ) ### 2nd: spends time to instantiate a .socket()
socket.bind("tcp://*:8080") ### 3rd: spends time to ask/acquire O/S port(s)
while True: ### INFINITE-LOOP----------------------------------------
if( arg.poll() ### ?-?-?-?-?-?-? MAY BLOCK depending on arg's .poll()-method
): ### IF .poll()-ed:
message = arg.recv() ### ?-?-?-?-?-?-? MAY BLOCK till a call to arg.recv()-finished
x = { "x": str( message[0] ), ### try to assemble a dict{}
"y": str( message[1] ) ### based on both message structure
} ### and content
app_json = json.dumps( x ) ### try to JSON-ify the dict{}
socket.send_string( app_json ) ### x-x-x-x-x-x-x WILL BLOCK till a socket.send_string() finished
message = socket.recv() ### x-x-x-x-x-x-x WILL BLOCK till a socket.recv()is ever finished
print( "Received request: %s" ### try to print
% message ### a representation of a <message>
) ### on CLI
########################################### This is
# SPIN/LOOP OTHEWRISE ### hasty to O/S resources, wasting GIL-lock latency masking
########################################################################################################
关于Python 子进程被子进程卡住,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56797257/