python - 无法在我扭曲的网络资源中测试 reactor.callInThread

标签 python twisted pytest twisted.web

我有一个扭曲的 Web 资源,它处理传入的请求并发送要在队列中处理的 celery 任务,这个 celery 调用是使用 reactor.callInThread 处理的

问题是我想对这段代码进行单元测试,但是像下面这样的简单测试用例失败了,因为 reactor.callInThread 中的代码从未执行过

from twisted.internet.defer import succeed
from twisted.web import server
from twisted.web.resource import Resource
from twisted.web.test.test_web import DummyRequest
from twisted.internet.defer import inlineCallbacks
from twisted.trial.unittest import TestCase
from twisted.internet import reactor

import pytest

class BadBeConnectedResultType(Exception):
    pass

class SmartDummyRequest(DummyRequest):
    def __init__(self, method, url, args=None, headers=None):
        DummyRequest.__init__(self, url.split('/'))
        self.method = method
        self.headers.update(headers or {})

        # set args
        args = args or {}
        for k, v in args.items():
            self.addArg(k, v)


    def value(self):
        return "".join(self.written)


class DummySite(server.Site):
    def get(self, url, args=None, headers=None):
        return self._request("GET", url, args, headers)


    def post(self, url, args=None, headers=None):
        return self._request("POST", url, args, headers)


    def _request(self, method, url, args, headers):
        request = SmartDummyRequest(method, url, args, headers)
        resource = self.getResourceFor(request)
        result = resource.render(request)
        return self._resolveResult(request, result)


    def _resolveResult(self, request, result):
        if isinstance(result, str):
            request.write(result)
            request.finish()
            return succeed(request)
        elif result is server.NOT_DONE_YET:
            if request.finished:
                return succeed(request)
            else:
                return request.notifyFinish().addCallback(lambda _: request)
        else:
            raise ValueError("Unexpected return value: %r" % (result,))

class TwistedBCTestCase(TestCase):
    def setUp(self):
        self.web = DummySite(RootResource())

    @inlineCallbacks
    def test_bc_resource(self):
        with pytest.raises(BadBeConnectedResultType):
            response = yield self.web.post("CLARO", args={
                'ResultType':2,
                'ResultData':1,
                'idDeliver': 1
            })

class RootResource(Resource):

    def render_GET(self, request):
        return ''

    def getChild(self, operator_id, request):
        if operator_id:
            return BCResource(operator_id)

        return self

class BCResource(Resource):

    def __init__(self, operator_id):
        Resource.__init__(self)
        self.operator_id = operator_id

    def send_to_queue(self, request):
        #This is never executed in my test
        try:
            result_type = cgi.escape(request.args['ResultType'][0])
            if result_type != '1':
                raise BadBeConnectedResultType(result_type)

            raw_result_data = cgi.escape(request.args['ResultData'][0])
            unique_id = cgi.escape(request.args['IdDeliver'][0])


            task = ProcessBCNotificationTask.delay(unique_id, raw_result_data)

        except BadBeConnectedResultType as e:
            logger.exception(e)


    def render_POST(self, request):
        reactor.callInThread(self.send_to_queue, request)
        return ''

郑重声明,我正在使用 http://findingscience.com/python/twisted/2012/02/20/testing-twisted-web-resources.html 中描述的方法测试网络资源

$ py.test twisted_test.py --tb=short
====================================================================================== test session starts ======================================================================================
platform linux2 -- Python 2.7.3 -- pytest-2.2.4
collected 1 items 

twisted_test.py F

=========================================================================================== FAILURES ============================================================================================
______________________________________________________________________________ TwistedBCTestCase.test_bc_resource _______________________________________________________________________________
env/lib/python2.7/site-packages/twisted/internet/defer.py:1045: in _inlineCallbacks
>                   result = g.send(result)
twisted_test.py:69: in test_bc_resource
>               'idDeliver': 1
E   Failed: DID NOT RAISE
=================================================================================== 1 failed in 0.30 seconds ====================================================================================

最佳答案

reactor 在测试期间未运行,因此没有线程池。您可以通过创建 dummy threads.deferToThread call 来测试 reactor.callInThread首先。

关于python - 无法在我扭曲的网络资源中测试 reactor.callInThread,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12219087/

相关文章:

python-2.7 - 如何使用 pytest 和 python 测试控制台打印?

python - 从多个文件导入变量

python - 如何在另一个数组之间放置一个 NumPy 数组?

python - 从另一个线程调用一个扭曲的协议(protocol)方法

python - 如何在 Twisted/Python 中使用 AMP 创建双向消息传递

Django channel pytest 测试。 django.core.exceptions.ImproperlyConfigured。 .

python - Py.test 测试类因导入错误而失败

python - 在 Python 中如何从像 True = False 这样的笑话语句中恢复

python - Python Sched 中的问题

python - 使用twistd运行高速公路应用程序