scala - Future Recursion Patterns/Future Chaining 任意长度

标签 scala recursion akka future

我很好奇递归构建一个按顺序运行的 Akka future 链的最佳方法,如果 future 的 doWork 调用失败, future 应该重试最多 3 次,如果重试用完,链应该失败尝试。假设所有 doWork 调用都通过返回的 future futChain 应该只完成。

object Main extends App {
  val futChain = recurse(2)

  def recurse(param: Int, retries: Int = 3): Future[String] {
    Future {
      doWorkThatMayFailReturningString(param...)
    } recoverWith {
      case e => 
        if (retries > 0) recurse(param, retries -1)
        else  Future.failed(e)
    } flatMap {
      strRes => recurse(nextParam) //how should the res from the previous fut be passed?
    }
  }

  futChain onComplete {
    case res => println(res) //should print all the strings
  }
}
  • 如何将结果作为集合?即在这个例子中,每个 StringdoWork 函数返回(我需要以某种方式修改 recurse func 以返回一个 Futrue[List[String]]
  • 我应该使用 recover 还是 recoverWith
  • 是否可以调用 flatMap 来链接这些调用
  • 我应该考虑尾递归和堆栈溢出吗?
  • 我会更好地递归构建 future 列表并减少它们吗?
  • 最佳答案

    您可以像这样实现可重试的 Future:

    def retry[T](f: => Future[T])(n: Int)(implicit e: ExecutionContext): Future[T] = {
        n match {
            case i if (i > 1) => f.recoverWith{ case t: Throwable => retry(f)(n - 1)}
            case _ => f
        }       
    }
    

    这不是针对尾递归优化的,但是如果您只打算重试几次,则不会出现堆栈溢出(我想如果前几次失败了,无论如何它都会继续失败)。

    然后我会单独进行链接。如果您将有限数量的函数链接在一起,每个函数都取决于前一个(并且出于某种原因您想要聚合结果),您可以使用 for 推导式(flatMap 的语法糖):
    for {
        firstResult <- retry(Future(doWork(param)))(3)
        secondResult <- retry(Future(doWork(firstResult)))(3)
        thirdResult <- retry(Future(doWork(secondResult)))(3)
    } yield List(firstResult, secondResult, thirdResult)
    

    对于任意长的链,您可以使用 Future.sequence(Akka 库中的 Futures)并行执行它们:
    def doWork(param: String): String = ...
    
    val parameters: List[String] = List(...)
    
    val results: Future[List[String]] = Future.sequence(parameters.map(doWork(_)))
    

    这将解开 List[Future[String]]Future[List[String]] 的内容。

    这是按顺序执行类似操作的一种方法:
    def sequential[A, B](seq: List[A])(f: A => Future[B])(implicit e: ExecutionContext): Future[List[B]] = {
        seq.foldLeft(Future.successful(List[B]())) { case (left, next) =>
            left.flatMap(list => f(next).map(_ :: list))
        }
    }
    
    def doWork(param: String): String = ...
    
    val results: Future[List[String]] = sequential(parameters)(param => Future(doWork(param))) 
    

    这些函数的实现对您的用例非常敏感。如果链中的任何 future 失败,上述两个函数将返回失败的 future 。有时你会想要这个,有时不会。如果您只想收集成功的 future ,并丢弃失败的 future 而不会使整个结果失败,则可以添加额外的步骤来恢复失败。

    此外,recoverrecoverWith 之间的区别在于它接受的 PartialFunction 类型。 recover 用默认值替换失败的 futures,而 recoverWith 使用另一个 Future 这样做。在我的 retry 的情况下, recoverWith 更合适,因为我试图用它自己恢复失败的 Future

    关于scala - Future Recursion Patterns/Future Chaining 任意长度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24308286/

    相关文章:

    java - Spark聚合方法中的并发

    linux - 使用 grep 和 sed 递归查找和替换所有文件中的字符串

    Scala future 应用程序在完成之前终止

    java - 我们如何在java中使用Akka-remote-access进行2路通信

    jQuery Ajax 调用 Twitter Finagle 服务器

    java - sbt 版本从 Maven Central 中的文件夹结构生成

    python - 使用递归将 list 中的列表转换为仅包含整数的列表

    java - 当我的方法递归的唯一条件尚未满足时,为什么它会递归?

    scala - 控制 Akka 中消耗大量内存的 actor 的生成

    scala - 在 ScalaCheck 中从语法生成字符串