ruby - RabbitMQ/兔子 : subscribe block not called if within a thread

标签 ruby multithreading rabbitmq bunny

如果在线程中,我无法获取要执行的队列订阅 block 。

来自 rubybunny/exchanges 的示例有效,正如预期的那样。但是,如果与线程中的消费者部分相适应,则订阅者 block 似乎不会执行。

我尝试了几种简单的变体,包括设置共享变量标志,但都没有成功。

我错过了什么?

代码
#!/usr/bin/env ruby
require "bunny"

quit = false

consumer = Thread.new do
  puts "consumer start"

  cnx = Bunny.new
  cnx.start
  cn  = cnx.create_channel
  ex = cn.topic("weathr", :auto_delete => true)

  q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
  q.subscribe do |delivery_info, properties, payload|
    puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
  end

  loop {
    sleep 1
    break if quit
  }

  cnx.close
  puts "consumer done"
end

connection = Bunny.new
connection.start
connection  = connection.create_channel
exchange = connection.topic("weathr", :auto_delete => true)
exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego").
  publish("Berkeley update",         :routing_key => "americas.north.us.ca.berkeley").
  publish("San Francisco update",    :routing_key => "americas.north.us.ca.sanfrancisco").
  publish("New York update",         :routing_key => "americas.north.us.ny.newyork").
  publish("São Paolo update",        :routing_key => "americas.south.brazil.saopaolo").
  publish("Hong Kong update",        :routing_key => "asia.southeast.hk.hongkong").
  publish("Kyoto update",            :routing_key => "asia.southeast.japan.kyoto").
  publish("Shanghai update",         :routing_key => "asia.southeast.prc.shanghai").
  publish("Rome update",             :routing_key => "europe.italy.roma").
  publish("Paris update",            :routing_key => "europe.france.paris")

sleep 5
connection.close

quit = true
consumer.join
实际产量
consumer start
consumer done
预期输出
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
An update for North America: New York update, routing key is americas.north.us.ny.newyork
consumer done

最佳答案

线程的订阅 block 没有执行,因为队列根本没有接收到任何消息。详细地说,在本例中,队列最终是在消息发布后创建的。

这可以通过将消息切换为 :mandatory => true 并使用 Bunny::Exchange#on_return 来可视化:

代码
#!/usr/bin/env ruby
require "bunny"

quit = false

connection = Bunny.new
connection.start

consumer = Thread.new do
  puts "consumer start"
  cn  = connection.create_channel
  ex = cn.topic("weathr", :auto_delete => true)

  q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
  q.subscribe do |delivery_info, properties, payload|
    puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
  end

  sleep 1 while !quit

  cn.close
  puts "consumer done"
end

channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
  puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
end

exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
  publish("Berkeley update",         :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
  publish("San Francisco update",    :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
  publish("New York update",         :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
  publish("São Paolo update",        :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
  publish("Hong Kong update",        :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
  publish("Kyoto update",            :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
  publish("Shanghai update",         :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
  publish("Rome update",             :mandatory => true, :routing_key => "europe.italy.roma").
  publish("Paris update",            :mandatory => true, :routing_key => "europe.france.paris")

channel.close
sleep 5

quit = true
consumer.join
connection.close
输出
consumer start
San Diego update was returned! reply_code = 312, reply_text = NO_ROUTE
Berkeley update was returned! reply_code = 312, reply_text = NO_ROUTE
San Francisco update was returned! reply_code = 312, reply_text = NO_ROUTE
New York update was returned! reply_code = 312, reply_text = NO_ROUTE
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done

正如我们所见,所有消息最终都会以 NO_ROUTE 返回。

在发布消息之前强制队列(和路由)存在的简单解决方案:

#!/usr/bin/env ruby
require "bunny"

quit = false
consumer_queued = false

connection = Bunny.new
connection.start

consumer = Thread.new do
  puts "consumer start"
  cn  = connection.create_channel
  ex = cn.topic("weathr", :auto_delete => true)

  q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
  consumer_queued = true
  q.subscribe do |delivery_info, properties, payload|
    puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
    $stdout.flush
  end

  sleep 1 while !quit

  cn.close
  puts "consumer done"
end

# ensure queue is ready
sleep 0.125  while !consumer_queued

channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
  puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
  $stdout.flush
end

exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
  publish("Berkeley update",         :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
  publish("San Francisco update",    :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
  publish("New York update",         :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
  publish("São Paolo update",        :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
  publish("Hong Kong update",        :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
  publish("Kyoto update",            :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
  publish("Shanghai update",         :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
  publish("Rome update",             :mandatory => true, :routing_key => "europe.italy.roma").
  publish("Paris update",            :mandatory => true, :routing_key => "europe.france.paris")

channel.close
sleep 5

quit = true
consumer.join
connection.close
输出(附返回通知)
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: New York update, routing key is americas.north.us.ny.newyork
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done

收到预期消息并返回其余消息。

关于ruby - RabbitMQ/兔子 : subscribe block not called if within a thread,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48674793/

相关文章:

mysql - rails 仅选择 2 列错误

iphone - GCD 获取队列名称/标签

c# - 我的线程代码的执行时间没有受益于具有 Join 的线程

python - Celery未注册任务KeyError

c++ - AMQP-CPP > 处理程序中的错误文件描述符

c# - RabbitMQ:将消息从一台电脑发送到另一台电脑

ruby-on-rails - 使用 ruby​​ 生成值 1 到 31 的哈希值?

javascript - 以自定义指定格式生成 .xlsx 文件

ruby-on-rails - 远程服务器上的aws-sdk gem 'cannot load such file'错误但不是本地

c++ - MPI 大小和 OpenMP 线程数