scala - Spark Streaming 中的批处理大小

标签 scala twitter apache-spark twitter4j spark-streaming

我是 Spark 和 Spark Streaming 的新手。我正在处理 Twitter 流数据。我的任务涉及独立处理每条推文,例如计算每条推文中的字数。据我所知, Spark Streaming 中 RDD 上的每个输入批处理形式 .因此,如果我给出 2 秒的批处理间隔,那么新的 RDD 包含两秒内的所有推文,并且应用的任何转换都将应用于整个两秒数据,并且无法在那两秒内处理单个推文。我的理解正确吗?或者每条推文形成一个新的RDD?我有点困惑...

最佳答案

在一批中,您有一个 RDD,其中包含以 2 秒间隔出现的所有状态。然后您可以单独处理这些状态。这是一个简短的例子:

 JavaDStream<Status> inputDStream = TwitterUtils.createStream(ctx, new OAuthAuthorization(builder.build()), filters);

      inputDStream.foreach(new Function2<JavaRDD<Status>,Time,Void>(){
            @Override
            public Void call(JavaRDD<Status> status, Time time) throws Exception {
                List<Status> statuses=status.collect();
                for(Status st:statuses){
                     System.out.println("STATUS:"+st.getText()+" user:"+st.getUser().getId());                      
                //Process and store status somewhere
                }
                return null;
            }});         
     ctx.start();
        ctx.awaitTermination();      
}

我希望我没有误解你的问题。

卓然

关于scala - Spark Streaming 中的批处理大小,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31095316/

相关文章:

scala - 在宏中找不到…的代理

android - 为什么 sbt 报告 "No java installations was detected"并设置了 $JAVA_HOME?

java - 根据特殊的非空格空白字符进行拆分

javascript - 防止 Android 上的 native Twitter 应用程序打开推文 URL

hadoop - Spark 1.3.0 : Running Pi example on YARN fails

java - 如何在 Gradle 中支持多种语言(Java 和 Scala)的多个项目?

javascript - 来自 Actionscript 3 应用程序的 Twitter 意图弹出窗口

c# - 如何使用 DotNetOpenAuth 库发送推文?

apache-spark - Spark : can you include partition columns in output files?

scala - SparkSQL 中使用 SQL 和不使用 SQL 的查询之间的差异