python - I/O 密集型串行端口应用程序 : Porting from Threading, 基于队列的异步设计(ala Twisted)

标签 python multithreading serial-port twisted

因此,我一直在为通过串行 (RS-232)“Master”与无线设备通信的客户端开发一个应用程序。我目前已经使用线程编写了应用程序的核心(如下)。我一直在 #python 上注意到,共识似乎是使用线程和使用 Twisted 的异步通信能力。

我还没有找到任何使用 twisted 进行串行端口异步 I/O 通信的好例子。但是,我找到了Dave Peticolas' 'Twisted Introduction' (感谢 nosklo)我目前正在研究,但是,它使用套接字而不是串行通信(但是异步概念肯定得到了很好的解释)。

我将如何使用线程、队列将此应用程序移植到 Twisted?有什么优点/缺点吗(我注意到,有时,如果一个线程挂起,它会导致系统 BSOD)?

代码 (msg_poller.py)

from livedatafeed import LiveDataFeed
from msg_build import build_message_to_send
from utils import get_item_from_queue
from protocol_wrapper import ProtocolWrapper, ProtocolStatus
from crc16 import *
import time
import Queue
import threading
import serial
import gc

gc.enable()
PROTOCOL_HEADER = '\x01'
PROTOCOL_FOOTER = '\x0D\x0A'
PROTOCOL_DLE = '\x90'

INITIAL_MODBUS = 0xFFFF


class Poller:
    """
    Connects to the serial port and polls nodes for data.
    Reads response from node(s) and loads that data into queue.
    Parses qdata and writes that data to database.
    """

    def __init__(self,
            port,
            baudrate,
            parity,
            rtscts,
            xonxoff,
            echo=False):
        try:
            self.serial = serial.serial_for_url(port,
                    baudrate,
                    parity=parity,
                    rtscts=rtscts,
                    xonxoff=xonxoff,
                    timeout=.01)
        except AttributeError:
            self.serial = serial.Serial(port,
                    baudrate,
                    parity=parity,
                    rtscts=rtscts,
                    xonxoff=xonxoff,
                    timeout=.01)
            self.com_data_q = None
        self.com_error_q = None
        self.livefeed = LiveDataFeed()
        self.timer = time.time()
        self.dtr_state = True
        self.rts_state = True
        self.break_state = False

    def start(self):
        self.data_q = Queue.Queue()
        self.error_q = Queue.Queue()
        com_error = get_item_from_queue(self.error_q)
        if com_error is not None:
            print 'Error %s' % (com_error)
        self.timer = time.time()
        self.alive = True

        # start monitor thread
        #
        self.mon_thread = threading.Thread(target=self.reader)
        self.mon_thread.setDaemon(1)
        self.mon_thread.start()

        # start sending thread
        #
        self.trans_thread = threading.Thread(target=self.writer)
        self.trans_thread.setDaemon(1)
        self.trans_thread.start()

    def stop(self):
        try:
            self.alive = False
            self.serial.close()
        except (KeyboardInterrupt, SystemExit):
            self.alive = False

    def reader(self):
        """
        Reads data from the serial port using self.mon_thread.
        Displays that data on the screen.
        """
        from rmsg_format import message_crc, message_format
        while self.alive:
            try:
                while self.serial.inWaiting() != 0:

                # Read node data from the serial port. Data should be 96B.

                    data = self.serial.read(96)
                    data += self.serial.read(self.serial.inWaiting())

                    if len(data) > 0:

                        # Put data in to the data_q object
                        self.data_q.put(data)
                        if len(data) == 96:
                            msg = self.data_q.get()

                            pw = ProtocolWrapper(
                                        header=PROTOCOL_HEADER,
                                        footer=PROTOCOL_FOOTER,
                                        dle=PROTOCOL_DLE)
                            status = map(pw.input, msg)

                            if status[-1] == ProtocolStatus.IN_MSG:
                                # Feed all the bytes of 'msg' sequentially into pw.input

                                # Parse the received CRC into a 16-bit integer
                                rec_crc = message_crc.parse(msg[-4:]).crc

                                # Compute the CRC on the message
                                calc_crc = calcString(msg[:-4], INITIAL_MODBUS)
                                from datetime import datetime
                                ts = datetime.now().strftime('%Y/%m/%d %H:%M:%S')
                                if rec_crc != calc_crc:
                                    print ts
                                    print 'ERROR: CRC Mismatch'
                                    print msg.encode('hex')
                                else:
                                    #msg = message_format.parse(msg[1:])
                                    #print msg.encode('hex') + "\r\n"
                                    msg = message_format.parse(msg[1:])
                                    print msg
                                    #return msg
                                    gc.collect()
                    time.sleep(.2)
            except (KeyboardInterrupt, SystemExit, Exception, TypeError):
                self.alive = False
                self.serial.close()
                raise

    def writer(self):
        """
        Builds the packet to poll each node for data.
        Writes that data to the serial port using self.trans_thread
        """
        import time
        try:
            while self.alive:
                try:
                    dest_module_code = ['DRILLRIG',
                            'POWERPLANT',
                            'GENSET',
                            'MUDPUMP']
                    dest_ser_no = lambda x: x + 1
                    for code in dest_module_code:
                        if code != 'POWERPLANT':
                            msg = build_message_to_send(
                                    data_len=0x10,
                                    dest_module_code='%s' % (code),
                                    dest_ser_no=dest_ser_no(0),
                                    dest_customer_code='*****',
                                    ret_ser_no=0x01,
                                    ret_module_code='DOGHOUSE',
                                    ret_customer_code='*****',
                                    command='POLL_NODE',
                                    data=[])
                            self.serial.write(msg)
                            time.sleep(.2)
                            gc.collect()
                        elif code == 'POWERPLANT':
                            msg = build_message_to_send(
                                    data_len=0x10,
                                    dest_module_code='POWERPLANT',
                                    dest_ser_no=dest_ser_no(0),
                                    dest_customer_code='*****',
                                    ret_ser_no=0x01,
                                    ret_module_code='DOGHOUSE',
                                    ret_customer_code='*****',
                                    command='POLL_NODE',
                                    data=[])
                            self.serial.write(msg)
                            time.sleep(.2)
                            gc.collect()
                            msg = build_message_to_send(
                                    data_len=0x10,
                                    dest_module_code='POWERPLANT',
                                    dest_ser_no=dest_ser_no(1),
                                    dest_customer_code='*****',
                                    ret_ser_no=0x01,
                                    ret_module_code='DOGHOUSE',
                                    ret_customer_code='*****',
                                    command='POLL_NODE',
                                    data=[])
                            self.serial.write(msg)
                            time.sleep(.2)
                            gc.collect()
                except (KeyboardInterrupt, SystemExit):
                    self.alive = False
                    self.serial.close()
                    raise
        except (KeyboardInterrupt, SystemExit):
            self.alive = False
            self.serial.close()
            raise


def main():
    poller = Poller(
            port='COM4',
            baudrate=115200,
            parity=serial.PARITY_NONE,
            rtscts=0,
            xonxoff=0,
            )
    poller.start()
    poller.reader()
    poller.writer()
    poller.stop()
if __name__ == '__main__':
    main()                                                                      

最佳答案

在线程/队列方法和使用扭曲的方法之间编写一个直接的一对一映射程序是非常困难的(如果不是不可能的话)。

我建议,掌握扭曲及其 react 器方式,它使用协议(protocol)和协议(protocol)特定方法。把它想象成当你使用延迟使用扭曲时,你已经使用线程和队列显式编码的所有异步事物都免费提供给你。

twisted 似乎确实在使用 SerialPort 传输类的 react 器上支持 SerialPort,基本结构似乎有点像这样。

from twisted.internet import reactor
from twisted.internet.serialport import SerialPort

SerialPort(YourProtocolClass(), Port, reactor, baudrate=baudrate))
reactor.run() 

在 YourProtocolClass() 中,您将处理特定于您的串行端口通信要求的各种事件。 doc/core/examples 目录包含 gpsfix.py 和 mouse.py 等示例。

关于python - I/O 密集型串行端口应用程序 : Porting from Threading, 基于队列的异步设计(ala Twisted),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4726391/

相关文章:

python - 事件未定义?

python - 如何使用使用 Estimator 和 Dataset API 训练的已保存模型进行预测?

c# - 正确锁定线程安全的自生成列表 (C#)

python:处理变量锁定的优雅方法?

ios - 核心数据 3 托管对象上下文

java - JSSC 串行接口(interface)上​​的 UnsatsifiedLinkError(处理)

c# - System.IO.Ports.dll 错误中的 System.TimeoutException

python - 在 Django 模型中访问外部字段

python - 如何使用 Python 旋转 3D 数组而不进行舍入?

c - 用串口编程C读取多个传感器