python - TypeError : unhashable type: 'TopicAndPartition' when KafkaUtils. createDirectStream 的原因是什么?

标签 python apache-spark spark-streaming pykafka

我想通过KafkaUtils.createDirectStream从任意偏移量消费kafka消息。

我的源代码:

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition

def functionToCreateContext():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)
    kvs = KafkaUtils.createDirectStream(
        ssc,
        ['test123'],
        {"metadata.broker.list": "localhost:9092"},
        {TopicAndPartition("test123", 0): 100, TopicAndPartition("test123", 1): 100}
    )
    #kvs = kvs.checkpoint(10)
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()
    return ssc

if __name__ == "__main__":
    ssc = StreamingContext.getOrCreate("./checkpoint", functionToCreateContext())

    ssc.start()
    ssc.awaitTermination()

但是得到如下错误:

Traceback (most recent call last):
  File "/usr/local/spark-1.6.0-bin-hadoop2.6/examples/src/main/python/streaming/direct_kafka_wordcount.py", line 56, in <module>
    ssc = StreamingContext.getOrCreate("./checkpoint", functionToCreateContext())
  File "/usr/local/spark-1.6.0-bin-hadoop2.6/examples/src/main/python/streaming/direct_kafka_wordcount.py", line 45, in functionToCreateContext
    {TopicAndPartition("test123", 0): 100, TopicAndPartition("test123", 1): 100}
TypeError: unhashable type: 'TopicAndPartition'

pyspark源代码:

@staticmethod
def createDirectStream(ssc, topics, kafkaParams, fromOffsets=None,
                       keyDecoder=utf8_decoder, valueDecoder=utf8_decoder,
                       messageHandler=None):

class TopicAndPartition(object):
    """
    Represents a specific top and partition for Kafka.
    """

    def __init__(self, topic, partition):
        """
        Create a Python TopicAndPartition to map to the Java related object
        :param topic: Kafka topic name.
        :param partition: Kafka partition id.
        """
        self._topic = topic
        self._partition = partition

    def _jTopicAndPartition(self, helper):
        return helper.createTopicAndPartition(self._topic, self._partition)
    .........

jfromOffsets = dict([(k._jTopicAndPartition(helper),
                      v) for (k, v) in fromOffsets.items()])

fromOffsets 应该是一个字典,字典的键应该是一个 TopicAndPartition 对象。

对此有什么想法吗?

最佳答案

pyspark对于python3有一个bug,TopicAndPartition类缺少一个hash方法,所以你应该将python3更改为python2,错误消失了。

然后应该将偏移量从 int 转换为 long:

{TopicAndPartition("test123", 0): long(100), TopicAndPartition("test123", 1): long(100)}

关于python - TypeError : unhashable type: 'TopicAndPartition' when KafkaUtils. createDirectStream 的原因是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37454186/

相关文章:

apache-spark - Spark本地模式下如何配置Executor

apache-spark - KStreams + Spark 流 + 机器学习

java - Spark 结构化流中提交消息

python - 在Python中执行cURLs命令行

python - For 循环列表中的字典 (Python 2.7.6)

python - 如果它不以 http 开头,我如何将 http 添加到 url?

python - Django 异常未注册的命名空间

r - 根据列数据类型对 Spark 数据框(在 Sparklyr 中)进行子集化的最佳方法是什么

python - 如何在 PySpark RDD 中返回不同的集合?

python - 如何确保值映射到正确的增量表列?