java - 如何使用 java(spring) 将 json 对象消息生成到 kafka 主题中?

标签 java json maven apache-kafka kafka-producer-api

我想在 kafka 主题中生成一条消息。该消息应具有以下模式:

   {"targetFileInfo":{"path":"2018-05-07-10/row01-small-01.txt.ready"}}

我知道这是一个 json 模式,那么我如何将 json 转换为字符串?

我用的是maven项目,所以需要哪些依赖才能使用

 String stringData = JSON.stringify({"targetFileInfo":{"path":"2018-05-07-10/row01-small-01.txt.ready"}});

所以我认为最好不要将 Json 转换为字符串并将该消息发送到 kafka 主题中。

我的代码就是这样,它可以发送一个字符串,但我不知道如何修改我的代码以发送上面的消息。也许你可以帮助我。

 Producer<String, String> producer = null;

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    producer = new KafkaProducer<>(props);
    String msg = "welcome";
    producer.send(new ProducerRecord<String, String>("event", msg));

    producer.close();

最佳答案

这解决了我的问题:

 Producer<String, String> producer = null;

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    producer = new KafkaProducer<>(props);

    try {
        producer = new KafkaProducer<String, String>(props);
    } catch (Exception e) {
        e.printStackTrace();
    }
    blobStorageChecker = new BlobStorageChecker();
    String folder = blobStorageChecker.getCurrentDateUTC();
    String msg = "{\"targetFileInfo\":{\"path\":\"test/"+folder+"row01-small.txt\"},\"sourceFileInfo\":{\"lastModifiedTime\":1525437960000,\"file\":\"/row01-small-01.txt\",\"filename\":\"/data/row01/row01-small.txt\",\"size\":19728,\"remoteUri\":\"ftp://waws-prod-am2-191.ftp.net/data/orsted-real/inbound/row01\",\"contentEncoding\":\"\",\"contentType\":\"\"}}";
    ProducerRecord<String, String> record = new ProducerRecord<String, String>("event-orsted-v1", null, msg);
    if (producer != null) {
        try {
            Future<RecordMetadata> future = producer.send(record);
            RecordMetadata metadata = future.get();
        } catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
        }
    }
    producer.close();

关于java - 如何使用 java(spring) 将 json 对象消息生成到 kafka 主题中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50198156/

相关文章:

java - 如何遍历LinkedTreeMap<String, Object>的所有值?

java - mvn 依赖项 :analyze 中的错误结果

java - 我还可以在 android 项目中使用 eclipse 吗?

java - 是否可以从 maven 编译 grunt 项目?

多次更改场景的 JavaFX 错误

java - 为什么每次添加新元素时 ArrayList 的 hashCode() 都会发生变化?

javascript - AngularJS:对象、数组、JSON、API 问题

java - 在java中创建数组并在构造函数中初始化

java - 如何在 Java 中创建斐波那契数列

IOS:使用延迟加载图像进行 JSON 解析