java - 通过 Apache Kafka 发送的 Python 处理的 Avro 格式数据在 Apache Camel/Java 处理器中进行反序列化时不会产生相同的输出

标签 java python apache-kafka avro binary-data

我正在运行一个 Kafka 代理,我通过 Python 程序将消息推送到该代理。为了高效的数据交换,我使用 Apache Avro 格式。在 Kafka 代理处,消息由带有处理器的 Camel 路由接收。在这个处理器中,我想对消息进行反序列化,最后想将数据推送到 InfluxDB。
流程机制有效,但在 Camel route ,我没有将输入的数据取出。在 Python 方面
我创建了一个字典:

testDict = dict()
testDict['name'] = 'avroTest'
testDict['double_one'] = 1.2345
testDict['double_two'] = 1.23
testDict['double_three'] = 2.345
testDict['time_stamp'] = long(time.time() * 1000000000)
Python 端对应的 Avro 架构如下所示:
{
  "namespace": "my.namespace",
  "name": "myRecord",
  "type": "record",
  "fields": [
    {"name": "name",         "type": "string"},
    {"name": "double_one",   "type": "double"},
    {"name": "double_two",   "type": "double"},
    {"name": "double_three", "type": "double"},
    {"name": "time_stamp",   "type": "long"}
  ]
}
将 avro 格式的消息发送到 Kafka 的 Python 代码如下所示:
def sendAvroFormattedMessage(self, dataDict: dict, topic_id: str, schemaDefinition: str) \
        -> FutureRecordMetadata:
    """
    Method for sending message to kafka broker in the avro binary format
    :param dataDict: data dictionary containing message data
    :param topic_id: the Kafka topic to send message to
    :param schemaDefinition: JSON schema definition
    :return: FurtureRecordMetadata
    """
    schema = avro.schema.parse(schemaDefinition)
    writer = avro.io.DatumWriter(schema)
    bytes_stream = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_stream)
    writer.write(dataDict, encoder)
    raw_bytes = bytes_stream.getvalue()

    messageBrokerWriterConnection = KafkaProducer(bootstrap_servers=<connectionUrl>, client_id='testLogger')
    
    result = messageBrokerWriterConnection.send(topic=topic_id, value=raw_bytes, key='AVRO_FORMAT'.encode('UTF-8'))
    return result
消息按预期到达代理,由 Camel 拾取并由以下 JAVA 代码处理:
from(kafkaEndpoint) //
                .process(exchange -> {
                    Long kafkaInboundTime = Long
                            .parseLong(exchange.getIn().getHeader("kafka.TIMESTAMP").toString());
                    if (exchange.getIn().getHeader("kafka.KEY") != null) {

                        BinaryDecoder decoder = DecoderFactory.get()
                                .binaryDecoder(exchange.getIn().getBody(InputStream.class), null);

                        SpecificDatumReader<Record> datumReader = new SpecificDatumReader<>(avroSchema);

                        System.out.println(datumReader.read(null, decoder).toString());
                    }
                }) //
                .to(influxdbEndpoint);
avroSchema目前在我的类的构造函数中硬编码如下:
avroSchema = SchemaBuilder.record("myRecord") //
                .namespace("my.namespace") //
                .fields() //
                .requiredString("name") //
                .requiredDouble("double_one") //
                .requiredDouble("double_two") //
                .requiredDouble("double_three") //
                .requiredLong("time_stamp") //  
                .endRecord();
System.out.println的输出是
{"name": "avroTest", "double_one": 6.803527358993313E-220, "double_two": -0.9919128115125185, "double_three": -0.9775074719163893, "time_stamp": 20}
显然,出了点问题,但我不知道是什么。任何帮助表示赞赏。
更新 1
由于 Python 代码在 Intel/Window 机器上运行,Kafka(在 VM 中)和 Java 代码在具有未知架构的 Linux 机器上运行,这种影响可能是由系统的不同字节序引起的吗?
更新 1.1 可以排除字节序。两边都检查,都是“小”
更新 2
作为检查,我将所有字段的架构定义更改为字符串类型。根据这个定义,值和键被正确传输——Python 输入和 Java/Camel 输出是相同的。
更新 3
Kafka 的 Camel 路由生产者端点没有任何特殊功能,例如反序列化器等:
"kafka:myTopicName?brokers=host:9092&clientId=myClientID&autoOffsetReset=earliest"

最佳答案

我找到了解决我的问题的方法。以下 Python 代码将所需的输出生成到 Kafka:

def sendAvroFormattedMessage(self, dataDict: dict, topic_id: MessageBrokerQueue, schemaDefinition: str) \
        -> FutureRecordMetadata:
    """
    Method for sending message to kafka broker in the avro binary format
    :param dataDict: data dictionary containing message data
    :param topic_id: the Kafka topic to send message to
    :param schemaDefinition: JSON schema definition
    :return: None
    """
    schema = avro.schema.parse(schemaDefinition)

    bytes_writer = io.BytesIO()
    encoder = BinaryEncoder(bytes_writer)
    writer = DatumWriter(schema)
    writer.write(dataDict, encoder)
    raw_bytes = bytes_writer.getvalue()

    self._messageBrokerWriterConnection = KafkaProducer(bootstrap_servers=self._connectionUrl)

    try:
        # NOTE: I use the 'AVRO' key to separate avro formatted messages from others 
        result = self._messageBrokerWriterConnection.send(topic=topic_id, value=raw_bytes, key='AVRO'.encode('UTF-8'))
    except Exception as err:
        print(err)
    self._messageBrokerWriterConnection.flush()
解决方案的关键是添加 valueDeserializer=...到 Apache Camel 端的端点定义:
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

 ...

TEST_QUEUE("kafka:topic_id?brokers=host:port&clientId=whatever&valueDeserializer=" + ByteArrayDeserializer.class.getName());
包括转换为 InfluxDB 点的 Apache Camel 路线代码可以这样编写:
@Component
public class Route_TEST_QUEUE extends RouteBuilder {

   Schema avroSchema = null;

   private Route_TEST_QUEUE() {
       avroSchema = SchemaBuilder //
             .record("ElectronCoolerCryoMessage") //       
             .namespace("de.gsi.fcc.applications.data.loggers.avro.messages") //
            .fields() //
            .requiredString("name") //
            .requiredDouble("double_one") //
            .requiredDouble("double_two") //
            .requiredDouble("double_three") //
            .requiredLong("time_stamp") // 
            .endRecord();
    }

    private String fromEndpoint = TEST_QUEUE.definitionString();

    @Override
    public void configure() throws Exception {

        from(fromEndpoint) //
                .process(messagePayload -> {        
                    byte[] data = messagePayload.getIn().getBody(byte[].class);
                    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null);
                    SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroSchema);
                    GenericRecord record = datumReader.read(null, decoder);

                    try {
                        Point.Builder influxPoint = Point
                            .measurement(record.get("name").toString());
                        long acqStamp = 0L;
                        if (record.hasField("time_stamp") && (long) record.get("time_stamp") > 0L) {
                            acqStamp = (long) record.get("time_stamp");
                        } else {
                            acqStamp = Long.parseLong(messagePayload.getIn().getHeader("kafka.TIMESTAMP").toString());
                        }

                        influxPoint.time(acqStamp, TimeUnit.NANOSECONDS);

                        Map<String, Object> fieldMap = new HashMap<>();

                        avroSchema.getFields().stream() //
                                .filter(field ->    !field.name().equals("keyFieldname")) //
                                .forEach(field -> {
                                     Object value = record.get(field.name());
                                    fieldMap.put(field.name().toString(), value);
                                });

                        influxPoint.fields(fieldMap);

                    } catch (Exception e) {
                         MessageLogger.logError(e);
                    }
                }) //
                .to(...InfluxEndpoint...) //
                .onException(Exception.class) //
                .useOriginalMessage() //
                .handled(true) //
                .to("stream:out");
        }
    }
}
这适用于我的目的 - 没有融合,只有 kafka。

关于java - 通过 Apache Kafka 发送的 Python 处理的 Avro 格式数据在 Apache Camel/Java 处理器中进行反序列化时不会产生相同的输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66047812/

相关文章:

java - 处理标签参与者的输入

java - Jarsigner(MIME 内容类型不是应用程序/时间戳回复)

java - 三元if-else的自动拆箱需求

python - 带有 xdist 的 py.test 没有执行用随机值参数化的测试

apache-spark - 无法使用 Kafka-Spark 集成找到 Set([topic,0]) 的领导者

java - 与系统时间比较后程序无限期执行

python - AttributeError : 'str' object has no attribute 'decode' in fitting Logistic Regression Model

python - 如何在纯Python中运行spark mllib决策树模型?

apache-kafka - 使用 kafka 服务启动排序器失败

apache-kafka - 使用 Apache Storm 获取数据