python - 构建微服务事件总线和 REST api (python/flask)

标签 python rest microservices event-bus

背景
我正在使用微服务架构构建我的第一个应用程序。我将主要使用 Flask 在 Python 中工作。
我正在考虑实现一个事件/消息总线来协调服务之间的操作。我打算实现的一些服务是:身份验证、用户、帖子和聊天。该应用程序有两个实体(“用户”和“组”),几乎每个服务都使用它们。我为每个服务都有一个单独的数据库,每个数据库都有自己的 usersgroups用于管理特定于该服务的用户/组数据的表。现在,当我考虑创建新用户这样的事件时,每个服务都需要在 users 中创建一个新条目。表,这就是我考虑使用事件总线的原因。
我读了 this post其中讨论了 CQRS 和使用 HTTP (REST) 进行服务之间的外部通信,同时使用事件总线进行内部通信。服务处理 (HTTP) 请求,并发出有关数据更改的事件(例如,Auth 服务创建新用户)。其他服务消耗可能触发其他进程(和更多事件)的事件。

我挂断的地方是如何实际实现(在 Python 中)一个服务,该服务监听 HTTP 请求和一组订阅 channel 中的新事件。我知道您需要使用像 redis/rabbitMQ 这样的工具,但是是否可以在同一进程中处理这两种类型的请求,或者您是否需要运行两台服务器(一个用于 REST 请求,另一个用于事件处理)?
另外,如果您对上述一般方法/架构有任何意见,我会全力以赴。

最佳答案

因此,在进行更多研究并构建原型(prototype)之后,单个服务器可以同时监听来自消息代理的 HTTP 请求和事件。但是,它需要运行两个单独的进程(一个 Web 服务器进程监听 HTTP,一个事件进程监听消息代理)。
这是我为原型(prototype)开发的架构:
enter image description here
核心模块(由文件夹图标表示)代表服务的核心,这是实际更改数据的所有代码。 HTTP Server 和 Event Worker 都从核心模块调用方法。 HTTP Server 或 Event Worker 都不产生事件,只有核心模块产生事件。
这是一个文件结构:

Project
 |-Foo
 |  |- foo.py
 |  |- web.py
 |  |- worker.py
 |  |- revent.py
 |-Bar
 |  |- bar.py
 |  |- web.py
 |  |- worker.py
 |  |- revent.py
web.py文件是简单的 flask 应用程序:
# bar.py
from flask import Flask, request
from bar import Bar


app = Flask(__name__)

@app.route('/bar')
def bar():
    return Bar.bar_action()

if __name__ == "__main__":
    app.run(port=5001, debug=1)
对于事件 worker 和核心模块,我使用了一个模块 revent.py (redis + event) 我创建的。它由三个类组成:
  • 事件——事件的抽象
  • 生产者——核心模块使用的服务/类来将事件生成到它们的事件流中。
  • Worker —— 一个事件服务器,你可以将事件映射到函数(有点像 Flask 中的路由 HTTP 端点),它还运行事件循环来监听事件。

  • 在幕后,这个模块正在使用 redis streams .我将粘贴 revent.py 的代码以下。
    但首先,这里是 bar.py 的示例示例,它由 http 服务器和 worker 调用以执行工作,并将有关它正在执行的工作的事件发送到 redis 中的“bar”流。
    # Bar/bar.py
    from revent import Producer
    import redis
    
    class Bar():
        ep = Producer("bar", host="localhost", port=6379, db=0)
    
        @ep.event("update")
        def bar_action(self, foo, **kwargs):
            print("BAR ACTION")
            #ep.send_event("update", {"test": str(True)})
            return "BAR ACTION"
    
    if __name__ == '__main__':
        Bar().bar_action("test", test="True")
    
    最后,这是一个示例工作程序,它将监听“bar”流 Foo/worker.py 上的事件.
    # Foo/worker.py
    from revent import Worker
    
    worker = Worker()
    
    @worker.on('bar', "update")
    def test(foo, test=False):
        if bool(test) == False:
            print('test')
        else:
            print('tested')
    
    if __name__ == "__main__":
        worker.listen(host='127.0.0.1', port=6379, db=0)
    
    
    正如所 promise 的,这是 revent.py 的代码我构建的模块。可能值得向 pypl 添加更进一步开发的版本,但我只是使用符号链接(symbolic link)来保持我的两个版本同步。
    # revent.py
    import redis
    from datetime import datetime
    import functools
    
    class Worker:
        # streams = {
        #   "bar": {
        #       "update": Foo.foo_action
        #   },
        # }
    
        def __init__(self):
            self._events = {}
    
    
        def on(self, stream, action, **options):
            """
            Wrapper to register a function to an event
            """
            def decorator(func):
                self.register_event(stream, action, func, **options)
                return func
            return decorator
    
        def register_event(self, stream, action, func, **options):
            """
            Map an event to a function
            """
            if stream in self._events.keys():
                self._events[stream][action] = func
            else:
                self._events[stream] = {action: func}
    
        def listen(self, host, port, db):
            """ 
            Main event loop
            Establish redis connection from passed parameters
            Wait for events from the specified streams
            Dispatch to appropriate event handler
            """
            self._r = redis.Redis(host=host, port=port, db=db)
            streams = " ".join(self._events.keys())
            while True:
                event = self._r.xread({streams: "$"}, None, 0) 
                # Call function that is mapped to this event
                self._dispatch(event)
    
        def _dispatch(self, event):
            """
            Call a function given an event
    
            If the event has been registered, the registered function will be called with the passed params.
            """
            e = Event(event=event)
            if e.action in self._events[e.stream].keys():
                func = self._events[e.stream][e.action]
                print(f"{datetime.now()} - Stream: {e.stream} - {e.event_id}: {e.action} {e.data}")
                return func(**e.data)
    
    
    class Event():
        """
        Abstraction for an event 
        """
        def __init__(self, stream="", action="", data={}, event=None):
            self.stream = stream
            self.action = action
            self.data = data
            self.event_id=None
            if event:
                self.parse_event(event)
    
        def parse_event(self, event):
            # event = [[b'bar', [(b'1594764770578-0', {b'action': b'update', b'test': b'True'})]]]
            self.stream = event[0][0].decode('utf-8')
            self.event_id = event[0][1][0][0].decode('utf-8')
            self.data = event[0][1][0][1]
            self.action = self.data.pop(b'action').decode('utf-8')
            params = {}
            for k, v in self.data.items():
                params[k.decode('utf-8')] = v.decode('utf-8')
            self.data = params
    
        def publish(self, r):
            body = {
                "action": self.action
            }
            for k, v in self.data.items():
                body[k] = v
            r.xadd(self.stream, body)
    
    class Producer:
        """
        Abstraction for a service (module) that publishes events about itself
    
        Manages stream information and can publish events
        """
        # stream = None
        # _r = redis.Redis(host="localhost", port=6379, db=0)
    
        def __init__(self, stream_name, host, port, db):
            self.stream = stream_name
            self._r = redis.Redis(host="localhost", port=6379, db=0)
    
        def send_event(self, action, data):
            e = Event(stream=self.stream, action=action, data=data)
            e.publish(self._r)
    
        def event(self, action, data={}):
            def decorator(func):
                @functools.wraps(func)
                def wrapped(*args, **kwargs):
                    result = func(*args, **kwargs)
                    arg_keys = func.__code__.co_varnames[1:-1]
                    for i in range(1, len(args)):
                        kwargs[arg_keys[i-1]] = args[i]
                    self.send_event(action, kwargs)
                    return result           
                return wrapped
            return decorator
    
    
    
    所以,把它放在一起。 foo.pybar.py模块分别执行 Foo 和 Bar 服务的实际工作。它们的方法由 HTTP 服务器和事件 worker 调用来处理请求/事件。在工作时,这两个模块会发出有关其状态更改的事件,以便其他感兴趣的服务可以相应地采取行动。 HTTP 服务器只是一个普通的网络应用程序,使用例如 flask 。事件 worker 在概念上类似于 Web 服务器,它在 redis 而不是 http 请求中监听事件。这两个进程(Web 服务器和事件 worker )都需要单独运行。因此,如果您在本地进行开发,则需要在不同的终端窗口中运行它们或使用容器/进程编排器。
    那是很多。我希望它对某人有所帮助,如果您有任何疑问,请在评论中告诉我。
    编辑
    我将 revent.py 文件作为包上传到 pypi -- redisevents .我将在本周晚些时候添加更多关于如何使用/扩展它的文档。

    关于python - 构建微服务事件总线和 REST api (python/flask),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62884011/

    相关文章:

    rest - Firefox Quantum REST 客户端?

    google-cloud-platform - f1-micro 中的 GCLOUD Kubernetes 结果(由于内存不足,不支持 f1-micro 机器的节点池)

    microservices - 微服务中的事件溯源、CQRS 和数据库

    python - pandas ExcelFile 是否在初始化时解析所有工作表? (并且可以避免)

    java - GET 响应中包含 URL

    python - 在 Django 1.9 中,使用 JSONField( native postgres jsonb)的约定是什么?

    rest - 在不发出 HTTP 请求的情况下获取 RestSetResponse 的输出

    docker - 微服务内部通信时的 SSL 证书主机名问题

    python - python 电影院座位图

    python - 在seaborn上绘制y轴网格和kdeplot之间的交集