我正在为 flask 应用程序做一些单元测试。其中一部分包括为每个测试重新启动 flask 应用程序。为此,我在 setUp()
中创建了我的 flask 应用程序。我的 unitest.TestCase
的功能,以便每次运行时我都能使应用程序处于新状态。另外,我在一个单独的线程中启动应用程序,这样测试就可以在没有 flask 应用程序阻塞的情况下运行。
下面的例子:
import requests
import unittest
from threading import Thread
class MyTest(unittest.TestCase):
def setUp(self):
test_port = 8000
self.test_url = f"http://0.0.0.0:{str(test_port)}"
self.app_thread = Thread(target=app.run, kwargs={"host": "0.0.0.0", "port": test_port, "debug": False})
self.app_thread.start()
def test_a_test_that_contacts_the_server(self):
response = requests.post(
f"{self.test_url}/dosomething",
json={"foo": "bar"},
headers=foo_bar
)
is_successful = json.loads(response.text)["isSuccessful"]
self.assertTrue(is_successful, msg=json.loads(response.text)["message"])
def tearDown(self):
# what should I do here???
pass
这会成为问题,因为在初始测试之后运行的测试会遇到端口
8000
的问题。正在使用。这引发了 OSError: [Errno 98] Address already in use
.(现在,我已经构建了一个解决方法,在其中我生成一个高范围端口列表,以及每个测试使用的另一个端口列表,这样我就永远不会选择以前测试使用的端口。这个解决方法有效,但我真的很想知道关闭这个 flask 应用程序的正确方法,最终关闭连接并释放/释放该端口。)
我希望在
tearDown()
中有一个特定的方法来关闭这个 flask 应用程序。功能。我应该如何关闭我的
tearDown()
中的 flask 应用程序方法?
最佳答案
我在写它的时候找到了我自己的问题的解决方案,并且因为 it's encouraged to answer your own question on Stack Overflow ,我仍然想与其他有相同问题的人分享。
这个问题的解决方案是将flask应用程序视为另一个进程而不是线程。这是使用 Process
完成的。来自 multiprocessing
代替 Thread
的模块来自 threading
模块。
看完this Stack Overflow answer我得出了这个结论关于在不使用 CTRL + C 的情况下停止 flask 。阅读该答案然后引导我阅读 multiprocessing
之间的差异和 threading
in this Stack Overflow answer .当然,在那之后,我转到了 multiprocessing
上的官方文档。模块,found here .更具体地说,this link will take you straight to the Process
class.
我无法完全解释为什么 multiprocessing
模块比 threading
更适合此目的,但我确实觉得这对这个应用程序更有意义。毕竟,flask 应用程序充当自己的 API 服务器,与我的测试分开,我的测试正在测试对它的调用/响应它返回。出于这个原因,我认为将我的 flask 应用程序作为自己的进程是最有意义的。
tl;博士
使用multiprocessing.Process
代替threading.Thread
,然后调用Process.terminate()
杀死进程,然后是 Process.join()
阻塞直到进程终止。
例子:
import requests
import unittest
from multiprocessing import Process
class MyTest(unittest.TestCase):
def setUp(self):
test_port = 8000
self.test_url = f"http://0.0.0.0:{str(test_port)}"
self.app_process = Process(target=app.run, kwargs={"host": "0.0.0.0", "port": test_port, "debug": False})
self.app_process.start()
def test_a_test_that_contacts_the_server(self):
response = requests.post(
f"{self.test_url}/dosomething",
json={"foo": "bar"},
headers=foo_bar
)
is_successful = json.loads(response.text)["isSuccessful"]
self.assertTrue(is_successful, msg=json.loads(response.text)["message"])
def tearDown(self):
self.app_process.terminate()
self.app_process.join()
尽早测试,并经常测试!
关于python - 每次测试后 flask 单元测试不关闭端口,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59816374/