apache - 在 Apache Flume 中获取 JMS header

标签 apache hadoop ibm-mq flume flume-ng

我正在尝试使用 Apache Flume 使用 JMS 消息 (IBM Websphere MQ) 并将数据存储到 HDFS。在阅读消息时,我只能看到消息的正文,而看不到消息的标题内容。

是否可以使用 Apache Flume 读取带有 header 属性的 jms 消息?

我的配置:

# Source definition
u.sources.s1.type=jms
u.sources.s1.initialContextFactory=ABC
u.sources.s1.connectionFactory=<my connection factory>
u.sources.s1.providerURL=ABC
u.sources.s1.destinationName=r1
u.sources.s1.destinationType=QUEUE
# Channel definition
u.channels.c1.type=file
u.channels.c1.capacity=10000000
u.channels.c1.checkpointDir=/checkpointdir
u.channels.c1.transactionCapacity=10000
u.channels.c1.dataDirs=/datadir
# Sink definition
u.sinks.r1.type=hdfs
u.sinks.r1.channel=c1
u.sinks.r1.hdfs.path=/message/%Y%m%d
u.sinks.r1.hdfs.filePrefix=event_
u.sinks.r1.hdfs.fileSuffix=.xml
u.sinks.r1.hdfs.fileType = DataStream
u.sinks.r1.hdfs.writeFormat=Text
u.sinks.r1.hdfs.useLocalTimeStamp=TRUE

最佳答案

JMS 消息的类型很多,如“表 30–2 JMS 消息类型”here .

Flume DefaultJMSMessageConverter 使用 TextMessage,如 here并在下面给出供您引用:

...
else if(message instanceof TextMessage) {
      TextMessage textMessage = (TextMessage)message;
      event.setBody(textMessage.getText().getBytes(charset));
    } 
...

TextMessage仅提供消息正文。

恕我直言,您有两个选择:

  1. 如果可能的话,在正文本身中发送消息- header 、 header -值对,并按原样使用“DefaultJMSMessageConverter”。
  2. 通过编写自定义 JMSMessageConverter 并将“消息”类型转换为 javax.jms.Message 来构建您自己的“flume-jms-source.jar” ,获取 JMS header ,在 SimpleEvent 中设置它们。

希望这能提供一些方向。

关于apache - 在 Apache Flume 中获取 JMS header ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48466410/

相关文章:

scala - EMR Spark 无法将 Dataframe 保存到 S3

sql - Spark SQL 会完全取代 Apache Impala 或 Apache Hive 吗?

apache - htaccess 重定向 http ://and http://www to https://that plays nice with subdomains

php - 我为什么要购买这些 Silex 404?

Apache 在 httpd.conf 中的 documentroot 更改后意外关闭

apache - 将域名指向Tomcat

hadoop - Apache Hadoop 配置选项

java - IBM WebSphere MQ 2042 错误

java - Websphere MQ 客户端部署到 Jboss Fuse

java - 我们可以将平面文件发送到 IBM MQ 还是必须创建平面文件的字符串表示形式,以便可以将其作为消息发送