python - 如何在函数内部(以非阻塞方式)使用 zmq 在客户端请求时获取函数的状态?

标签 python c++ zeromq pyzmq

当客户端请求状态时,如何以非阻塞方式使用 ZMQ 来“服务”长时间运行的作业的状态?

下面的代码说明了如何暂时“中断”长时间运行的任务以发送当前状态。

任务运行时间长是因为有很多 url 需要处理,而不是因为每个 url 都需要很长时间来处理。这意味着服务器几乎可以立即向客户端响应当前状态。

我无法以非阻塞方式实现此逻辑,因为使用标志 zmq.NOBLOCK 导致 Again: Resource temporary unavailable,并且不使用标志表示服务器阻塞并等待接收消息。

如何实现这样的逻辑/行为?我愿意使用 C++ 或 Python。

服务器代码:

import zmq

# Socket details
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)

# List of many urls
urls = ['http://google.com','http://yahoo.com']

def process(url):
    """Sample function"""
    pass

processed_urls = []
for url in urls:

    # If a message has been received by a client respond to the message
    # The response should be the current status.
    if socket.recv(zmq.NOBLOCK):
        msg = b"Processed the following urls %s" % str(processed_urls).encode()
        socket.send(msg, zmq.NOBLOCK)

    # Continue processing the urls
    process(url)
    processed_urls.append(url)

最佳答案

首先 - NON-BLOCKING 是一把双刃剑。有两个世界,每个世界都可以,有时确实会阻塞。

  1. GIL-side 和/或process-side "blocking" 可能会出现(numpy 下面的示例,但对于任何无法轻松实现非阻塞解决方法的同步阻塞调用有效),而某些外部进程或全局应用程序架构可能仍需要(至少)一些响应& 握手行为甚至来自这种故意“阻止”的 Python 代码区域。

  2. 第二个世界是您的 ZeroMQ(可能)阻塞调用。设置 zmq.CONFLATE 还可以帮助您从长时间运行的客户端到服务器进行类似 PUSH 的 URL 报告。在报告套接字的客户端和服务器端设置 CONFLATE

在我力所能及的每个地方,我都提倡严格的非阻塞设计。即使是 ZeroMQ 代码的教科书示例也应该是真实且公平的,不会被屏蔽。我们生活在第三个千年,阻塞代码是一种性能和资源使用的毁灭性状态,主要是在专业级分布式系统设计的控制范围之外。


主要脚手架:

####################################################################
### NEED TO VIEW aHealthSTATUS FROM AN EXTERNAL_UNIVERSE:
### ( A LIGHTWEIGHT EXCULPATED MONITOR TO OBSERVE THE HEALTH OF THE EXECUTION ENVIRONMENT FROM OUTSIDE OF THE VM-JAIL, FROM AN OUTER HYPERVISOR SPACE )
### ( + using signal.signal() )

import signal, os
#-------------------------------------------------------------------
# .SET  ZeroMQ INFRASTRUCTURE:

#-------------------------------------------------------------------
# .DEF  SIG_handler(s)

def SIG_handler_based_HealthREPORTER( SIGnum, aFrame ):
    print( 'SIG_handler called to report state with signal', SIGnum )
    #---------------------------------------------------------------
    # ZeroMQ .send( .SIG/.MSG )
    
    pass;   # yes, all the needed magic comes right here
    
    #-------------------------------------------------------------------
    # FINALLY:
    
    raise OSError( "Had to send a HealthREPORT" )                   # ??? do we indeed need this circus to be always played around, except in a DEMO-mode?

#-------------------------------------------------------------------
# .ASSOC SIG_handler:

signal.signal( signal.SIGALRM, SIG_handler_based_HealthREPORTER )   # .SET { SIGALRM: <aHandler> }-assoc

#-------------------------------------------------------------------
# .SET 1[sec]-delay + 1[sec]-interval

signal.setitimer( signal.ITIMER_REAL, 1, 1 )                        # .SET REAL-TIME Interval-based WatchDog -- Decrements interval timer in real time, and delivers SIGALRM upon expiration.


# ------------------------------------------------------------------
# FINALLY:


#-------------------------------------------------------------------
# .SET / .DEACTIVATE 
signal.setitimer( signal.ITIMER_REAL, 0 )                           # .SET / DEACTIVATE

#-------------------------------------------------------------------
# .TERM GRACEFULLY ZeroMQ INFRASTRUCTURE


#-------------------------------------------------------------------
# CLEAN EXIT(0)
_exit(0)

让我分享一种用于某种aHealthMONITOR 的方法,用于一个确实很长的 principal-BLOCKING 计算案例。

让我们举一个 GIL 的“阻塞”类型计算的例子:

#######
# SETUP
signal.signal(    signal.SIGALRM, SIG_ALRM_handler_A )          # .ASSOC { SIGALRM: thisHandler }
signal.setitimer( signal.ITIMER_REAL, 10, 5 )                   # .SET   @5 [sec] interval, after first run, starting after 10[sec] initial-delay
SIG_ALRM_last_ctx_switch_VOLUNTARY = -1                         # .RESET .INIT()

SIGALRM + ITIMER_REAL 的机制提供了一个可爱的自动化,至少可以通过一些响应让外部世界满意(在本例中频率为 ~ 0.2 [Hz],但主要{up-|down-}-可扩展到任何合理且系统范围内稳定的时间量-- 测试 0.5 [GHz ] 1.0 [GHz] VM 系统上的处理程序留给最终的黑客考虑 -- 否则适用合理的规模因素和非阻塞/低延迟设计的常识)

DEMO 读数显示,involuntary= 上下文切换如何演示阻塞无关机制(读取数字,随着它们的增长,而自愿性在整个 GIL 中保持不变-blocking part of the process ), 所以类似的def-ed SIG_ALRM_handler_XYZ() 可以提供为您的过程状态独立点播记者提供的解决方案。

SIG_ALRM_handler_A(): activated             Wed Oct 19 14:13:14 2016 ------------------------------ pctxsw(voluntary=53151, involuntary=1169)

>>> SIG_ALRM_last_ctx_switch_VOLUNTARY                              53243
>>> SIG_ALRM_last_ctx_switch_FORCED                                  1169

>>> [ np.math.factorial( 2**f ) for f in range(20) ][:5]            # too fast to notice @5[sec]
[1, 2, 24, 40320, 20922789888000]

#########
# COMPUTE
# len(str([np.math.factorial(2**f) for f in range(20)][-1]))    # .RUN   A "FAT"-BLOCKING CHUNK OF A regex/numpy/C/FORTRAN-calculus

>>> len( str( [ np.math.factorial( 2**f ) for f in range(20) ][-1] ) )
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1234)  INSPECT processes ... ev. add a Stateful-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:44 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1257)  INSPECT processes ... ev. add a Stateful-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:49 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1282)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:54 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1305)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:59 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1330)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:04 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1352)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:09 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1377)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:14 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1400)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:19 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1425)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:24 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1448)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:29 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1473)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:34 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1496)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1521)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:44 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1543)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:49 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1568)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:54 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1591)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:59 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1616)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:04 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1639)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:09 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1664)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:14 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1687)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:19 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1713)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:24 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1740)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:29 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1767)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:34 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1790)  INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1812)  INSPECT processes ... ev. add a StateFul-self-Introspection
2771010

在此流程上下文中,使用了此处理程序:

########################################################################
### SIGALRM_handler_          
###

import psutil, resource, os, time
        
SIG_ALRM_last_ctx_switch_VOLUNTARY = -1
SIG_ALRM_last_ctx_switch_FORCED    = -1

def SIG_ALRM_handler_A( aSigNUM, aFrame ):                              # SIG_ALRM fired evenly even during [ np.math.factorial( 2**f ) for f in range( 20 ) ] C-based processing =======================================
    #
    # onEntry_ROTATE_SigHandlers() -- MAY set another sub-sampled SIG_ALRM_handler_B() ... { last: 0, 0: handler_A, 1: handler_B, 2: handler_C }
    #
    # onEntry_SEQ of calls of regular, hierarchically timed MONITORS ( just the SNAPSHOT-DATA ACQUISITION Code-SPRINTs, handle later due to possible TimeDOMAIN overlaps )
    # 
    aProcess         =   psutil.Process( os.getpid() )
    aProcessCpuPCT   =         aProcess.cpu_percent( interval = 0 )     # EVENLY-TIME-STEPPED
    aCtxSwitchNUMs   =         aProcess.num_ctx_switches()              # THIS PROCESS ( may inspect other per-incident later ... on anomaly )
    
    aVolCtxSwitchCNT = aCtxSwitchNUMs.voluntary
    aForcedSwitchCNT = aCtxSwitchNUMs.involuntary
    
    global SIG_ALRM_last_ctx_switch_VOLUNTARY
    global SIG_ALRM_last_ctx_switch_FORCED
    
    if (     SIG_ALRM_last_ctx_switch_VOLUNTARY != -1 ):                # .INIT VALUE STILL UNCHANGED
        #----------
        # .ON_TICK: must process delta(s)
        if ( SIG_ALRM_last_ctx_switch_VOLUNTARY == aVolCtxSwitchCNT ):
            #
            # AN INDIRECT INDICATION OF A LONG-RUNNING WORKLOAD OUTSIDE GIL-STEPPING ( regex / C-lib / FORTRAN / numpy-block et al )
            #                                                                                 |||||              vvv
            # SIG_:  Wed Oct 19 12:24:32 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=315)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:24:37 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=323)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:24:42 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=331)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:24:47 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=338)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:24:52 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=346)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:24:57 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=353)  ~~~  0.0
            # ...                                                                             |||||              ^^^
            # 00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000]
            # >>>                                                                             |||||              |||
            #                                                                                 vvvvv              |||
            # SIG_:  Wed Oct 19 12:26:17 2016 ------------------------------ pctxsw(voluntary=49983, involuntary=502)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:26:22 2016 ------------------------------ pctxsw(voluntary=49984, involuntary=502)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:26:27 2016 ------------------------------ pctxsw(voluntary=49985, involuntary=502)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:26:32 2016 ------------------------------ pctxsw(voluntary=49986, involuntary=502)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:26:37 2016 ------------------------------ pctxsw(voluntary=49987, involuntary=502)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:26:42 2016 ------------------------------ pctxsw(voluntary=49988, involuntary=502)  ~~~  0.0                
            print(   "SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: ", time.ctime(), 10 * "-",  aProcess.num_ctx_switches(), "{0:_>60s}".format( str( aProcess.threads() ) ),          " INSPECT processes ... ev. add a StateFul-self-Introspection" )
    else:
        #----------
        # .ON_INIT: may report .INIT()
        print(   "SIG_ALRM_handler_A(): activated            ", time.ctime(), 30 * "-",  aProcess.num_ctx_switches() )
    
    ##########
    # FINALLY:
    
    SIG_ALRM_last_ctx_switch_VOLUNTARY = aVolCtxSwitchCNT               # .STO ACTUALs
    SIG_ALRM_last_ctx_switch_FORCED    = aForcedSwitchCNT               # .STO ACTUALs

关于python - 如何在函数内部(以非阻塞方式)使用 zmq 在客户端请求时获取函数的状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45114658/

相关文章:

python - Django 迁移找不到 GDALRaster

python - 在 Python 中为 select.select 操作文件描述符

python - 我可以将 Python windows 包安装到 virtualenvs 中吗?

python - 如何将 site_id 或主机添加到 Django 调试日志?

c++ - fftw:为什么我的 2D DFT 输出与每行的 1D DFT 输出不同?

c++ - 如何读取 .obj 文件?

c++ - Linux 中存在段错误的 poll() 系统调用核心转储

build - 如何使用cygwin在windows上构建zeromq?

你能得到 ZeroMQ 中发布者和订阅者的身份/地址吗?

c++ - 以 "almost always auto"样式初始化 ZeroMQ 2.2 消息要使用私有(private)构造函数