go - 在golang中缓存网络流?

标签 go network-programming streaming nonblocking

我正在尝试在 golang 中为视频流编写缓存代理。

我的问题是,如何在多个连接之间分发大块数据的流式副本?

或者如何存储(缓存)和安全(快速)访问来自多个 goroutine 的数据?

我尝试了几个选项,包括互斥量和 channel ,但它们没有用。 以下是一些存在错误的示例。

这是简化版:

...
var clients []*client
func new_client(conn net.Conn) {
    client := &client{
        conn: conn,
    }
    clients = append(clients, client)
}
...
func stream(source io.Reader) {
    buf := make([]byte, 32*1024)
    for {
        n, _ := source.Read(buf)
        for _, client := range clients {
            wn, e := client.conn.Write(buf[0:n])
            // blocks here for all clients if one of clients stops reading
        }
    }
}

此版本的问题是当一个客户端停止读取但未关闭连接时,对 Write() 的调用开始阻塞。在 goroutine 中包装对 Write() 的调用(在客户端上使用互斥锁)没有帮助 - 存在与 channel 相同的延迟(下一个示例),此外 go 不保证 goroutine 的执行顺序。

我试过这样修复它:

        for _, client := range clients {
            client.conn.SetWriteDeadline(time.Now().Add(1 * time.Millisecond))
            wn, e := client.conn.Write(buf[0:n])
        }

它有助于阻止,但缓慢的客户端无法及时读取,增加超时 - 返回延迟。

我也试过这样的:

...
var clients []*client
func new_client(conn net.Conn) {
    client := &client{
        buf_chan: make(chan []byte, 100),
    }
    clients = append(clients, client)
    for {
        buf <- client.buf_chan
        n, e := client.conn.Write(buf)
    }
}
...
func stream(source io.Reader) {
    buf := make([]byte, 32*1024)
    for {
        n, _ := source.Read(buf)
        for _, client := range clients {
            client.buf_chan <- buf[0:n]
        }
    }
}

但在这个版本中 - 在发送到 channel 和另一端接收之间存在一些延迟,因此播放器中的视频流开始出现延迟和滞后。

也许对 go 中的一些包有建议,或者为这类任务设计模式?

感谢您的帮助!

最佳答案

在 channel 版本中,慢客户端也可能会增加延迟。因为一个慢客户端可以使它的 buf_chan 满,然后写入它的 buf_chan 会阻塞。 Wrapper select 可以避免:

select {
case client.buf_chan <- buf[0:n]:
default:
//handle slow client ...    
}

关于go - 在golang中缓存网络流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27575515/

相关文章:

mongodb - 如何通过MongoDB同步在两个不同服务器上运行的两个应用

azure - 为什么这个简单的 Go 服务器不在 Azure 应用服务中运行?

go - 将 go-pg 查询转换为纯 sql

ios - 将可达性与 ReactiveCocoa 集成?

ios - 使用多点连接框架并保存发现的附近设备

go - int 和 C.int 之间有什么区别?

c - 在 header 结构之后访问数据

python - 为什么 PIPE 文件的 readline() 如此慢?

video - 来自静态图像和音频的 ffmpeg 直播

使用 H.264(带音频)的 FFmpeg 流式传输 - Red5 媒体服务器(Ubuntu 操作系统)