asynchronous - 如何异步对 Kotlin 中的多个项目执行网络请求?

标签 asynchronous kotlin coroutine

我有一个需要从 Web 服务检索的项目列表。似乎最好的方法是通过 channel ,但我无法让它们同时运行。为简单起见,我将问题简化为一个带有模拟网络调用循环的小应用程序。


import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.text.SimpleDateFormat
import java.util.*
import kotlin.random.Random

private val charPool: List<Char> = ('a'..'z') + ('A'..'Z') + ('0'..'9')
private val sdf =SimpleDateFormat("YYYY-MM-dd kk:mm:ss.SSSS")
const val NUMBER_OF_LOOPS = 5

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    log("First thing")
    repeat(NUMBER_OF_LOOPS) {
        launch {
            channel.send(getValue(it))
        }
    }
    log("Second Thing")
    launch {
        repeat(NUMBER_OF_LOOPS) {
            val value = channel.receive()
            log(value)
        }
    }
    log("All Done.")
}

fun getValue(i: Int): String {
    log(String.format("Starting value for %d", i))
    var rand: String = ""
    repeat(100000) {
        rand = (1..256)
            .map { Random.nextInt(0, charPool.size) }
            .map(charPool::get)
            .joinToString("")
    }
    log(String.format("Done processing for %d", i))
    return rand
}

fun log(value: String){
    println(String.format("%s %s", sdf.format(Date()), value))
}

当我运行它时,我得到以下输出。
2019-11-20 10:57:00.0473 First thing
2019-11-20 10:57:00.0484 Second Thing
2019-11-20 10:57:00.0485 All Done.
2019-11-20 10:57:00.0495 Starting value for 0
2019-11-20 10:57:02.0489 Done processing for 0
2019-11-20 10:57:02.0495 Starting value for 1
2019-11-20 10:57:03.0898 Done processing for 1
2019-11-20 10:57:03.0898 Starting value for 2
2019-11-20 10:57:05.0270 Done processing for 2
2019-11-20 10:57:05.0270 Starting value for 3
2019-11-20 10:57:06.0607 Done processing for 3
2019-11-20 10:57:06.0607 Starting value for 4
2019-11-20 10:57:07.0930 Done processing for 4
2019-11-20 10:57:07.0930 YxjtbzPR29etkoCg6yR64XwepRPDHGaS8MAp4M16FLyEIaVUcqtx6RSaBaWmB5MyDh60Xo7a6aOSfwmLOvdt2Ppyj3K8OpWjhQ5BuC6aHVrPmQwau5PLROIWSIU0wrLl7mrYhDJSSfFPRUATXxFz0viDvzx4q88q8Nm9b3LPu264bR0kYrQk2sGQATQLRxR7DcsgKAvCvFKy8Ba0VYcKreJKAWOeUVYkvFTW9rPJ2X8FkwhzqeuNlWeK27v60CSB
2019-11-20 10:57:07.0930 n9uOCwuIlKX4PcWcFUhKfr96Topoht67h0wt1JHTAzIGxaRnHd92wW3wdghWaPxL9WuCE2ICfwHY5NCq1PYAekaCKEPvb46Ouz4TzFgGy0j3xZdwgyVqS4pmQJvl7hvhjldvouhxFfkt1DLeNAyai7J2CMFvWhDsFsNy0S0jsJfMfI9TV9N7U02u74rnwR5hZb2RfLJgfYf77GtS8jkf0QXufnhJmt8MYsx7BGLi9vrhgH0qcaLdA2bDspia767s
2019-11-20 10:57:07.0930 ljfUduDQunB2YsHI0ZO7HRXEbqSbozIKivbge0DIFuNgI7cAvPIofQhBiVU2sTfhswr9zpRd30aKZzE3zprU5oPg70RuTSAAbDaVr06nd4IARi8WbfthpPVP3FIk4ScezMH4I47FmoNuwMmTxyLrtskWWqInJn8WA0U7Riq4pNG7y8CKiXgiytWH09QXJvNMG7Wzd3ApIUyx45ljCOZUHDiORsGWRcQnf9ngv56d9qGBYJOqfHKALiJnca9CoJSN
2019-11-20 10:57:07.0931 LkoTs8Oa8cxd3NUEKK1snRZPguYFTtDQJcJ4kYDaDYpjBOScnziEbr2OuLsCHfXgBNFc3ZzejKCLwN3oCT5l24iILkpm7122ktoOGPXtMwNBoEuB4oK83xqm6P7YKX7YU3LK1njio5zzdbPhfc0Qyrym1qyBVQnr3c4dJds1zMYDTAfZJ0o8Wyi5R8XJ7bH8aiCXziozaEELa80CWM3mDymjSWMnihDjfJlasO3cFHP4q2vpsLzqnxqTfS9WKGNv
2019-11-20 10:57:07.0931 6wDtTynt8htbfm28ipVHOZ961TdoLZomAk2cv7GzvptFqlKnU71Y5U22n75i9AQloZpFAbGdEXHduYKWoAHvi8mmPCmK7X1y8wUjqHWYEC4GFgGbrzM4lIPAKp1WagqnL3gndpGMPwY1WpiXtpjqtgGCNb26TEsF178uXgMHk5i8V46bBBNGPFPTtACCn5Ga5myv7GAcwWpGcl4yHkBpxSVuqA1bhssJ4KhIvwexPH159ePeeaixeIBZQTfwDl86


我希望所有“起始值...”日志都显示在任何“完成处理”日志之前,但正如您所见,它们都在同步运行。

有一个更好的方法吗?

最佳答案

您的代码的主要问题是启动子协程的协程范围由 runBlocking 提供。 ,实际上是 runs everything in the current thread默认情况下,在这种情况下是您的 main线。此上下文由 launch 创建的子协程继承。 .如果您添加 Thread.currentThread().name 的输出,您可以很容易地看到这一点。给您的log功能。

将调度程序指定为 runBlocking 的参数例如。:

runBlocking(Dispatchers.Default) { ... }

话虽如此,我觉得您正在使 channel 复杂化。您可以使用 async使这些请求并发。

例如,如果您更改了 main方法如下:
// Note the explicit specification of the dispatcher here...
fun main() = runBlocking<Unit>(Dispatchers.Default) {
  log("First thing")

  val results = (1..NUMBER_OF_LOOPS).map {
    async { getValue(it) }
  }.awaitAll()

  log("Results: $results")

  log("All Done.")
}

它应该输出类似于以下内容的内容,我认为这是您正在寻找的内容。注意我将线程名称添加到您的 log获得更多洞察力的方法:
DefaultDispatcher-worker-4 2019-11-20 14:51:23.0884 Starting value for 3
DefaultDispatcher-worker-1 2019-11-20 14:51:23.0884 Starting value for 1
DefaultDispatcher-worker-5 2019-11-20 14:51:23.0884 Starting value for 4
DefaultDispatcher-worker-6 2019-11-20 14:51:23.0884 Starting value for 5
DefaultDispatcher-worker-3 2019-11-20 14:51:23.0884 Starting value for 2
DefaultDispatcher-worker-4 2019-11-20 14:51:25.0547 Done processing for 3
DefaultDispatcher-worker-6 2019-11-20 14:51:25.0599 Done processing for 5
DefaultDispatcher-worker-1 2019-11-20 14:51:25.0601 Done processing for 1
DefaultDispatcher-worker-5 2019-11-20 14:51:25.0775 Done processing for 4
DefaultDispatcher-worker-3 2019-11-20 14:51:25.0779 Done processing for 2
DefaultDispatcher-worker-3 2019-11-20 14:51:25.0780 Results: [U5VZ..., J4u0..., HWqg..., 1VrO..., ecS1...]
DefaultDispatcher-worker-3 2019-11-20 14:51:25.0780 All Done.


如果您不调用awaitAll()您将获得 Deferred<T> 的列表,你可以组成例如
val first = (1..x).map {
  async { something(it) }
}

val second = (1..y).map {
  async { somethingElse(it) }
}

(first + second).awaitAll()

我推荐阅读Composing Suspending Functions ,尤其是关于结构化并发的部分。使用异步函数正确处理故障可能很棘手,在将此类代码投入生产之前需要额外注意。

关于asynchronous - 如何异步对 Kotlin 中的多个项目执行网络请求?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58959736/

相关文章:

ios - CoreData 完成后 Swift iOS 9 UITableView 重新加载数据

c# - 将传统异步处理程序包装到 TPL Task<T> 时,回调和状态会发生什么?

android - 如果调用它的 Activity 已完成,如何检查(Android)异步任务?

java - Kotlin 无法获取正确的枚举实例

unit-testing - 是否可以测试这个 Kotlin 类?

python-3.x - 使用 next(gen) 和 gen.send(None) 启动 Python 3 生成器有区别吗?

swift - PromiseKit 如何返回 "when(resolved)"作为 promise ?

android - Kotlin DialogFragment editText 可编辑始终为空

python - Gevent可以和CPython中的真实线程结合使用吗?

python - 如何中断 Tornado 协程