Ruby Kafka 生产者 BufferUnderflow

标签 ruby apache-kafka

我想使用 ruby​​ kafka 客户端库来生成事件,但遇到了一个我不确定如何解决的问题。任何帮助,将不胜感激。

我尝试过使用 kafka-rb(acrosa、mheffner 和 bpot 分支)。问题是无论我通过图书馆发送给它什么,例如

require 'kafka'
host = 'localhost'
port = 9092
producer = Kafka::Producer.new(

        :topic => 'login',
        :host => host,
        :port => port
)
producer.send([Kafka::Message.new("aaaaa")])

我得到一个:

java.nio.BufferUnderflowException
    at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:127)
    at java.nio.ByteBuffer.get(ByteBuffer.java:675)
    at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:22)
    at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:34)
    at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:34)
    at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:34)
    at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:48)
    at kafka.network.Processor.read(SocketServer.scala:321)
    at kafka.network.Processor.run(SocketServer.scala:231)
    at java.lang.Thread.run(Thread.java:680)

在服务器上。在同一台服务器上,我可以毫无问题地通过提供的控制台生产者发送文本。

如果您以前看过此内容,我将不胜感激。 由于我对 Scala 不是很熟悉,所以我不确定问题出在哪里,但在我看来,抛出此异常的那一行与从套接字读取 clientId 有关,而且在我看来,没有从 ruby​​ 客户端发送这样的东西。

当我查看在 tcpdump 上生成的来自 kafka-rb 和提供的生产者的消息时。 ruby 的看起来更短。 此外,无论我使用 kafka-0.7 还是 0.8,我都会得到完全相同的行为。

最佳答案

我可以用 Kafka 0.8 重现你的错误,但是当我尝试这个实现时:

require 'rubygems'
require 'kafka'
producer = Kafka::Producer.new({ :host => "localhost", :port => 9092, :topic => "test" , :compression => 0 })
message = Kafka::Message.new("some random message content")
producer.send(message)

它适用于 Kafka 0.7:

consumer example

根据 Kafka 文档,您提到的客户端仅支持 0.7.x ( https://cwiki.apache.org/KAFKA/clients.html#Clients-Ruby )。 0.8 部分说:

The 0.8 release changes the protocol fairly substantially. This version hasn't been released yet. The new protocol is documented here. A number of clients are in progress, and we will update when they are complete.

所以我认为https://github.com/acrosa/kafka-rb在这种情况下将不起作用:-(。

最佳

关于Ruby Kafka 生产者 BufferUnderflow,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14756557/

相关文章:

ruby - 如何过滤 HashMap 数组

ruby-on-rails - "uninitialized constant Authlogic"

windows - 由于未加载主题,Kafka 启动失败

apache-spark - 如何在 pyspark 流应用程序中使用具有不同主题的两个不同流将数据从 Kafka 存储到 Redis?

java - Spring Cloud Dataflow Local Server需要连接zookeeper吗?

ruby-on-rails - 事件记录选择查询到哪里并加入查询

ruby-on-rails - 安装和使用 Redcarpet (rails)

ruby - 如何使用名称在参数中给出的模块扩展 ruby​​ 中的对象?

java - 热衷于在 Java 中使用 Kafka Streams 创建差异流?

elasticsearch - Kafka连接 Elasticsearch ID创建多个字段不起作用