java - 无法使用spark sql读取kafka

标签 java maven apache-spark apache-kafka spark-streaming

我正在尝试使用 Spark 阅读 kafka,但我猜面临一些与库相关的问题。

我正在向 kafka 主题推送一些事件,我可以通过 kafka 控制台消费者读取这些事件,但无法通过 Spark 读取。我正在使用spark-sql-kafka库,该项目是用maven编写的。 Scala 版本是 2.11.12,spark 版本是 2.4.3。

            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.3</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>2.4.3</version>
            <scope>provided</scope>
        </dependency>

我的java代码如下:-

SparkSession spark = SparkSession.builder()
                .appName("kafka-tutorials")
                .master("local[*]")
                .getOrCreate();

        Dataset<Row> rows = spark.readStream().
                format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "meetup-trending-topics")
                .option("startingOffsets", "latest")
                .load();

        rows.writeStream()
                .outputMode("append")
                .format("console")
                .start();

        spark.streams().awaitAnyTermination();
        spark.stop();

以下是我收到的错误消息:-

线程“main”org.apache.spark.sql.AnalysisException中出现异常:无法找到数据源:kafka。请按照《Structured Streaming + Kafka 集成指南》部署部分部署应用程序; 在 org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652) 在 org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)

解决方案:- 两者之一 1)create uber jar 或 ii) --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 我之前在 mainclass 之后给出了 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 选项。

最佳答案

这个:

<scope>provided</scope>

意味着您有责任提供适当的jar。我(和许多其他人)更喜欢避免使用这个范围,而是构建一个 uberjar 来部署。

关于java - 无法使用spark sql读取kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56704113/

相关文章:

java - Spring AOP - 拦截其祖先有注释的类

java - Hazelcast 中所有 map 的编程属性设置?

java - Lambda 表达式抛出异常

scala - 如何在Spark数据帧中执行条件 "withColumn"?

hadoop - Livy 客户端 - Spark_Conf.zip 不存在

python - 即使在正确的位置提到了该文件,输入文件也不存在 - pyspark

java - javafx 电子表格中的撤消/重做功能

java - 如果它更接近我设置的值,有没有办法打印它?

java - JSF Primefaces 主题更改错误

Eclipse 无法识别 Scala 源目录