kotlin - Kotlin-基于大小和时间的 block 序列

标签 kotlin collections kotlin-coroutines

我有一个永无止境的流作为序列。
我的目标是根据时间和大小从序列中提取一批。

我的意思是,如果我的序列现在有2250条消息,我想发送3批(1000、1000、250)。

另外,如果到接下来的5分钟为止,我仍然没有累积1000条消息,那么无论如何我都将以迄今为止累积的任何内容发送它。

        sequence
        .chunked(1000)
        .map { chunk ->
            // do something with chunk
        }

我期望有一个类似.chunked(1000,300)的东西,当我想每5分钟发送一次时,秒数就是300。

提前致谢

最佳答案

Kotlin Sequence是一个同步概念,不应以任何时间限制的方式使用。如果您询问下一个元素的顺序,则它将阻塞调用程序线程,直到产生下一个元素,并且无法取消它。

但是,kotlinx.coroutines库引入了Channel的概念,它是异步世界序列的粗略模拟,其中操作可能需要一些时间才能完成,并且这样做时不会阻塞线程。您可以在this guide中阅读更多内容。

它没有提供现成的chunked运算符,但是使编写一个运算符变得简单。您可以使用以下代码:

import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.selects.*

fun <T> ReceiveChannel<T>.chunked(size: Int, time: Long) =
    produce<List<T>>(onCompletion = consumes()) {
        while (true) { // this loop goes over each chunk
            val chunk = mutableListOf<T>() // current chunk
            val ticker = ticker(time) // time-limit for this chunk
            try {
                whileSelect {
                    ticker.onReceive {
                        false  // done with chunk when timer ticks, takes priority over received elements
                    }
                    this@chunked.onReceive {
                        chunk += it
                        chunk.size < size // continue whileSelect if chunk is not full
                    }
                }
            } catch (e: ClosedReceiveChannelException) {
                return@produce // that is normal exception when the source channel is over -- just stop
            } finally {
                ticker.cancel() // release ticker (we don't need it anymore as we wait for the first tick only)
                if (chunk.isNotEmpty()) send(chunk) // send non-empty chunk on exit from whileSelect
            }
        }
    }

从该代码中可以看到,它嵌入了一些关于在极端情况下该做什么的重要决定。如果计时器到期但当前块仍为空,该怎么办?此代码开始新的时间间隔,并且不发送先前的(空)块。我们是在最后一个元素之后的超时中完成当前块,还是从第一个元素开始测量时间,还是从块的开头开始测量时间?这段代码在后面做。

该代码是完全顺序的-易于逐步遵循(其内部没有并发性)逻辑。可以根据项目的特定要求进行调整。

关于kotlin - Kotlin-基于大小和时间的 block 序列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51022533/

相关文章:

android - 是否可以进行多个并行调用并接受使用 Kotlin Flow 返回的第一个调用?

android - 意外超越 Kotlin

android - 从 Room DB 获取单个项目。从 View 模型调用函数

closures - 从 forEachLine 返回

java - 当compareto返回0时理解TreeSet

vba - 如何对集合进行排序?

java - Kotlin 标准库中的 String 类是如何实现的?

java - 当原始列表被修改时,Collections.in modulated list() 返回的 Arraylist 会发生什么?

kotlin - Kotlin将顺序IO调用包装为Sequence

Android - 用于处理 IllegalStateException : Cannot access database on the main thread 的 Kotlin 协程