java - 为什么当我发送两个输入流时 Spark Streaming 停止工作?

标签 java sockets apache-spark spark-streaming

我正在开发一个 Spark Streaming 应用程序,其中需要使用来自 Python 中的两个服务器的输入流,每个服务器每秒向 Spark 上下文发送一条 JSON 消息。

我的问题是,如果我只对一个流执行操作,一切都会正常。但是,如果我有来自不同服务器的两个流,那么 Spark 在打印任何内容之前就会卡住,并且只有当两个服务器都发送了它们必须发送的所有 JSON 消息时(当它检测到“socketTextStream 没有接收到”时)才重新开始工作数据。

这是我的代码:

    JavaReceiverInputDStream<String> streamData1 = ssc.socketTextStream("localhost",996,
            StorageLevels.MEMORY_AND_DISK_SER);

    JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream("localhost", 9995,StorageLevels.MEMORY_AND_DISK_SER);

    JavaPairDStream<Integer, String> dataStream1= streamData1.mapToPair(new PairFunction<String, Integer, String>() {
        public Tuple2<Integer, String> call(String stream) throws Exception {


            Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(1, stream);

            return streamPair;
        }
    });

    JavaPairDStream<Integer, String> dataStream2= streamData2.mapToPair(new PairFunction<String, Integer, String>() {
        public Tuple2<Integer, String> call(String stream) throws Exception {


            Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(2, stream);

            return streamPair;
        }
    });

dataStream2.print(); //for example

请注意,没有错误消息,Spark 在启动上下文后简单地卡住,当我从端口获取 JSON 消息时,它没有显示任何内容。

非常感谢。

最佳答案

看看 Spark Streaming documentation 中的这些警告 看看它们是否适用:

Points to remember

  • When running a Spark Streaming program locally, do not use “local” or “local1” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL, where n > number of receivers to run (see Spark Properties for information on how to set the master).
  • Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process it.

关于java - 为什么当我发送两个输入流时 Spark Streaming 停止工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37116079/

相关文章:

java - 复制 Alfresco 中的文件夹

java - 使用 Apache POI 编辑 Word 文档

java - DropdownChoice 选择与模型值不同,在 ajax 更新时更改

java - 如何通过 Fiddler 强制 Java applet 的网络流量?

c - 如何纠正从客户端读取数据的套接字编程错误?

node.js - Socket.io 中的身份验证

apache-spark - spark结构化流异常: Append output mode not supported without watermark

java - 为什么私有(private)变量即使被调用不止一次也只增加一次

pandas - 类型错误 : field Customer: Can not merge type <class 'pyspark.sql.types.StringType' > and <class 'pyspark.sql.types.DoubleType' >

scala - 将 RDD[T] 过滤为类型 T 的子类