twitter - Twitter 流媒体的 Kafka Consumer 弃用错误

标签 twitter apache-kafka

我一直在研究 Kafka Twitter 流式提要数据。

我正在关注以下链接中的示例: http://www.hahaskills.com/tutorials/kafka/Twitter_doc.html

我能够使用生产者代码并且它工作正常。能够获取 Twitter feed 并将其发送到 Kafka Producer。

我无法使用消费者代码,因为它已被许多 API 抛出为已弃用的错误。

这是消费者代码:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
//import kafka.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
//import kafka.consumer.KafkaStream;
//import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;


//import org.apache.kafka.clients.producer.KafkaProducer;

public class KafkaConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public KafkaConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");

        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

        this.topic = topic;
    }

    public void testConsumer() {

     System.out.println("Test Con called");

        Map<String, Integer> topicCount = new HashMap<>();

        topicCount.put(topic, 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);

        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);

        System.out.println("For");

        for (final KafkaStream stream : streams) {

            ConsumerIterator<byte[], byte[]> it = stream.iterator();

            System.out.println("Size"+it.length());

            while (it.hasNext()) {
                System.out.println("Stream");
                System.out.println("Message from Single Topic: " + new String(it.next().message()));
            }
        }

        if (consumer != null) {
            consumer.shutdown();
        }
    }

    public static void main(String[] args) {

     System.out.println("Started");
     String topic="twittertopic";
     KafkaConsumer simpleTWConsumer = new KafkaConsumer("localhost:XXXX", "testgroup", topic);
     simpleTWConsumer.testConsumer();
     System.out.println("End");
    }    
}

它抛出错误:ConsumerConnector、ConsumerIterator、KafkaStream 已弃用。

ConsumerConfig 不可见。

此示例代码是否有固定版本(twitter 的 Kafka 消费者)?

最佳答案

您正在遵循的教程非常旧,它使用已弃用的旧 Scala Kafka 客户端,请参阅 http://kafka.apache.org/documentation/#legacyapis

已弃用的类是:

  • kafka.consumer.*kafka.javaapi.consumer 而是使用 org.apache.kafka.clients.consumer 下较新的 Java Consumer .*

  • kafka. Producer.*kafka.javaapi. Producer 而是使用 org.apache.kafka.clients. Producer 下较新的 Java Producer .*

除了使用已弃用的类之外​​,您的代码大部分都是正确的,我只需要修复一些导入。请参阅下面的固定版本。使用它,我能够使用我为名为 twittertopic 的主题生成的消息。

package example;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class MyConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public MyConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        this.topic = topic;
    }

    public void testConsumer() {

        Map<String, Integer> topicCount = new HashMap<>();
        topicCount.put(topic, 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);

        for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                System.out.println("Message from Single Topic: " + new String(it.next().message()));
            }
        }

        if (consumer != null) {
            consumer.shutdown();
        }
    }

    public static void main(String[] args) {
        System.out.println("Started");
        String topic = "twittertopic";
        MyConsumer simpleTWConsumer = new MyConsumer("localhost:2181", "testgroup", topic);
        simpleTWConsumer.testConsumer();
        System.out.println("End");
    }
}

虽然可以使用上面的代码,但下一个主要 Kafka 版本可能会删除当前已弃用的类,因此您不应该使用它们编写新逻辑。

相反,您应该开始使用 Java 客户端,您可以使用 Github 上提供的示例:https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples

使用新的 Java Consumer,您的逻辑将如下所示:

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MyConsumer {

    static final String TOPIC = "twittertopic";
    static final String GROUP = "testgroup";

    public static void main(String[] args) {
        System.out.println("Started");

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", GROUP);
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);) {
            consumer.subscribe(Arrays.asList(TOPIC));

            for (int i = 0; i < 1000; i++) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));
                System.out.println("Size: " + records.count());
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received a message: " + record.key() + " " + record.value());
                }
            }
        }
        System.out.println("End");
    }

}

关于twitter - Twitter 流媒体的 Kafka Consumer 弃用错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49073518/

相关文章:

javascript - Twitter 小部件仅在 HTML 上显示链接

java - Kafka Streams - 按时间戳/序列保存消息?

apache-kafka - 如何运行模式注册表的两个实例

python - 如何将流信息从 stdout 传输到 python 中的 mySQLdb?

node.js - 在服务器和 Git Repo 上使用应用程序 key 的最佳实践

apache-kafka - 卡夫卡流 : RocksDB TTL

java - 如何使用 danielwegener Kafka 附加程序在 Kafka 消息中自定义 ZonedDatetime

apache-spark - Spark 提交失败,并出现 Spark Streaming workdcount python 代码

python - Tweepy:带有 'Bad Authentication data' 错误的简单脚本

python - tweepy twitter api 流状态对象与 api 搜索状态对象 - timestamp_ms/created_at 区别?