(注意:这个问题的背景非常冗长,但底部有一个 SSCCE 可以跳过)
背景
我正在尝试开发一个基于 Python 的 CLI 来与 Web 服务进行交互。在我的代码库中,我有一个 CommunicationService
类,用于处理与 Web 服务的所有直接通信。它公开了一个 received_response
属性,该属性返回一个 Observable
(来自 RxPY),其他对象可以订阅该代码,以便在从 Web 服务收到响应时收到通知。
我的 CLI 逻辑基于 click
库,其中一个子命令实现如下:
async def enabled(self, request: str, response_handler: Callable[[str], Tuple[bool, str]]) -> None:
self._generate_request(request)
if response_handler is None:
return None
while True:
response = await self.on_response
success, value = response_handler(response)
print(success, value)
if success:
return value
这里发生的事情(在
response_handler
不是 None
的情况下)是该子命令表现为一个协程,等待来自 Web 服务( self.on_response == CommunicationService.received_response
)的响应并从它可以处理的第一个响应中返回一些处理过的值。我试图通过创建完全模拟
CommunicationService
的测试用例来测试我的 CLI 的行为;创建了一个假的 Subject
(它可以充当 Observable
)并且 CommunicationService.received_response
被模拟以返回它。作为测试的一部分,主题的 on_next
方法被调用以将模拟 Web 服务响应传递回生产代码:@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
context.mock_received_response_subject.on_next(context.text)
我使用单击“结果回调”函数,该函数在 CLI 调用结束时调用并阻塞,直到协程(子命令)完成:
@cli.resultcallback()
def _handle_command_task(task: Coroutine, **_) -> None:
if task:
loop = asyncio.get_event_loop()
result = loop.run_until_complete(task)
loop.close()
print('RESULT:', result)
问题
在测试开始时,我运行
CliRunner.invoke
来触发整个 shebang。问题是这是一个阻塞调用,它将阻塞线程,直到 CLI 完成并返回结果,如果我需要我的测试线程继续运行,它就可以与它同时生成模拟 Web 服务响应,这没有帮助。我想我需要做的是使用
CliRunner.invoke
在新线程上运行 ThreadPoolExecutor
。这允许测试逻辑在原始线程上继续并执行上面发布的 @when
步骤。但是,使用 mock_received_response_subject.on_next
发布的通知似乎不会触发在子命令中继续执行。我相信解决方案将涉及使用 RxPY 的
AsyncIOScheduler
,但我发现有关此的文档有点稀疏且无用。南昌
下面的片段捕获了我希望是问题的本质。如果可以对其进行修改以使其正常工作,我应该能够将相同的解决方案应用于我的实际代码以使其按照我想要的方式运行。
import asyncio
import logging
import sys
import time
import click
from click.testing import CliRunner
from rx.subjects import Subject
web_response_subject = Subject()
web_response_observable = web_response_subject.as_observable()
thread_loop = asyncio.new_event_loop()
@click.group()
def cli():
asyncio.set_event_loop(thread_loop)
@cli.resultcallback()
def result_handler(task, **_):
loop = asyncio.get_event_loop()
result = loop.run_until_complete(task) # Should block until subject publishes value
loop.close()
print(result)
@cli.command()
async def get_web_response():
return await web_response_observable
def test():
runner = CliRunner()
future = thread_loop.run_in_executor(None, runner.invoke, cli, ['get_web_response'])
time.sleep(1)
web_response_subject.on_next('foo') # Simulate reception of web response.
time.sleep(1)
result = future.result()
print(result.output)
logging.basicConfig(
level=logging.DEBUG,
format='%(threadName)10s %(name)18s: %(message)s',
stream=sys.stderr,
)
test()
当前行为
程序在运行时挂起,在
result = loop.run_until_complete(task)
处阻塞。验收标准
程序终止并在
foo
上打印 stdout
。更新 1
基于 Vincent 的帮助,我对代码进行了一些更改。
Relay.enabled
(等待来自 Web 服务的响应以处理它们的子命令)现在实现如下:async def enabled(self, request: str, response_handler: Callable[[str], Tuple[bool, str]]) -> None:
self._generate_request(request)
if response_handler is None:
return None
return await self.on_response \
.select(response_handler) \
.where(lambda result, i: result[0]) \
.select(lambda result, index: result[1]) \
.first()
我不太确定
await
将如何处理 RxPY
observables - 它们会在每个生成的元素上将执行返回给调用者,还是仅当 observable 完成(或出错?)。我现在知道是后者,老实说,这感觉是更自然的选择,并且让我使这个函数的实现感觉更加优雅和 react 性。我还修改了生成模拟 Web 服务响应的测试步骤:
@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
loop = asyncio.get_event_loop()
loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)
不幸的是,这将无法正常工作,因为 CLI 是在它自己的线程中调用的......
@when('the CLI is run with "{arguments}"')
def step_impl(context, arguments):
loop = asyncio.get_event_loop()
if 'async.cli' in context.tags:
context.async_result = loop.run_in_executor(None, context.cli_runner.invoke, testcube.cli, arguments.split())
else:
...
并且 CLI 在调用时创建自己的线程私有(private)事件循环......
def cli(context, hostname, port):
_initialize_logging(context.meta['click_log.core.logger']['level'])
# Create a new event loop for processing commands asynchronously on.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
...
我认为我需要的是一种方法,允许我的测试步骤在新线程上调用 CLI,然后获取它正在使用的事件循环:
@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
loop = _get_cli_event_loop() # Needs to be implemented.
loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)
更新 2
似乎没有一种简单的方法可以获取特定线程创建并为自己使用的事件循环,因此我接受了 Victor 的建议并模拟了
asyncio.new_event_loop
以返回我的测试代码创建和存储的事件循环:def _apply_mock_event_loop_patch(context):
# Close any already-existing exit stacks.
if hasattr(context, 'mock_event_loop_exit_stack'):
context.mock_event_loop_exit_stack.close()
context.test_loop = asyncio.new_event_loop()
print(context.test_loop)
context.mock_event_loop_exit_stack = ExitStack()
context.mock_event_loop_exit_stack.enter_context(
patch.object(asyncio, 'new_event_loop', spec=True, return_value=context.test_loop))
我更改了“收到的模拟 Web 响应”测试步骤以执行以下操作:
@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
loop = context.test_loop
loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)
好消息是,当这一步执行时,我实际上让
Relay.enabled
协程触发!现在唯一的问题是最后的测试步骤,我等待在自己的线程中执行 CLI 得到的 future ,并验证 CLI 是否在
stdout
上发送:@then('the CLI should print "{output}"')
def step_impl(context, output):
if 'async.cli' in context.tags:
loop = asyncio.get_event_loop() # main loop, not test loop
result = loop.run_until_complete(context.async_result)
else:
result = context.result
assert_that(result.output, equal_to(output))
我试过玩这个,但我似乎无法让
context.async_result
(从 loop.run_in_executor
存储 future )很好地转换到 done
并返回结果。使用当前的实现,我得到第一个测试 (1.1
) 的错误和第二个 (1.2
) 的无限期挂起: @mock.comms @async.cli @wip
Scenario Outline: Querying relay enable state -- @1.1 # testcube/tests/features/relay.feature:45
When the user queries the enable state of relay 0 # testcube/tests/features/steps/relay.py:17 0.003s
Then the CLI should query the web service about the enable state of relay 0 # testcube/tests/features/steps/relay.py:48 0.000s
When the communications service receives a response from TestCube Web Service # testcube/tests/features/steps/core.py:58 0.000s
"""
{'module':'relays','path':'relays[0].enabled','data':[True]}'
"""
Then the CLI should print "True" # testcube/tests/features/steps/core.py:94 0.003s
Traceback (most recent call last):
File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/behave/model.py", line 1456, in run
match.run(runner.context)
File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/behave/model.py", line 1903, in run
self.func(context, *args, **kwargs)
File "testcube/tests/features/steps/core.py", line 99, in step_impl
result = loop.run_until_complete(context.async_result)
File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
return future.result()
File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/concurrent/futures/thread.py", line 55, in run
result = self.fn(*self.args, **self.kwargs)
File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/click/testing.py", line 299, in invoke
output = out.getvalue()
ValueError: I/O operation on closed file.
Captured stdout:
RECEIVED WEB RESPONSE: {'module':'relays','path':'relays[0].enabled','data':[True]}'
<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py:431]>
@mock.comms @async.cli @wip
Scenario Outline: Querying relay enable state -- @1.2 # testcube/tests/features/relay.feature:46
When the user queries the enable state of relay 1 # testcube/tests/features/steps/relay.py:17 0.005s
Then the CLI should query the web service about the enable state of relay 1 # testcube/tests/features/steps/relay.py:48 0.001s
When the communications service receives a response from TestCube Web Service # testcube/tests/features/steps/core.py:58 0.000s
"""
{'module':'relays','path':'relays[1].enabled','data':[False]}'
"""
RECEIVED WEB RESPONSE: {'module':'relays','path':'relays[1].enabled','data':[False]}'
Then the CLI should print "False" # testcube/tests/features/steps/core.py:94
第 3 章:结局
搞砸所有这些异步多线程的东西,我太笨了。
首先,而不是像这样描述场景......
When the user queries the enable state of relay <relay_id>
Then the CLI should query the web service about the enable state of relay <relay_id>
When the communications service receives a response from TestCube Web Service:
"""
{"module":"relays","path":"relays[<relay_id>].enabled","data":[<relay_enabled>]}
"""
Then the CLI should print "<relay_enabled>"
我们是这样描述的:
Given the communications service will respond to requests:
"""
{"module":"relays","path":"relays[<relay_id>].enabled","data":[<relay_enabled>]}
"""
When the user queries the enable state of relay <relay_id>
Then the CLI should query the web service about the enable state of relay <relay_id>
And the CLI should print "<relay_enabled>"
实现新的给定步骤:
@given('the communications service will respond to requests')
def step_impl(context):
response = context.text
def publish_mock_response(_):
loop = context.test_loop
loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, response)
# Configure the mock comms service to publish a mock response when a request is made.
instance = context.mock_comms.return_value
instance.send_request.on_next.side_effect = publish_mock_response
繁荣
2 features passed, 0 failed, 0 skipped
22 scenarios passed, 0 failed, 0 skipped
58 steps passed, 0 failed, 0 skipped, 0 undefined
Took 0m0.111s
最佳答案
我可以看到您的代码有两个问题:
RxPy
不使用 Observable.to_future 中的任何内容,所以你必须访问 RxPy
运行 asyncio 事件循环的同一线程中的对象。 RxPy
当 on_completed
时设置 future 的结果被调用,以便等待 observable 返回最后发出的对象。这意味着您必须同时拨打 on_next
和 on_completed
获取 await
返回。 这是一个工作示例:
import click
import asyncio
from rx.subjects import Subject
from click.testing import CliRunner
web_response_subject = Subject()
web_response_observable = web_response_subject.as_observable()
main_loop = asyncio.get_event_loop()
@click.group()
def cli():
pass
@cli.resultcallback()
def result_handler(task, **_):
future = asyncio.run_coroutine_threadsafe(task, main_loop)
print(future.result())
@cli.command()
async def get_web_response():
return await web_response_observable
def test():
runner = CliRunner()
future = main_loop.run_in_executor(
None, runner.invoke, cli, ['get_web_response'])
main_loop.call_later(1, web_response_subject.on_next, 'foo')
main_loop.call_later(2, web_response_subject.on_completed)
result = main_loop.run_until_complete(future)
print(result.output, end='')
if __name__ == '__main__':
test()
关于python - 如何使用 asyncio 在单独的线程上通知 RxPY 观察者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39318723/