go - 限制来自 channel 的已处理消息的数量

标签 go channel goroutine

我通过 channel 收到大约 200 000 条消息/秒给我的工作人员,我需要将发送给客户端的消息数量限制为每秒 20 条。
这使它每 50 毫秒 1 条消息

在 LOOP 的帮助下,worker 在整个程序生命周期中仍然活着(而不是为每条消息打开一个 channel )。

我的目标:
- 由于消息的顺序很重要,我想跳过在阻塞的 50 毫秒内收到的所有消息,只保存最新的消息
- 如果最新的消息在阻塞的 50 毫秒内出现,我希望在循环内的阻塞时间结束并且没有新消息到来时处理保存的消息! <--这是我的问题

我的策略
- 继续发送尚未处理的最新消息到同一 channel

但是它的问题是,如果该消息是在(来自应用程序的)新消息之后发送的呢?

下面的代码更像是一个作为工作代码的算法,只是想要一个关于如何做到这一点的提示/方法。

func example (new_message_from_channel <-chan *message) {
    default = message
    time = now_milliseconds
    diff_accepted = 50milli
    for this_message := range new_message_from_channel {
        if now_millisecond -  time >= diff_accepted {
            send_it_to_the_client
            time = now_milliseconds
        } else {
            //save the latest message
            default = this_message

            //My problem is how to process this latest message when the blocked 50ms is over and no new message coming ?!

            //My strategy - keep sending it to the same channel
            theChannel <- default
        }

    }
}

如果你有一个优雅的方法,欢迎你与我分享:)

最佳答案

使用速率限制器,您可以创建 throttle函数将采用:速率和 channel 作为输入;并返回两个 channel - 一个包含所有原始 channel 项目,另一个仅以固定速率中继项目:

func throttle(r time.Duration, in <-chan event) (C, tC <-chan event) {

    // "writeable" channels
    var (
        wC  = make(chan event)
        wtC = make(chan event)
    )

    // read-only channels - returned to caller
    C = wC
    tC = wtC

    go func() {
        defer close(wC)
        defer close(wtC)

        rl := rate.NewLimiter(
            rate.Every(r),
            1,
        )

        // relays input channel's items to two channels:
        // (1) gets all writes from original channel
        // (2) only writes at a fixed frequency
        for ev := range in {
            wC <- ev
            if rl.Allow() {
                wtC <- ev
            }
        }
    }()
    return
}

工作示例:https://play.golang.org/p/upei0TiyzNr

编辑:

为了避免使用速率限制器,而是使用简单的 time.Ticker :
tick := time.NewTicker(r)

for ev := range in {
    select {
    case wC <- ev: // write to main
    case <-tick.C:
        wC <- ev  // write to main ...
        wtC <- ev // ... plus throttle channel
    }
}

工作示例:https://play.golang.org/p/UTRXh72BvRl

关于go - 限制来自 channel 的已处理消息的数量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62239320/

相关文章:

macos - 去构建并执行: fork/exec: permission denied

go - 如何使用多种格式在循环中解析日期?

json - 在 Golang 中读取在线 JSON 并将值保存到 CSV

Go例程和Defer

clojure - 我们可以将 Clojure 的 core.async 描述为 'continuation passing style' 吗?

go - 如何使用从 google api 收到的响应数据?

for-loop - 如何在Go中并行运行for循环内的方法?

select - Go select 语句解决方法中的优先级

go - 在 for 循环中生成 goroutine 时发生死锁

go - 发送到 channel 的消息会丢失吗?