http - 净/http : concurrency and message passing between coroutines

标签 http go websocket goroutine

我在 REST API 服务器上工作,该服务器的功能之一是能够在创建新资源或修改现有资源时通过 websocket 通知任意数量的客户端。

我有一个自定义操作路由器,用于将 URL 绑定(bind)到函数和 gorillas 的 websocket 库实现。对于 IPC,我决定依赖 channel ,因为它似乎是协程之间通信的惯用方式。它的行为也像一个管道,这是我熟悉的概念。

Create 函数的原型(prototype)如下所示:

func Create (res http.ResponseWriter, req *http.Request, userdata interface {}) (int, string, interface {})

作为 userdata 结构 PipeSet 的实例被传递。它是一个在所有协程之间共享的映射,其中键是 Pipe 的地址(指向)并且值相同。这里的基本原理是在删除时加快查找过程。

type Pipe chan string                                                           

type PipeSet struct {                                                           
    sync.Mutex                                                                  
    Pipes map [*Pipe] *Pipe                                                     
}                                                                               

func NewPipe () Pipe {                                                          
    return make (Pipe)                                                          
}                                                                               

func NewPipeSet () PipeSet {                                                    
    var newSet PipeSet                                                      
    newSet.Pipes = make (map[*Pipe] *Pipe)                                  
    return newSet                                                           
}                                                                               

func (o *PipeSet) AddPipe (pipe *Pipe) {                                        
    o.Lock ()                                                                   
    o.Pipes[pipe] = pipe                                                        
    o.Unlock ()                                                                 
}                                                                               

func (o *PipeSet) ForeachPipe (f func (pipe Pipe)) {                            
    o.Lock ()                                                                   
    for k := range (o.Pipes) {                                                  
        f (*o.Pipes[k])                                                         
    }                                                                           
    o.Unlock ()                                                                 
}                                                                               

func (o *PipeSet) DeletePipe (pipe *Pipe) {                                     
    o.Lock ()                                                                   
    delete (o.Pipes, pipe)                                                      
    o.Unlock ()                                                                 
}

当客户端通过 websocket 连接时,将创建一个新 channel (Pipe)并将其添加到共享 PipeSet。然后,如果创建了一个新资源,协程将通过整个 PipeSet 向每个 Pipe 发送消息。然后将消息转发到另一端连接的客户端。

问题区域

我无法检测客户端的 websocket 连接是否仍然存在。我需要知道这一点以确定是否应该从 PipeSet 中删除一个 Pipe。在这种情况下,我依赖于 CloseNotifier。它永远不会触发。

代码如下所示(摘录):

var upgrader = websocket.Upgrader {
    CheckOrigin: func (r *http.Request) bool { return true },
}

conn, err := upgrader.Upgrade (res, req, nil)

if err != nil {
    marker.MarkError (err)
    return http.StatusBadRequest, "", nil
}

defer conn.Close ()

exitStatus = http.StatusOK
pipe := genstore.NewPipe ()
quit := res.(http.CloseNotifier).CloseNotify ()

genStore.WSChannels.AddPipe (&pipe)

for {
    log.Printf ("waiting for a message")

    select {
        case wsMsg = <-pipe:
            log.Printf ("got a message: %s (num pipes %d)", wsMsg, len (genStore.WSChannels.Pipes))

            if err = conn.WriteMessage (websocket.TextMessage, []byte (wsMsg)); err != nil {
                marker.MarkError (err)
                goto egress
            }

        case <-quit:
            log.Printf ("quit...")
            goto egress
    }
}

egress:
genStore.WSChannels.DeletePipe (&pipe)

最佳答案

当您使用 Gorilla 将 HTTP 连接升级为 WebSocket 连接时,它会劫持该连接并且 net/http 服务器停止为其提供服务。这意味着,您不能依赖那一刻起的 net/http 事件。

检查这个:https://github.com/gorilla/websocket/issues/123

因此,您在这里可以做的是为每个新的 WebSocket 连接启动新的 goroutine,它将从此连接读取数据并在失败时将消息写入 quit channel 。

关于http - 净/http : concurrency and message passing between coroutines,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44340192/

相关文章:

go - 存储 slice 的接口(interface){}上的范围

linux - 为什么这个 "hello world"golang http 示例在 osx 上变慢

javascript - 如何仅呈现更新后的统计数据 - websockets

javascript - 如何在 Node.js 应用程序中排列文件?

http - 有没有办法在回复中请求没有文件大小的远程文件?

c# - ServiceStack - 如何在 POST 方法中强制上传文件

条件代码的 Go 编译器标志

python - 如何在 Django 2 中实现 Django-Private-Chat

http - 如何禁用 http 请求 header 中的范围选项?

实体签名的 HTTP header