首先,我是 Python 新手,不熟悉它的功能。我一直主要使用MATLAB。
PC 简要规范:Windows 10、Intel i7
我正在尝试制作一个计时器类,用于定期执行 MATLAB 等函数,这显然是从 Java 计时器借来的。 MATLAB 计时器的分辨率约为 1 毫秒,我从未见过它在任何情况下都超过 2 毫秒。事实上,它对我的项目来说已经足够准确了。
最近,由于MATLAB的并行计算和网络访问功能较差,我打算转向Python。然而,不幸的是,与我必须制作自己的计时器类的 MATLAB 相比,Python 的标准包提供了某种程度较低的计时器 (threading.Timer)。首先,我提到了 QnA Executing periodic actions in Python [duplicate] . Michael Anderson 提出的解决方案给出了漂移校正的简单概念。他使用 time.sleep() 来保持时间。该方法非常准确,有时比 MATLAB 计时器显示出更好的准确性。大约0.5 毫秒分辨率。但是,定时器在time.sleep() 中被捕获期间不能被中断(暂停或恢复)。但有时我必须立即停止,无论它是否处于 sleep() 状态。
我发现的问题的解决方案是利用线程包中的 Event 类。引用 Python threading.timer - repeat function every 'n' seconds
.使用 Event.wait() 的超时功能,我可以在执行之间设置时间间隔,并用于保持时间段。也就是说,事件通常会被清除,因此 wait(timeout) 可以像 time.sleep(interval) 一样,我可以在需要时通过设置 event 立即退出 wait()。
一切似乎都很好,但 Event.wait() 中存在一个严重问题。时间延迟在 1 ~ 15 ms 之间变化太大。我认为它来自 Event.wait() 的开销。
我制作了一个示例代码,显示了 time.sleep() 和 Event.wait() 之间的准确性比较。这总计 1000 次迭代 1 ms sleep() 和 wait() 以查看累积时间错误。预期结果约为 1.000。
import time
from threading import Event
time.sleep(3) # to relax
# time.sleep()
tspan = 1
N = 1000
t1 = time.perf_counter()
for _ in range(N):
time.sleep(tspan/N)
t2 = time.perf_counter()
print(t2-t1)
time.sleep(3) # to relax
# Event.wait()
tspan = 1
event = Event()
t1 = time.perf_counter()
for _ in range(N):
event.wait(tspan/N)
t2 = time.perf_counter()
print(t2-t1)
结果:
1.1379848184879964
15.614547161211096
结果表明 time.sleep() 在准确性上要好得多。但是我不能完全依赖前面提到的 time.sleep()。
总之,
我目前正在考虑一个折衷方案:就像在示例中一样,创建一个微小的 time.sleep() 循环(间隔为 0.5 毫秒)并使用 if 语句退出循环并在需要时中断。据我所知,该方法在 Python 2.x Python time.sleep() vs event.wait() 中使用.
这是一个冗长的介绍,但我的问题可以总结如下。
非常感谢。
最佳答案
我遇到了与 Event.wait()
相同的计时问题.我想出的解决方案是创建一个模仿 threading.Event
的类。 .在内部,它使用了 time.sleep()
的组合。循环和繁忙循环可大大提高精度。 sleep 循环在单独的线程中运行,以便阻塞 wait()
主线程中的调用仍然可以立即中断。当set()
方法被调用, sleep 线程应该在不久之后终止。此外,为了最大限度地减少 CPU 使用率,我确保繁忙循环的运行时间不会超过 3 毫秒。
这是我的自定义 Event
类以及最后的计时演示(演示的打印执行时间将以纳秒为单位):
import time
import _thread
import datetime
class Event:
__slots__ = (
"_flag", "_lock", "_nl",
"_pc", "_waiters"
)
_lock_type = _thread.LockType
_timedelta = datetime.timedelta
_perf_counter = time.perf_counter
_new_lock = _thread.allocate_lock
class _switch:
__slots__ = ("_on",)
def __call__(self, on: bool = None):
if on is None:
return self._on
self._on = on
def __bool__(self):
return self._on
def __init__(self):
self._on = False
def clear(self):
with self._lock:
self._flag(False)
def is_set(self) -> bool:
return self._flag()
def set(self):
with self._lock:
self._flag(True)
waiters = self._waiters
for waiter in waiters:
waiter.release()
waiters.clear()
def wait(
self,
timeout: float = None
) -> bool:
with self._lock:
return self._wait(self._pc(), timeout)
def _new_waiter(self) -> _lock_type:
waiter = self._nl()
waiter.acquire()
self._waiters.append(waiter)
return waiter
def _wait(
self,
start: float,
timeout: float,
td=_timedelta,
pc=_perf_counter,
end: _timedelta = None,
waiter: _lock_type = None,
new_thread=_thread.start_new_thread,
thread_delay=_timedelta(milliseconds=3)
) -> bool:
flag = self._flag
if flag:
return True
elif timeout is None:
waiter = self._new_waiter()
elif timeout <= 0:
return False
else:
delay = td(seconds=timeout)
end = td(seconds=start) + delay
if delay > thread_delay:
mark = end - thread_delay
waiter = self._new_waiter()
new_thread(
self._wait_thread,
(flag, mark, waiter)
)
lock = self._lock
lock.release()
try:
if waiter:
waiter.acquire()
if end:
while (
not flag and
td(seconds=pc()) < end
):
pass
finally:
lock.acquire()
if waiter and not flag:
self._waiters.remove(waiter)
return flag()
@staticmethod
def _wait_thread(
flag: _switch,
mark: _timedelta,
waiter: _lock_type,
td=_timedelta,
pc=_perf_counter,
sleep=time.sleep
):
while not flag and td(seconds=pc()) < mark:
sleep(0.001)
if waiter.locked():
waiter.release()
def __new__(cls):
_new_lock = cls._new_lock
_self = object.__new__(cls)
_self._waiters = []
_self._nl = _new_lock
_self._lock = _new_lock()
_self._flag = cls._switch()
_self._pc = cls._perf_counter
return _self
if __name__ == "__main__":
def test_wait_time():
wait_time = datetime.timedelta(microseconds=1)
wait_time = wait_time.total_seconds()
def test(
event=Event(),
delay=wait_time,
pc=time.perf_counter
):
pc1 = pc()
event.wait(delay)
pc2 = pc()
pc1, pc2 = [
int(nbr * 1000000000)
for nbr in (pc1, pc2)
]
return pc2 - pc1
lst = [
f"{i}.\t\t{test()}"
for i in range(1, 11)
]
print("\n".join(lst))
test_wait_time()
del test_wait_time
关于python - 制作计时器 : timeout inaccuracy of threading. Event.wait - Python 3.6,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48984512/