apache-spark - 如何在 Spark 流应用程序中处理 DynamoDB Stream

标签 apache-spark amazon-dynamodb amazon-kinesis

我想使用 Spark Streaming 应用程序中的 DynamoDB Stream。

Spark 流使用 KCL 从 Kinesis 中读取。有一个库可以让 KCL 能够从 DynamoDB 流中读取数据:dynamodb-streams-kinesis-adapter。

但是可以将这个库插入 Spark 吗?有人做过吗?

我正在使用 Spark 2.1.0。

我的备份计划是让另一个应用程序从 DynamoDB 流中读取到 Kinesis 流中。

谢谢

最佳答案

执行此操作的方法是实现 KinesisInputDStream 以使用 dynamodb-streams-kinesis-adapter 提供的工作器
official guidelines建议这样的事情:
final Worker worker = StreamsWorkerFactory .createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
从 Spark 的角度来看,它是在 KinesisInputDStream.scala 中的 kinesis-asl 模块下实现的

我已经为 Spark 2.4.0 尝试过这个。这是我的 repo 。它需要很少的改进,但可以完成工作

https://github.com/ravi72munde/spark-dynamo-stream-asl

修改完 KinesisInputDStream 后,我们就可以使用它了,如下图。val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName("sample-tablename-2") .regionName("us-east-1") .initialPosition(new Latest()) .checkpointAppName("sample-app") .checkpointInterval(Milliseconds(100)) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build()

关于apache-spark - 如何在 Spark 流应用程序中处理 DynamoDB Stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43441727/

相关文章:

c++ - C++ (DynamoDB) 中的 HMAC SHA256

amazon-dynamodb - DynamoDB,无法保存加密数据, "not authorized to perform: kms:GenerateDataKey on resource"

php - 检查 DynamoDB 中是否存在表的最佳方法是什么?

r - 为什么 ml_create_dummy_variables 在 Sparklyr 中不显示新的虚拟变量列

hadoop - Sparksql saveAsTable 调用错误的 hdfs 端口

hadoop - 我应该如何将我的事件流保存到冷存储?

amazon-web-services - 使用AWS Kinesis Firehose写入S3存储桶中的特定文件夹

amazon-web-services - Spark Streaming 使用 S3 与 Kinesis

scala - 为什么 spark 应用程序失败并显示 java.lang.NoClassDefFoundError : com/sun/jersey/api/client/config/ClientConfig even though the jar exists?

java - 使用 Spark SQL Row 在 Java 中访问多维 WrappedArray 元素