python - 使用 pika 在 python 中使用 SparkStreaming、RabbitMQ 和 MQTT

标签 python apache-spark rabbitmq mqtt pika

为了让事情变得棘手,我想使用来自 rabbitMQ 队列的消息。现在我知道在 rabbit ( https://www.rabbitmq.com/mqtt.html ) 上有一个 MQTT 插件。

但是,我似乎无法制作一个 Spark 使用从 pika 生成的消息的示例。

例如,我在这里使用简单的 wordcount.py 程序 (https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html) 来查看我是否可以通过以下方式看到消息 producer:

import sys
import pika
import json
import future
import pprofile

def sendJson(json):

  connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  channel = connection.channel()

  channel.queue_declare(queue='analytics', durable=True)
  channel.queue_bind(exchange='analytics_exchange',
                       queue='analytics')

  channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json)
  connection.close()

if __name__ == "__main__":
  with open(sys.argv[1],'r') as json_file:
    sendJson(json_file.read())

sparkstreaming 消费者如下:

import sys
import operator

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils

sc = SparkContext(appName="SS")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
#ssc.setLogLevel("ERROR")


#RabbitMQ

"""EXCHANGE = 'analytics_exchange'
EXCHANGE_TYPE = 'direct'
QUEUE = 'analytics'
ROUTING_KEY = 'analytics'
RESPONSE_ROUTING_KEY = 'analytics-response'
"""


brokerUrl = "localhost:5672" # "tcp://iot.eclipse.org:1883"
topic = "analytics"

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
#dummy functions - nothing interesting...
words = mqttStream.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()
ssc.start()
ssc.awaitTermination()

然而,与简单的 wordcount 示例不同,我无法让它工作并出现以下错误:

16/06/16 17:41:35 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 8)
java.lang.NullPointerException
    at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:273)

所以我的问题是,根据 MQTTUtils.createStream(ssc, brokerUrl, topic) 应该如何设置来监听队列,是否有更完整的示例以及这些映射如何到 rabbitMQ 上。

我正在运行我的消费者代码:./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py

我已经按照一条评论的建议使用 TCP 参数更新了生产者代码,如下所示:

url_location = 'tcp://localhost'
url = os.environ.get('', url_location)
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)

Spark 流式传输为:

brokerUrl = "tcp://127.0.0.1:5672"
topic = "#" #all messages

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
records = mqttStream.flatMap(lambda line: json.loads(line))
count = records.map(lambda rec: len(rec))
total = count.reduce(lambda a, b: a + b)
total.pprint()

最佳答案

您似乎使用了错误的端口号。假设:

  • 您有一个使用默认设置运行的 RabbitMQ 本地实例,并且您启用了 MQTT 插件(rabbitmq-plugins enable rabbitmq_mqtt)并重新启动了 RabbitMQ 服务器
  • 在执行 spark-submit/pyspark 时包含 spark-streaming-mqtt(使用 packagesjars/驱动类路径)

您可以通过 tcp://localhost:1883 使用 TCP 进行连接。您还必须记住 MQTT 使用的是 amq.topic

快速入门:

  • 使用以下内容创建 Dockerfile:

    FROM rabbitmq:3-management
    
    RUN rabbitmq-plugins enable rabbitmq_mqtt
    
  • 构建 Docker 镜像:

    docker build -t rabbit_mqtt .
    
  • 启动图像并等待服务器准备就绪:

    docker run -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbit_mqtt 
    
  • 使用以下内容创建 producer.py:

    import pika
    import time 
    
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='amq.topic',
                     type='topic', durable=True)
    
    for i in range(1000):
        channel.basic_publish(
            exchange='amq.topic',  # amq.topic as exchange
            routing_key='hello',   # Routing key used by producer
            body='Hello World {0}'.format(i)
        )
        time.sleep(3)
    
    connection.close()
    
  • 开始制作人

    python producer.py
    

    并访问管理控制台http://127.0.0.1:15672/#/exchanges/%2F/amq.topic

    查看是否收到消息。

  • 使用以下内容创建 consumer.py:

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.mqtt import MQTTUtils
    
    sc = SparkContext()
    ssc = StreamingContext(sc, 10)
    
    mqttStream = MQTTUtils.createStream(
        ssc, 
        "tcp://localhost:1883",  # Note both port number and protocol
        "hello"                  # The same routing key as used by producer
    )
    mqttStream.count().pprint()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
    
  • 下载依赖项(将Scala版本调整为用于构建Spark的版本和Spark版本):

    mvn dependency:get -Dartifact=org.apache.spark:spark-streaming-mqtt_2.11:1.6.1
    
  • 确保 SPARK_HOMEPYTHONPATH 指向正确的目录。

  • 提交 consumer.py(像以前一样调整版本):

    spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.11:1.6.1 consumer.py
    

如果您按照所有步骤操作,您应该会在 Spark 日志中看到 Hello world 消息。

关于python - 使用 pika 在 python 中使用 SparkStreaming、RabbitMQ 和 MQTT,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37863801/

相关文章:

go - 在单独的方法中使用golang的defer

docker - 错误:在为rabbitmq运行docker容器时出现function_clause

python - 在 Discord.py 中获取公会的所有成员

python - 一个段落中的 Reportlab 粗体和普通文本

python - PyQt:应用程序意外离开主循环

apache-spark - 无法转换 Parquet 列 : Expected decimal, 找到二进制

apache-spark - 为什么 Spark 在 joinWith 期间对预先分区的数据帧执行不必要的随机播放?

RabbitMQ - 无法联系统计数据库。消息速率和队列长度将不会显示

python mysqldb 查询 with where

scala - 从 Spark udf 记录到驱动程序