python - 使用Python的asyncio实现时间锁

标签 python locking python-asyncio

我不知道这种锁是否称为时间锁,但我需要一些针对以下场景的东西:我正在使用 aiohttp 发出大量并发请求,并且有可能服务器有时会返回429 Too Many Requests。在这种情况下,我必须暂停所有后续请求一段时间。

我想出了以下解决方案:

import asyncio


class TimeLock:

    def __init__(self, *, loop=None):
        self._locked = False
        self._locked_at = None
        self._time_lock = None
        self._unlock_task = None
        self._num_waiters = 0
        if loop is not None:
            self._loop = loop
        else:
            self._loop = asyncio.get_event_loop()

    def __repr__(self):
        state = f'locked at {self.locked_at}' if self._locked else 'unlocked'
        return f'[{state}] {self._num_waiters} waiters'

    @property
    def locked(self):
        return self._locked

    @property
    def locked_at(self):
        return self._locked_at

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        # in this time lock there is nothing to do when it's released
        return

    async def acquire(self):
        if not self._locked:
            return True
        try:
            print('waiting for lock to be released')
            self._num_waiters += 1
            await self._time_lock
            self._num_waiters -= 1
            print('done, returning now')
        except asyncio.CancelledError:
            if self._locked:
                raise
        return True

    def lock_for(self, delay, lock_more=False):
        print(f'locking for {delay}')
        if self._locked:
            if not lock_more:
                # if we don't want to increase the lock time, we just exit when
                # the lock is already in a locked state
                print('already locked, nothing to do')
                return
            print('already locked, but canceling old unlock task')
            self._unlock_task.cancel()
        self._locked = True
        self._locked_at = time.time()
        self._time_lock = self._loop.create_future()
        self._unlock_task = self._loop.create_task(self.unlock_in(delay))
        print('locked')

    async def unlock_in(self, delay):
        print('unlocking started')
        await asyncio.sleep(delay)
        self._locked = False
        self._locked_at = None
        self._unlock_task = None
        self._time_lock.set_result(True)
        print('unlocked')

我正在使用以下代码测试锁:

import asyncio

from ares.http import TimeLock


async def run(lock, i):
    async with lock:
        print(lock)
        print(i)
        if i in (3, 6, 9):
            lock.lock_for(2)


if __name__ == '__main__':
    lock = TimeLock()
    tasks = []
    loop = asyncio.get_event_loop()
    for i in range(10):
        tasks.append(run(lock, i))
    loop.run_until_complete(asyncio.gather(*tasks))
    print(lock)

代码产生以下输出,这似乎与我想要的上述场景一致:

[unlocked] 0 waiters
0
[unlocked] 0 waiters
1
[unlocked] 0 waiters
2
[unlocked] 0 waiters
3
locking for 2
locked
waiting for lock to be released
waiting for lock to be released
waiting for lock to be released
waiting for lock to be released
waiting for lock to be released
waiting for lock to be released
unlocking started
unlocked
done, returning now
[unlocked] 5 waiters
4
done, returning now
[unlocked] 4 waiters
5
done, returning now
[unlocked] 3 waiters
6
locking for 2
locked
done, returning now
[locked at 1559496296.7109463] 2 waiters
7
done, returning now
[locked at 1559496296.7109463] 1 waiters
8
done, returning now
[locked at 1559496296.7109463] 0 waiters
9
locking for 2
already locked, nothing to do
unlocking started
[locked at 1559496296.7109463] 0 waiters

这是实现此同步原语的正确方法吗? 我也不确定这段代码的线程安全性。我对线程和异步代码没有太多经验。

最佳答案

我没有测试你的代码,但想法似乎不错。仅当您要在不同线程中使用相同的锁对象时,您才应该担心线程安全。正如 Jimmy Engelbrecht 已经指出的那样,asyncio 在单线程中运行,您通常不必担心基元的线程安全性。

还有一些想法:

  • 我对术语很清楚,但似乎这个原语应该被称为 semaphore
  • 您可以继承或仅使用existing primitive(s),而不是从乞求中实现它。
  • 当需要暂停时,您可以委托(delegate)信号量跟踪事件,而不是在客户端代码内执行

这段代码展示了这个想法:

import asyncio


class PausingSemaphore:
    def __init__(self, should_pause, pause_for_seconds):
        self.should_pause = should_pause
        self.pause_for_seconds = pause_for_seconds
        self._is_paused = False
        self._resume = asyncio.Event()

    async def __aenter__(self):
        await self.check_paused()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        if self.should_pause(exc):
            self.pause()

    async def check_paused(self):
        if self._is_paused:
            await self._resume.wait()

    def pause(self):
        if not self._is_paused:
            self._is_paused = True
            asyncio.get_running_loop().call_later(
                self.pause_for_seconds,
                self.unpause
            )

    def unpause(self):
        self._is_paused = False
        self._resume.set()

让我们测试一下:

import aiohttp


def should_pause(exc):
    return (
        type(exc) is aiohttp.ClientResponseError 
        and
        exc.status == 429
    )


pausing_sem = None
regular_sem = None


async def request(url):
    async with regular_sem:
        async with pausing_sem:
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(url, raise_for_status=True) as resp:
                        print('Done!')
            except aiohttp.ClientResponseError:
                print('Too many requests!')
                raise


async def main():
    global pausing_sem
    global regular_sem
    pausing_sem = PausingSemaphore(should_pause, 5)
    regular_sem = asyncio.Semaphore(3)

    await asyncio.gather(
        *[
            request('http://httpbin.org/get'),
            request('http://httpbin.org/get'),
            request('http://httpbin.org/get'),
            request('http://httpbin.org/get'),
            request('http://httpbin.org/get'),
            request('http://httpbin.org/status/429'),
            request('http://httpbin.org/get'),
            request('http://httpbin.org/get'),
            request('http://httpbin.org/get'),
            request('http://httpbin.org/get'),
            request('http://httpbin.org/get'),
        ], 
        return_exceptions=True
    )


if __name__ == '__main__':
    asyncio.run(main())

附注没有对这段代码进行太多测试!

关于python - 使用Python的asyncio实现时间锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56466721/

相关文章:

python - 是否可以在 Pandas 因式分解方法中指定级别的顺序?

python - 在 Python 2.x 与 Python 3.x 中将字符串编码为 Base64

PYTHON 文件更新在 VSCODE 中导入时未反射(reflect)出来

python - 如何使用异步请求保存 JSON 响应?

python - peewee 和 peewee 异步 : why is async slower

python - 如何从主机共享 python 库到多个 docker 容器?

c# - 跨多个应用程序域使用文件时读取\写入文件的最佳方式

rust - 为什么 Rust RwLock 在 fork 时表现异常?

c# 线程安全深拷贝

python - 与 Flask 程序一起使用时出现 Asyncio 错误 "There is no current event loop in thread"