c# - 如何为 IoT 中心的每个新生产者添加新的 Kafka 主题?

标签 c# azure apache-kafka apache-kafka-connect azure-iot-hub

我正在开发 Azure 云解决方案。我使用连接到 Kafka 的 IoT 中心来处理来自各种 IoT 设备的数据。 我面临的是来自多个设备的所有数据都存储在同一个主题中。但是,我想将连接到IoT Hub的每个设备的数据处理到Kafka中的特定主题(每个设备都有自己的Kafka主题)

Toketi“适用于 Azure IoT 中心的 Kafka Connect 源连接器”提供以下配置文件(边缘节点)

connector.class=com.microsoft.azure.iot.kafka.connect.source.IotHubSourceConnector
name=AzureIotHubConnector
tasks.max=1
Kafka.Topic=IotTopic
IotHub.EventHubCompatibleName=iothub-toketi
IotHub.EventHubCompatibleEndpoint=sb://iothub-001.servicebus.windows.net/
IotHub.AccessKeyName=service
IotHub.AccessKeyValue=4KsdfiB9J899a+N3iwerjKwzeqbZUj1K//KKj1ye9i3=
IotHub.ConsumerGroup=$Default
IotHub.Partitions=4
IotHub.StartTime=2016-11-28T00:00:00Z
IotHub.Offsets=
BatchSize=100
ReceiveTimeout=60

它适用于一个主题来存储来自多个设备的所有数据,但我希望在来自设备的数据之间进行隔离

任何解决方案或想法!!

谢谢

最佳答案

解决方案之一是使用 SMT(单消息转换)。

源连接器流程包含几个步骤:

  • 从外部源轮询数据 List<SourceRecord>
  • 使用定义的 SMT 转换每条消息 ( SourceRecord )(如果未定义转换,则可以跳过
  • 转换 SourceRecord 的键和值到字节数组。
  • 通过 KafkaProducer 发送消息卡夫卡

Kafka Connect 根据 SourceRecord::topic 确定向哪个主题发送消息 field 。使用SMT您可以设置适当的主题值。

Pure Apache Kafka Connect 没有这样的转换。 如果您使用 Confluence Platform,则可以使用一些额外的转换。 要提取主题名称,您可以使用 ExtractTopic 。它有一个属性,称为 field

有关 SMT 整体概念的更多信息,请访问 Apache Kafka web pageConfluent web page

关于c# - 如何为 IoT 中心的每个新生产者添加新的 Kafka 主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55705689/

相关文章:

c# - 动态命令行参数和预加载任务,同时保持调试

c# - 使用 Entity Framework 从 ASP.NET MVC (C#) 中的 DropDown 获取 SelectedValue

azure - 获取 TFS Webhook 订阅状态

java - 卡夫卡输出流

尝试使用 Kafka 数据存储运行 Geomesa 快速入门时出现 Java 错误

postgresql - Postgres Debezium CDC 未发布对 Kafka 的更改

c# - List<T> 的扩展方法不接受 List<int>

c# - 限制在 WPF 中扩展 TreeNode 时 TreeView 的深度

azure - 有一些如何备份 Azure AD B2C 的方法吗?

c# - 使用 C# Web api 进行 Microsoft Azure 身份验证