apache-spark - 如何从属性文件设置 Kafka 参数?

标签 apache-spark apache-kafka spark-streaming

我目前正在做的事情如下:

val topic = "mytopic"
val zkhosts = "localhost"
val zkports = "2181"

在我的代码中设置它然后将它发送到 kafkastream 函数可以工作,但我想从 .properties 读取它文件。有没有可能的解决方案?

最佳答案

鉴于此属性文件位于 /tmp/sample.properties

kafka.topic = "mytopic"
kafka.zkhost = "localhost"
kafka.zkports = 2191

我们可以使用普通的 java Property用于加载属性的 API:
import java.io.FileReader
val configFile = new java.io.File("/tmp/sample.properties")
val reader = new FileReader(configFile)
val props = new Properties()
props.load(reader)
reader.close()

您还可以使用您最喜欢的配置库来加载属性文件,就像在任何其他程序上一样。

例如,您可以使用流行的 typesafe config lib . Scala 有很多包装器,但在其原始形式中,您可以执行以下操作:
import com.typesafe.config.ConfigFactory
val configFile = new java.io.File("/tmp/sample.properties")
val kafkaConfig = ConfigFactory.parseFile(configFile)

import java.util.Properties
val kafkaProperties = new Properties()
kafkaProperties.put("zookeeper.hosts", kafkaConfig.getString("kafka.zkhost"))
kafkaProperties.put("zookeeper.port", kafkaConfig.getInt("kafka.zkports"):java.lang.Integer)
kafkaProperties.put("kafka.topic", kafkaConfig.getString("kafka.topic"))

(有很多方法可以使之美观和紧凑。这里我使用的是最常见的形式)

关于apache-spark - 如何从属性文件设置 Kafka 参数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44411067/

相关文章:

apache-kafka - 重新启动集群时连接消费者作业被删除

java - 运行 Apache Spark Kafka Stream 时获取 Hadoop OutputFormat RunTimeException

scala - 使用 Flink 从 kafka 主题的开头进行消费

apache-spark - Yarn Heap 使用量随时间增长

hadoop - 来自文件夹(不是 HDFS)的 Apache Spark Streaming

apache-spark - 获取用于调优的节点数、代码数和可用 RAM

python - 如何使用 Python 连接 HBase 和 Spark?

java - 为什么 Spark Standalone 集群不使用所有可用核心?

apache-spark - java.lang.UnsupportedOperationException : 'Writing to a non-empty Cassandra Table is not allowed

heroku - 在 Heroku 上部署 Apache Spark