apache-spark - 从 RDD 中提取特定的主题值

标签 apache-spark apache-kafka

我正在尝试从以下 kafka 主题中阅读 spark :

Map<TopicAndPartition, Long> map = new HashMap<>();
        map.put(new TopicAndPartition("A", 0), 1L);
        map.put(new TopicAndPartition("B", 0), 1L);

        JavaInputDStream<Map.Entry> topicMessages = KafkaUtils.createDirectStream(
                                                                            jssc,
                                                                            String.class,
                                                                            String.class,
                                                                            StringDecoder.class,
                                                                            StringDecoder.class,
                                                                            Map.Entry.class,
                                                                            kafkaParams,
                                                                            map,
                                                                            messageAndMetadata -> 
                                                                                new AbstractMap.SimpleEntry<>(messageAndMetadata.topic(),
                                                                                                              messageAndMetadata.message())
                                                                          );

现在 topicMessage 具有如下所示的键和值格式的所有值:

A="04/15/2015","18:44:28"
A="04/15/2015","18:44:28"
A="04/15/2015","18:44:28"
B="04/15/2016","18:44:28"
B="04/15/2014","18:44:28"  

如何提取特定主题的值(value)。
对于名为 B

的主题,如下所示
"04/15/2016","18:44:28"
"04/15/2014","18:44:28" 

最佳答案

如果您想要给定主题的行,您只需要做:

JavaPairDStream<String> rowsFromTopicB = topicMessages.filter( entry -> entry.getKey().toString().equals("B")).map(entry -> entry.getValue().toString())

关于apache-spark - 从 RDD 中提取特定的主题值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37562867/

相关文章:

apache-spark - 如何在 Spark Kafka 直接流中手动提交偏移量?

scala - Spark 数据帧 : Pivot and Group based on columns

java - 在spark-jobserver上运行基于Java的Spark作业

apache-spark - Spark 节点是否可以使用 Spark-Cassandra Connector 连接到托管在不同服务器中的 Cassandra 节点

node.js - 比较 kafka-node 和 node-rdkafka

apache-kafka - Apache Kafka 中的分区领导者是什么?

apache-kafka - Kafka - 保留期参数

tcp - 在 logstash 中以事务方式发送事件

apache-spark - docker 停止 spark 容器退出

apache-spark - Spark Streaming 崩溃到 Kafka Ran out of messages before reaching ending offset exception