Scala 线程池 - 同时调用 API

标签 scala apache-spark functional-programming databricks blockingqueue

我在数据块中有一个用例,其中必须对 URL 数据集进行 API 调用。该数据集有大约 10 万条记录。
允许的最大并发数为 3。
我在 Scala 中完成了实现并在 databricks notebook 中运行。除了队列中待处理的一个元素之外,我觉得这里缺少一些东西。
阻塞队列和线程池是解决这个问题的正确方法吗?

在下面的代码中,我进行了修改,而不是从数据集中读取,而是在 Seq 上进行采样。
任何帮助/想法将不胜感激。

import java.time.LocalDateTime import java.util.concurrent.{ArrayBlockingQueue,BlockingQueue} import java.util.concurrent.Executors import java.util.concurrent.TimeUnit; var inpQueue:BlockingQueue[(Int, String)] = new ArrayBlockingQueue[(Int, String)](1) val inpDS = Seq((1,"https://google.com/2X6barD"), (2,"https://google.com/3d9vCgW"), (3,"https://google.com/2M02Xz0"), (4,"https://google.com/2XOu2uL"), (5,"https://google.com/2AfBWF0"), (6,"https://google.com/36AEKsw"), (7,"https://google.com/3enBxz7"), (8,"https://google.com/36ABq0x"), (9,"https://google.com/2XBjmiF"), (10,"https://google.com/36Emlen")) val pool = Executors.newFixedThreadPool(3) var i = 0 inpDS.foreach{ ix => { inpQueue.put(ix) val t = new ConsumerAPIThread() t.setName("MyThread-"+i+" ") pool.execute(t) } i = i+1 } println("Final Queue Size = " +inpQueue.size+"\n") class ConsumerAPIThread() extends Thread { var name ="" override def run() { val urlDetail = inpQueue.take() print(this.getName()+" "+ Thread.currentThread().getName() + " popped "+urlDetail+" Queue Size "+inpQueue.size+" \n") triggerAPI((urlDetail._1, urlDetail._2)) } def triggerAPI(params:(Int,String)){ try{ val result = scala.io.Source.fromURL(params._2) println("" +result) }catch{ case ex:Exception => { println("Exception caught") } } } def ConsumerAPIThread(s:String) { name = s; } }

最佳答案

所以,你有两个要求:功能性的一个是你想要异步处理列表中的项目,非功能性的一个是你不想一次处理三个以上的项目。
关于后者,好消息是,正如您在问题中已经表明的那样,Java native 公开了一个很好打包的 Executor在具有固定大小的线程池上运行任务,如果您使用线程,优雅地允许您限制并发级别。
转到功能需求,Scala 提供了帮助,因为它在其标准 API 中提供了一些恰好可以做到这一点的东西。特别是它使用 scala.concurrent.Future ,所以为了使用它,我们必须重构 triggerAPIFuture 方面.函数的内容不是特别相关,所以我们现在主要关注它的(修改后的)签名:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext

def triggerAPI(params: (Int, String))(implicit ec: ExecutionContext): Future[Unit] =
  Future {
    // some code that takes some time to run...
  }
请注意,现在 triggerAPI返回 Future .一个 Future可以被认为是最终要计算的东西的读取句柄。特别是,这是一个 Future[Unit] ,其中 Unit代表“我们并不特别关心这个函数的输出,但主要关心它的副作用”。
此外,请注意该方法现在采用一个隐式参数,即 ExecutionContext . ExecutionContext用于提供Future s 具有某种形式的计算发生的环境。 Scala 有一个 API 来创建 ExecutionContext来自 java.util.concurrent.ExecutorService ,所以这将派上用场在固定线程池上运行我们的计算,在任何给定时间运行不超过三个回调。
在继续之前,如果您对 Future 有任何疑问s, ExecutionContext s 和隐式参数,Scala 文档是您最好的知识来源(这里有几个指针: 12 )。
现在我们有了新的 triggerAPI方法,我们可以使用 Future.traverse ( here is the documentation for Scala 2.12 -- 撰写本文时的最新版本是 2.13,但据我所知,Spark 用户暂时停留在 2.12 上)。Future.traverse 的 tl;dr是它采用某种形式的容器和一个函数,该函数获取该容器中的项目并返回 Future别的东西。该函数将应用于容器中的每个项目,结果将是 Future结果的容器。在您的情况下:容器是 List , 商品为 (Int, String)你返回的其他东西是 Unit .
这意味着您可以简单地调用它:
Future.traverse(inpDS)(triggerAPI)
triggerAPI将应用于 inpDS 中的每个项目.
通过确保线程池支持的执行上下文在调用 Future.traverse 时在隐式范围内,项目将使用所需的线程池进行处理。
调用的结果是Future[List[Unit]] ,这不是很有趣,可以简单地丢弃(因为您只对副作用感兴趣)。
说了很多,如果你想玩我描述的代码,你可以这样做 here on Scastie .
作为引用,这是整个实现:
import java.util.concurrent.{ExecutorService, Executors}

import scala.concurrent.duration.DurationLong
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

val datasets = List(
  (1, "https://google.com/2X6barD"),
  (2, "https://google.com/3d9vCgW"),
  (3, "https://google.com/2M02Xz0"),
  (4, "https://google.com/2XOu2uL"),
  (5, "https://google.com/2AfBWF0"),
  (6, "https://google.com/36AEKsw"),
  (7, "https://google.com/3enBxz7"),
  (8, "https://google.com/36ABq0x"),
  (9, "https://google.com/2XBjmiF")
)

val executor: ExecutorService = Executors.newFixedThreadPool(3)
implicit val executionContext: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(executor)

def triggerAPI(params: (Int, String))(implicit ec: ExecutionContext): Future[Unit] =
  Future {
    val (index, _) = params
    println(s"+ started processing $index")
    val start = System.nanoTime() / 1000000
    Iterator.from(0).map(_ + 1).drop(100000000).take(1).toList.head // a noticeably slow operation
    val end = System.nanoTime() / 1000000
    val duration = (end - start).millis
    println(s"- finished processing $index after $duration")
  }

Future.traverse(datasets)(triggerAPI).onComplete {
  case result =>
    println("* processing is over, shutting down the executor")
    executionContext.shutdown()
}

关于Scala 线程池 - 同时调用 API,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62290668/

相关文章:

xml - NodeSeq 匹配失败,但等效的 Elem 匹配成功——为什么?怎么修?

Java 8 - 将 Kairosdb 中的多个对象列表保存到 csv 文件中

apache-spark - YARN 如何决定启动哪种类型的 Application master?

scala - 没有初始化/无时,是否有最佳实践为 Scala 选项赋值?

java-8 - 在 Vavr 中执行副作用

scala - 随机作为 scalaz.Monad 的实例

list - Scala:为什么 foldLeft 不能用于两个列表的连接?

java - 检测 Scala 程序中函数变化的最佳实践?

apache-spark - Spark 中的混合推荐器

c++ - 需要 C++ 中非常通用的 argmax 函数