python - Tornado 与 ThreadPoolExecutor

标签 python multithreading io tornado

我有使用 Tornado 作为 http 服务器和自定义 http 框架的设置。想法是拥有单个 Tornado 处理程序,每个到达的请求都应该提交给 ThreadPoolExecutor 并让 Tornado 监听新请求。一旦线程完成处理请求,就会调用回调,在执行 IO 循环的同一线程中向客户端发送响应。

精简后的代码看起来像这样。基础 http 服务器类:

class HttpServer():
    def __init__(self, router, port, max_workers):
        self.router = router
        self.port = port
        self.max_workers = max_workers

    def run(self):
        raise NotImplementedError()

Tornado 支持的 HttpServer 实现:

class TornadoServer(HttpServer):
    def run(self):
        executor = futures.ThreadPoolExecutor(max_workers=self.max_workers)

        def submit(callback, **kwargs):
            future = executor.submit(Request(**kwargs))
            future.add_done_callback(callback)
            return future

        application = web.Application([
            (r'(.*)', MainHandler, {
                'submit': submit,
                'router': self.router   
            })
        ])

        application.listen(self.port)

        ioloop.IOLoop.instance().start()

处理所有 tornado 请求的主处理程序(只实现了 GET,但其他都是一样的):

class MainHandler():
    def initialize(self, submit, router):
        self.submit = submit
        self.router = router

    def worker(self, request):
        responder, kwargs = self.router.resolve(request)
        response = responder(**kwargs)
        return res

    def on_response(self, response):
        # when this is called response should already have result
        if isinstance(response, Future):
            response = response.result()
        # response is my own class, just write returned content to client
        self.write(response.data)
        self.flush()
        self.finish()

    def _on_response_ready(self, response):
        # schedule response processing in ioloop, to be on ioloop thread
        ioloop.IOLoop.current().add_callback(
            partial(self.on_response, response)
        )

    @web.asynchronous
    def get(self, url):
        self.submit(
            self._on_response_ready, # callback
            url=url, method='post', original_request=self.request
        )

服务器以这样的方式启动:

router = Router()
server = TornadoServer(router, 1111, max_workers=50)
server.run()

因此,如您所见,主处理程序只是将每个请求提交到线程池,当处理完成时,回调被调用(_on_response_ready),它只是安排请求完成在 IO 循环上执行(以确保它是在执行 IO 循环的同一线程上完成的)。

这行得通。至少看起来是这样。

我的问题是关于 ThreadPoolExecutor 中最大工作线程的性能。

所有处理程序都是 IO 绑定(bind)的,没有计算在进行(它们主要是在等待数据库或外部服务),所以对于 50 个工作人员,我预计 50 个并发请求的完成速度大约比只有一个的 50 个并发请求快 50 倍 worker 。

但事实并非如此。当我在线程池中有 50 个工作人员和 1 个工作人员时,我看到的是每秒几乎相同的请求。

为了测量,我使用了 Apache-Bench 和类似的东西:

ab -n 100 -c 10 http://localhost:1111/some_url

有人知道我做错了什么吗?我是否误解了 Tornado 或 ThreadPool 的工作原理?还是组合?

最佳答案

按照 kwarunek 的建议,用于 postgres 的 momoko 包装器解决了这个问题。如果您想征求外部合作者的进一步调试建议,将在每次访问数据库之前执行 sleep(10) 的测试任务发布带时间戳的调试日志会有所帮助。

关于python - Tornado 与 ThreadPoolExecutor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32211102/

相关文章:

python - 在 Windows 上的本地主机上运行 Ansible-Playbook

python - 如何在 Google Colab 中进行文本到语音的转换?

c - 如何在 C 中为多线程使用互斥量?

java - 单例模式的正确使用

c++ - 无需检查即可将标准输入快速读入缓冲区

c++ - 减少 fgets() 中的填充次数

python - 如何 reshape 图像的尺寸以包含图像数量(即 1)?

python - 实例方法的装饰器

在 Linux 上使用任务集的多核系统上的 Python 全局解释器锁定 (GIL) 解决方法?

java - `InputStream`和 `Reader`本质上是相同的, `OutputStream`和 `Writer`本质上是相同的吗?