apache-kafka - 是否建议在 schema.registry.url 中使用多个 URL?

标签 apache-kafka confluent-schema-registry

此处的文档 https://docs.confluent.io/current/schema-registry/index.html提到了使用单个 URL 的可能性,这意味着也可以使用多个以逗号分隔的 URL,但是,问题是推荐什么以及为什么使用 F5 之类的东西,或者只是使用逗号分隔的 URL?

我使用 kafka-avro-console- Producer 和 kafka-avro-console-consumer 测试了逗号分隔的 URL,虽然后者始终按预期运行,但前者有时会返回(打印到控制台) >> ERROR Failed当其中一个 URL 错误时,将 HTTP 请求发送到端点<<,尽管它会向 Kafka 生成消息,并且不会崩溃。我宁愿它不这样做,但在实际的应用程序代码中总是可以忽略这样的异常。它实际上打印整个错误堆栈,而不仅仅是这几个单词。我也看过这个: https://github.com/confluentinc/confluent-kafka-dotnet/issues/711

我们将使用 3 个 URL 作为配置参数值,您对此有何看法?

我们使用Java客户端,代码如下:

final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://kafka1:8081,http://kafka2:8081,http://kafka3:8081");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 

因此,假设我们有 3 个 kafka 代理服务器节点:kafka1、kafka2 和 kafka3,并且我们在每个节点上启动了一个架构注册表实例,我们是否应该使用与在还是我们应该使用外部负载均衡器(例如 F5)或循环 DNS,并让它提供解析为 kafka1、kafka2 和 kafka3 的单个 ipaddr/别名(如下面示例中的 schema_registry_loadbalanced)?这将在应用程序代码中使用,如下所示:

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema_registry_loadbalanced:8081");

对于 kafka 连接,我们认为外部负载均衡器不会提供太多好处,但对于架构注册表我们不确定。

最佳答案

这取决于您的客户端,但至少 Java 属性被设置为将配置类型作为 URL 列表。

在我工作的地方,注册表是一个负载平衡器,所以无论如何它都是一个 URL。

关于apache-kafka - 是否建议在 schema.registry.url 中使用多个 URL?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55965935/

相关文章:

javascript - Koa 与 Kafka - 无法产生 kafka.connect()

c# - 可以将 C# 模型序列化为 AVRO JSON 模式吗?

jdbc - 汇合: ERROR Failed to run query for table TimestampIncrementingTableQuerier mysql-jdbc

java - 在 Spring Cloud Stream 中使用 PollableMessageSource 输入时如何使用 avro native 解码器?

apache-kafka - 如何在消费者组kafka中动态添加消费者

python - 如果Kafka消费者关闭,如何读取最后一条消费消息后的消息?

hadoop - [HDFS connector + Kafka]单机模式下如何写多个主题?

kubernetes - 我应该传递什么参数以使架构注册表在从属模式下运行?

apache-kafka - 汇合模式注册表 `UnknownTopicOrPartitionException: This server does not host this topic-partition.`

java - Kafka消费者唤醒异常处理