java - 当 processElement 的输出为 fat 时处理内存不足异常

标签 java google-cloud-dataflow


当我使用数据流时,我需要创建一些字符串行,其中的用户 ID 列表以逗号分隔。然后将结果写入GCS。
不幸的是,在DoFn的processElement过程中,每行用户过多,导致java.lang.OutOfMemoryError。
有没有办法避免OutOfMemory异常并成功写入fat GCS 中每行的行数与文本文件?
我的源代码如下。

PCollection<KV<String, String>> rows = someData
    .apply(Combine.<String, String>perKey(new CombineUserIds()));

public static class CombineUserIds implements SerializableFunction<Iterable<String>, String> {
  private static final long serialVersionUID = 0;

  @Override
  public String apply(Iterable<String> userIdList) {
    return Joiner.on(",").join(userIdList);
  }
}

在这里,someData源代码中是PCollection<KV<String, String>>类型,键为group_id,值为user_id。
以下是完整的错误消息
(b997767fac436e5c): java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.lang.StringBuilder.append(StringBuilder.java:76) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:457) at java.lang.StringBuilder.append(StringBuilder.java:166) at java.lang.StringBuilder.append(StringBuilder.java:76) at com.google.common.base.Joiner.appendTo(Joiner.java:111) at com.google.common.base.Joiner.appendTo(Joiner.java:152) at com.google.common.base.Joiner.join(Joiner.java:193) at com.google.common.base.Joiner.join(Joiner.java:183) at com.moloco.dataflow.ml.adhoc.GenerateMLUserProfileSet$CombineUserIds.apply(GenerateMLUserProfileSet.java:189) at com.moloco.dataflow.ml.adhoc.GenerateMLUserProfileSet$CombineUserIds.apply(GenerateMLUserProfileSet.java:184) at com.google.cloud.dataflow.sdk.transforms.Combine$IterableCombineFn.mergeToSingleton(Combine.java:1613) at com.google.cloud.dataflow.sdk.transforms.Combine$IterableCombineFn.mergeAccumulators(Combine.java:1591) at com.google.cloud.dataflow.sdk.transforms.Combine$IterableCombineFn.mergeAccumulators(Combine.java:1536) at com.google.cloud.dataflow.sdk.transforms.Combine$CombineFn$2.mergeAccumulators(Combine.java:489) at com.google.cloud.dataflow.sdk.runners.worker.GroupAlsoByWindowsParDoFnFactory$MergingKeyedCombineFn.extractOutput(GroupAlsoByWindowsParDoFnFactory.java:249) at com.google.cloud.dataflow.sdk.runners.worker.GroupAlsoByWindowsParDoFnFactory$MergingKeyedCombineFn.extractOutput(GroupAlsoByWindowsParDoFnFactory.java:216) at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndCombineDoFn$KeyedCombineFnRunner.extractOutput(GroupAlsoByWindowsAndCombineDoFn.java:243) at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndCombineDoFn.closeWindow(GroupAlsoByWindowsAndCombineDoFn.java:206) at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndCombineDoFn.processElement(GroupAlsoByWindowsAndCombineDoFn.java:192) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:226)

最佳答案

来自 Oracle docs ,

线程 thread_name 中出现异常:java.lang.OutOfMemoryError:Java 堆空间

Cause: The detail message Java heap space indicates object could not be allocated in the Java heap. This error does not necessarily imply a memory leak. The problem can be as simple as a configuration issue, where the specified heap size (or the default size, if it is not specified) is insufficient for the application.

解决方案1:增加 JVM 堆大小,如下所示。

您可以指定每个项目需要多少堆空间

以下是针对 Eclipse 的:

鼠标右键单击

Run As - Run Configuration - Arguments - Vm Arguments, 

然后添加这个

-Xmx1024 

-Xmx2048m

解决方案2(仅在尝试解决方案1之后):

再次来自 Oracle docs ,

3.4.3 Monitor the Objects Pending Finalization When the OutOfMemoryError exception is thrown with the "Java heap space" > detail message, the cause can be excessive use of finalizers. To diagnose this, you have several options for monitoring the number of objects that are pending finalization:

The JConsole management tool can be used to monitor the number of objects that are pending finalization. This tool reports the pending finalization count in the memory statistics on the Summary tab pane. The count is approximate, but it can be used to characterize an application and understand if it relies a lot on finalization.

On Oracle Solaris and Linux operating systems, the jmap utility can be used with the -finalizerinfo option to print information about objects awaiting finalization.

An application can report the approximate number of objects pending finalization using the getObjectPendingFinalizationCount method of the java.lang.management.MemoryMXBean class. Links to the API documentation and example code can be found in Custom Diagnostic Tools. The example code can easily be extended to include the reporting of the pending finalization count.

关于java - 当 processElement 的输出为 fat 时处理内存不足异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38537936/

相关文章:

java - Java中的延迟打印

java - 编辑方法返回的内容 java

java - OpenCV:在没有校准图像的情况下估计径向失真

google-cloud-datastore - 使用 Google 云数据流从一个数据存储读取数据并将数据写入另一个数据存储

java - Java 中的日期/日历

javascript - 数据表每 5 秒更新一次单列

google-bigquery - 谷歌云数据流 : How to read data from a Database and write to BigQuery

java - 如何解决 "Neither Jetty ALPN nor OpenSSL via netty-tcnative were properly configured"?

java - 结合 BigQuery 和 Pub/Sub Apache Beam

python - 如何通过 Cloud Functions 调度用 Python 编写的 Dataflow 管道?