我正在尝试创建一个提供 OAuth2 token 的源,并且还负责刷新过期的 token 。 目前我的代码看起来有点像这样
case class Token(expires: Instant = Instant.now().plus(100, ChronoUnit.MILLIS)){
def expired = Instant.now().isAfter(expires)
}
Source
.repeat()
.mapAsync(1){ _ =>
println(" -> token req")
// this fakes an async token request to the token service
Future{
Thread.sleep(500)
println(" <- token resp")
Token()
}
}
.mapAsync(1){ token =>
println(" -> req with token auth")
if(token.expired){
println("!!! Received expired token")
}
// this is the actual call that needs the token
println("making call")
Future{
Thread.sleep(2000)
println(" <- req resp")
"OK"
}
}
.take(2)
.runWith(Sink.ignore)
.recover{case e => ()}
.flatMap{ _ =>
system.terminate()
}
此代码的输出如下所示
root -> token req
root <- token resp
root -> token req
root -> req with token auth
root making call
root <- token resp
root -> token req
root <- token resp
root -> token req
root <- token resp
root -> token req
root <- req resp
root -> req with token auth
root !!! Received expired token
root making call
root <- token resp
root -> token req
root <- token resp
root -> token req
root <- token resp
root <- req resp
root -> req with token auth
root !!! Received expired token
root making call
root ... finished with exit code 0
显然,这个mapAsync(1)在不符合预期的情况下产生了需求(预取?)
有 2 个问题:
- 需求导致上游出现不需要的 token 请求
- token 的预取/缓存存在问题,因为它们仅在特定时间内有效
那么我如何创建一个像这个函数一样的真正的拉流呢?
def tokenSource: () => Future[Token]
最佳答案
如果您有意避免预取和排队,那么我认为 scala.collection.immutable.Stream
,或 Iterator,是比 akka Stream 更好的解决方案。
下面是一个示例实现,它避免了您在问题中列举的陷阱。 (注意:我使用 ActorSystem
通过 dispatcher
创建 ExecutionContext,以防止应用程序在 sleep
调用有时间完成之前退出。我正在采取优点是 ActorSystem 不会仅仅因为主函数到达表达式定义的末尾而关闭。)
import scala.collection.immutable.Stream
import scala.concurrent.Future
object ScalaStreamTest extends App {
case class Token(expires: Long = System.currentTimeMillis() + 100){
def expired = System.currentTimeMillis() > expires
}
val actorSystem = akka.actor.ActorSystem()
import actorSystem.dispatcher
def createToken = Future {
Thread.sleep(500)
println(" <- token resp")
Token()
}
def checkExpiration(token : Future[Token]) = token map { t =>
println(" -> req with token auth")
if(t.expired){println("!!! Received expired token")}
t
}
def makeCall(token : Future[Token]) = token flatMap { t =>
println("making call")
Future {
Thread.sleep(2000)
println(" <- req resp")
"OK"
}
}
val stream = Stream.continually(createToken)
.map(checkExpiration)
.map(makeCall)
.take(2)
.force
}//end object ScalaStreamTest
force
调用是必要的,因为 Stream 是惰性的,因此在强制之前的所有方法调用(即: Continuous、map 和 take)也是惰性的。除非调用 reducer 或通过强制显式告知 Stream,否则惰性 Stream 上不会发生任何计算。
关于scala - 如何使用异步阶段创建 Akka 流真正的拉流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32735162/