python - 无法使用 asyncio 和 aiohttp 将远程计算机上的大量帖子发送到本地

标签 python python-3.x python-asyncio aiohttp

我编写了一个程序,可以使用 asyncioaiohttp 来发布事件。当我在本地运行该程序时,它可以工作。我可以发布 10k 个事件,没有问题。但是,我将整个代码库 SCPed 到远程计算机,并且在该计算机中我无法发布超过 15 个事件而不出现此错误:

RuntimeError: Event loop is closed
Exception ignored in: <coroutine object Poster.async_post_event at 0x7f4a53989410>
Traceback (most recent call last):
  File "/home/bli1/qe-trinity/tracer/utils/poster.py", line 63, in async_post_event
  File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 565, in __aenter__
  File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 198, in _request
  File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 316, in connect
  File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 349, in _release_waiter
  File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 332, in set_result
  File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
  File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 447, in call_soon
  File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon
  File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed
RuntimeError: Event loop is closed
Exception ignored in: <coroutine object Poster.async_post_event at 0x7f4a5397ffc0>
Traceback (most recent call last):
  File "/home/bli1/qe-trinity/tracer/utils/poster.py", line 63, in async_post_event
  File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 565, in __aenter__
  File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 198, in _request
  File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 316, in connect
  File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 349, in _release_waiter
  File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 332, in set_result
  File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
  File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 447, in call_soon
  File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon
  File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed
RuntimeError: Event loop is closed

如何调试此问题或找出问题根源?

这是我创建的类,我使用方法 post() 来运行:

import uuid
import os
import asyncio
import time
import random
import json
import aiohttp
from tracer.utils.phase import Phase

class Poster(Phase):
    def __init__(self, log, endpoint, num_post, topic, datafile, timeout, oracles, secure=False, thru_proxy=True):
        Phase.__init__(self, log, "post", oracles, secure, thru_proxy)
        self.log = log
        self.num_post = int(num_post)
        self.datafile = datafile.readlines()
        self.topic = topic
        self.endpoint = self.set_endpoint(endpoint, self.topic)
        self.response = None
        self.timeout = timeout

    def random_line(self):
        """ Returns random line from file and converts it to JSON """
        return json.loads(random.choice(self.datafile))

    @staticmethod
    def change_uuid(event):
        """ Creates new UUID for event_id """
        new_uuid = str(uuid.uuid4())
        event["event_header"]["event_id"] = new_uuid
        return event

    @staticmethod
    def wrapevent(event):
        """ Wrap event with metadata for analysis later on """
        return {
            "tracer": {
                "post": {
                    "statusCode": None,
                    "timestamp": None,
                },
                "awsKafkaTimestamp": None,
                "qdcKakfaTimestamp": None,
                "hdfsTimestamp": None
            },
            "event": event
        }

    def gen_random_event(self):
        random_event = self.random_line()
        event = self.change_uuid(random_event)
        dataspec = self.wrapevent(event)
        return dataspec

    async def async_post_event(self, event, session):
        async with session.post(self.endpoint, data=event, proxy=self.proxy) as resp:
            event["tracer"]["post"]["timestamp"] = time.time() * 1000.0
            event["tracer"]["post"]["statusCode"] = resp.status
            unique_id = event["event"]["event_header"]["event_id"]
            oracle_endpoint = os.path.join(self.oracle, unique_id)
        async with session.put(oracle_endpoint, data=json.dumps(event), proxy=self.proxy) as resp:
            if resp.status != 200:
                self.log.debug("Post to ElasticSearch not 200")
                self.log.debug(event["event"]["event_header"]["event_id"])
                self.log.debug("Status code: " + str(resp.status))
            return event["event"]["event_header"]["event_id"], resp.status

    async def async_post_events(self, events):
        coros = []
        conn = aiohttp.TCPConnector(verify_ssl=self.secure)
        async with aiohttp.ClientSession(connector=conn) as session:
            for event in events:
                coros.append(self.async_post_event(event, session))
            return await asyncio.gather(*coros)

    def post(self):
        event_loop = asyncio.get_event_loop()
        try:
            events = [self.gen_random_event() for i in range(self.num_post)]
            start_time = time.time()
            results = event_loop.run_until_complete(self.async_post_events(events))
            print("Time taken: " + str(time.time() - start_time))
        finally:
            event_loop.close()

最佳答案

循环一旦关闭就无法重复使用。来自 AbstractEventLoop.close文档:

This is idempotent and irreversible. No other methods should be called after this one.

删除 loop.close 调用或为每个帖子创建一个新循环。

我的建议是通过运行循环内的所有内容并在需要时等待 async_post_events 来避免这些问题。

关于python - 无法使用 asyncio 和 aiohttp 将远程计算机上的大量帖子发送到本地,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40412425/

相关文章:

python - 在这里可以接受抑制 `asyncio.CancelledError` 吗?

python - 在 gevent 中,如何转储所有正在运行的 greenlet 的堆栈跟踪?

java - 如何定义一个 map 接受 Thrift 中不同类型的值?

python - 如何将文件读入 Python 特定文件类型的字典

python - 如何使用 matplotlib 选择多个点?

python - 在 curio 中等待事件的问题

Python Asyncio run_forever() 和任务

python - Selenium Webdriver - Python - leboncoin - pb 选择带重音的按钮

python - 使用 csv.writer 直接写入二进制模式

Python 向 PATH 添加了很多东西。我该如何停止?