sockets - ZMQ sockets缺乏线程安全如何处理?

标签 sockets go zeromq

我已经在一些 Python 应用程序中使用 ZMQ 一段时间了,但直到最近我才决定在 Go 中重新实现其中一个,并且我意识到 ZMQ 套接字不是线程安全的。

原始的 Python 实现使用如下所示的事件循环:

while running:
    socks = dict(poller.poll(TIMEOUT))
    if socks.get(router) == zmq.POLLIN:
        client_id = router.recv()
        _ = router.recv()
        data = router.recv()
        requests.append((client_id, data))

    for req in requests:
        rep = handle_request(req)
        if rep:
            replies.append(rep)
            requests.remove(req)

    for client_id, data in replies:
        router.send(client_id, zmq.SNDMORE)
        router.send(b'', zmq.SNDMORE)
        router.send(data)
        del replies[:]

问题是回复可能在第一次通过时还没有准备好,所以每当我有待处理的请求时,我必须以非常短的超时时间进行轮询,否则客户端等待的时间会超过他们应该等待的时间,应用程序就会结束使用大量 CPU 进行轮询。

当我决定用 Go 重新实现它时,我认为它会这么简单,通过在轮询上使用无限超时来避免这个问题:

for {
    sockets, _ := poller.Poll(-1) 
    for _, socket := range sockets {
        switch s := socket.Socket; s {
        case router:
            msg, _ := s.RecvMessage(0)
            client_id := msg[0]
            data := msg[2]
            go handleRequest(router, client_id, data)                
        }
    }
}

但只有当我连接了一个客户端或轻负载时,这种理想的实现才有效。在重负载下,我在 libzmq 中出现随机断言错误。我尝试了以下方法:

  1. zmq4 docs 之后我尝试在所有套接字操作上添加 sync.Mutex 和锁定/解锁。它失败。我认为这是因为 ZMQ 使用自己的线程进行刷新。

  2. 创建一个用于轮询/接收的 goroutine 和一个用于发送的 goroutine,并以与我在 Python 版本中使用 req/rep 队列相同的方式使用 channel 。它失败了,因为我仍在共享套接字。

  3. 与 2 相同,但设置 GOMAXPROCS=1。它失败了,并且吞吐量非常有限,因为回复被阻止,直到 Poll() 调用返回。

  4. 使用 2 中的 req/rep channel ,但使用 runtime.LockOSThread 将所有套接字操作保持在与套接字相同的线程中。有和上面一样的问题。它没有失败,但吞吐量非常有限。

  5. 与 4 相同,但使用 Python 版本的轮询超时策略。它可以工作,但存在与 Python 版本相同的问题。

  6. 共享上下文而不是套接字,并在单独的 goroutine 中创建一个用于发送的套接字和一个用于接收的套接字,与 channel 通信。它有效,但我必须重写客户端库以使用两个套接字而不是一个。

  7. 摆脱 zmq 并使用线程安全的原始 TCP 套接字。它工作得很好,但我还必须重写客户端库。

所以,看起来 6 是 ZMQ 真正打算使用的方式,因为这是我让它与 goroutines 无缝工作的唯一方法,但我想知道是否还有其他我没有尝试过的方法。有什么想法吗?


更新

有了这里的答案,我意识到我可以向轮询器添加一个 inproc PULL 套接字,然后让一个 goroutine 连接并推送一个字节来打破无限等待。它不像这里建议的解决方案那么通用,但它可以工作,我什至可以将它移植到 Python 版本。

最佳答案

opened an issue a 1.5年前介绍一个端口https://github.com/vaughan0/go-zmq/blob/master/channels.go到 pebbe/zmq4。最终作者决定反对它,但我们已经在生产中(在非常繁重的工作负载下)使用它很长时间了。

这是一个 gist必须添加到 pebbe/zmq4 包的文件的一部分(因为它向套接字添加了方法)。这可以重写为 Socket 接收器上的方法将 Socket 作为参数,但由于我们无论如何都要提供代码,所以这是一种简单的前进方式。

基本用法是像平常一样创建您的Socket(例如将其命名为s)然后您可以:

channels := s.Channels()
outBound := channels.Out()
inBound := channels.In()

现在你有两个 [][]byte 类型的 channel ,你可以在 goroutine 之间使用,但是一个 goroutine - 在 channel 抽象中管理,负责管理 Poller 并与套接字通信。

关于sockets - ZMQ sockets缺乏线程安全如何处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36437799/

相关文章:

java - 客户端未连接到服务器

java - Log4j2 中的 Socket Appender 出现错误

casting - 是否支持显式原始类型转换?

http - 使用 ioutil : returning array of (ASCII? ) 来自 http 调用的数字进行输出

c# - 为什么 Mono 上的 NetMQ DealerSocket 不向 Debian Wheezy 上的服务器发送消息,但在 Windows 上却发送消息?

python - 安装 ZeroMQ 时出错

java - 如何使用套接字与socketio交互

c++ - 利用 poll() 函数(用户空间)的其他方法?

linux - 为 OS X 编译的 Golang 可执行扩展

python - 多核 ZeroMQ?