python-3.x - 如何在多线程应用程序中使用 aiopg 池?

标签 python-3.x python-multithreading python-asyncio

我有一个 python 3.4.3、postgreSQL 9.4、aiopg-0.7.0。多线程应用程序的示例取自该站点。如何使用游泳池?选择操作时线程挂起。

import time
import asyncio
import aiopg
import functools
from threading import Thread, current_thread, Event
from concurrent.futures import Future

class B(Thread):
   def __init__(self, start_event):
       Thread.__init__(self)
       self.loop = None
       self.tid = None
       self.event = start_event

   def run(self):
       self.loop = asyncio.new_event_loop()
       asyncio.set_event_loop(self.loop)
       self.tid = current_thread()
       self.loop.call_soon(self.event.set)
       self.loop.run_forever()

   def stop(self):
       self.loop.call_soon_threadsafe(self.loop.stop)

   def add_task(self, coro):
       """this method should return a task object, that I
         can cancel, not a handle"""
      def _async_add(func, fut):
          try:
              ret = func()
              fut.set_result(ret)
          except Exception as e:
              fut.set_exception(e)

       f = functools.partial(asyncio.async, coro, loop=self.loop)
       if current_thread() == self.tid:
           return f() # We can call directly if we're not going between threads.
       else:
           # We're in a non-event loop thread so we use a Future
           # to get the task from the event loop thread once
           # it's ready.
           fut = Future()
           self.loop.call_soon_threadsafe(_async_add, f, fut)
           return fut.result()

   def cancel_task(self, task):
       self.loop.call_soon_threadsafe(task.cancel)


@asyncio.coroutine
def test(pool, name_task):
    while True:
        print(name_task, 'running')
        with (yield from pool.cursor()) as cur:
            print(name_task, " select. ")
            yield from cur.execute("SELECT count(*) FROM test")
            count = yield from cur.fetchone()
            print(name_task, ' Result: ', count)
        yield from asyncio.sleep(3)

@asyncio.coroutine
def connect_db():
    dsn = 'dbname=%s user=%s password=%s host=%s' % ('testdb', 'user', 'passw', '127.0.0.1')
    pool = yield from aiopg.create_pool(dsn)
    print('create pool type =', type(pool))
    # future.set_result(pool)
    return (pool)

event = Event()
b = B(event)
b.start()
event.wait() # Let the loop's thread signal us, rather than sleeping
loop_db = asyncio.get_event_loop()
pool = loop_db.run_until_complete(connect_db())
time.sleep(2)
t = b.add_task(test(pool, 'Task1'))  # This is a real task
t = b.add_task(test(pool, 'Task2'))

while True:
    time.sleep(10)

b.stop()

'yield from cur.execute("SELECT count(*) FROM test")' 不返回结果

最佳答案

长话短说:您不能从不同的事件循环共享 aiopg 池对象。

aiopg.Pool耦合到事件循环。如果您不指定 loop参数明确取自 asyncio.get_event_loop()称呼。

所以在你的例子中,你有一个池耦合到主线程的事件循环。

当您从单独的线程执行 db 查询时,您试图通过执行线程的循环而不是主要的循环来完成它。它不起作用。

关于python-3.x - 如何在多线程应用程序中使用 aiopg 池?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32050921/

相关文章:

python-3.x - 如何从h2标签获取HREF? Python/ Selenium

python - 如何正确替换 QScintilla 小部件上的某些匹配项?

python - 执行 list.pop() 时出现意外结果?

python - 更新传递给线程python的变量

Python create_task 在运行事件循环中不起作用

python - 字符串的列表理解

python - MQTT 订阅在多线程中无法正常工作

python - FastAPI 和 Python 线程

python - 动态添加到 Python asyncio 的事件循环应该执行的列表

python - Python 中并发协程的非阻塞启动