scala - 来自 Spark Streaming 的 Rest API 服务调用

标签 scala rest apache-spark spark-streaming

我有一个用例,在从 Kafka 读取消息以执行一些计算并将结果保存回 HDFS 和第三方应用程序后,我需要从 Spark 流中调用 RESTAPI。

我在这里有几个疑问:

  • 我们如何直接从 Spark 流中调用 RESTAPI。
  • 如何使用流式批处理时间管理 RESTAPI 超时。
  • 最佳答案

    此代码不会按原样编译。但这是给定用例的方法。

    val conf = new SparkConf().setAppName("App name").setMaster("yarn")
    val ssc = new StreamingContext(conf, Seconds(1))
    
    val dstream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
    
    dstream.foreachRDD { rdd =>
    
      //Write the rdd to HDFS directly
      rdd.saveAsTextFile("hdfs/location/to/save")
    
      //loop through each parttion in rdd
      rdd.foreachPartition { partitionOfRecords =>
    
        //1. Create HttpClient object here
        //2.a POST data to API
    
        //Use it if you want record level control in rdd or partion
        partitionOfRecords.foreach { record =>
          //2.b Post the the date to API
          record.toString
        }
      }
      //Use 2.a or 2.b to POST data as per your req
    }
    
    ssc.start()
    ssc.awaitTermination()
    

    大多数 HttpClients(用于 REST 调用)都支持请求超时。

    使用 Apache HttpClient 超时的 Http POST 调用示例
    val CONNECTION_TIMEOUT_MS = 20000; // Timeout in millis (20 sec).
    
    val requestConfig = RequestConfig.custom()
      .setConnectionRequestTimeout(CONNECTION_TIMEOUT_MS)
      .setConnectTimeout(CONNECTION_TIMEOUT_MS)
      .setSocketTimeout(CONNECTION_TIMEOUT_MS)
      .build();
    
    HttpClientBuilder.create().build();
    
    val client: CloseableHttpClient = HttpClientBuilder.create().build();
    
    val url = "https://selfsolve.apple.com/wcResults.do"
    val post = new HttpPost(url);
    
    //Set config to post
    post.setConfig(requestConfig)
    
    post.setEntity(EntityBuilder.create.setText("some text to post to API").build())
    
    val response: HttpResponse = client.execute(post)
    

    关于scala - 来自 Spark Streaming 的 Rest API 服务调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41799578/

    相关文章:

    scala - assemblyMergeStrategy 编译时导致 scala.MatchError

    scala - Spark : java. lang.UnsupportedOperationException : No Encoder found for java. time.LocalDate

    scala - Scala 中中缀运算符的实际优先级

    java - 即使断言失败,TestNG 也不会导致测试用例失败

    java - Rest crud 操作的 Junit 测试

    python - 如何创建可作为python集合而不是文件使用的大数据的rdd

    java - Spark-提交找不到类(ClassNotFoundException)

    Scala groupby 元组

    java - 如何在 Spring 中使用 RESTTemplate 使用 API

    scala - 错误实用程序 : Uncaught exception in thread SparkListenerBus