go - 从 rabbitmq 获取已发布消息的响应。戈朗

标签 go rabbitmq rpc amqp

我已经实现了使用消息的 worker ,用它们做某种类型的工作并在队列上发布结果。
我有 api,它将接收 http 请求,向工作人员发布消息并等待工作人员发送响应。
我有两个队列,一个是 worker ,另一个是网关。
Api 将 -> 在工作队列上发布事件。使用网关队列中的事件(等待响应)
工作人员将 -> 使用工作人员队列中的事件。在网关队列上发布事件。
worker 工作得很好,我从来没有遇到过问题。所以我将专注于api,因为那里有错误。
问题:
每次我的api发布消息时。它首先会生成一些 uuid,我们会在 map 中记住这些 uuid,以便在我们使用响应时。我们可以匹配响应是哪个请求。
当我们消费消息时,第一次一切正常,我们可以将响应与请求相匹配。但是第二次,我们在 map 中找不到该响应。尽管我已经添加了它。
下面是一些代码示例。
存储,将与 map 一起使用的接口(interface)

package main

import "fmt"

type ChannelStorage interface {

    Add(uid string, message chan ResponseMessage)
    Delete(uid string)
    Get(uid string) chan ResponseMessage
}

func NewChannelStorage() ChannelStorage {
    return ChannelMapStorage{
        channelMap: make(map[string]chan ResponseMessage),
    }
}

type ChannelMapStorage struct {
    channelMap map[string]chan ResponseMessage
}

func (storage ChannelMapStorage) Add(uid string, message chan ResponseMessage) {
    fmt.Println(fmt.Sprintf("Adding Message: %s ", uid))
    storage.channelMap[uid] = message
}

func (storage ChannelMapStorage) Delete(uid string) {
    fmt.Println(fmt.Sprintf("Deleting Message: %s ", uid))
    delete(storage.channelMap, uid)
}

func (storage ChannelMapStorage) Get(uid string) chan ResponseMessage {
    fmt.Println(fmt.Sprintf("Getting Message: %s ", uid))
    return storage.channelMap[uid]
}
这是我的发布者,它将向工作队列发送事件。
package main

import (
    "fmt"
    "github.com/streadway/amqp"
)

var channel *amqp.Channel

func init()  {

    // Connect to the rabbit.
    conn, err := amqp.Dial(rabbitConfig.uri)
    if err != nil {
        panic(err)
    }

    // create channel
    channel, err = conn.Channel()
    if err != nil {
        panic(err)
    }
}

func publish(queueName string , data []byte, id string) error {

    // publish message
    return channel.Publish(
        "",            // exchange
        queueName, // routing key
        false,         // mandatory
        false,         // immediate
        amqp.Publishing{
            CorrelationId: id,
            ContentType: "text/plain",
            Body:        data,
        },
    )

}
这是发件人,这是代码的一部分,所有操作都发生在这里。
它的工作是连接到rabbitmq,消费事件。将事件与请求匹配并通知我们得到响应。
package main

import (
    "encoding/json"
    "errors"
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "time"
)

func init()  {

    conn, err := amqp.Dial(rabbitConfig.uri)

    failOnError(err, "Publisher failed to connect to the rabbitmq")

    // Create channel
    channel, err := conn.Channel()

    failOnError(err, "Publisher failed to create channel")

    // Create queue
    queue, err := channel.QueueDeclare(
        RECIEVE_QUEUE_NAME, // channelname
        true,      // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )

    failOnError(err, "Failed to create queue for consumer")

    // channel
    messages, err := channel.Consume(
        queue.Name, // queue
        "",         // consumer
        true,      // auto-ack
        false,      // exclusive
        false,      // no-local
        false,      // no-wait
        nil,        // args
    )

    failOnError(err, "Failed on consuming event")

    go func() {
        for message:= range messages {
            go handleMessage(message)
        }
    }()
}

func sendMessage(name string, id string ,requestType string) ([]byte, int, error) {

    // Create new task message
    message := TaskMessage{
        Uid: uid(),
        ReplyTo: RECIEVE_QUEUE_NAME,
        Type: requestType,
        Name: name,
        Id: id,
    }

    data ,err := json.Marshal(message)

    if err != nil {
        fmt.Println(fmt.Sprintf("Cant unmarshall message %s", err.Error()))
        return nil, 0, err
    }

    err = publish(SEND_QUEUE_NAME, data, message.Uid)

    if err != nil {
        fmt.Println(fmt.Sprintf("Cant publish message %s", err.Error()))
        return nil, 0, err
    }

    // whenever we send message, we need to add it to the waiting response channel
    rchannel := make(chan ResponseMessage)
    channelStorage.Add(message.Uid, rchannel)

    // Wait for the response
    select {

    case response := <- rchannel:

        fmt.Println(fmt.Sprintf("Sending response: %s ", message.Uid))

        data := response.Response
        code := response.StatusCode

        channelStorage.Delete(message.Uid)

        return data, code, nil

    case <-time.After(3 * time.Second):

        // remove channel from rchans
        channelStorage.Delete(message.Uid)

        // Return timeout error.
        return nil, 0, errors.New("Response timed out on rabbit.")
    }
}

func handleMessage(msg amqp.Delivery)  {

    // Parse message.
    response := &ResponseMessage{}

    // Parse response.
    err := json.Unmarshal(msg.Body, response)

    if err != nil {
        log.Printf("ERROR: fail unmarshl: %s", msg.Body)
        return
    }

    // find waiting channel(with uid) and forward the reply to it
    if channel := channelStorage.Get(response.Uid); channel != nil {
        channel <- *response
    }
}

方法 sendMessage() 将发布事件,等待消费响应。将其映射到我们的请求并返回结果。
在这里我将创建新任务
// Create new task message
    message := TaskMessage{
        Uid: uid(),
        ReplyTo: RECIEVE_QUEUE_NAME,
        Type: requestType,
        Name: name,
        Id: id,
    }

    data ,err := json.Marshal(message)

    if err != nil {
        fmt.Println(fmt.Sprintf("Cant unmarshall message %s", err.Error()))
        return nil, 0, err
    }
使用来自 publisher.go 的方法发布任务
err = publish(SEND_QUEUE_NAME, data, message.Uid)

    if err != nil {
        fmt.Println(fmt.Sprintf("Cant publish message %s", err.Error()))
        return nil, 0, err
    }
使用我们的存储接口(interface)来映射具有我们生成的唯一 ID 的响应。
rchannel := make(chan ResponseMessage)
channelStorage.Add(message.Uid, rchannel)
等待将通知的 channel 是当我们收到对已发布事件的响应时。如果我们在 3 秒内没有得到响应,我们的超时。
case response := <- rchannel:

        fmt.Println(fmt.Sprintf("Sending response: %s ", message.Uid))

        data := response.Response
        code := response.StatusCode

        channelStorage.Delete(message.Uid)

        return data, code, nil

    case <-time.After(3 * time.Second):

        // remove channel from rchans
        channelStorage.Delete(message.Uid)

        // Return timeout error.
        return nil, 0, errors.New("Response timed out on rabbit.")
    }
当我们使用消息但尝试将 id 与响应映射时,就会出现问题。它说它为我们 map 中的 id 返回 nil。
func handleMessage(msg amqp.Delivery)  {

    // Parse message.
    response := &ResponseMessage{}

    // Parse response.
    err := json.Unmarshal(msg.Body, response)

    if err != nil {
        log.Printf("ERROR: fail unmarshl: %s", msg.Body)
        return
    }

    // find waiting channel(with uid) and forward the reply to it
    if channel := channelStorage.Get(response.Uid); channel != nil {
        channel <- *response
    }
}
我的 main.go 看起来像这样,它将创建全局存储并运行 http 服务器:
package main

import (
    "fmt"
    "os"
)

var channelStorage ChannelStorage

func main()  {

    channelStorage = NewChannelStorage()

    err := SetupRouter(NewApi()).Run(":8080")

    if err != nil {
        fmt.Println(err)
        os.Exit(1)
    }
}
因为我对golang没有太多经验。问题可能会发生,因为我创建了全局存储或类似的东西。

最佳答案

首先:您的存储现在不是线程安全的,您应该使用 sync.Mutex 例如。

type ChannelMapStorage struct {
    m sync.Nutex
    channelMap map[string]chan ResponseMessage
}

func (storage *ChannelMapStorage) Add(uid string, message chan ResponseMessage) {
    storage.m.Lock()
    fmt.Println(fmt.Sprintf("Adding Message: %s ", uid))
    storage.channelMap[uid] = message
    storage.m.Unlock()
}
所有其他方法也是如此。
第二:您应该在发布之前将消息添加到存储中,如果发布失败则将其删除。
data ,err := json.Marshal(message)

if err != nil {
    fmt.Println(fmt.Sprintf("Cant unmarshall message %s", err.Error()))
    return nil, 0, err
}

rchannel := make(chan ResponseMessage)
channelStorage.Add(message.Uid, rchannel)

err = publish(SEND_QUEUE_NAME, data, message.Uid)
if err != nil {
    channelStorage.Del(message.Uid)
    fmt.Println(fmt.Sprintf("Cant publish message %s", err.Error()))
    return nil, 0, err
}

关于go - 从 rabbitmq 获取已发布消息的响应。戈朗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63189459/

相关文章:

c - 我正在将一个简单的 RNG 函数从 C 移植到 Go,结果不正确

go - 如何在 Go 中的结构内初始化嵌套映射?

go - 逃逸分析显示 channel 为泄漏参数

python - RabbitMQ:防止作业在两个不同的 worker 上同时运行

rabbitmq - 如何在 Rabbitmq 中设置消息的基本消息属性?

go - 在不下载图像的情况下确定aws s3中文件的图像大小

spring-boot - 微服务异步响应

c# - 多语言 RPC 库

c - Protocol Buffer 未知类型名称错误?

performance - 本地 IPC 的平均性能测量