我正在运行一个 Kafka 代理,我通过 Python 程序将消息推送到该代理。为了高效的数据交换,我使用 Apache Avro 格式。在 Kafka 代理处,消息由带有处理器的 Camel 路由接收。在这个处理器中,我想对消息进行反序列化,最后想将数据推送到 InfluxDB。
流程机制有效,但在 Camel route ,我没有将输入的数据取出。在 Python 方面
我创建了一个字典:
testDict = dict()
testDict['name'] = 'avroTest'
testDict['double_one'] = 1.2345
testDict['double_two'] = 1.23
testDict['double_three'] = 2.345
testDict['time_stamp'] = long(time.time() * 1000000000)
Python 端对应的 Avro 架构如下所示:{
"namespace": "my.namespace",
"name": "myRecord",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "double_one", "type": "double"},
{"name": "double_two", "type": "double"},
{"name": "double_three", "type": "double"},
{"name": "time_stamp", "type": "long"}
]
}
将 avro 格式的消息发送到 Kafka 的 Python 代码如下所示:def sendAvroFormattedMessage(self, dataDict: dict, topic_id: str, schemaDefinition: str) \
-> FutureRecordMetadata:
"""
Method for sending message to kafka broker in the avro binary format
:param dataDict: data dictionary containing message data
:param topic_id: the Kafka topic to send message to
:param schemaDefinition: JSON schema definition
:return: FurtureRecordMetadata
"""
schema = avro.schema.parse(schemaDefinition)
writer = avro.io.DatumWriter(schema)
bytes_stream = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_stream)
writer.write(dataDict, encoder)
raw_bytes = bytes_stream.getvalue()
messageBrokerWriterConnection = KafkaProducer(bootstrap_servers=<connectionUrl>, client_id='testLogger')
result = messageBrokerWriterConnection.send(topic=topic_id, value=raw_bytes, key='AVRO_FORMAT'.encode('UTF-8'))
return result
消息按预期到达代理,由 Camel 拾取并由以下 JAVA 代码处理:from(kafkaEndpoint) //
.process(exchange -> {
Long kafkaInboundTime = Long
.parseLong(exchange.getIn().getHeader("kafka.TIMESTAMP").toString());
if (exchange.getIn().getHeader("kafka.KEY") != null) {
BinaryDecoder decoder = DecoderFactory.get()
.binaryDecoder(exchange.getIn().getBody(InputStream.class), null);
SpecificDatumReader<Record> datumReader = new SpecificDatumReader<>(avroSchema);
System.out.println(datumReader.read(null, decoder).toString());
}
}) //
.to(influxdbEndpoint);
与 avroSchema
目前在我的类的构造函数中硬编码如下:avroSchema = SchemaBuilder.record("myRecord") //
.namespace("my.namespace") //
.fields() //
.requiredString("name") //
.requiredDouble("double_one") //
.requiredDouble("double_two") //
.requiredDouble("double_three") //
.requiredLong("time_stamp") //
.endRecord();
System.out.println
的输出是{"name": "avroTest", "double_one": 6.803527358993313E-220, "double_two": -0.9919128115125185, "double_three": -0.9775074719163893, "time_stamp": 20}
显然,出了点问题,但我不知道是什么。任何帮助表示赞赏。更新 1
由于 Python 代码在 Intel/Window 机器上运行,Kafka(在 VM 中)和 Java 代码在具有未知架构的 Linux 机器上运行,这种影响可能是由系统的不同字节序引起的吗?
更新 1.1 可以排除字节序。两边都检查,都是“小”
更新 2
作为检查,我将所有字段的架构定义更改为字符串类型。根据这个定义,值和键被正确传输——Python 输入和 Java/Camel 输出是相同的。
更新 3
Kafka 的 Camel 路由生产者端点没有任何特殊功能,例如反序列化器等:
"kafka:myTopicName?brokers=host:9092&clientId=myClientID&autoOffsetReset=earliest"
最佳答案
我找到了解决我的问题的方法。以下 Python 代码将所需的输出生成到 Kafka:
def sendAvroFormattedMessage(self, dataDict: dict, topic_id: MessageBrokerQueue, schemaDefinition: str) \
-> FutureRecordMetadata:
"""
Method for sending message to kafka broker in the avro binary format
:param dataDict: data dictionary containing message data
:param topic_id: the Kafka topic to send message to
:param schemaDefinition: JSON schema definition
:return: None
"""
schema = avro.schema.parse(schemaDefinition)
bytes_writer = io.BytesIO()
encoder = BinaryEncoder(bytes_writer)
writer = DatumWriter(schema)
writer.write(dataDict, encoder)
raw_bytes = bytes_writer.getvalue()
self._messageBrokerWriterConnection = KafkaProducer(bootstrap_servers=self._connectionUrl)
try:
# NOTE: I use the 'AVRO' key to separate avro formatted messages from others
result = self._messageBrokerWriterConnection.send(topic=topic_id, value=raw_bytes, key='AVRO'.encode('UTF-8'))
except Exception as err:
print(err)
self._messageBrokerWriterConnection.flush()
解决方案的关键是添加 valueDeserializer=...
到 Apache Camel 端的端点定义:import org.apache.kafka.common.serialization.ByteArrayDeserializer;
...
TEST_QUEUE("kafka:topic_id?brokers=host:port&clientId=whatever&valueDeserializer=" + ByteArrayDeserializer.class.getName());
包括转换为 InfluxDB 点的 Apache Camel 路线代码可以这样编写:@Component
public class Route_TEST_QUEUE extends RouteBuilder {
Schema avroSchema = null;
private Route_TEST_QUEUE() {
avroSchema = SchemaBuilder //
.record("ElectronCoolerCryoMessage") //
.namespace("de.gsi.fcc.applications.data.loggers.avro.messages") //
.fields() //
.requiredString("name") //
.requiredDouble("double_one") //
.requiredDouble("double_two") //
.requiredDouble("double_three") //
.requiredLong("time_stamp") //
.endRecord();
}
private String fromEndpoint = TEST_QUEUE.definitionString();
@Override
public void configure() throws Exception {
from(fromEndpoint) //
.process(messagePayload -> {
byte[] data = messagePayload.getIn().getBody(byte[].class);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null);
SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroSchema);
GenericRecord record = datumReader.read(null, decoder);
try {
Point.Builder influxPoint = Point
.measurement(record.get("name").toString());
long acqStamp = 0L;
if (record.hasField("time_stamp") && (long) record.get("time_stamp") > 0L) {
acqStamp = (long) record.get("time_stamp");
} else {
acqStamp = Long.parseLong(messagePayload.getIn().getHeader("kafka.TIMESTAMP").toString());
}
influxPoint.time(acqStamp, TimeUnit.NANOSECONDS);
Map<String, Object> fieldMap = new HashMap<>();
avroSchema.getFields().stream() //
.filter(field -> !field.name().equals("keyFieldname")) //
.forEach(field -> {
Object value = record.get(field.name());
fieldMap.put(field.name().toString(), value);
});
influxPoint.fields(fieldMap);
} catch (Exception e) {
MessageLogger.logError(e);
}
}) //
.to(...InfluxEndpoint...) //
.onException(Exception.class) //
.useOriginalMessage() //
.handled(true) //
.to("stream:out");
}
}
}
这适用于我的目的 - 没有融合,只有 kafka。
关于java - 通过 Apache Kafka 发送的 Python 处理的 Avro 格式数据在 Apache Camel/Java 处理器中进行反序列化时不会产生相同的输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66047812/