我们一直在尝试创建一个 kafka 消费者,它尝试在来自其他 kafka 集群的 60 个分区中以大约 2.7tb/小时的速度使用数据。
到目前为止,我们已经设法消耗了大约 2tb 的数据/小时并且无法 catch 目标(2.7)。
我们正在消费的集群具有存储问题的数据保留/删除率,因此我们需要在 3 分钟内使用该数据量。
细节,
我们在 6 台机器上使用 60 个分区的数据。
import java.io.*;
import java.net.InetSocketAddress;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.*;
import javax.json.*;
import java.sql.Timestamp;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.protobuf.util.JsonFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.hadoop.security.UserGroupInformation;
public class NotificationConsumerThread implements Runnable {
private final KafkaConsumer<byte[], byte[]> consumer;
private final String topic;
public NotificationConsumerThread(String brokers, String groupId, String topic) {
Properties prop = createConsumerConfig(brokers, groupId);
this.consumer = new KafkaConsumer<>(prop);
this.topic = topic;
this.consumer.subscribe(Arrays.asList(this.topic));
}
private static Properties createConsumerConfig(String brokers, String groupId) {
Properties props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "120000");
props.put("request.timeout.ms", "120001");
props.put("max.poll.records", "280000");
props.put("fetch.min.bytes", "1");
props.put("max.partition.fetch.bytes", "10000000");
props.put("auto.offset.reset", "latest");
props.put("receive.buffer.bytes", "15000000");
props.put("send.buffer.bytes", "1500000");
props.put("heartbeat.interval.ms", "40000");
// props.put("max.poll.interval.ms", "420000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
return props;
}
@Override
public void run() {
try {
Configuration confHadoop = new Configuration();
confHadoop.addResource(new Path("redacted"));
confHadoop.addResource(new Path("redacted"));
confHadoop.setBoolean("dfs.support.append" ,true);
confHadoop.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
confHadoop.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
confHadoop.set("hadoop.security.authentication","kerberos");
confHadoop.set("dfs.namenode.kerberos.principal.pattern", "redacted");
UserGroupInformation.setConfiguration(confHadoop); UserGroupInformation.loginUserFromKeytab("redacted", "redacted");
FileSystem fileHadoop1 = FileSystem.get(confHadoop);
StringBuffer jsonFormat3 = new StringBuffer();
while (true) {
String jsonFormat;
String jsonFormat1;
String jsonFormat2;
DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHH");
dateFormat.toString();
Date date = new Date();
ConsumerRecords<byte[], byte[]> records = consumer.poll(3000);
for (ConsumerRecord<byte[], byte[]> record : records) {
FlowOuterClass.Flow data = FlowOuterClass.Flow.parseFrom(record.value());
jsonFormat = JsonFormat.printer().print(data);
jsonFormat1 = jsonFormat.replaceAll("\\n", "");
JsonObject jsonObject1 = Json.createReader(new StringReader(jsonFormat1)).readObject();
Timestamp ts = new Timestamp(Long.parseLong(jsonObject1.getString("xxxx")));
date = new Date(ts.getTime());
jsonFormat2 = jsonFormat1.substring(0, jsonFormat1.length() - 1) + ", " + "\"xxxxx\"" + ": " + "\"" + dateFormat.format(date) + "\"" + "}\n";
jsonFormat3.append(jsonFormat2);
}
String jsonFormat4 = jsonFormat3.toString();
if(jsonFormat4.length()>100000000) {
FSDataOutputStream stream = fileHadoop1.create(new Path("redacted-xxxxx" + dateFormat.format(date) + "/" + UUID.randomUUID().toString() + ".json"));
stream.write(jsonFormat4.getBytes());
stream.close();
jsonFormat3.delete(0, jsonFormat3.length());
}
}
} catch (Exception e) {
System.out.println(e);
}
consumer.close();
}
}
这是滞后状态:我们在互联网上找不到任何解决方案,因此我们很高兴知道如何与 kafka 消费者一起使用这些大量数据的最佳实践。
谢谢!
最佳答案
您可以尝试做一些事情,看看您是否能够以最小的延迟 catch 生成消息的原始速率。
max.poll.records
来增加轮询时的记录数。至280000
.请注意,此配置 只会工作当您以类似的方式调整其他两个配置时。您需要更改 max.partition.fetch.bytes
和 fetch.max.bytes
以成比例的速度。我看到你试图改变max.partition.fetch.bytes
至10000000
(10MB) 大约您还应该考虑调整此值 fetch.max.bytes
.因此,简而言之,您需要以正确的比例调整所有这些值。请仔细阅读,您可能会发现这很有用。 increase number of message in the poll 我希望这有帮助。
关于java - 卡夫卡消费者滞后,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62796034/