apache-spark - 如何在Spark流媒体应用程序中处理DynamoDB Stream

原文 标签 apache-spark amazon-dynamodb amazon-kinesis

我想从Spark Streaming应用程序使用DynamoDB流。

Spark流使用KCL读取Kinesis。有一个使KCL能够从DynamoDB流读取的库:dynamodb-streams-kinesis-adapter。

但是有可能将此lib插入spark吗?有人这样做吗?

我正在使用Spark 2.1.0。

我的备份计划是让另一个应用程序从DynamoDB流读取到Kinesis流中。

谢谢

最佳答案

实现它的方法是实现KinesisInputDStream以使用dynamodb-streams-kinesis-adapter提供的工作程序
official guidelines建议如下所示:

final Worker worker = StreamsWorkerFactory .createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);

从Spark的角度来看,它是在KinesisInputDStream.scala中的kinesis-asl模块下实现的

我已经为Spark 2.4.0尝试过这个。这是我的仓库。它几乎不需要精炼,但可以完成工作

https://github.com/ravi72munde/spark-dynamo-stream-asl

修改KinesisInputDStream之后,我们可以如下所示使用它。
val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName("sample-tablename-2") .regionName("us-east-1") .initialPosition(new Latest()) .checkpointAppName("sample-app") .checkpointInterval(Milliseconds(100)) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build()

相关文章:

java - 无法理解Spark中的UDF,尤其是Java中的UDF

java - 并行读取S3中的多个文件(Spark,Java)

json - DynamoDB UnmarshalListOfMaps在Go中创建空值

go - AWS Lambda GoLang处理程序API

java - Apache Spark:在Java中有效地使用mapPartitions

python - Pyspark-如何在“ 4小时”内分组汇总窗口

node.js - 使用NODEjs进行DynamoDB更新:语法错误;令牌:“更新”,附近:“设置更新=”

java - DynamoDBMapper仅在对象不存在时保存

amazon-kinesis - 扫雪机Scala收集器:Kinesis流pockinesisfirehose不存在

algorithm - 基于时间跨度的AWS Kinesis流聚合