scala - 使用 scala sbt 对 kafka + flink 示例进行故障排除?

标签 scala sbt apache-kafka apache-flink

刚接触 kafka/flink/scala/sbt 组合并尝试设置以下内容

  • 多主题 Kafka 队列
  • 使用 scala jar 的 Flink 流处理作业
  • 一个 scala jar,它从一个主题、进程读取数据,然后将数据推送到另一个主题

到目前为止

  • 能够正确设置Kafka和Flink。
  • 能够使用flink二进制文件附带的Kafka.jar示例读取kafka队列。

能够创建一个wordcount jar(感谢ipoteka)
现在尝试创建一个流字数统计 jar,但遇到了 sbt 问题
现在尝试创建一个示例 wordcount.jar,然后再尝试实际的 kafka/spark 流示例。
但遇到 simSBT 问题 知道我在忽略什么吗?
如果我有任何不必要的声明,也请告诉我。
如果有人分享一个简单的程序来读/写 kakfa 队列,我也将不胜感激。

项目设置 -

|- project/plugins.sbt
|- build.sbt
|- src/main/scala/WordCount.scala

build.sbt

name := "Kakfa-Flink Project"

version := "1.0"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0"

// Updated : Correction pointed by ipoteka 
libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.10.0.0"

libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.0.0"

libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.0.0"

libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.0.0"

// for jar building
mainClass in compile := Some("StreamWordCount")

plugins.sbt

// *** creating fat jar
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

WordCount.scala

package prog

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time.Time

object WordCount {

  type WordCount = (String, Int)

  def main(lines: DataStream[String], stopWords: Set[String], window: Time): DataStream[WordCount] = {
    lines
      .flatMap(line => line.split(" "))
      .filter(word => !word.isEmpty)
      .map(word => word.toLowerCase)
      .filter(word => !stopWords.contains(word))
      .map(word => (word, 1))
      .keyBy(0)
      .timeWindow(window)
      .sum(1)
  }

}

StreamWordCount.scala

package prog

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time.Time



object Main {
  def main(args: Array[String]) {

  type WordCount = (String, Int)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    val stream = env
      .addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))
      .flatMap(line => line.split(" "))
      .filter(word => !word.isEmpty)
      .map(word => word.toLowerCase)
      .filter(word => !stopWords.contains(word))
      .map(word => (word, 1))
      .keyBy(0)
      .timeWindow(window)
      .sum(1)
      .print

    env.execute("Flink Kafka Example")
  }
}

创建 jar 时出错(已更新)

[vagrant@streaming ex]$ /opt/sbt/bin/sbt  package
    [error] /home/vagrant/ex/src/main/scala/StreamWordCount.scala:4: object connectors is not a member of package org.apache.flink.streaming
[error] import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
[error]                                   ^
[error] /home/vagrant/ex/src/main/scala/StreamWordCount.scala:18: not found: type Properties
[error]     val properties = new Properties()
[error]                          ^
[error] /home/vagrant/ex/src/main/scala/StreamWordCount.scala:23: not found: type FlinkKafkaConsumer082
[error]       .addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))
[error]                      ^
[error] three errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 31 s, completed Jul 3, 2016 9:02:18 PM

最佳答案

您从哪里获得这些版本?我没有看到 kafka 版本 1.0.0。看maven (按 sbt 选项卡):

libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.10.0.0"

我还建议您检查所有其他版本。例如,Spark 当前版本是 1.6.2

关于scala - 使用 scala sbt 对 kafka + flink 示例进行故障排除?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38173645/

相关文章:

java - 使用 KafkaProducer 发送恰好一个 ProducerRecord

scala - 如何在不抛出异常的情况下将字符串读入枚举?

Scala 调试

scala - Play 2.0/SBT : Exclude certain transitive dependencies from some/all modules in Build. 斯卡拉

java - Apache Kafka 系统错误处理

java - Kafka 生产者在发送到通过 AdminClient createTopics 方法创建的主题时抛出 "Received unknown topic or partition error"

scala - 案例类扩展特征不适用于副本

forms - 使用 liftweb 预填充表单数据

scala - 具有变量的 maven 依赖项的 Ivy/sbt 错误,有修复吗?

scala - 如何创建具有不同子项目的通用 SBT 根项目