go - Go-Stomp读取超时如何解决

标签 go stomp apollo

尝试使用 Go-Stomp 订阅 ActiveMQ(Apollo),但出现读取超时错误。我的应用程序应该每天 24 小时运行以处理传入的消息。

问题:

  1. 有没有办法在队列中没有更多消息的情况下保持订阅?尝试放置 ConnOpt.HeartBeat 似乎也不起作用
  2. 为什么读超时后,我好像又接受了一条消息?

以下是我的步骤:

  • 我将 1000 条消息放入输入队列进行测试
  • 运行一个订阅者,下面提供代码
  • 订阅者在 2-3 秒后阅读完 1000 条消息,看到错误“2016/10/07 17:12:44 订阅 1:/queue/hflc-in:错误消息:读取超时”。
  • 再添加 1000 条消息,但似乎订阅已经关闭,因此没有消息未被处理

我的代码:

  var(
   serverAddr   = flag.String("server", "10.92.10.10:61613", "STOMP server    endpoint")
   messageCount = flag.Int("count", 10, "Number of messages to send/receive")
   inputQ       = flag.String("inputq", "/queue/hflc-in", "Input queue")
)

var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
   stomp.ConnOpt.Login("userid", "userpassword"),
   stomp.ConnOpt.Host("mybroker"),
   stomp.ConnOpt.HeartBeat(360*time.Second, 360*time.Second), // I put this but seems no impact
}

func main() {
  flag.Parse()
  jobschan := make(chan bean.Request, 10)
  //my init setup
  go getInput(1, jobschan)
}

func getInput(id int, jobschan chan bean.Request) {
   conn, err := stomp.Dial("tcp", *serverAddr, options...)

   if err != nil {
      println("cannot connect to server", err.Error())
      return
   }
   fmt.Printf("Connected %v \n", id)

   sub, err := conn.Subscribe(*inputQ, stomp.AckClient)
   if err != nil {
     println("cannot subscribe to", *inputQ, err.Error())
     return
   }

   fmt.Printf("Subscribed %v \n", id)
   var messageCount int
   for {
    msg := <-sub.C
    //expectedText := fmt.Sprintf("Message #%d", i)
    if msg != nil {

        actualText := string(msg.Body)
        
        var req bean.Request
        if actualText != "SHUTDOWN" {
            messageCount = messageCount + 1
            var err2 = easyjson.Unmarshal([]byte(actualText), &req)
            if err2 != nil {
                log.Error("Unable unmarshall", zap.Error(err))
                println("message body %v", msg.Body) // what is [0/0]0x0 ?
            } else {
                fmt.Printf("Subscriber %v received message, count %v \n  ", id, messageCount)
                jobschan <- req
            }
        } else {
            logchan <- "got some issue"
        }
    }
   }
  }

错误:

2016/10/07 17:12:44 Subscription 1: /queue/hflc-in: ERROR message:read timeout
[E] 2016-10-07T09:12:44Z Unable unmarshall
message body %v [0/0]0x0

最佳答案

通过添加这些行解决:

在 Apollo 中,注意到队列在几秒后为空后被删除,所以在 apollo.xml 中将 auto_delete_after 设置为几个小时,例如:

<queue id="hflc-in" dlq="dlq-in" nak_limit="3" auto_delete_after="7200"/>
<queue id="hflc-log" dlq="dlq-log" nak_limit="3" auto_delete_after="7200"/>
<queue id="hflc-out" dlq="dlq-out" nak_limit="3" auto_delete_after="7200"/>

在 Go 中,注意到 go-stomp 在队列中找不到任何消息后会立即放弃,因此在 conn 选项中,添加 HeartBeat Error

var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
   //.... original configuration
   stomp.ConnOpt.HeartBeatError(360 * time.Second),
}

但是对第2题还是一头雾水。

关于go - Go-Stomp读取超时如何解决,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39914161/

相关文章:

io - 读取()函数

Angular2 与 Stomp.js

javascript - 如何将两个依赖的 GraphQL 查询与 'compose' 结合起来?

graphql - 不变违规 : Expecting a parsed GraphQL document

go - 为什么 goroutines 用缓冲 channel 解决死锁问题?

go - 为什么goroutine中的未缓冲 channel 获得了此顺序

c - 尝试扩展 golang Pigpio 包装函数 gpioWaveAddGeneric

javascript - Stilts.js 无法使用安全的 true 连接到 websocket 服务器

spring - 如何在 Spring 中从 RabbitMQ 正确获取所有队列消息?

javascript - 如何在 Nuxt.js 中捕获服务器错误,以免页面渲染崩溃? (Vue)