java - 主题之间的过滤

标签 java apache-kafka apache-kafka-streams

我的主题中有 1,000 条记录。我正在尝试根据薪资过滤从输入主题到输出主题的记录。

例如:我想要工资高于3万的人的记录。
我正在尝试使用 Java 来使用 KSTREAMS 来实现此目的。

记录为文本格式(逗号分隔),例如:

first_name, last_name, email, gender, ip_address, country, salary
Redacted,Tranfield,user@example.com,Female,45.25.XXX.XXX,Russia,$12345.01
Redacted,Merck,user@example.com,Male,236.224.XXX.XXX,Belarus,$54321.96
Redacted,Kopisch,user@example.com,Male,61.36.XXX.XXX,Morocco,$12345.05
Redacted,Edds,user@example.com,Male,6.87.XXX.XXX,Poland,$54321.72
Redacted,Alston,user@example.com,Female,56.146.XXX.XXX,Indonesia,$12345.16
...

这是我的代码:

public class StreamsStartApp {
public static void main(String[] args) {
System.out.println();
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-starter-app");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,   Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();

// Stream from Kafka topic
KStream<Long, Long> newInput = builder.stream("word-count-input");
Stream<Long, Long> usersAndColours = newInput
// step 1 - we ensure that a comma is here as we will split on it
.filter(value -> value.contains(",")
// step 2 - we select a key that will be the user id
.selectKey((key, value) -> value.split(",")[6])

// step 3 - got stuck here. 
// .filter(key -> key.value[6] > 30000 
// .selectKey((new1, value1) -> value1.split)(",")[3])
//  .filter((key, value) -> key.greater(10));
//    .filter((key, value) -> key > 10);
// .filter(key -> key.getkey().intValue() > 10);
usersAndColours.to("new-output");
Runtime.getRuntime().addShutdownHook(new Thread(streams::close))  

在上面靠近步骤 1 的代码中,我使用“,”分隔了示例数据。
在第 2 步中,我选择了一个字段,即:工资字段作为键。
现在,在第 3 步中,我尝试使用薪资字段来过滤数据。
我尝试了一些评论的方法,但没有任何效果。
任何想法都会有帮助。

最佳答案

首先,你的键和值都是 String serdes,而不是 Long,所以 KStream<Long, Long>是不正确的。

value.split(",")[6]只是一个字符串,而不是一个 double 。 (或长整型,因为有十进制值)

您需要删除 $从您的列中并将字符串解析为 Double,然后您可以对其进行过滤。也不是key.value[6]因为你的键不是一个带有值字段的对象。

而且您可能应该将电子邮件作为 key ,而不是工资,如果您甚至需要 key ,那就是

实际上,您可以在一行中完成此操作(为了便于阅读,这里制作了两行)

newInput.filter(value -> value.contains(",")  && 
    Double.parseDouble(value.split(",")[6].replace("$", "")) > 30000);

关于java - 主题之间的过滤,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49160458/

相关文章:

java - 如果我在kafka流中设置commit.interval.ms =的值,它是否能够提交偏移量?

apache-kafka - 跨多个用户扩展 Kafka 流应用程序

java - Spark GC 时间非常长导致任务执行缓慢

java - 复制数据库可以在模拟器上运行,但不能在设备上运行

c# - 更好地使用构造函数或方法工厂模式?

node.js - Nodejs kafka消费者无限循环

docker - 在 docker swarm 上搭建多节点 Kafka 集群

apache-kafka - 消费群体如何在kafka中运作?

docker - 如何限制 kafka-streams 中的rocksdb内存使用

java - JFrame 不添加来自单独类的图形