java - 在 Flink 中广播 HashMap

标签 java hashmap apache-flink

我正在使用Flink v.1.4.0

我正在使用 DataSet API,我想尝试的事情之一与 Apache Spark 中使用广播变量的方式非常相似。

实际上,我想在 DataSet 上应用映射函数,遍历 DataSet 中的每个元素并在 HashMap< 中搜索它;如果搜索元素存在于 map 中,则检索相应的值。

HashMap非常大,我不知道(因为我什至还没有构建我的解决方案)它是否需要Serialized才能传输和使用所有 worker 同时进行。

一般来说,我想到的解决方案如下所示:

Map<String, T> hashMap = new ... ;

DataSet<Point> points = env.readCsv(...);

points
  .map(point -> hashMap.getOrDefault(point.getId, 0))
  ...

但我不知道这是否有效或者是否有效。经过一番搜索后,我发现了一个更好的例子 here根据这一点,我们可以使用 Flink 中的 Broadcast 变量来广播 List,如下所示:

DataSet<Point> points = env.readCsv(...);

DataSet<Centroid> centroids = ... ; // some computation

points.map(new RichMapFunction<Point, Integer>() {

    private List<Centroid> centroids;

    @Override
    public void open(Configuration parameters) {
        this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
    }

    @Override
    public Integer map(Point p) {
        return selectCentroid(centroids, p);
    }

}).withBroadcastSet("centroids", centroids);

但是,.getBroadcastVariable() 似乎仅适用于List

  • 有人可以通过 HashMap 提供替代解决方案吗?
  • 该解决方案如何运作?
  • 解决这个问题最有效的方法是什么?
  • 可以使用 Flink 托管状态来执行类似于使用广播变量的操作吗?如何?
  • 最后,我可以尝试在管道中使用多个广播变量进行多个映射吗?

最佳答案

hashMap的值在哪里?来自?另外两种可能的解决方案:

  1. 重新初始化/重新创建/重新生成 hashMapopen 中单独的过滤/映射运算符的每个实例中方法。每条记录的效率可能更高,但会重复初始化逻辑。
  2. 创建两个 DataSet ,一个代表hashMap值,第二个为 pointsjoin那两个DataSet使用desired join strategy 。打个比方,你想要做的事情可以通过 SQL 查询 SELECT * FROM points p, hashMap h WHERE h.key = p.id 来表达。 。

关于java - 在 Flink 中广播 HashMap,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49039896/

相关文章:

java - IDocumentFilter 使用 HashMap 作为字典 Menu 来验证 JTextField

java - 全局窗口自定义触发器上的 allowedLateness

apache-flink - 从 IDE 运行 Flink 时如何启动 Flink 作业管理器 Web 界面

python - 将 PCollection 分配回全局窗口

java - Android自动SQL插入

java - 在 Java 中,如何保持原始窗口的焦点,尽管单击 JFrame

Java Item[] itemList;

java - Swing JPanel 不会重绘

java - 如何将 Hashmap 值存储在 ListView 中 LongclickListener 的变量中?

java - HashMap 的问题