python - 在多处理或多线程应用程序中保留 cpu 时间

标签 python multithreading multiprocessing python-3.6 python-asyncio

我正在使用 Raspberry Pi 3 进行一个项目,以通过连续循环中的许多简单重复事件来进行一些环境控制。 RP3 胜任这项工作,但它让我可以专注于其他事情。

应用的特点:

  1. 应用程序应从十几个传感器(温度、湿度、pH 值、ORP 等)收集传感器数据(可变间隔 n 秒)。
  2. Controller 根据时间和这些传感器数据计算输出(开关、阀门和 PWM 驱动器)。
  3. 几乎没有事件需要按顺序运行。
  4. 有些事件属于“安全”类别,触发后应立即运行(故障安全传感器、紧急按钮)。
  5. 大多数事件以秒为间隔重复运行(每秒,直到每 30 秒)。
  6. 一些事件会触发一个 Action ,在 1 到 120 秒内激活一个继电器。
  7. 一些事件使用基于时间的值。该值需要每天计算多次,并且相当占用 CPU(使用一些迭代插值公式,因此具有可变的运行时间)。
  8. 显示环境状态(连续循环)

我熟悉(不是专业)VB.NET,但决定在 Python 3.6 中完成这个项目。 在过去的几个月里,我阅读了很多关于设计模式、线程、进程、事件、并行处理等主题的内容。

根据我的阅读,我认为 Asyncio 与 Executor 中的一些任务相结合可以完成这项工作。

大多数任务/事件都不是时间紧迫的。 Controller 输出可以使用“最新的”传感器数据。 另一方面,某些任务会在特定时间段内激活继电器。我想知道如何对这些任务进行编程,而不会有另一个“耗时”任务在一段时间内(例如)CO2 阀打开时阻塞处理器。这对我的环境来说可能是灾难性的。

因此我需要一些建议。

到目前为止,请参阅下面的代码。我不确定我是否正确使用了 Python 中的 Asyncio 函数。 为了可读性,我会将各个任务的内容存储在单独的模块中。

import asyncio
import concurrent.futures
import datetime
import time
import random
import math

# define a task...
async def firstTask():
    while True:
        await asyncio.sleep(1)
        print("First task executed")

# define another task...        
async def secondTask():
    while True:
        await asyncio.sleep(5)
        print("Second Worker Executed")

# define/simulate heavy CPU-bound task
def heavy_load():
    while True:
        print('Heavy_load started')
        i = 0
        for i in range(50000000):
            f = math.sqrt(i)*math.sqrt(i)
        print('Heavy_load finished')
        time.sleep(4)

def main():

    # Create a process pool (for CPU bound tasks).
    processpool = concurrent.futures.ProcessPoolExecutor()

    #  Create a thread pool (for I/O bound tasks).
    threadpool = concurrent.futures.ThreadPoolExecutor()

    loop = asyncio.get_event_loop()

    try:
        # Add all tasks. (Correct use?)
        asyncio.ensure_future(firstTask())
        asyncio.ensure_future(secondTask())
        loop.run_in_executor(processpool, heavy_load)
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        print("Loop will be ended")
        loop.close()    

if __name__ == '__main__':
    main()

最佳答案

Most tasks/events are not time-critical. Controller output can use 'most recent' sensordata. Some tasks, on the other hand, activating a relay for a certain period of time. I would like to know how to programm these tasks without the chance another 'time consuming' task is blocking the processor during the period of time (for example) a CO2 valve is open. This could be disastrous for my environment.

请允许我强调 Python 不是实时语言,asyncio 也不是实时组件。它们既没有用于实时执行的基础设施(Python 是垃圾收集的,通常在分时系统上运行),也没有在这种环境中进行实践测试。因此,我强烈建议不要在任何失误可能对您的环境造成灾难性的情况下使用它们。

除此之外,您的代码还有一个问题:虽然 heavy_load 计算不会阻塞事件循环,但它也永远不会完成,也不会提供有关其进度的信息。 run_in_executor 背后的想法是,您正在运行的计算最终会停止,并且事件循环会希望收到有关它的通知。 run_in_executor 的惯用用法可能如下所示:

def do_heavy_calc(param):
    print('Heavy_load started')
    f = 0
    for i in range(50000000):
        f += math.sqrt(i)*math.sqrt(i)
    return f

def heavy_calc(param):
    loop = asyncio.get_event_loop()
    return loop.run_in_executor(processpool, do_heavy_calc)

表达式 heavy_calc(...) 不仅在不阻塞事件循环的情况下运行,而且可等待。这意味着异步代码可以等待其结果,也不会阻塞其他协程:

async def sum_params(p1, p2):
    s1 = await heavy_calc(p1)
    s2 = await heavy_calc(p2)
    return s1 + s2

上面依次运行了两个计算。也可以并行完成:

async def sum_params_parallel(p1, p2):
    s1, s2 = await asyncio.gather(heavy_calc(p1), heavy_calc(p2))
    return s1 + s2

另一件可以改进的事情是设置代码:

asyncio.ensure_future(firstTask())
asyncio.ensure_future(secondTask())
loop.run_in_executor(processpool, heavy_load)
loop.run_forever()

调用 asyncio.ensure_future 然后从不等待结果有点像 asyncio 反模式。未等待的任务引发的异常会被默默地吞掉,这几乎肯定不是您想要的。有时人们只是忘记写 await,这就是为什么 asyncio 在循环被破坏时提示未等待的挂起任务。

安排某人等待每个任务是一种很好的编码习惯,可以立即使用 awaitgather 将其与其他任务,或稍后。例如,如果任务需要在后台运行,您可以将其存储在某处并 await 或在应用程序生命周期结束时取消它。在你的情况下,我会将 gatherloop.run_until_complete 结合起来:

everything = asyncio.gather(firstTask(), secondTask(),
                           loop.run_in_executor(processpool, heavy_load))
loop.run_until_complete(everything)

关于python - 在多处理或多线程应用程序中保留 cpu 时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48622638/

相关文章:

python - 如何在 python 中找到所有可能的正则表达式匹配项?

python - 安全错误 : Permission denied to access property "document" on cross-origin object error clicking on download link in iframe using Selenium Python

python - 关闭 subprocess.Popen 中的标准输入

multiprocessing - 使用 DistributedArrays 时出现 BoundsError

python - 多处理:如何将数据从父进程发送到持续运行的子进程?

python - 在 ttk/python 中更改 Label 小部件的填充颜色

python - 控制线程类python中的循环

python - 如何使 threading.Thread start() 调用 run() 以外的方法

c# - 程序设计问题,filesystemwatcher,多线程C#

python - 多处理中的有序状态打印