apache-spark - 如何将数据从 Kafka 传递到 Spark Streaming?

标签 apache-spark apache-kafka spark-streaming kafka-python

我正在尝试将数据从 kafka 传递到 Spark 流。

这是我到目前为止所做的:

  • 两个都安装了 kafkaspark
  • 已开始 zookeeper使用默认属性配置
  • 已开始 kafka server使用默认属性配置
  • 已开始 kafka producer
  • 已开始 kafka consumer
  • 从生产者向消费者发送消息。工作正常。
  • 写了 kafka-spark.py 接收来自 kafka 的消息以触发。
  • 我试着运行 ./bin/spark-submit examples/src/main/python/kafka-spark.py
  • 我收到一个错误。

  • kafka-spark.py -
    from __future__ import print_function
    import sys
    from pyspark.streaming import StreamingContext
    from pyspark import SparkContext,SparkConf
    from pyspark.streaming.kafka import KafkaUtils
    
    if __name__ == "__main__":
        #conf = SparkConf().setAppName("Kafka-Spark").setMaster("spark://127.0.0.1:7077")
        conf = SparkConf().setAppName("Kafka-Spark")
        #sc = SparkContext(appName="KafkaSpark")
        sc = SparkContext(conf=conf)
        stream=StreamingContext(sc,1)
        map1={'spark-kafka':1}
        kafkaStream = KafkaUtils.createStream(stream, 'localhost:9092', "name", map1) #tried with localhost:2181 too
    
        print("kafkastream=",kafkaStream)
        sc.stop()
    

    完整日志,包括运行 spark-kafka.py 时的错误:
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    16/01/18 13:05:33 INFO SparkContext: Running Spark version 1.6.0
    16/01/18 13:05:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    16/01/18 13:05:33 INFO SecurityManager: Changing view acls to: username
    16/01/18 13:05:33 INFO SecurityManager: Changing modify acls to: username
    16/01/18 13:05:33 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(username); users with modify permissions: Set(username)
    16/01/18 13:05:33 INFO Utils: Successfully started service 'sparkDriver' on port 54446.
    16/01/18 13:05:34 INFO Slf4jLogger: Slf4jLogger started
    16/01/18 13:05:34 INFO Remoting: Starting remoting
    16/01/18 13:05:34 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@127.0.0.1:50386]
    16/01/18 13:05:34 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 50386.
    16/01/18 13:05:34 INFO SparkEnv: Registering MapOutputTracker
    16/01/18 13:05:34 INFO SparkEnv: Registering BlockManagerMaster
    16/01/18 13:05:34 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-f5490271-cdb7-467d-a915-4f5ccab57f0e
    16/01/18 13:05:34 INFO MemoryStore: MemoryStore started with capacity 511.1 MB
    16/01/18 13:05:34 INFO SparkEnv: Registering OutputCommitCoordinator
    16/01/18 13:05:34 INFO Server: jetty-8.y.z-SNAPSHOT
    16/01/18 13:05:34 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
    16/01/18 13:05:34 INFO Utils: Successfully started service 'SparkUI' on port 4040.
    16/01/18 13:05:34 INFO SparkUI: Started SparkUI at http://127.0.0.1:4040
    Java HotSpot(TM) Server VM warning: You have loaded library /tmp/libnetty-transport-native-epoll561240765619860252.so which might have disabled stack guard. The VM will try to fix the stack guard now.
    It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
    16/01/18 13:05:34 INFO Utils: Copying ~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py to /tmp/spark-18227081-a1c8-43f2-8ca7-cfc4751f023f/userFiles-e93fc252-0ba1-42b7-b4fa-2e46f3a0601e/kafka-spark.py
    16/01/18 13:05:34 INFO SparkContext: Added file file:~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py at file:~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py with timestamp 1453118734892
    16/01/18 13:05:35 INFO Executor: Starting executor ID driver on host localhost
    16/01/18 13:05:35 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 58970.
    16/01/18 13:05:35 INFO NettyBlockTransferService: Server created on 58970
    16/01/18 13:05:35 INFO BlockManagerMaster: Trying to register BlockManager
    16/01/18 13:05:35 INFO BlockManagerMasterEndpoint: Registering block manager localhost:58970 with 511.1 MB RAM, BlockManagerId(driver, localhost, 58970)
    16/01/18 13:05:35 INFO BlockManagerMaster: Registered BlockManager
    
    ________________________________________________________________________________________________
    
      Spark Streaming's Kafka libraries not found in class path. Try one of the following.
    
      1. Include the Kafka library and its dependencies with in the
         spark-submit command as
    
         $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:1.6.0 ...
    
      2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
         Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = 1.6.0.
         Then, include the jar in the spark-submit command as
    
         $ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...
    
    ________________________________________________________________________________________________
    
    
    Traceback (most recent call last):
      File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py", line 33, in <module>
        kafkaStream = KafkaUtils.createStream(stream, 'localhost:9092', "name", map1)
      File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 80, in createStream
    py4j.protocol.Py4JJavaError: An error occurred while calling o22.loadClass.
    : java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Thread.java:745)
    
    16/01/18 13:05:35 INFO SparkContext: Invoking stop() from shutdown hook
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/api,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/threadDump/json,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/threadDump,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/json,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment/json,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/json,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool/json,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/json,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/json,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/job/json,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/job,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/json,null}
    16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs,null}
    16/01/18 13:05:35 INFO SparkUI: Stopped Spark web UI at http://127.0.0.1:4040
    16/01/18 13:05:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    16/01/18 13:05:35 INFO MemoryStore: MemoryStore cleared
    16/01/18 13:05:35 INFO BlockManager: BlockManager stopped
    16/01/18 13:05:35 INFO BlockManagerMaster: BlockManagerMaster stopped
    16/01/18 13:05:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    16/01/18 13:05:35 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
    16/01/18 13:05:35 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
    16/01/18 13:05:35 INFO SparkContext: Successfully stopped SparkContext
    16/01/18 13:05:35 INFO ShutdownHookManager: Shutdown hook called
    16/01/18 13:05:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-18227081-a1c8-43f2-8ca7-cfc4751f023f
    16/01/18 13:05:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-18227081-a1c8-43f2-8ca7-cfc4751f023f/pyspark-fcd47a97-57ef-46c3-bb16-357632580334
    

    编辑

    正在运行 ./bin/spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.0.jar examples/src/main/python/kafka-spark.py我得到 HEXADECIMAL 位置而不是实际字符串:
    kafkastream= <pyspark.streaming.dstream.TransformedDStream object at 0x7fd6c4dad150>
    

    知道我做错了什么吗?我对 kakfa 和 spark 真的很陌生,所以我需要一些帮助。谢谢!

    最佳答案

    您需要提交 spark-streaming-kafka-assembly_*.jar 与您的工作:

    spark-submit --jars spark-streaming-kafka-assembly_2.10-1.5.2.jar ./spark-kafka.py 
    

    关于apache-spark - 如何将数据从 Kafka 传递到 Spark Streaming?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34854518/

    相关文章:

    apache-spark - Spark驱动程序以集群模式登录边缘节点

    Ruby 线程不打印 kafka 消息

    mysql - 过滤数据库行

    go - 使用Sarama单独或批量提交消息-Go的kafka客户端

    hadoop - 即使在向命令行添加原则后,HDFS 委托(delegate) token 也已过期

    scala - Apache Spark Scala - Hive 插入到抛出 "too large frame error"

    java - 如何使用 Spark 3.0.0 读写 S3?

    scala - 了解 reduceByKey 函数定义 Spark Scala

    apache-spark - 如何从属性文件设置 Kafka 参数?

    apache-spark - 老年代堆内存逐渐增加