python - 基于协程的状态机

标签 python event-handling tornado coroutine fsm

我有一个棘手而有趣的问题要问你。

在处理 I/O 任务(例如通过 Twisted、Tornado 中的某个传输层实现协议(protocol))时,我发现了类似的场景或模式。该模式与其说是抽象的,不如说是通用的。例如,当您使用 MODEM 时-就像设备一样,你向他发送命令并接收结果。

但是,有时您需要使用新命令对调制解调器对上一个命令的响应使用react。例如,假设调制解调器为M,->为通信运营商,接受一个参数,消息 key ,服务器为S。

    1. s ->(a) M
       1.1 M ->(b) S # modem reacts on `a` as `b`; so next we should send him command B
       1.2 M ->(c) S # modem responses on `a` as `c`; so next we should send him C
    2. s ->(b) M
       2.1 M ->(g) S
       2.2 M -> (f) S
       ...
       2.N M -> (x) S
    ...

所以,它看起来像 FSM 行为。最好在 tornado 中实现这个场景,同时使用非阻塞 I/O(通过流对象)。通过简单地提供跟踪场景作为输入并覆盖输入中描述的状态(事件)的处理程序,我们可以获得良好的有限状态机行为。

输入可能有以下符号:

{
  a: (b, c, d),
  b: (c, 'exit|silence'),
  c: (a, 'exit|silence'),
  d: (b)
}

所有这些字母数字符号都是州名。 每个键值对都是状态名称和可能的状态转换集。

在 tornado 协程和 futures 中引入的 FSM 的可能实现是什么? 请分享您的想法和代码。

最佳答案

我认为 Twisted 更适合协议(protocol)实现。无论如何,在 Python 中,函数和方法是一等对象,这意味着您可以将它们存储在字典中。您还可以使用 functools.partial 将带有参数的函数绑定(bind)到字典键。您可以使用它来实现转换。每个状态应该是一个包含字典的函数,其中键是可能的输入状态,值是输出状态。然后你可以很容易地从一种状态跳到另一种状态。要使用 Tornado 循环下一个状态,而不是直接调用,应该使用 ioloop.IOLoop.instance().add_callback 注册为回调。

自动机接受语言 a*b*c 的示例实现:

import errno
import functools
import socket
from tornado import ioloop, iostream

class Communicator(object):
    def connection_ready(self, sock, fd, events):
        while True:
            try:
                connection, address = sock.accept()
            except socket.error, e:
                if e[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
                    raise
                return
            connection.setblocking(0)
            self.stream = iostream.IOStream(connection)
            self.stream.read_until(delimiter='\n', callback=self.initial_state) 

    def initial_state(self, msg):
        msg = msg.rstrip()
        print "entering initial state with message: %s" % msg
        transitions = {
            'a' : functools.partial(ioloop.IOLoop.instance().add_callback, self.state_a, msg),
            'b' : functools.partial(ioloop.IOLoop.instance().add_callback, self.state_b, msg),
            'c' : functools.partial(ioloop.IOLoop.instance().add_callback, self.final_state, msg)
        }
        try:
            transitions[msg[0]]()
        except:
            self.stream.write("Aborted (wrong input)\n", self.stream.close)

    def state_a(self, msg):
        print "entering state a with message: %s" % msg
        transitions = {
            'a' : functools.partial(ioloop.IOLoop.instance().add_callback, self.stream.write, "got a\n", functools.partial(self.state_a, msg[1:])),
            'b' : functools.partial(ioloop.IOLoop.instance().add_callback, self.state_b, msg),
            'c' : functools.partial(ioloop.IOLoop.instance().add_callback, self.final_state, msg[1:])
        }
        try:
            transitions[msg[0]]()
        except:
            self.stream.write("Aborted (wrong input)\n", self.stream.close)

    def state_b(self, msg):
        print "entering state b with message: %s" % msg
        transitions = {
            'a' : functools.partial(ioloop.IOLoop.instance().add_callback, self.state_a, msg),
            'b' : functools.partial(ioloop.IOLoop.instance().add_callback, self.stream.write, "got b\n", functools.partial(self.state_a, msg[1:])),
            'c' : functools.partial(ioloop.IOLoop.instance().add_callback, self.final_state, msg[1:])}
        try:
            transitions[msg[0]]()
        except:
            self.stream.write("Aborted (wrong input)\n" , self.stream.close)

    def final_state(self, msg):
        print "entering final state with message: %s" % msg
        self.stream.write("Finished properly with message %s\n" % msg, self.stream.close)

if __name__ == '__main__':
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.setblocking(0)
    sock.bind(("", 8000))
    sock.listen(5000)

    communicator = Communicator()
    io_loop = ioloop.IOLoop.instance()
    callback = functools.partial(communicator.connection_ready, sock)
    io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
    try:
        io_loop.start()
    except KeyboardInterrupt:
        io_loop.stop()
        print "exited cleanly"

使用 Netcat 的 session :

$ nc localhost 8000
aaaaa
got a
got a
got a
got a
got a
Aborted (wrong input)
$ nc localhost 8000
abababab
got a
got b
got a
got b
got a
got b
got a
got b
Aborted (wrong input)
$ nc localhost 8000
aaabbbc
got a
got a
got a
got b
got b
got b
Finished properly with message 
$ nc localhost 8000
abcabc
got a
got b
Finished properly with message abc

关于python - 基于协程的状态机,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19025218/

相关文章:

python - Tornado eventloop 在产生多个 gen.Task 时引发 "NoneType object is not iterable"

python - 格式化包含 ${} 的 python 字符串

java - 幕后事件和事件处理

python - Tornado 请求.body

c# - WPF:防止用户离开 TextBox?

javascript - jQuery复选框更改和单击事件

apache - 如何使用apache重定向到docker容器

python - 用sympy缓慢替换符号矩阵

python - Flask应用未在本地主机上正确显示

python - 使用(或不使用)conda 与atom 结合的推理