我正在使用测试平台对我的谷歌应用引擎应用进行单元测试,而我的应用使用任务队列。
当我在单元测试期间向任务队列提交任务时,似乎该任务在队列中,但该任务没有执行。
如何让任务在单元测试期间执行?
最佳答案
使用撒克逊人的出色答案,我能够使用 testbed 而不是 gaetestbed 来做同样的事情。这就是我所做的。
将此添加到我的 setUp()
:
self.taskqueue_stub = apiproxy_stub_map.apiproxy.GetStub('taskqueue')
然后,在我的测试中,我使用了以下内容:
# Execute the task in the taskqueue
tasks = self.taskqueue_stub.GetTasks("default")
self.assertEqual(len(tasks), 1)
task = tasks[0]
params = base64.b64decode(task["body"])
response = self.app.post(task["url"], params)
在某个地方,POST 参数被 base64 编码,因此必须撤消它才能使其工作。
我比 Saxon 的回答更喜欢这个,因为我可以使用官方的 testbed 包,而且我可以在自己的测试代码中完成所有操作。
编辑:后来我想对使用延迟库提交的任务做同样的事情,但我费了一番脑筋才弄清楚,所以我在这里分享以减轻其他人的痛苦。
如果您的任务队列仅包含延迟提交的任务,那么这将运行所有任务以及由这些任务排队的所有任务:
def submit_deferred(taskq):
tasks = taskq.GetTasks("default")
taskq.FlushQueue("default")
while tasks:
for task in tasks:
(func, args, opts) = pickle.loads(base64.b64decode(task["body"]))
func(*args)
tasks = taskq.GetTasks("default")
taskq.FlushQueue("default")
关于python - GAE : unit testing taskqueue with testbed,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6632809/