java - 卡夫卡消费者滞后

标签 java hadoop apache-kafka bigdata kafka-consumer-api

我们一直在尝试创建一个 kafka 消费者,它尝试在来自其他 kafka 集群的 60 个分区中以大约 2.7tb/小时的速度使用数据。
到目前为止,我们已经设法消耗了大约 2tb 的数据/小时并且无法 catch 目标(2.7)。
我们正在消费的集群具有存储问题的数据保留/删除率,因此我们需要在 3 分钟内使用该数据量。
细节,
我们在 6 台机器上使用 60 个分区的数据。

import java.io.*;
import java.net.InetSocketAddress;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.*;
import javax.json.*;
import java.sql.Timestamp;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.protobuf.util.JsonFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.hadoop.security.UserGroupInformation;

public class NotificationConsumerThread implements Runnable {

    private final KafkaConsumer<byte[], byte[]> consumer;
    private final String topic;

    public NotificationConsumerThread(String brokers, String groupId, String topic) {
        Properties prop = createConsumerConfig(brokers, groupId);
        this.consumer = new KafkaConsumer<>(prop);
        this.topic = topic;
        this.consumer.subscribe(Arrays.asList(this.topic));
    }

    private static Properties createConsumerConfig(String brokers, String groupId) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokers);
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "120000");
        props.put("request.timeout.ms", "120001");
        props.put("max.poll.records", "280000");
        props.put("fetch.min.bytes", "1");
        props.put("max.partition.fetch.bytes", "10000000");
        props.put("auto.offset.reset", "latest");
        props.put("receive.buffer.bytes", "15000000");
        props.put("send.buffer.bytes", "1500000");
        props.put("heartbeat.interval.ms", "40000");
      //  props.put("max.poll.interval.ms", "420000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

        return props;
    }



    @Override
    public void run() {
        try {
            Configuration confHadoop = new Configuration();
            
            confHadoop.addResource(new Path("redacted"));
            confHadoop.addResource(new Path("redacted"));
            confHadoop.setBoolean("dfs.support.append" ,true);


           
            confHadoop.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
            confHadoop.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
            confHadoop.set("hadoop.security.authentication","kerberos");
            confHadoop.set("dfs.namenode.kerberos.principal.pattern", "redacted");
            UserGroupInformation.setConfiguration(confHadoop); UserGroupInformation.loginUserFromKeytab("redacted", "redacted");

            FileSystem fileHadoop1 = FileSystem.get(confHadoop);
            StringBuffer jsonFormat3 = new StringBuffer();

            while (true) {
                String jsonFormat;
                String jsonFormat1;
                String jsonFormat2;


                DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHH");
                dateFormat.toString();

                Date date = new Date();


                ConsumerRecords<byte[], byte[]> records = consumer.poll(3000);


                for (ConsumerRecord<byte[], byte[]> record : records) {


                    FlowOuterClass.Flow data = FlowOuterClass.Flow.parseFrom(record.value());
                    jsonFormat = JsonFormat.printer().print(data);
                    jsonFormat1 = jsonFormat.replaceAll("\\n", "");


                    JsonObject jsonObject1 = Json.createReader(new StringReader(jsonFormat1)).readObject();
                    Timestamp ts = new Timestamp(Long.parseLong(jsonObject1.getString("xxxx")));


                    date = new Date(ts.getTime());
                    jsonFormat2 = jsonFormat1.substring(0, jsonFormat1.length() - 1) + ", " + "\"xxxxx\"" + ": " + "\"" + dateFormat.format(date) + "\"" + "}\n";
                    jsonFormat3.append(jsonFormat2);

                }

                    String jsonFormat4 = jsonFormat3.toString();

                    if(jsonFormat4.length()>100000000) {
                        FSDataOutputStream stream = fileHadoop1.create(new Path("redacted-xxxxx" + dateFormat.format(date) + "/" + UUID.randomUUID().toString() + ".json"));
                        stream.write(jsonFormat4.getBytes());
                        stream.close();


                        
                        jsonFormat3.delete(0, jsonFormat3.length());
                    }
                }



        } catch (Exception e) {
            System.out.println(e);
        }
        consumer.close();
    }
}
这是滞后状态:
enter image description here
我们在互联网上找不到任何解决方案,因此我们很高兴知道如何与 kafka 消费者一起使用这些大量数据的最佳实践。
谢谢!

最佳答案

您可以尝试做一些事情,看看您是否能够以最小的延迟 catch 生成消息的原始速率。

  • 您应该增加消费者组中的消费者数量。根据您发布的图片,我可以看到有 10 个消费者在 6 台机器上运行。如果您的机器是 能够运行更多的消费者 ,那么您可能应该考虑增加消费者的数量。请注意,如果您可以将消费者数量增加到 12、15、20、30 中的任何一个,则效果会更好。这是因为我们想要 所有消费者获得相等数量的分区从话题。所以这个想法是消费者的数量应该是 60 倍。(你正在消费的主题中的分区数量)
  • 您试图通过更改 max.poll.records 来增加轮询时的记录数。至280000 .请注意,此配置 只会工作当您以类似的方式调整其他两个配置时。您需要更改 max.partition.fetch.bytesfetch.max.bytes以成比例的速度。我看到你试图改变max.partition.fetch.bytes10000000 (10MB) 大约您还应该考虑调整此值 fetch.max.bytes .因此,简而言之,您需要以正确的比例调整所有这些值。请仔细阅读,您可能会发现这很有用。 increase number of message in the poll
  • 如果上述两种方法不起作用,这是您可以考虑的最后一步。因为我们知道 Kafka 中的分区决定并行度 你可以实现的。您可以考虑增加您正在消费的主题中的分区数(将其更改为 120 或从当前 60 个分区中更大的数字)

  • 我希望这有帮助。

    关于java - 卡夫卡消费者滞后,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62796034/

    相关文章:

    java - 运行 jdbc 程序时线程 "main"java.sql.SQLException 中出现异常?

    java - 将应用程序提交到Spark集群:错误本地类不兼容

    java - 查询 KafkaAvroDeserializer 使用的默认 SchemaRegistryClient

    maven - 建立大象鸟 pig 时发生错误

    hadoop - kafka分区和生产者关系

    apache-kafka - 删除未使用的 kafka 消费者组

    java - 如何模拟主要

    java - 创建并写入文件

    java - 应该使用哪个 - ConcurrentHashMap putIfAbsent 或锁定映射

    java - 在 Java 程序中从 Unix 路径读取一个 hadoop 配置文件