rust - 如何才能让 Future Producer 至少达到 rust rdkafka 中 Threaded Producer 的性能?

标签 rust rust-tokio

我只是在玩弄这些示例,我尝试将 FutureProducer 与 Tokio::spawn 一起使用,每个产品大约需要 11 毫秒。 11000 毫秒(11 秒)内发送 1000 条消息。

虽然 ThreadedProducer 在大约 4.5 秒(开发)和 2.6 秒(在 --release 上)内生成了 1000000 条(100 万条消息)!!!,这是两者之间的疯狂差异,也许我错过了一些东西,或者我没做好某事。 如果存在如此大的速度差异,为什么还要使用 FutureProducer? 也许有人可以阐明让我理解和了解 FutureProducer。

Kafka主题名称是“my-topic”,它有3个分区。

也许我的代码没有以合适的方式编写(对于 future 的生产者),我需要使用 FutureProducer 生成 1000000 条消息/不到 10 秒。

我的尝试写在以下要点中(我更新了这个问题以添加新的要点)


注意: 在我写完问题后,我尝试通过添加不同的想法来解决我的问题,直到第七次尝试成功

1- 生成阻塞: https://gist.github.com/arkanmgerges/cf1e43ce0b819ebdd1b383d6b51bb049

2- 线程生产者 https://gist.github.com/arkanmgerges/15011348ef3f169226f9a47db78c48bd

3- future 的制作人 https://gist.github.com/arkanmgerges/181623f380d05d07086398385609e82e

4- 带有基础生产者的操作系统线程 https://gist.github.com/arkanmgerges/1e953207d5a46d15754d58f17f573914

5- 与 future 制作人的操作系统线程 https://gist.github.com/arkanmgerges/2f0bb4ac67d91af0d8519e262caed52d

6- 操作系统线程,为 future 的生产者生成 tokio 任务 https://gist.github.com/arkanmgerges/7c696fef6b397b9235564f1266443726

7- tokio 多线程使用 #[tokio::main] 和 FutureProducer https://gist.github.com/arkanmgerges/24e1a1831d62f9c5e079ee06e96a6329

最佳答案

在我的第五个示例中,我需要使用操作系统线程(感谢与 @BlackBeans 的讨论),并且在操作系统线程内,我使用了 tokio 运行时,它使用 4 个工作线程,并且它将在操作系统线程中阻塞。 该示例使用了 100 个操作系统线程,每个线程都有 tokio 运行时和 4 个工作线程。

每个操作系统线程将产生 10000 条消息。 代码没有优化,我在构建开发中运行了它。


我在第七次尝试中完成的一个新示例,我使用 #[tokio::main] 默认情况下将使用 block_on,当我生成新任务时,可以将其放入新的操作系统线程中(我已经在主调度程序(在 block_on 内部)下使用 #[tokio::main] 进行了单独的测试来检查它。并且可以在 2.93 秒(开发版本)和 2.29 秒(发布版本)内生成 100 万条消息

关于rust - 如何才能让 Future Producer 至少达到 rust rdkafka 中 Threaded Producer 的性能?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71892619/

相关文章:

rust - 如何用u64::max_value()填充Vec

rust - 如何为 Cargo 设置默认并行作业数 [-j 4]?

rust - 如何使用 conservative_impl_trait 返回对迭代器的引用?

generics - 如果我为 B 实现 From<A>,是否也会为 Vec<B> 实现 From<Vec<A>>?

rust - 在删除 Rust Future 时 panic 运行异步代码

rust - 我可以通过在两个异步接收器上调用 select 来错过一个值吗?

memory-management - 在运行时在堆上分配缓冲区

rust - 为什么 `tokio::main` 报告错误 "cycle detected when processing"?

multithreading - 如何将 Tokio 线程池限制为一定数量的 native 线程?

rust - 如何解决 "the trait bound ` [closure]: tokio::prelude::Future` is not satisfied"when calling tokio::spawn?