java - 如何在每个窗口上刷新/重新加载侧面输入

标签 java join apache-beam beam-sql

我正在使用 Apache Beam 来连接多个流以及一些查找。我有两种情况,如果查找大小很大,我希望侧面输入为每个记录处理重新加载/刷新(即我将使用 where 子句查询数据库),如果查找大小较小,则重新加载/刷新一次一天。

我想知道正确的方法是什么。我不希望巨大的数据侧输入耗尽所有工作人员的内存。

我使用下面的代码每天刷新一次侧面输入。

PCollectionView<Map<String, String>> lkp =
        p.apply(GenerateSequence.from(0)).withRate(1, Duration.standardDays(1))
            .apply(
                Window.<Long>into(new GlobalWindows())
                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                    .discardingFiredPanes())
            .apply(
                ParDo.of(
                    new DoFn<Long, Map<String, String>>() {

                      private static final long serialVersionUID = 1L;

                      @ProcessElement
                      public void process(
                          @Element Long input, OutputReceiver<Map<String, String>> o) {
                        Map<String, String> map = HiveConnection.getHiveConnection("select * from table");
                        o.output(map);
                      }
                    }))
            .apply(View.<Map<String, String>>asSingleton());

请指导我完成这些类型用例的最佳实践,并为我提供一些示例代码以便更好地理解。

谢谢, 戈萨姆

最佳答案

您正在使用正确的推荐模式进行小型日常查找。

在大型情况下,通常建议使用来自 DoFn 的标注,而不是使用 SideInput。这个旧博客包含“调用外部服务进行数据丰富”模式的示例。

Guide to common Cloud Dataflow use-case patterns, Part 1

我会尝试找时间将此图案添加到光束图案页面:

Beam Patterns

关于java - 如何在每个窗口上刷新/重新加载侧面输入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57033262/

相关文章:

rabbitmq - 为什么我的 RabbitMQ 消息无法使用 Apache Beam 序列化?

java - Apache Beam 流式处理 json 数据

java - Glassfish 5 不适用于 Intellij 2017 2.4

java - 解析所有 ISO 8601 DateTime 格式 Java 1.8

php - Codeigniter - 连接派生表

php - MySQL - 复杂查询问题,多个表

r - data.table 条件不等式连接

apache-beam - 每秒调用最大请求数的管道设计

java - 以相反的顺序遍历 LinkedHashMap

java 使用listIterator插入元素