apache-spark - 来自 IBM MQ 的 Spark 流数据

标签 apache-spark apache-kafka spark-streaming ibm-mq

我想从 IBM MQ 流式传输数据。我试过了this code我在 Github 上找到了。

我能够从队列中流式传输数据,但每次流式传输时,它都会从中获取所有数据。我只想获取当前插入队列的数据。我查阅了很多网站,但没有找到正确的解决方案。

在 Kafka 中,我们有类似 KafkaStreamUtils 的东西来流式传输近乎实时的数据。在 IBM MQ 中是否有类似的东西,以便它只流式传输最新数据?

最佳答案

您提供的链接中的示例显示它调用以下方法从 IBM MQ 接收:

CustomMQReciever(String host , int port, String qm, String channel, String qn)

如果您查看 CustomMQReciever here你可以看到它只是浏览队列中的消息。这意味着该消息仍将在队列中,下次连接时您将收到相同的消息:

MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);

如果您想从队列中删除消息,您需要调用一个方法来从队列中使用它们,而不是从队列中浏览它们。以下是对 CustomMQReciever.java 进行更改的示例,应该可以完成您想要的:


initConnection() 下,将上面的代码更改为以下代码,使其从队列中删除消息:

MQMessageConsumer consumer = (MQMessageConsumer) qSession.createConsumer(queue);

摆脱:

enumeration= browser.getEnumeration();

receive() 下更改以下内容:

while (!isStopped() && enumeration.hasMoreElements() )
    {

    receivedMessage= (JMSMessage) enumeration.nextElement();
    String userInput = convertStreamToString(receivedMessage);
    //System.out.println("Received data :'" + userInput + "'");
    store(userInput);
    }

像这样:

while (!isStopped() && (receivedMessage = consumer.receiveNoWait()) != null))
    {
    String userInput = convertStreamToString(receivedMessage);
    //System.out.println("Received data :'" + userInput + "'");
    store(userInput);
    }

关于apache-spark - 来自 IBM MQ 的 Spark 流数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51324918/

相关文章:

apache-kafka - 卡夫卡服务器: Config file not found error

hadoop - 在 yarn 中启动/停止 Spark 流作业的正确方法是什么?

apache-spark - 如果在写入增量表之后立即执行 z 排序,那么在写入增量表之前重新分区是否毫无意义?

apache-kafka - 如何更改Kafka Connect Source Connector生成的主题名称

java - Kafka 消费者分配返回空集

apache-spark - Spark Streaming 应用程序中不同持续时间的多个窗口

apache-spark - Apache Spark : Reference pointer to the parent RDD

apache-spark - 为什么当我重新分区数据时,文件分割的大小没有减少?

apache-spark - 从 Databricks 中的 UDF 内部查询 Delta Lake

python - PySpark:在窗口上加盐并倾斜的 CumSum