我想使用 Spark Streaming 应用程序中的 DynamoDB Stream。
Spark 流使用 KCL 从 Kinesis 中读取。有一个库可以让 KCL 能够从 DynamoDB 流中读取数据:dynamodb-streams-kinesis-adapter。
但是可以将这个库插入 Spark 吗?有人做过吗?
我正在使用 Spark 2.1.0。
我的备份计划是让另一个应用程序从 DynamoDB 流中读取到 Kinesis 流中。
谢谢
最佳答案
执行此操作的方法是实现 KinesisInputDStream 以使用 dynamodb-streams-kinesis-adapter
提供的工作器
official guidelines建议这样的事情:final Worker worker = StreamsWorkerFactory
.createDynamoDbStreamsWorker(
recordProcessorFactory,
workerConfig,
adapterClient,
amazonDynamoDB,
amazonCloudWatchClient);
从 Spark 的角度来看,它是在 KinesisInputDStream.scala 中的 kinesis-asl 模块下实现的
我已经为 Spark 2.4.0 尝试过这个。这是我的 repo 。它需要很少的改进,但可以完成工作
https://github.com/ravi72munde/spark-dynamo-stream-asl
修改完 KinesisInputDStream 后,我们就可以使用它了,如下图。val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("sample-tablename-2")
.regionName("us-east-1")
.initialPosition(new Latest())
.checkpointAppName("sample-app")
.checkpointInterval(Milliseconds(100))
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
关于apache-spark - 如何在 Spark 流应用程序中处理 DynamoDB Stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43441727/