java - 如何向flink CEP数据流添加新事件?

标签 java apache-flink flink-streaming complex-event-processing flink-cep

我正在使用 flink 1.5.2 来解决 CEP 问题。

我的数据来自列表,其他一些进程会在系统运行时将新的事件对象添加到该列表中。它不是套接字或网络消息。我一直在阅读官方网站的示例。以下是我认为应该执行的步骤。

  1. 使用 env.fromCollection(list) 创建数据流;
  2. 定义 Pattern 模式
  3. 使用 CEP.pattern(data_stream, pattern) 获取 PatternStream
  4. 使用pattern_stream.select( ...implement select interface ...) 将复杂事件结果作为DataStream获取

但是我的输入流应该是无界的。我在 DataStream<> 对象中没有找到任何 add() 方法。我该如何实现这个目标?另外,我是否需要告诉 DataStream<> 何时清理过时的事件?

最佳答案

只有在使用预先固定的有界输入集时(例如编写测试或只是进行实验时),集合才适合作为 Flink 的输入源。如果您想要无限制的流,则需要选择不同的源,例如套接字或消息队列系统(如 Kafka)。

套接字很容易用于实验。在 Linux 和 MacOS 系统上您可以使用

nc -lk 9999

创建一个 Flink 可以绑定(bind)到端口 9999 的套接字,并且您提供给 nc (netcat) 作为输入的任何内容都将一次一行地流式传输到您的 Flink 作业中。 Netcat 也适用于 Windows,但未预安装。

但是,您不应该计划在生产中使用套接字,因为它们无法回滚(这对于在故障恢复期间使用 Flink 获得准确的结果至关重要)。

关于java - 如何向flink CEP数据流添加新事件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51782767/

相关文章:

java - ArrayList<String> 对象赋值

apache-flink - Flink Streaming 有真正的全局参数吗?

apache-flink - Apache Flink : Enrich stream with data from external/blocking call

java - Java 中字符串操作的正确名称

java - SpringMVC与静态资源

apache-flink - 集成测试 flink 作业

hadoop - Flink 在 HDFS 上写入产生空文件

apache-flink - Apache Flink 中的周期性水印和标点水印有什么区别?

java - Apache Flink CEP如何根据事件值传入时间窗口?

java - Salesforce:在 Salesforce 中为某些配置文件创建用户时收到 "PORTAL_NO_ACCESS"错误消息