用于 GPIO Led blink 的 Python 线程类

标签 python asynchronous raspberry-pi python-multithreading gpio

不幸的是,我在各方面尝试解决这个问题后没有任何结果后来到这里。

我的想法:

我需要一个名为 Led 的类,它在构造函数中只接受一个 GPIO 引脚并提供以下方法:

  • 点亮
  • 关灯
  • 闪烁

我的工作:

我以这种方式构建了这个类:

import RPi.GPIO as GPIO
import time
import threading
from threading import Thread


class Led(Thread):

    def __init__(self, led_pin):
        Thread.__init__(self)
        self.pin_stop = threading.Event()
        self.__led_pin = led_pin
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(self.__led_pin, GPIO.OUT)

    def low(self, pin):
        GPIO.setup(pin, GPIO.OUT)
        GPIO.output(pin, GPIO.LOW)

    def blink(self, time_on=0.050, time_off=1):
        pin = threading.Thread(name='ledblink',target=self.__blink_pin, args=(time_on, time_off, self.pin_stop))
        pin.start()

    def __blink_pin(self, time_on, time_off, pin_stop):
        while not pin_stop.is_set():
            GPIO.output(self.__led_pin, GPIO.HIGH)
            time.sleep(time_on)
            GPIO.output(self.__led_pin, GPIO.LOW)
            time.sleep(time_off)

    def __stop(self):
        self.pin_stop.set()


    def reset(self):
        GPIO.cleanup()

    def off(self):
        self.__stop()

    def on(self):
        self.__stop()
        GPIO.output(self.__led_pin, GPIO.LOW)
        GPIO.output(self.__led_pin, GPIO.HIGH)

闪烁方法负责无限期闪烁 LED,直到调用 Off 或 On 方法。

然后运行这个简单的代码:

from classes.leds import Led
import time
from random import randint

Led16 = Led(16)

def main():
    while True:
        if (randint(0, 1) == 1):
            Led16.blink()
        else:
            Led16.off()
        time.sleep(2)


if __name__ == "__main__":
    main()

会发生什么:

每次调用一个方法时,Led 对象似乎都会产生一个新线程,从而使 GPIO 线在多个线程之间共享。

我的愿望是什么:

我想保持异步闪烁 LED(显然)并控制 Led16() 对象状态,也许每次我调用它的方法时都不需要创建新线程,但在达到这一点时我有点困惑。

感谢帮助我了解如何实现这一目标。

最佳答案

您正在创建大量线程,因为您的 blink() 每次调用都会创建一个新线程,而旧线程不会停止。

我想线程有几个选项:

  1. 只创建一次线程 - 例如在 __init__() 中 - 它会在闪烁的时间间隔内连续运行(即大部分时间处于休眠状态)读取实例变量并设置LED相应。要更改 LED 状态,blink()on()off() 通过将此实例变量设置为 on/来控制 LED熄灭/闪烁。

  2. 在闪烁例程中,如果线程已经在运行,则不要创建新线程,或者停止旧线程(并等待它完成)然后启动一个新线程。

你必须处理的事情是你希望行为是这样的:

  • 如果 LED 熄灭,则一旦调用 on()blink() LED 就会亮起
  • 如果 LED 正在闪烁并且 blink() 再次被调用,则闪烁的开/关序列不会受到干扰
  • 如果闪烁并且 off() 被调用,如果它已经开始运行到完成,我会想要 on-cycle,即 LED 不应该立即关闭,因为这可能会很短看起来很奇怪的闪光灯。

创建新线程的关键在于等待旧线程完成,在 __init__() 中只创建一次线程并让它持续运行感觉最简单。当 LED 打开或关闭时,时间段会缩短(值 FAST_CYCLE),以便当 LED 关闭或打开时它会快速 react ,因为 sleep()时间很短。

关于您的代码的其他几点:

  • 我认为您不需要让您的类继承自 Thread - 您正在 pin=... 行中创建一个新线程。
  • 如果您在编写代码时添加注释,通常会使阅读代码时更容易理解发生了什么。
  • 如果您保留对线程的引用(即 self.pin = threading.Thread 而不是 pin = threading.Thread),则在 reset() 你可以使用 join() 来确保它在你继续之前已经退出
  • 由于闪烁时间段可能会改变并且线程必须使用最新值,因此每次都使用 self 读取它们而不是将它们作为参数传递给 __blink_pin() 例程,如果这样做你也可以使用 self 来获取 pin_stop 信号量。

像这样(未经测试):

import RPi.GPIO as GPIO
import time
import threading
from threading import Thread

class Led(object):

    LED_OFF = 0
    LED_ON = 1
    LED_FLASHING = 2

    # the short time sleep to use when the led is on or off to ensure the led responds quickly to changes to blinking
    FAST_CYCLE = 0.05

    def __init__(self, led_pin):
        # create the semaphore used to make thread exit
        self.pin_stop = threading.Event()
        # the pin for the LED
        self.__led_pin = led_pin
        # initialise the pin and turn the led off
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(self.__led_pin, GPIO.OUT)
        # the mode for the led - off/on/flashing
        self.__ledmode = Led.LED_OFF
        # make sure the LED is off (this also initialises the times for the thread)
        self.off()
        # create the thread, keep a reference to it for when we need to exit
        self.__thread = threading.Thread(name='ledblink',target=self.__blink_pin)
        # start the thread
        self.__thread.start()

    def blink(self, time_on=0.050, time_off=1):
        # blinking will start at the next first period
        # because turning the led on now might look funny because we don't know
        # when the next first period will start - the blink routine does all the
        # timing so that will 'just work'
        self.__ledmode = Led.LED_FLASHING
        self.__time_on = time_on
        self.__time_off = time_off

    def off(self):
        self.__ledmode = LED_OFF
        # set the cycle times short so changes to ledmode are picked up quickly
        self.__time_on = Led.FAST_CYCLE
        self.__time_off = Led.FAST_CYCLE
        # could turn the LED off immediately, might make for a short flicker on if was blinking

    def on(self):
        self.__ledmode = LED_ON
        # set the cycle times short so changes to ledmode are picked up quickly
        self.__time_on = Led.FAST_CYCLE
        self.__time_off = Led.FAST_CYCLE
        # could turn the LED on immediately, might make for a short flicker off if was blinking

    def reset(self):
        # set the semaphore so the thread will exit after sleep has completed
        self.pin_stop.set()
        # wait for the thread to exit
        self.__thread.join()
        # now clean up the GPIO
        GPIO.cleanup()

    ############################################################################
    # below here are private methods
    def __turnledon(self, pin):
        GPIO.output(pin, GPIO.LOW)

    def __turnledoff(self, pin):
        GPIO.output(pin, GPIO.HIGH)

    # this does all the work
    # If blinking, there are two sleeps in each loop
    # if on or off, there is only one sleep to ensure quick response to blink()
    def __blink_pin(self):
        while not self.pin_stop.is_set():
            # the first period is when the LED will be on if blinking
            if self.__ledmode == Led.LED_ON or self.__ledmode == Led.LED_FLASHING: 
                self.__turnledon()
            else:
                self.__turnledoff()
            # this is the first sleep - the 'on' time when blinking
            time.sleep(self.__time_on)
            # only if blinking, turn led off and do a second sleep for the off time
            if self.__ledmode == Led.LED_FLASHING:
                self.__turnledoff()
                # do an extra check that the stop semaphore hasn't been set before the off-time sleep
                if not self.pin_stop.is_set():
                    # this is the second sleep - off time when blinking
                    time.sleep(self.__time_off)

关于用于 GPIO Led blink 的 Python 线程类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46956380/

相关文章:

scala - Play Framework 异步 Controller 阻止对同一 Controller 的后续调用

python - Raspberry Pi 使用 Python 到 mkdir

python - 如何在C/C++程序中加载内核模块

java - 树莓派 "Kiosk"模式

python - 使用 cv2.VideoCapture() 从 IP 摄像机读取流

python - 从堆栈跟踪到树

python - 为什么代码在收到响应后不执行

javascript - 将异步工作流程更改为 Promise (Bluebird)

C# httpClient 不返回内容

python - 如何从马尔可夫链输出创建段落?