我想使用 ruby kafka 客户端库来生成事件,但遇到了一个我不确定如何解决的问题。任何帮助,将不胜感激。
我尝试过使用 kafka-rb(acrosa、mheffner 和 bpot 分支)。问题是无论我通过图书馆发送给它什么,例如
require 'kafka'
host = 'localhost'
port = 9092
producer = Kafka::Producer.new(
:topic => 'login',
:host => host,
:port => port
)
producer.send([Kafka::Message.new("aaaaa")])
我得到一个:
java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:127)
at java.nio.ByteBuffer.get(ByteBuffer.java:675)
at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:22)
at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:34)
at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:34)
at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:34)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:48)
at kafka.network.Processor.read(SocketServer.scala:321)
at kafka.network.Processor.run(SocketServer.scala:231)
at java.lang.Thread.run(Thread.java:680)
在服务器上。在同一台服务器上,我可以毫无问题地通过提供的控制台生产者发送文本。
如果您以前看过此内容,我将不胜感激。 由于我对 Scala 不是很熟悉,所以我不确定问题出在哪里,但在我看来,抛出此异常的那一行与从套接字读取 clientId 有关,而且在我看来,没有从 ruby 客户端发送这样的东西。
当我查看在 tcpdump 上生成的来自 kafka-rb 和提供的生产者的消息时。 ruby 的看起来更短。 此外,无论我使用 kafka-0.7 还是 0.8,我都会得到完全相同的行为。
最佳答案
我可以用 Kafka 0.8 重现你的错误,但是当我尝试这个实现时:
require 'rubygems'
require 'kafka'
producer = Kafka::Producer.new({ :host => "localhost", :port => 9092, :topic => "test" , :compression => 0 })
message = Kafka::Message.new("some random message content")
producer.send(message)
它适用于 Kafka 0.7:
根据 Kafka 文档,您提到的客户端仅支持 0.7.x ( https://cwiki.apache.org/KAFKA/clients.html#Clients-Ruby )。 0.8 部分说:
The 0.8 release changes the protocol fairly substantially. This version hasn't been released yet. The new protocol is documented here. A number of clients are in progress, and we will update when they are complete.
所以我认为https://github.com/acrosa/kafka-rb在这种情况下将不起作用:-(。
最佳
前
关于Ruby Kafka 生产者 BufferUnderflow,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14756557/