java - 使用 Java 的 Apache Spark Streaming 自定义接收器(文本文件)

标签 java apache-spark spark-streaming

我是 Apache Spark 新手。

我需要从本地/安装目录读取日志文件。 一些外部源将文件写入本地/安装目录。 例如。外部源写入记录到 combined_file.txt 文件中,文件写入完成后,外部源将创建带有前缀 0_ 的新文件,例如 0_combined_file.txt。然后我需要读取 combined_file.txt 日志文件并处理它。 因此,我尝试编写自定义接收器来检查日志文件写入本地/安装目录是否已完成,然后读取已完成的文件。

这是我的代码

@Override
    public void onStart() {
        Runnable th = () -> {
            while (true) {
                try {
                    Thread.sleep(1000l);
                    File dir = new File("/home/PK01/Desktop/arcflash/");
                    File[] completedFiles = dir.listFiles((dirName, fileName) -> {
                        return fileName.toLowerCase().startsWith("0_");
                    });
                    //metaDataFile --> 0_test.txt
                    //completedFiles --> test.txt
                    for (File metaDataFile : completedFiles) {
                        String compFileName = metaDataFile.getName();
                        compFileName = compFileName.substring(2, compFileName.length());
                        File dataFile = new File("/home/PK01/Desktop/arcflash/" + compFileName);
                        if (dataFile.exists()) {
                            byte[] data = new byte[(int) dataFile.length()];
                            fis.read(data);
                            fis.close();
                            store(new String(data));
                            dataFile.delete();
                            metaDataFile.delete();
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
    new Thread(th);
    }

我正在尝试按如下方式处理数据。

JavaReceiverInputDStream<String> data = jssc.receiverStream(receiver);
data.foreachRDD(fileStreamRdd -> {
                        processOnSingleFile(fileStreamRdd.flatMap(streamBatchData -> {
                        return Arrays.asList(streamBatchData.split("\\n")).iterator();
                    }));
});

但是低于异常

18/01/19 12:08:39 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/01/19 12:08:39 WARN BlockManager: Block input-0-1516343919400 replicated to only 0 peer(s) instead of 1 peers
18/01/19 12:08:40 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.OutOfMemoryError: Java heap space
    at com.esotericsoftware.kryo.io.Output.<init>(Output.java:60)
    at org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:91)
    at org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:308)
    at org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:308)
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:312)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:383)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
18/01/19 12:08:40 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 1,5,main]
java.lang.OutOfMemoryError: Java heap space
    at com.esotericsoftware.kryo.io.Output.<init>(Output.java:60)
    at org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:91)
    at org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:308)
    at org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:308)
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:312)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:383)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
18/01/19 12:08:40 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
    at com.esotericsoftware.kryo.io.Output.<init>(Output.java:60)
    at org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:91)
    at org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:308)
    at org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:308)
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:312)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:383)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

任何人都可以帮助我解决这里的错误。

任何帮助将不胜感激

最佳答案

19/01/18 12:08:40 错误 SparkUncaughtExceptionHandler: 线程 Thread[任务 1,5,main 的执行器任务启动工作线程] 中未捕获异常 java.lang.OutOfMemoryError:Java堆空间

上面显示您遇到了内存不足错误。在提交 Spark 作业时显式增加内存

关于java - 使用 Java 的 Apache Spark Streaming 自定义接收器(文本文件),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48335606/

相关文章:

scala - 从 Spark udf 记录到驱动程序

hadoop - 有没有一种方法可以将某种缓存用于Spark中最常用的查询的结果?

scala - 为什么 Spark ML ALS 算法打印 RMSE = NaN?

scala - 使用 spark 在 hive 中流式传输数据存储

scala - 传递给 Spark 的 StreamingContext.fileStream[K, V, F] ("directory"的 Key、Value 和 InputFormat 类型的性质是什么

java - 最后在异常处理

java - 用于存储数据的适当数据结构

hadoop - 如果没有,如何处理 Spark RDD 分区。执行者 < 没有。 RDD分区

java - 将 T4CConnection 转换为 OracleConnection 时出错

java - 如何在apachecamel中将csv转换为json