java - 如何访问 Spark Streaming 自定义接收器存储的元数据?

标签 java apache-spark spark-streaming

Spark Streaming 提供了创建自定义接收器的功能,详细信息 here 。要将接收器接收到的数据存储到 Spark 中,需要使用 store(data) 方法。

我存储到 Spark 的数据具有与其关联的某些属性。 Spark Receiver class由自定义接收器扩展,提供了几种 store(data,metadata) 形式的存储方法,这意味着元数据/属性可以与数据一起存储。下面的代码摘录显示了我如何使用此方法来存储数据及其元数据/属性。

public class CustomReceiver extends Receiver<String> {

    public CustomReceiver() {
        super(StorageLevel.MEMORY_AND_DISK_2());
    }

    @Override
    public void onStart() {
        new Thread() {
            @Override
            public void run() {
                try {
                    receive();
                } catch (IOException e) {
                    restart("Error connecting: ", e);
                }
            }
        }.start();
    }

    @Override
    public void onStop() {
        // Not needed as receive() method closes resources when stopped
    }

    private void receive() throws IOException {
        String str = getData();
        Map<String, String> metadata = getMetadata();
        Iterator<String> it = Arrays.asList(str.split("\n\r")).iterator();

        store(it, metadata);

        if (isStopped()) {
            closeConnections();
        }
    }
}

从另一个类访问此存储的数据,如以下代码摘录所示:

private void testCustomReceiver() {
    JavaDStream<String> custom = ssc.receiverStream(new CustomReceiver());

    JavaDStream<String> processedInput = custom.flatMap(row -> {
        return Arrays.asList(row.split("\\r?\\n"));
    });

    processedInput.print();
}

现在引出了我的问题:如何从上面显示的 testCustomReceiver() 方法访问与自定义接收器中的数据一起存储的元数据/属性?

我尝试搜索文档并探索调试器中的 JavaDStream 对象以搜索元数据,但无济于事。任何有关此事的帮助或建议将不胜感激,谢谢。

最佳答案

我一直在研究 Spark 代码,并且相信您永远无法再次访问它。事实上,我不相信它曾经被使用过。

您的接收器的主管获取metadataOption并将其放入ReceivedBlockInfo(这是org.apache.spark.streaming私有(private)的)中。从那里开始,它就……无处可去。我在流代码库中找不到对 ReceivedBlockInfo.metadataOption 的引用。假设 ReceivedBlockInfo 可能被序列化然后反序列化为不同的类,或者一些时髦的反射检索元数据,但这两种都是反模式,我不会指望它发生。

为什么会在那里?我相信其目的是使其成为 Metadata Checkpointing 的一部分系统,但它要么从未连接,要么接收器元数据和流检查点之间的连接被切断。

无论哪种方式,一旦 block 被放入流中, block 元数据就会消失。

关于java - 如何访问 Spark Streaming 自定义接收器存储的元数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35271270/

相关文章:

hadoop - 如何使用spark for map-reduce flow来选择文件夹下所有csv文件的N列,前M行?

scala - 在Spark Streaming中与ElasticSearch连接时出错

javascript - 调用ajax到servlet调用总是报错函数

apache-spark - 如何将数据从 Cassandra 导出到 BigQuery

scala - 在完成 map 操作之前,Spark,mapPartitions,网络连接已关闭

amazon-web-services - EMR主节点和从节点的自定义端口列表

apache-spark - Spark Structured Streaming 和 DStreams 有什么区别?

java - Java布局的对齐方式困惑

java - 如何解释这个线程转储?

java - 我可以在 JavaEE 8 中注入(inject) 2 个同名的接口(interface)吗?