python - 在 Python 3 中使用 asyncio 和 websockets 的长时间延迟

标签 python python-3.x websocket python-asyncio

在处理从 websocket 服务器推送到我的 client.py 的数据时,我遇到了长时间(3 小时)的延迟(编辑:延迟一开始很短,然后在一天中变得更长)。我知道它不是服务器延迟的。

例如,我每隔 5 秒就会看到 keep_alive 日志事件及其相应的时间戳。这样就可以顺利进行了。 但是当我看到日志中处理的数据帧实际上是在服务器发送数据帧 后 3 小时。 我是否正在做一些事情来延迟这个过程?

我是否正确地调用了协程“keep_alive”? keep_alive 只是向服务器发送一条消息,以保持连接处于事件状态。服务器回显消息。我也登录太多了吗?这会不会延迟处理(我不这么认为,因为我看到日志记录事件立即发生)。

async def keep_alive(websocket):
                """
                 This only needs to happen every 30 minutes. I currently have it set to every 5 seconds.
                """
                await websocket.send('Hello')   
                await asyncio.sleep(5)

async def open_connection_test():
    """
    Establishes web socket (WSS). Receives data and then stores in csv.
    """
    async with websockets.connect( 
            'wss://{}:{}@localhost.urlname.com/ws'.format(user,pswd), ssl=True, ) as websocket:
        while True:    
            """
            Handle message from server.
            """
            message = await websocket.recv()
            if message.isdigit():
                # now = datetime.datetime.now()
                rotating_logger.info ('Keep alive message: {}'.format(str(message)))
            else:
                jasonified_message = json.loads(message)
                for key in jasonified_message:
                    rotating_logger.info ('{}: \n\t{}\n'.format(key,jasonified_message[key]))    
                """
                Store in a csv file.
                """
                try:            
                    convert_and_store(jasonified_message)
                except PermissionError:
                    convert_and_store(jasonified_message, divert = True)                        
            """
            Keep connection alive.
            """            
            await keep_alive(websocket)

"""
Logs any exceptions in logs file.
"""
try:
    asyncio.get_event_loop().run_until_complete(open_connection())
except Exception as e:
    rotating_logger.info (e)

编辑: 来自documentation - 我认为这可能与它有关 - 但我没有将这些点联系起来。

The max_queue parameter sets the maximum length of the queue that holds incoming messages. The default value is 32. 0 disables the limit. Messages are added to an in-memory queue when they’re received; then recv() pops from that queue. In order to prevent excessive memory consumption when messages are received faster than they can be processed, the queue must be bounded. If the queue fills up, the protocol stops processing incoming data until recv() is called. In this situation, various receive buffers (at least in asyncio and in the OS) will fill up, then the TCP receive window will shrink, slowing down transmission to avoid packet loss.

编辑 2018 年 9 月 28 日:我在没有保持事件消息的情况下对其进行测试,这似乎不是问题所在。它可能与 convert_and_store() 函数有关吗?这是否需要异步定义然后等待?

def convert_and_store(data, divert = False, test = False):
    if test:
        data = b
    fields = data.keys()
    file_name =  parse_call_type(data, divert = divert)
    json_to_csv(data, file_name, fields)

编辑 10/1/2018:似乎 keep-alive 消息和 convert_and_store 都存在问题;如果我将保持事件消息延长到 60 秒 - 那么 convert_and_store 将每 60 秒仅运行一次。所以 convert_and_store 正在等待 keep_alive()...

最佳答案

Could it be related to the convert_and_store() function?

是的,它可能是。不应直接调用阻塞代码。如果一个函数执行 1 秒的 CPU 密集型计算,则所有 asyncio 任务和 IO 操作将延迟 1 秒。

执行器可用于在不同的线程/进程中运行阻塞代码:

import asyncio
import concurrent.futures
import time

def long_runned_job(x):
    time.sleep(2)
    print("Done ", x)

async def test():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        for i in range(5):
            loop.run_in_executor(pool, long_runned_job, i)
            print(i, " is runned")
            await asyncio.sleep(0.5)
loop = asyncio.get_event_loop()
loop.run_until_complete(test())

在您的情况下,它应该看起来像这样:

import concurrent.futures

async def open_connection_test():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        async with websockets.connect(...) as websocket:
            while True:    
                ...
                loop.run_in_executor(pool, convert_and_store, args)

已编辑:

It seems that both the keep-alive message and convert_and_store are both at issue

您可以在后台运行keep_alive:

async def keep_alive(ws):
    while ws.open:
        await ws.ping(...)   
        await asyncio.sleep(...)

async with websockets.connect(...) as websocket:
    asyncio.ensure_future(keep_alive(websocket))
    while True:    
        ...

关于python - 在 Python 3 中使用 asyncio 和 websockets 的长时间延迟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52484087/

相关文章:

python - 尝试将字典转换为数据帧并使用 for 循环附加到同一数据帧

python - 如何将鼠兔连接到rabbitMQ远程服务器? ( python 、鼠兔)

regex - 如何使用 Pandas 拆分数据框?

Sha1 的 C++ Base64 - WebSocket 握手

http - websockets 如何比简单的 HTTP 请求更快?

python - 如何在PyUsb中绑定(bind)/取消绑定(bind)USB设备?

php - 如何使用shell命令在php中执行python脚本?

python-3.x - 如何从 ubuntu 控制台运行几个 python 程序?

Python - 如何设置法语语言环境?

java - 泰鲁斯 wss ://websocket not passing through squid proxy