java - 与 kafka 消费者记录并行流

标签 java apache-kafka kafka-consumer-api

我有卡夫卡记录:

ConsumerRecords<String, Events> records = kafkaConsumer.poll(POLL_TIMEOUT);

我想使用并行流而不是多线程运行以下代码。

                records.forEach((record) -> {
                Event event = record.value();

                       HTTPSend.send(event);

            });

我尝试过多线程,但我想尝试并行流:

for (ConsumerRecord<String, Event> record : records) {
                        executor.execute(new Runnable() {
                            @Override
                            public void run() {

                                        HTTPSend.send(Event);

                            }
                        });

                    }

实际上,我面临着多线程 HTTP.send 的问题(即使线程池为 1 个线程)。我得到了

“由以下原因引起:sun.security.validator.ValidatorException:PKIX 路径构建失败:sun.security.provider.certpath.SunCertPathBuilderException:无法找到请求目标的有效证书路径”。 这是通过 https 的请求。此错误仅在第一次发出请求时出现。之后,异常消失。噗!

对于我正在使用的多线程:

int threadCOunt=1;
                BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(threadCOunt, true);
                RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
                ExecutorService executor = new ThreadPoolExecutor(threadCOunt, threadCOunt, 0L, TimeUnit.MILLISECONDS, queue, handler);

HTTPSend.send() 是:

long sizeSend = 0;
    SSLContext sc = null;

    try {
        sc = SSLContext.getInstance("TLS");
        sc.init(null, TRUST_ALL_CERTS, new SecureRandom());
    } catch (NoSuchAlgorithmException | KeyManagementException e) {
        LOGGER.error("Failed to create SSL context", e);
    }

    // Ignore differences between given hostname and certificate hostname
    HostnameVerifier hv = (hostname, session) -> true;

    // Create the REST client and configure it to connect meta
    Client client = ClientBuilder.newBuilder()
            .hostnameVerifier(hv)
            .sslContext(sc).build();

    WebTarget baseTarget = client.target(getURL()).path(HTTP_PATH);
    Response jsonResponse = null;

    try {
        StringBuilder eventsBatchString = new StringBuilder();
        eventsBatchString.append(this.getEvent(event));
        Entity<String> entity = Entity.entity(eventsBatchString.toString(), MediaType.APPLICATION_JSON_TYPE);
        builder = baseTarget.request();
        LOGGER.debug("about to send the event {} and URL {}", entity, getURL());
        jsonResponse = builder.header(HTTP_ACK_CHANNEL, guid.toString())
                .header("Content-type", MediaType.APPLICATION_JSON)
                .header("Authorization", String.format("Meta %s", eventsModuleConfig.getSecretKey()))
                .post(entity);

最佳答案

我明白你想做什么,但我不确定这是最好的主意(我也不确定这是否不是)。

Kafka 的 poll/commit 模型允许简单的背压,并在崩溃时保留最后处理的项目。通过“立即”返回轮询循环,您告诉 Kafka“我已准备好接受更多”,并提交偏移量(手动或自动)告诉 Kafka 您已成功读取到该点。

您似乎想要做的是尽快读取 Kafka,提交偏移量,然后将 Kafka 记录放入执行器队列中,然后从中平衡每秒的请求等。

我不能 100% 确定这是一个好主意:如果您的应用程序崩溃了会发生什么?您可能已经提交了一些实际上并未到达上游的 Kafka 消息。如果您确实想这样做,我建议在完成 Runnable 后手动提交偏移量(通过 commitSync),而不是让高级使用者为您执行此操作。

为什么你可能想要使用线程执行器:我认为这些也可以通过 Kafka 来完成。

您可能想同时向网络服务器发布多条消息。分区良好的 Kafka 主题将允许多个消费者/消费者组消费多个分区,因此 - 假设一个完美扩展的 HTTP 服务器 - 将允许您并行地将消息发布到您的服务器。基于进程的并发性太棒了!

也许 Web 服务器不是完全可扩展的,或者该请求速度很慢(假设每个请求需要 1 秒):您需要限制 Web 服务器每秒处理的请求数,如果您有一个队列,则可能有几个线程在不备份 Kafka 的情况下发布。

在这种情况下,您可以设置 max.poll.records为您的 Web 服务器所需的可扩展值。可能还有更好的方法来做到这一点,尽管目前我还没有想到。

如果您的网络服务器需要很长时间才能响应,您可能会收到与心跳失败相关的错误。在这种情况下,我会引导您访问 this SO answer on the timeout / heartbeat topic .

我不会使用线程执行器,从而使同步 HTTP 请求看起来是异步的,而是使用像 Netty 这样的事件 HTTP 客户端。 ,从而无需基于线程的并发即可实现并行性。

关于java - 与 kafka 消费者记录并行流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47250781/

相关文章:

Java:我是否应该使用构造函数来做更多的事情而不仅仅是初始化变量

java - 来自 Raspberry OS 的 BacNet/IP 广播不工作

java - 同步Android表面 View

spring - 启动 Spring Boot 应用程序而不检查 Kafka Server

apache-kafka - 消费群体如何在kafka中运作?

java - OpenGL 中的立方体渲染怪癖

apache-kafka - Kafka如何从__consumer_offsets主题阅读

java - 如何通过 Spark 提交 Spark Streaming 应用程序

java - 在 Java 中手动提交 Kafka 偏移量

apache-kafka - kafka中的消费群体有什么需求?