支持多 channel 订阅者的Python可观察实现

标签 python

在一个扭曲的应用程序中,我有一系列通过可观察模式进行交互的资源 Controller /管理器类。一般来说,大多数观察者都会订阅特定 channel (例如“foo.bar.entity2”),但在某些情况下我想了解特定 channel 中的所有事件(例如“foo.*”),所以我写了如下内容:

    from collections import defaultdict

    class SimplePubSub(object):

        def __init__(self):
            self.subjects = defaultdict(list)


        def subscribe(self, subject, callbackstr):
            """
                for brevity, callbackstr would be a valid Python function or bound method but here is just a string
            """
            self.subjects[subject].append(callbackstr)

        def fire(self, subject):
            """
                Again for brevity, fire would have *args, **kwargs or some other additional message arguments but not here            
            """

            if subject in self.subjects:
                print "Firing callback %s" % subject
                for callback in self.subjects[subject]:
                    print "callback %s" % callback



    pubSub = SimplePubSub()
    pubSub.subscribe('foo.bar', "foo.bar1")
    pubSub.subscribe('foo.foo', "foo.foo1")

    pubSub.subscribe('foo.ich.tier1', "foo.ich.tier3_1")
    pubSub.subscribe('foo.ich.tier2', "foo.ich.tier2_1")
    pubSub.subscribe('foo.ich.tier3', "foo.ich.tier2_1")

    #Find everything that starts with foo
    #say foo.bar is fired
    firedSubject = "foo.bar"
    pubSub.fire(firedSubject)
    #outputs 
    #>>Firing callback foo.bar
    #>>callback foo.bar1

    #but let's say I want to add a callback for everything undr foo.ich

    class GlobalPubSub(SimplePubSub):

        def __init__(self):
            self.globals = defaultdict(list)
            super(GlobalPubSub, self).__init__()


        def subscribe(self, subject, callback):
            if subject.find("*") > -1:
                #assumes global suscriptions would be like subject.foo.* and we want to catch all subject.foo's 
                self.globals[subject[:-2]].append(callback)
            else:
                super(GlobalPubSub, self).subscribe(subject, callback)

        def fire(self, subject):
            super(GlobalPubSub, self).fire(subject)
            if self.globals:
                for key in self.globals.iterkeys():
                    if subject.startswith(key):
                        for callback in self.globals[key]:
                            print "global callback says", callback

    print "Now with global subscriptions"
    print
    pubSub = GlobalPubSub()
    pubSub.subscribe('foo.bar', "foo.bar1")
    pubSub.subscribe('foo.foo', "foo.foo1")

    pubSub.subscribe('foo.ich.tier1', "foo.ich.tier3_1")
    pubSub.subscribe('foo.ich.tier2', "foo.ich.tier2_1")
    pubSub.subscribe('foo.ich.tier3', "foo.ich.tier2_1")
    pubSub.subscribe("foo.ich.*", "All the ichs, all the time!")

    #Find everything that starts with foo.ich
    firedSubject = "foo.ich.tier2"
    pubSub.fire(firedSubject)
    #outputs 
    #>>Firing callback foo.bar
    #>>callback foo.bar1
    #>>Now with global subscriptions
    #
    #>>Firing callback foo.ich.tier2
    #>>callback foo.ich.tier2_1
    #>>global callback says All the ichs, all the time!

在不诉诸某种外来构造(例如尝试)的情况下,这是否已经达到了最好的程度?我正在寻找一个关于我走在正确轨道上的确认,或者一个关于纯 python 的全局订阅处理程序的更好的替代建议(没有外部库或服务)。

最佳答案

对我来说,您似乎走在正确的道路上。我正在使用 PyPubSub使用 wxPython 应用程序一段时间,然后最终实现了我自己的“更简单”版本,从根本上讲,它看起来与您在这里所做的非常相似,除了您可能会结束的更多花哨功能之外在您填写要求时实现。

给出的答案here也很像你所做的。

这个answer进入一些稍微不同的方法的示例。

除了 PyPubSub 之外,还有许多现有的库在那里,例如 pydispatchblinker ,这可能值得一看以供引用或想法。

关于支持多 channel 订阅者的Python可观察实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7638760/

相关文章:

python - Tensorflow 数据集 - 给定生成器输出 1 个标签的 X 个输入,如何构建批处理?

python - 如果 python 中的 block 未使用 True 语句执行

python - 标签上的 Tkinter 标签

python - 为什么 'and' 和 'or' 在 Python 中返回操作数?

python - 如何允许或拒绝通知地理位置麦克风摄像头弹出

c# - 使用 Python 与 C# 创建 PPT 演示文稿

python - 使用 Python WMI 在远程系统上运行本地批处理文件

python - 使用 Python 进行元组和列表操作。缩短元组生成

python - 如果 Python 进程在特定时间内未完成,则终止它?

Python 继承 - 一个调用在子类中重写的方法的构造函数,使用哪个方法?