java - Spring 集成卡夫卡消费者

标签 java spring spring-integration apache-kafka

我是 kafka 的新手,如果我遗漏了什么,我深表歉意。

我正在尝试使用来自现有主题的消息。

我从this得到了Spring Integration Basic Kafka Example的代码链接。

我的代码目前看起来像这样:

@SpringBootApplication
public class Application {

    @Value("${kafka.topic}")
    private String topic;

    @Value("${kafka.messageKey}")
    private String messageKey;

    @Value("${kafka.broker.address}")
    private String brokerAddress;

    @Value("${kafka.zookeeper.connect}")
    private String zookeeperConnect;

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context
                = new SpringApplicationBuilder(Application.class)
                .web(false)
                .run(args);
        /*MessageChannel toKafka = context.getBean("toKafka", MessageChannel.class);
        for (int i = 0; i < 1; i++) {
            toKafka.send(new GenericMessage<String>("foo" + i));
        }*/
        PollableChannel fromKafka = context.getBean("received", PollableChannel.class);
        Message<?> received = fromKafka.receive();
        while (received != null) {
            System.out.println(received);
            received = fromKafka.receive();
        }
        context.close();
        System.exit(0);
    }

    /*@ServiceActivator(inputChannel = "toKafka")
    @Bean
    public MessageHandler handler() throws Exception {
        KafkaProducerMessageHandler<String, String> handler =
                new KafkaProducerMessageHandler<>(kafkaTemplate());
        handler.setTopicExpression(new LiteralExpression(this.topic));
        handler.setMessageKeyExpression(new LiteralExpression(this.messageKey));
        return handler;
    }*/

    /*@Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }*/

    /*@Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        //props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }*/

    @Bean
    public KafkaMessageListenerContainer<String, String> container() throws Exception {
        return new KafkaMessageListenerContainer<>(consumerFactory(), new TopicPartition(this.topic, 0));
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
        //props.put(ConsumerConfig.GROUP_ID_CONFIG, "siTestGroup");
        //props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        //props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        //props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public KafkaMessageDrivenChannelAdapter<String, String>
                adapter(KafkaMessageListenerContainer<String, String> container) {
        KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
                new KafkaMessageDrivenChannelAdapter<>(container);
        kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
        return kafkaMessageDrivenChannelAdapter;
    }

    @Bean
    public PollableChannel received() {
        return new QueueChannel();
    }

    /*@Bean
    public TopicCreator topicCreator() {
        return new TopicCreator(this.topic, this.zookeeperConnect);
    }*/

    /*public static class TopicCreator implements SmartLifecycle {

        private final String topic;

        private final String zkConnect;

        private volatile boolean running;

        public TopicCreator(String topic, String zkConnect) {
            this.topic = topic;
            this.zkConnect = zkConnect;
        }

        @Override
        public void start() {
            ZkUtils zkUtils = new ZkUtils(new ZkClient(this.zkConnect, 6000, 6000,
                ZKStringSerializer$.MODULE$), null, false);
            try {
                if (!AdminUtils.topicExists(zkUtils, topic))
                    AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties());
            }
            catch (TopicExistsException e) {
                // no-op
            }
            this.running = true;
        }

        @Override
        public void stop() {
        }

        @Override
        public boolean isRunning() {
            return this.running;
        }

        @Override
        public int getPhase() {
            return Integer.MIN_VALUE;
        }

        @Override
        public boolean isAutoStartup() {
            return true;
        }

        @Override
        public void stop(Runnable callback) {
            callback.run();
        }

    }*/

}

我收到以下错误:

16:22:59.725 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
    at java.lang.Thread.run(Unknown Source)
16:23:00.229 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
    at java.lang.Thread.run(Unknown Source)
16:23:00.732 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
    at java.lang.Thread.run(Unknown Source)
16:23:01.235 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
    at java.lang.Thread.run(Unknown Source)
16:23:01.739 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
    at java.lang.Thread.run(Unknown Source)
16:23:02.243 [container-kafka-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 1921605, only 49 bytes available
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:281)
    at java.lang.Thread.run(Unknown Source)

我没有从日志中获取信息。

感谢您的帮助:)

编辑:

添加pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-dependencies</artifactId>
    <version>1.4.0.BUILD-SNAPSHOT</version>
  </parent>
  <groupId>org.springframework.integration.samples</groupId>
  <artifactId>kafka</artifactId>
  <version>4.3.0.BUILD-SNAPSHOT</version>
  <name>Apache Kafka Sample</name>
  <description>Apache Kafka Sample</description>
  <url>http://projects.spring.io/spring-integration</url>
  <organization>
    <name>SpringIO</name>
    <url>https://spring.io</url>
  </organization>
  <licenses>
    <license>
      <name>The Apache Software License, Version 2.0</name>
      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
      <distribution>repo</distribution>
    </license>
  </licenses>
  <developers>
    <developer>
      <id>garyrussell</id>
      <name>Gary Russell</name>
      <email>grussell@pivotal.io</email>
      <roles>
        <role>project lead</role>
      </roles>
    </developer>
    <developer>
      <id>markfisher</id>
      <name>Mark Fisher</name>
      <email>mfisher@pivotal.io</email>
      <roles>
        <role>project founder and lead emeritus</role>
      </roles>
    </developer>
    <developer>
      <id>ghillert</id>
      <name>Gunnar Hillert</name>
      <email>ghillert@pivotal.io</email>
    </developer>
    <developer>
      <id>abilan</id>
      <name>Artem Bilan</name>
      <email>abilan@pivotal.io</email>
    </developer>
  </developers>
  <scm>
    <connection>scm:git:scm:git:git://github.com/spring-projects/spring-integration-samples.git</connection>
    <developerConnection>scm:git:scm:git:ssh://git@github.com:spring-projects/spring-integration-samples.git</developerConnection>
    <url>https://github.com/spring-projects/spring-integration-samples</url>
  </scm>
  <dependencies>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
      <version>4.3.0.M1</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.hamcrest</groupId>
      <artifactId>hamcrest-all</artifactId>
      <version>1.3</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-test</artifactId>
      <version>4.2.5.RELEASE</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-integration</artifactId>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-kafka</artifactId>
      <version>2.0.0.M1</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka-test</artifactId>
      <version>1.0.0.M2</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.mockito</groupId>
      <artifactId>mockito-core</artifactId>
      <version>1.9.5</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
  <repositories>
    <repository>
      <id>repo.spring.io.milestone</id>
      <name>Spring Framework Maven Milestone Repository</name>
      <url>https://repo.spring.io/libs-milestone</url>
    </repository>
  </repositories>
  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project>

最佳答案

该版本的示例应用程序需要 0.9 代理 - 请参阅 this question及其指向 Kafka docs 的链接.

该示例应用的早期版本适用于 0.8 代理。

您需要 this commit 之前的版本我认为我们在 github 中没有标签,但是 this is the previous version适用于 0.8。

编辑

使用0.8客户端版本,需要改一下这段代码...

BrokerAddressListConfiguration configuration = new BrokerAddressListConfiguration(
            BrokerAddress.fromAddress(this.brokerAddress));

...到...

BrokerAddressListConfiguration configuration = new BrokerAddressListConfiguration(
            BrokerAddress.fromAddress(this.firstBrokerAddress), 
            BrokerAddress.fromAddress(this.secondBrokerAddress));

即提供一组 BrokerAddress 对象。

对于 0.9 客户端,可以使用一个简单的以逗号分隔的 host:port 对列表

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);

关于java - Spring 集成卡夫卡消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36851538/

相关文章:

Java 空指针异常 - 堆栈跟踪 Null

java - logback FileAppender -> AsyncAppender 刷新信号

java - Spring 启动 : Autowired fields are none

java - 使用 AOP 在选择性方法之前调用公共(public)操作

java - 具有两个来源的 Spring 集成流程

数组方法参数的 Spring 表达式语言 (SpEL)

java - 使用 Maven 为 OSGi 获取 "wrap"jar 的好方法

java - 创建一个不可变的类

java - 在服务层获取当前用户

spring-integration - 如何在没有 'input-channel' 的情况下在 XML 中声明可重用转换器?