python - GAE : unit testing taskqueue with testbed

标签 python unit-testing google-app-engine task-queue

我正在使用测试平台对我的谷歌应用引擎应用进行单元测试,而我的应用使用任务队列。

当我在单元测试期间向任务队列提交任务时,似乎该任务在队列中,但该任务没有执行。

如何让任务在单元测试期间执行?

最佳答案

使用撒克逊人的出色答案,我能够使用 testbed 而不是 gaetestbed 来做同样的事情。这就是我所做的。

将此添加到我的 setUp():

    self.taskqueue_stub = apiproxy_stub_map.apiproxy.GetStub('taskqueue')

然后,在我的测试中,我使用了以下内容:

    # Execute the task in the taskqueue
    tasks = self.taskqueue_stub.GetTasks("default")
    self.assertEqual(len(tasks), 1)
    task = tasks[0]
    params = base64.b64decode(task["body"])
    response = self.app.post(task["url"], params)

在某个地方,POST 参数被 base64 编码,因此必须撤消它才能使其工作。

我比 Saxon 的回答更喜欢这个,因为我可以使用官方的 testbed 包,而且我可以在自己的测试代码中完成所有操作。

编辑:后来我想对使用延迟库提交的任务做同样的事情,但我费了一番脑筋才弄清楚,所以我在这里分享以减轻其他人的痛苦。

如果您的任务队列仅包含延迟提交的任务,那么这将运行所有任务以及由这些任务排队的所有任务:

def submit_deferred(taskq):
    tasks = taskq.GetTasks("default")
    taskq.FlushQueue("default")
    while tasks:
        for task in tasks:
            (func, args, opts) = pickle.loads(base64.b64decode(task["body"]))
            func(*args)
        tasks = taskq.GetTasks("default")
        taskq.FlushQueue("default")

关于python - GAE : unit testing taskqueue with testbed,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6632809/

相关文章:

python - 这两种测试方法的区别

python - 使用 BS4 或 Selenium 从 finishline.com 抓取网页

python - Pandas 绘制字符串出现的总和

actionscript-3 - 使用 ActionScript 3 访问 Google AppEngine Cloud Endpoints?

python - 优化 App Engine 上的 RSS 解析以避免高 CPU 警告

python - 如何在apache2中运行Django应用程序

ios - 使用示例核心数据进行单元测试

unit-testing - Cakephp 3 - 单元测试验证默认

c# - 发布 Moq'ing HttpResponseMessage

java - Google App Engine 中的分片列表