python - 定期检查事件/触发器的更优雅的方式?

标签 python multithreading event-handling timeout raspberry-pi

我有一个被动红外传感器,我想根据运动关闭和打开我的显示器。例如。如果 5 分钟内没有任何 Action ,则显示屏应关闭以节省电量。但是,如果有运动,请勿关闭显示屏,或重新打开它。 (不要问为什么屏幕保护程序不适合这个。我正在制作的设备没有任何键盘或鼠标。它只是一个独立的显示器。)

我的想法是创建两个线程,一个生产者和一个消费者。生产者线程(PIR 传感器)将消息放入队列中,消费者(控制显示)读取该消息。这样我就可以将信号从一个发送到另一个。

我在下面有一个功能齐全的代码(带有一些解释),它完成了前面描述的内容。我的问题是有没有办法以更优雅的方式实现这一点?你觉得我的方法怎么样,可以吗?是不是很黑客?

#!/usr/bin/env python

import Queue
from threading import Thread
import RPi.GPIO as gpio
import time
import os
import sys

class PIRSensor:

    # PIR sensor's states
    current_state = 0
    previous_state = 0

    def __init__(self, pir_pin, timeout):
        # PIR GPIO pin
        self.pir_pin = pir_pin
        # Timeout between motion detections
        self.timeout = timeout

    def setup(self):
        gpio.setmode(gpio.BCM)
        gpio.setup(self.pir_pin, gpio.IN)
        # Wait for the PIR sensor to settle
        # (loop until PIR output is 0)
        while gpio.input(self.pir_pin) == 1:
            self.current_state = 0

    def report_motion(self, queue):
        try:
            self.setup()

            while True:
                self.current_state = gpio.input(self.pir_pin)

                if self.current_state == 1 and self.previous_state == 0:
                    # PIR sensor is triggered
                    queue.put(True)
                    # Record previous state
                    self.previous_state = 1
                elif self.current_state == 1 and self.previous_state == 1:
                    # Feed the queue since there is still motion
                    queue.put(True)
                elif self.current_state == 0 and self.previous_state == 1:
                    # PIR sensor has returned to ready state
                    self.previous_state = 0

                time.sleep(self.timeout)
        except KeyboardInterrupt:
            raise

class DisplayControl:

    # Display's status
    display_on = True

    def __init__(self, timeout):
        self.timeout = timeout

    def turn_off(self):
        # Turn off the display
        if self.display_on:
            os.system("/opt/vc/bin/tvservice -o > /dev/null 2>&1")
            self.display_on = False

    def turn_on(self):
        # Turn on the display
        if not self.display_on:
            os.system("{ /opt/vc/bin/tvservice -p && chvt 9 && chvt 7 ; } > /dev/null 2>&1")
            self.display_on = True

    def check_motion(self, queue):
        try:
            while True:
                try:
                    motion = queue.get(True, self.timeout)

                    if motion:
                        self.turn_on()
                except Queue.Empty:
                        self.turn_off()
        except KeyboardInterrupt:
            raise

if __name__ == "__main__":
    try:
        pir_sensor = PIRSensor(7, 0.25)
        display_control = DisplayControl(300)
        queue = Queue.Queue()

        producer = Thread(target=pir_sensor.report_motion, args=(queue,))
        consumer = Thread(target=display_control.check_motion, args=(queue,))

        producer.daemon = True
        consumer.daemon = True

        producer.start()
        consumer.start()

        while True:
            time.sleep(0.1)
    except KeyboardInterrupt:
        display_control.turn_on()
        # Reset GPIO settings
        gpio.cleanup()
        sys.exit(0)

生产者线程运行 PIRSensor 类实例的函数 (report_motion)。 PIRSensor 类每秒读取无源红外传感器的状态四次,每当它感测到运动时就会将消息放入队列中。

消费者线程运行 DisplayControl 类实例的 (check_motion) 函数。它以给定的超时时间以阻塞模式读取前面提到的队列。可能会发生以下情况:

  • 如果显示屏打开并且队列中没有给定的消息 时间,又称为超时到期,消费者线程将关闭电源 显示。
  • 如果显示屏关闭并且有消息出现,线程将 打开显示器电源。

最佳答案

我觉得这个主意不错。我对您的实现的唯一问题是为什么消费者和生产者都在子线程中?您可以将消费者保留在主线程中,然后就不需要在主线程中出现这种无意义的循环。

while True:
    time.sleep(0.1)

这只是浪费 CPU 周期。相反,您可以直接调用 display_motion.check_motion(queue)

关于python - 定期检查事件/触发器的更优雅的方式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23971955/

相关文章:

java - 泛型映射 : read and write

python - Airflow xcom_pull 不提供相同上游任务实例运行的数据,而是提供最新数据

python - 上采样数据时均等分割值

java - Java 的 Thread.sleep 在这里有什么用处?

java - 关于 wait 和 notifyAll

ios - UIButton 不响应触摸事件

java - 为什么它没有显示我的标签?

python - 更快地处理 Pandas 中的 Dataframe

python - shutil.move(scr, dst) 得到 IOError : [Errno 13] Permission denied and 3 more errors

c++ - 使用 std::mutex、std::condition_variable 和 std::unique_lock