我正在使用 Apache Camel JAVA DSL 消费来自 Apache kafka 的消息。
我正在通过将对象转换为 kafka 上的 byte[]
来编写对象。当我使用它时,我会收到一 strip 有 byte[]
的消息。我反序列化它并得到一个对象。
我检查它是否是 MyObject
的对象,然后需要使用 java DSL .to()
将其传递给 bean。我的代码如下:
public class KafkaRouter extends RouteBuilder {
private MessageBean msgBean;
@Override
public void configure() throws Exception {
from("{{kafka.cons.uri}}").process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
Object obj = SerializationUtils.deserialize(exchange.getIn().getBody(byte[].class)); //TODO cast to specific class as returned after deserialization.
if(obj !=null && obj instanceof MessageBean){
msgBean = (MessageBean)obj;
}
else {
throw new PTFException("Invalid Message read in Kafka Consumer");
}
}
}).bean(PTFTransformerService.class,"callTransformerService(msgBean)"); ;
}
现在的问题是我只想在相应的调用方法中使用 MyObject
而不想使用 TypeConvertors
。我不想在方法中获取 Exchange/body,我将在进程中处理我的流,并在读取无效消息时抛出异常,并且不将其转发到 bean。
我在另一端的方法是:
private void callTransformerService(MessageBean msgObj){
// Got my object here ;-)
}
最佳答案
在函数参数MessageBean msgObj
之前添加@Body
:
import org.apache.camel.Body;
private void callTransformerService(@Body MessageBean msgObj){
}
关于java - 是否可以使用 Apache Camel 将 MyObject 传递给 bean 路由?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31831016/