java - Scala - 如何过滤 KStream(Kafka Streams)

标签 java scala filter apache-kafka-streams

我是 Scala 新手,我正在尝试根据第二个组件字段过滤 KStream[String, JsonNode]。

作为示例,工作的 Java 代码如下:

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
...
import com.fasterxml.jackson.databind.JsonNode;
...
...
final KStream<String, JsonNode> source = streamsBuilder.stream(inputTopic,
                    Consumed.with(Serdes.String(), jsonSerde));


// filter and producer preprocessed
source.filter((k, v) -> v.get("total_cost").asDouble() > 0 && v.get("num_items").asInt() > 0)
    .to(outputTopic, Produced.with(Serdes.String(), jsonSerde));

我已经尝试过这个:

import org.apache.kafka.streams.kstream.{Produced,Consumed,KStream};
import org.apache.kafka.streams.StreamsBuilder;
...
import com.fasterxml.jackson.databind.JsonNode;
...
var source:KStream[String, JsonNode] = streamsBuilder.stream(inputTopic, Consumed.`with`(Serdes.String(), jsonSerde));

source.filter({
  case (k:String ,v:JsonNode) => 
    (v.get("total_cost").asDouble() > 0 && v.get("num_items").asInt() > 0)
  })
  .to(outputTopic, Produced.`with`(Serdes.String(), jsonSerde));

在上面的尝试中我得到:

Description Resource Path Location Type missing parameter type for expanded function The argument types of an anonymous function must be fully known. (SLS 8.5) Expected type was: org.apache.kafka.streams.kstream.Predicate[? >: String, ? >: com.fasterxml.jackson.databind.JsonNode]

我也尝试过这个:

source.filter((_._2.get("total_cost").asDouble() > 0 && _._2.get("num_items").asInt() > 0))
  .to(outputTopic, Produced.`with`(Serdes.String(), jsonSerde));

我如何在 Scala 中过滤这个对象? 提前致谢。

最佳答案

org.apache.kafka.streams.kstream.Predicate 来自 JavaAPI,在 Scala 2.11 (我假设你使用它)你必须显式实现接口(interface),因此:

source.filter(new Predicate[String, JsonNode]() {
  override def test(k: String, v: JsonNode): Boolean = {
    v.get("total_cost").asDouble() > 0 && v.get("num_items").asInt() > 0
  }
}).to(outputTopic, Produced.`with`(Serdes.String(), jsonSerde));

应该可以。

有关SAM(单一抽象方法)的更多信息,请参见 here

请注意,您不必使用 Java API - 有一流的 Scala API .

关于java - Scala - 如何过滤 KStream(Kafka Streams),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60882874/

相关文章:

java - 如何从java中的父类(super class)方法停止/退出子类方法的执行

Java 接口(interface)特征

java - 从 Executor 创建 ExecutorService

javascript - ExtJS 网格过滤器没有出现

javascript - 为什么使用数组过滤器的 Promise.all 会产生空数组?

javascript - 使用用户输入的维度根据每个元素中的字符数过滤数组

java - 使用 GSON 解析 Json 但列表始终为空

java - Java 中的异常处理

java - 如何在 Scala 中使用 Java lambda

scala - Scala 中的特征和抽象方法重写