go - 在 Go 中通过多个 channel 广播一个 channel

标签 go channel

我想将从一个 channel 接收到的数据广播到一个 channel 列表。 channel 列表是动态的,可以在运行阶段修改。

作为 Go 的新开发人员,我编写了这段代码。我发现它对我想要的东西来说很重。有更好的方法吗?

package utils

import "sync"

// StringChannelBroadcaster broadcasts string data from a channel to multiple channels
type StringChannelBroadcaster struct {
    Source      chan string
    Subscribers map[string]*StringChannelSubscriber
    stopChannel chan bool
    mutex       sync.Mutex
    capacity    uint64
}

// NewStringChannelBroadcaster creates a StringChannelBroadcaster
func NewStringChannelBroadcaster(capacity uint64) (b *StringChannelBroadcaster) {
    return &StringChannelBroadcaster{
        Source:      make(chan string, capacity),
        Subscribers: make(map[string]*StringChannelSubscriber),
        capacity:    capacity,
    }
}

// Dispatch starts dispatching message
func (b *StringChannelBroadcaster) Dispatch() {
    b.stopChannel = make(chan bool)
    for {
        select {
        case val, ok := <-b.Source:
            if ok {
                b.mutex.Lock()
                for _, value := range b.Subscribers {
                    value.Channel <- val
                }
                b.mutex.Unlock()
            }
        case <-b.stopChannel:
            return
        }
    }
}

// Stop stops the Broadcaster
func (b *StringChannelBroadcaster) Stop() {
    close(b.stopChannel)
}

// StringChannelSubscriber defines a subscriber to a StringChannelBroadcaster
type StringChannelSubscriber struct {
    Key     string
    Channel chan string
}

// NewSubscriber returns a new subsriber to the StringChannelBroadcaster
func (b *StringChannelBroadcaster) NewSubscriber() *StringChannelSubscriber {
    key := RandString(20)
    newSubscriber := StringChannelSubscriber{
        Key:     key,
        Channel: make(chan string, b.capacity),
    }
    b.mutex.Lock()
    b.Subscribers[key] = &newSubscriber
    b.mutex.Unlock()

    return &newSubscriber
}

// RemoveSubscriber removes a subscrber from the StringChannelBroadcaster
func (b *StringChannelBroadcaster) RemoveSubscriber(subscriber *StringChannelSubscriber) {
    b.mutex.Lock()
    delete(b.Subscribers, subscriber.Key)
    b.mutex.Unlock()
}

谢谢,

朱利安

最佳答案

我认为您可以稍微简化一下:去掉 stopChannelStop 方法。您可以只关闭 Source 而不是调用 Stop,并在 Dispatch 中检测到它(ok 将为 false)以退出(实际上您可以在源 channel 上进行范围)。

你可以去掉Dispatch,只用for循环在NewStringChannelBroadcaster中启动一个goroutine,这样外部代码就不必单独启动dispatch循环了。

您可以使用 channel 类型作为 map 键,这样您的 map 就可以变成 map[chan string]struct{}(空结构,因为您不需要 map 值)。所以你的 NewSubscriber 可以接受一个 channel 类型参数(或创建一个新 channel 并返回它),并将其插入到 map 中,你不需要随机字符串或 StringChannelSubscriber类型。

我还做了一些改进,比如关闭订阅者 channel :

package main

import "sync"

import (
    "fmt"
    "time"
)

// StringChannelBroadcaster broadcasts string data from a channel to multiple channels
type StringChannelBroadcaster struct {
    Source      chan string
    Subscribers map[chan string]struct{}
    mutex       sync.Mutex
    capacity    uint64
}

// NewStringChannelBroadcaster creates a StringChannelBroadcaster
func NewStringChannelBroadcaster(capacity uint64) *StringChannelBroadcaster {
    b := &StringChannelBroadcaster{
        Source:      make(chan string, capacity),
        Subscribers: make(map[chan string]struct{}),
        capacity:    capacity,
    }
    go b.dispatch()
    return b
}

// Dispatch starts dispatching message
func (b *StringChannelBroadcaster) dispatch() {
    // for iterates until the channel is closed
    for val := range b.Source {
        b.mutex.Lock()
        for ch := range b.Subscribers {
            ch <- val
        }
        b.mutex.Unlock()
    }
    b.mutex.Lock()
    for ch := range b.Subscribers {
        close(ch)
        // you shouldn't be calling RemoveSubscriber after closing b.Source
        // but it's better to be safe than sorry
        delete(b.Subscribers, ch)
    }
    b.Subscribers = nil
    b.mutex.Unlock()
}

func (b *StringChannelBroadcaster) NewSubscriber() chan string {
    ch := make(chan string, b.capacity)
    b.mutex.Lock()
    if b.Subscribers == nil {
        panic(fmt.Errorf("NewSubscriber called on closed broadcaster"))
    }
    b.Subscribers[ch] = struct{}{}
    b.mutex.Unlock()

    return ch
}

// RemoveSubscriber removes a subscrber from the StringChannelBroadcaster
func (b *StringChannelBroadcaster) RemoveSubscriber(ch chan string) {
    b.mutex.Lock()
    if _, ok := b.Subscribers[ch]; ok {
        close(ch)                 // this line does have to be inside the if to prevent close of closed channel, in case RemoveSubscriber is called twice on the same channel
        delete(b.Subscribers, ch) // this line doesn't need to be inside the if
    }
    b.mutex.Unlock()
}

func main() {
    b := NewStringChannelBroadcaster(0)

    var toberemoved chan string

    for i := 0; i < 3; i++ {
        i := i

        ch := b.NewSubscriber()
        if i == 1 {
            toberemoved = ch
        }
        go func() {
            for v := range ch {
                fmt.Printf("receive %v: %v\n", i, v)
            }
            fmt.Printf("Exit %v\n", i)
        }()
    }

    b.Source <- "Test 1"
    b.Source <- "Test 2"
    // This is a race condition: the second reader may or may not receive the first two messages.
    b.RemoveSubscriber(toberemoved)
    b.Source <- "Test 3"

    // let the reader goroutines receive the last message
    time.Sleep(2 * time.Second)

    close(b.Source)

    // let the reader goroutines write close message
    time.Sleep(1 * time.Second)
}

https://play.golang.org/p/X-NcikvbDM

编辑:我添加了您的编辑以解决在关闭 Source 后调用 RemoveSubscriber 时出现的 panic ,但您不应该这样做,您应该让结构 channel 关闭后,其中的所有内容都会被垃圾收集。 如果在关闭 Source 后调用它,我还向 NewSubscriber 添加了一个 panic。以前你可以这样做,它会泄漏创建的 channel ,并且可能会泄漏将永远阻塞在该 channel 上的 goroutine。

如果您可以在已经关闭的广播公司上调用 NewSubscriber(或 RemoveSubscriber),这可能意味着您的代码某处有错误,因为您坚持一个你不应该成为的广播员。

关于go - 在 Go 中通过多个 channel 广播一个 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38401421/

相关文章:

go - 检查 X509 证书是否与 CertificateRequest (CSR) 匹配

go - 在 Go 中管理多个 channel

asynchronous - perl6 "An operation first awaited"

go - 从 Go 中的 channel 读取多个元素

golang - os.stdout 和 multiwriter 之间的区别

go - 如何修复这个简单程序中的 'declared but not used' 编译器错误?

file - 查找目录中的重复文件

go - 带有 html 模板的路由和没有 Golang 的路由之间每秒请求数的巨大差异

go - 如何关闭具有多个发送者的 channel ?

java - 如何在 java/xuggler 中混合多个音频 channel ?