scala - 我是否使用了正确的框架?

标签 scala apache-spark elasticsearch apache-flink

我是 scala/flink/spark 的新手,有几个问题。
现在正在使用带有 flink 的 scala。

数据流的总体思路是这样的:
csv 文件 -> flink -> 弹性 -> flink(流程数据) -> MongoDB -> Tableau

有大量以分号分隔的日志文件。
我想将这些文件写入 elasticsearch 作为我的数据库。 (这已经有效)
现在需要进行各种分析(例如一致性报告/生产力报告)。
对于这些报告,需要不同类型的列。

想法是用flink从elasticsearch导入基础数据,编辑数据并保存到mongodb中,这样就可以用tableau进行数据可视化了。

编辑将包括添加额外的列,如工作日和不同状态的开始/结束时间

// +-------+-----+-----+  
// | status|date |time |  
// +-------+-----+-----+  
// | start | 1.1 |7:00 |  
// | run_a | 1.1 |7:20 |  
// | run_b | 1.1 |7:50 |  
// +-------+-----+-----+  


// +-------+-------+-------+----+  
// | status|s_time |e_time |day |  
// +-------+-------+-------+----|  
// | start | 7:00  |7:20   | MON|  
// | run_a | 7:20  |7:50   | MON|  
// | run_b | 7:50  |nextVal| MON|  
// +-------+-------+-------+----+  

经过一番研究,我发现 flink 并没有提供使用弹性作为数据源的可能性。
有一个github项目https://github.com/mnubo/flink-elasticsearch-source-connector但它已经一年多没有更新了。这似乎无法正常工作,因为它给我的点击次数更少,然后我会使用相同的查询进入 kibana。
有没有其他选择?为什么默认情况下不支持此功能?

这些表转换是否可以使用 flink 进行?用 flink 做这些有意义吗? (因为我很难实现它们)

我是否为这个项目使用了正确的框架?我应该切换到 spark,因为它提供了更多功能/社区项目吗?

最佳答案

首先,如果您的目标只是处理日志(强大的搜索、可视化、存储),您不能重新发明轮子并使用 ELK stack
您将获得下一个能力 -

  • 数据收集和日志解析引擎与 Logstash
  • 分析和可视化 Kibana
  • Elasticsearch 喜欢搜索引擎
  • 与云无缝集成(AWS 或 elastic cloud)

  • 但是这个软件是 shareware - 你将无法使用免费版本的全部功能,我可以根据我的个人经验说 - 试用版适合在生产中使用 - 它确实让生活更轻松。

    如果您想创建自己的自定义管道来存储、转换和处理日志或其他文件,Apache Spark 是用于此目的的绝佳解决方案 - 您可以使用 SparkETL 解决方案来处理您想要的一切 - 构建数据管道非常容易(read from elasticsearch --> process it --> save to mongo ; take from mongo --> send to visualisation 等) - 您可以使用 Spark 2.0 achieve speedup(与早期版本的 Spark 相比)。

    此外,已经准备好 solutionSpark - Mongo - ES 的集成,或者您可以通过使用 ESMongo 的连接器来制作自己的。关于 Flink - 你可以使用它来代替 SparkSpark 是更成熟的技术并且拥有更广泛的社区。与替代方案一样,您可以使用 ETL 解决方案在系统之间进行快速开发/原型(prototype)设计数据流(用鼠标拖动必要的组件),例如 StreamsetsNiFi

    关于scala - 我是否使用了正确的框架?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45563408/

    相关文章:

    Scala Lift - filterNot 带有多个参数

    scala - 如何使用 Spark 计算累积和

    apache-spark - 为什么 spark 的 global_temp 数据库不可见?

    php - 数组中的ElasticSearch匹配组合

    scala - 构造具有相同结构的多个案例类的单个函数

    scala - sbt 在动态任务中设置 java 选项

    java - Scala 2.12 和 Java 8 SAM 互操作不编译

    apache-spark - Spark - 如何通过 'SparkLauncher' 识别失败的作业

    ruby-on-rails - Tire找不到部分单词(在2个字段中搜索)

    elasticsearch - ElasticSearch仅检索与单个文档中的术语匹配的列表元素