Golang amqp 重新连接

标签 go connection amqp

我想测试与rabbitmq 服务器的重启连接。 就写了个小脚本来测试。 http://play.golang.org/p/l3ZWzG0Qqb 但它不起作用。

在第 10 步中,我关闭了 channel 和连接。并再次打开它们。并重新创建 chan amqp.Confirmation ( :75) 。并继续循环。 但在那之后,chan 确认没有任何返回。

UPD:代码在这里。

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "os"
    "time"
)

const SERVER = "amqp://user:pass@localhost:5672/"
const EXCHANGE_NAME = "publisher.test.1"
const EXCHANGE_TYPE = "direct"
const ROUTING_KEY = "publisher.test"

var Connection *amqp.Connection
var Channel *amqp.Channel

func setup(url string) (*amqp.Connection, *amqp.Channel, error) {
    conn, err := amqp.Dial(url)
    if err != nil {
        return nil, nil, err
    }

    ch, err := conn.Channel()
    if err != nil {
        return nil, nil, err
    }

    return conn, ch, nil
}

func main() {
    url := SERVER

    Connection, Channel, err := setup(url)
    if err != nil {
        fmt.Println("err publisher setup:", err)
        return
    }

    confirms := Channel.NotifyPublish(make(chan amqp.Confirmation, 1))
    if err := Channel.Confirm(false); err != nil {
        log.Fatalf("confirm.select destination: %s", err)
    }

    for i := 1; i <= 3000000; i++ {
        log.Println(i)

        if err != nil {
            fmt.Println("err consume:", err)
            return
        }

        if err := Channel.Publish(EXCHANGE_NAME, ROUTING_KEY, false, false, amqp.Publishing{
            Body: []byte(fmt.Sprintf("%d", i)),
        }); err != nil {
            fmt.Println("err publish:", err)
            log.Printf("%+v", err)
            os.Exit(1)
            return
        }

        // only ack the source delivery when the destination acks the publishing
        confirmed := <-confirms
        if confirmed.Ack {
            log.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
        } else {
            log.Printf("failed delivery of delivery tag: %d", confirmed.DeliveryTag)
            // TODO. Reconnect will be here
        }

        if i == 10 {
            Channel.Close()
            Connection.Close()
            while := true
            for while {
                log.Println("while")
                time.Sleep(time.Second * 1)
                Connection, Channel, err = setup(url)
                if err == nil {
                    while = false
                    confirms = Channel.NotifyPublish(make(chan amqp.Confirmation, 1))
                    log.Printf("%+v", confirms)
                }
            }
        }
        time.Sleep(time.Millisecond * 300)
    }

    os.Exit(1)
}

最佳答案

您应该将 channel 置于确认模式。通过调用 channel.Confirm()方法。 关闭连接后,甚至在同一连接上获得新 channel 后,您应该再次调用 Confirm() 方法,因为该 channel 与旧 channel 不同,所有新 channel 默认不发送确认。

关于Golang amqp 重新连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36740066/

相关文章:

go - 生产中的 log.SetFlags(log.LstdFlags | log.Lshortfile)

go - 创建/获取自定义 kubernetes 资源

linux - 树莓派2 : routing table has no the specified gateway

go - 是否可以向 protobuf 服务添加自定义方法类型?

Golang 单元测试矩阵在 SonarQube 仪表板上不可见

pointers - 我如何比较 Go 中的指针?

java - 从 Java 应用程序创建到 2 个数据库的连接

C++客户端socket同时连接

node.js - 如何向除发布者之外的所有订阅者发送消息,发布者也是同一 rabbitMQ 队列上的监听器

spring - RabbitTemplate接收消息并重新排队