java - 如何根据数据将一个数据流输出到不同的输出端?

标签 java apache-flink flink-streaming

在 Apache Flink 中,我有一个元组流。让我们假设一个非常简单的 Tuple1<String> .元组可以在其值字段中具有任意值(例如“P1”、“P2”等)。可能值的集合是有限的,但我事先不知道完整的集合(所以可能有一个“P362”)。我想根据元组内部的值将该元组写入某个输出位置。所以例如我想要以下文件结构:

  • /output/P1
  • /output/P2

在文档中,我只发现了写入我事先知道的位置的可能性(例如 stream.writeCsv("/output/somewhere") ),但没有办法让数据的内容决定数据实际结束的位置。

我在文档中阅读了有关输出拆分的信息,但这似乎没有提供一种方法来按照我希望的方式将输出重定向到不同的目的地(或者我只是不明白这是如何工作的)。

这可以用 Flink API 来完成吗?如果可以,怎么做?如果没有,是否有第三方库可以做到这一点,或者我是否必须自己构建这样的东西?

最佳答案

您可以实现自定义接收器。从两者之一继承:

  • org.apache.flink.streaming.api.functions.sink.SinkFunction
  • org.apache.flink.streaming.api.functions.sink.RichSinkFunction

在你的程序中使用:

stream.addSink(SinkFunction<T> sinkFunction);

代替 stream.writeCsv("/output/somewhere")

关于java - 如何根据数据将一个数据流输出到不同的输出端?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33414141/

相关文章:

maven - 如何通过 bazel 使用/导入 Flink 的 TestHarness 类?

java - 如何在 Apache Flink 中注册 java.util.List 类型

java - Spring 3.0 JAX-WS 和/或与 Apache CXF 对比

java - 在 Java : loop variable vs enhanced for statement 中迭代数组的最快方法

java - ls 松散耦合可以通过任何其他方式来实现,而不是使用父类引用变量,通常不是专门在我的代码中?

apache-flink - Flink 自动缩放和最大并行度

apache-flink - Flink作业失败,由: java. io.IOException引起:rpc调用大小超过最大akka帧大小

java - Flink Streaming 在计算求和时抛出异常

java - JPA 查询 MONTH/YEAR 函数

apache-flink - 如何按事件时间加入实时流和延迟流