python - 制作计时器 : timeout inaccuracy of threading. Event.wait - Python 3.6

标签 python multithreading timer sleep periodic-task

首先,我是 Python 新手,不熟悉它的功能。我一直主要使用MATLAB。

PC 简要规范:Windows 10、Intel i7

我正在尝试制作一个计时器类,用于定期执行 MATLAB 等函数,这显然是从 Java 计时器借来的。 MATLAB 计时器的分辨率约为 1 毫秒,我从未见过它在任何情况下都超过 2 毫秒。事实上,它对我的​​项目来说已经足够准确了。

最近,由于MATLAB的并行计算和网络访问功能较差,我打算转向Python。然而,不幸的是,与我必须制作自己的计时器类的 MATLAB 相比,Python 的标准包提供了某种程度较低的计时器 (threading.Timer)。首先,我提到了 QnA Executing periodic actions in Python [duplicate] . Michael Anderson 提出的解决方案给出了漂移校正的简单概念。他使用 time.sleep() 来保持时间。该方法非常准确,有时比 MATLAB 计时器显示出更好的准确性。大约0.5 毫秒分辨率。但是,定时器在time.sleep() 中被捕获期间不能被中断(暂停或恢复)。但有时我必须立即停止,无论它是否处于 sleep() 状态。

我发现的问题的解决方案是利用线程包中的 Event 类。引用 Python threading.timer - repeat function every 'n' seconds .使用 Event.wait() 的超时功能,我可以在执行之间设置时间间隔,并用于保持时间段。也就是说,事件通常会被清除,因此 wait(timeout) 可以像 time.sleep(interval) 一样,我可以在需要时通过设置 event 立即退出 wait()。

一切似乎都很好,但 Event.wait() 中存在一个严重问题。时间延迟在 1 ~ 15 ms 之间变化太大。我认为它来自 Event.wait() 的开销。

我制作了一个示例代码,显示了 time.sleep() 和 Event.wait() 之间的准确性比较。这总计 1000 次迭代 1 ms sleep() 和 wait() 以查看累积时间错误。预期结果约为 1.000。

import time
from threading import Event

time.sleep(3)  # to relax

# time.sleep()
tspan = 1
N = 1000
t1 = time.perf_counter()
for _ in range(N):
    time.sleep(tspan/N)
t2 = time.perf_counter()

print(t2-t1)

time.sleep(3)  # to relax

# Event.wait()    
tspan = 1
event = Event()
t1 = time.perf_counter()
for _ in range(N):
    event.wait(tspan/N)
t2 = time.perf_counter()

print(t2-t1)

结果:
1.1379848184879964
15.614547161211096

结果表明 time.sleep() 在准确性上要好得多。但是我不能完全依赖前面提到的 time.sleep()。

总之,
  • time.sleep():准确但不可中断
  • threading.Event.wait():不准确但可中断

  • 我目前正在考虑一个折衷方案:就像在示例中一样,创建一个微小的 time.sleep() 循环(间隔为 0.5 毫秒)并使用 if 语句退出循环并在需要时中断。据我所知,该方法在 Python 2.x Python time.sleep() vs event.wait() 中使用.

    这是一个冗长的介绍,但我的问题可以总结如下。
  • 我可以通过外部信号或事件强制线程进程从 time.sleep() 中断吗? (这似乎是最有效的。???)
  • 使 Event.wait() 更准确或减少开销时间。
  • 除了 sleep() 和 Event.wait() 方法之外,是否还有更好的方法来提高计时精度。

  • 非常感谢。

    最佳答案

    我遇到了与 Event.wait() 相同的计时问题.我想出的解决方案是创建一个模仿 threading.Event 的类。 .在内部,它使用了 time.sleep() 的组合。循环和繁忙循环可大大提高精度。 sleep 循环在单独的线程中运行,以便阻塞 wait()主线程中的调用仍然可以立即中断。当set()方法被调用, sleep 线程应该在不久之后终止。此外,为了最大限度地减少 CPU 使用率,我确保繁忙循环的运行时间不会超过 3 毫秒。
    这是我的自定义 Event类以及最后的计时演示(演示的打印执行时间将以纳秒为单位):

    import time
    import _thread
    import datetime
    
    
    class Event:
        __slots__ = (
            "_flag", "_lock", "_nl",
            "_pc", "_waiters"
        )
    
        _lock_type = _thread.LockType
        _timedelta = datetime.timedelta
        _perf_counter = time.perf_counter
        _new_lock = _thread.allocate_lock
    
        class _switch:
            __slots__ = ("_on",)
    
            def __call__(self, on: bool = None):
                if on is None:
                    return self._on
    
                self._on = on
    
            def __bool__(self):
                return self._on
    
            def __init__(self):
                self._on = False
    
        def clear(self):
            with self._lock:
                self._flag(False)
    
        def is_set(self) -> bool:
            return self._flag()
    
        def set(self):
            with self._lock:
                self._flag(True)
                waiters = self._waiters
    
                for waiter in waiters:
                    waiter.release()
    
                waiters.clear()
    
        def wait(
            self,
            timeout: float = None
        ) -> bool:
            with self._lock:
                return self._wait(self._pc(), timeout)
    
        def _new_waiter(self) -> _lock_type:
            waiter = self._nl()
            waiter.acquire()
            self._waiters.append(waiter)
            return waiter
    
        def _wait(
            self,
            start: float,
            timeout: float,
            td=_timedelta,
            pc=_perf_counter,
            end: _timedelta = None,
            waiter: _lock_type = None,
            new_thread=_thread.start_new_thread,
            thread_delay=_timedelta(milliseconds=3)
        ) -> bool:
            flag = self._flag
    
            if flag:
                return True
            elif timeout is None:
                waiter = self._new_waiter()
            elif timeout <= 0:
                return False
            else:
                delay = td(seconds=timeout)
                end = td(seconds=start) + delay
    
                if delay > thread_delay:
                    mark = end - thread_delay
                    waiter = self._new_waiter()
                    new_thread(
                        self._wait_thread,
                        (flag, mark, waiter)
                    )
    
            lock = self._lock
            lock.release()
    
            try:
                if waiter:
                    waiter.acquire()
    
                if end:
                    while (
                        not flag and
                        td(seconds=pc()) < end
                    ):
                        pass
    
            finally:
                lock.acquire()
    
                if waiter and not flag:
                    self._waiters.remove(waiter)
    
            return flag()
    
        @staticmethod
        def _wait_thread(
            flag: _switch,
            mark: _timedelta,
            waiter: _lock_type,
            td=_timedelta,
            pc=_perf_counter,
            sleep=time.sleep
        ):
            while not flag and td(seconds=pc()) < mark:
                sleep(0.001)
    
            if waiter.locked():
                waiter.release()
    
        def __new__(cls):
            _new_lock = cls._new_lock
            _self = object.__new__(cls)
            _self._waiters = []
            _self._nl = _new_lock
            _self._lock = _new_lock()
            _self._flag = cls._switch()
            _self._pc = cls._perf_counter
            return _self
    
    
    if __name__ == "__main__":
        def test_wait_time():
            wait_time = datetime.timedelta(microseconds=1)
            wait_time = wait_time.total_seconds()
    
            def test(
                event=Event(),
                delay=wait_time,
                pc=time.perf_counter
            ):
                pc1 = pc()
                event.wait(delay)
                pc2 = pc()
                pc1, pc2 = [
                    int(nbr * 1000000000)
                    for nbr in (pc1, pc2)
                ]
                return pc2 - pc1
    
            lst = [
                f"{i}.\t\t{test()}"
                for i in range(1, 11)
            ]
            print("\n".join(lst))
    
        test_wait_time()
        del test_wait_time
    

    关于python - 制作计时器 : timeout inaccuracy of threading. Event.wait - Python 3.6,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48984512/

    相关文章:

    python - python中将字符串与numpy数组中的格式化数字转换的最快方法是什么

    python - 用python通过麦克风播放mp3文件

    c# - 有没有办法在 c# 中获取所有线程的堆栈跟踪,比如 java.lang.Thread.getAllStackTraces()?

    javascript - 如何实现跨页定时器?

    c - 如何使 C 计时器在 Linux 中的特定系统时间到期

    python - 子类中变量的值去了哪里?

    python - 尝试用漂亮的汤隔离 1 列

    来自另一个线程的 C# UnhandledException 不断循环

    ruby-on-rails - 如果花费的时间超过 X 秒,则退出线程

    javascript - 清除 setTimeout 或 reset 函数