apache-kafka - Quarkus 应用程序中 Kafka 反/序列化器中的 CDI 上下文

标签 apache-kafka cdi quarkus smallrye-reactive-messaging

我有一个 Quarkus 项目,其中包含基于 Kafka 的 Smallrye 响应式(Reactive)消息传递。因为我想使用“复杂的 pojo”,所以我需要一个自定义的反/序列化器。

我想将这两个类制作为 CDI bean,以便我可以注入(inject)并使用我的自定义记录器,它是一个 CDI bean。有没有办法实现这个目标?


现在我注入(inject)的记录器对象只是空:

import org.apache.kafka.common.serialization.Serializer;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

@ApplicationScoped
public class MySerializer implements Serializer<MyDto>
{
    @Inject MyLogger logger;

    @Override public byte[] serialize(String topicName, MyDto myDto)
    {
        // this causes a java.lang.NullPointerException
        logger.info("serializing");

        ...
    }
}

最佳答案

据我所知,你只能向kafka注册一个类名,它会在内部创建该类,即。不使用 CDI。

可能的解决方法:使注册对象成为 CDI-bean 的薄包装器,并将工作委托(delegate)给该 bean:

public class MySerializer implements Serializer<MyDto> {
    private MySerializerCdi delegate;

    public MySerializer() {
        delegate = CDI.current().select(MySerializerCdi.class).get();
    }

    @Override public byte[] serialize(String topicName, MyDto myDto) {
        return delegate.serialize(topicName, myDto);
    }
    ...
}

...并相应地重命名您的原始 CDI 类。

关于apache-kafka - Quarkus 应用程序中 Kafka 反/序列化器中的 CDI 上下文,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62577724/

相关文章:

mongodb - Wildfly 8 中用于 CDI 的自定义 jndi 对象工厂

java - 如何强制 CDI/Weld 使用 new 关键字?

java - 如何在Quarkus中获取静态值的配置值

c - Zookeeper librdkafka 示例

ssl - kafka : Inbound closed before receiving peer's close_notify 中的 SSL 错误

cdi - 动态 CDI 注入(inject)@Named

java - Quarkus 中的 JAX-RS 子资源问题

java - 如何避免连续出现 "Resetting offset"和 "Seeking to LATEST offset"?

twitter - Kafka Twitter 流 TwitterException 错误

quarkus - 哪些配置属性在 Quarkus 的部署时/运行时不可更改?