cassandra - Apache Nifi/Cassandra - 如何将 CSV 加载到 Cassandra 表中

标签 cassandra cql data-integration apache-nifi

我每天都会收到多次传入的各种 CSV 文件,存储来自传感器的时间序列数据,这些传感器是传感器站的一部分。每个 CSV 均以其来源的传感器站和传感器 ID 命名,例如“station1_sensor2.csv”。目前,数据存储如下:

> cat station1_sensor2.csv
2016-05-04 03:02:01.001000+0000;0;
2016-05-04 03:02:01.002000+0000;0.1234;
2016-05-04 03:02:01.003000+0000;0.2345;

我创建了一个 Cassandra 表来存储它们并能够查询它们以查找各种已识别的任务。 Cassandra 表如下所示:

cqlsh > CREATE KEYSPACE data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};

        CREATE TABLE sensor_data (
        station_id text, // id of the station
        sensor_id text,  // id of the sensor
        tps timestamp,   // timestamp of the measure
        val float,       // measured value
        PRIMARY KEY ((station_id, sensor_id), tps)
        );

我想使用 Apache Nifi 自动将 CSV 中的数据存储到此 Cassandra 表中,但我找不到正确执行此操作的示例或方案。我尝试使用“PutCassandraQL”处理器,但我在没有任何明确示例的情况下苦苦挣扎。因此,如果您能获得有关如何使用 Apache Nifi 执行 Cassandra put 查询以将数据插入表中的任何帮助,我们将不胜感激!

最佳答案

TL;DR 我有一个 NiFi 1.0 模板来完成此任务 Gist并在 NiFi Wiki .

NiFi 鼓励高度模块化的设计,因此让我们将其分解为更小的任务,我将描述可能的流程并根据您的用例解释每个处理器的用途:

enter image description here

  1. 读入 CSV 文件。这可以通过 GetFile 来完成,或者最好使用 ListFile -> FetchFile。在我的示例中,我使用脚本处理器来创建内嵌流文件,其中包含上面的示例数据。这使得我的模板可供其他人使用。

  2. 解析文件名以获取站点和传感器字段。这使用 NiFi Expression Language获取文件名中下划线之前(代表站)和下划线之后(减去 CSV 扩展名)之后的部分(代表传感器)。

  3. 将单个 CSV 流文件拆分为每行一个流文件。这样做是为了稍后我们可以创建单独的 CQL INSERT 语句。

  4. 从每行中提取列值。我为此使用了 ExtractText 和正则表达式,如果您有非常复杂的逻辑,您可能需要检查脚本处理器,例如 ExecuteScript .

  5. 更改时间戳。 IIRC,CQL 不接受时间戳文字上的微秒。您可以尝试解析微秒(最好在 ExecuteScript 处理器中完成)或只是重新格式化时间戳。请注意,由于无法解析微秒,因此“重新格式化”会导致我的示例中所有小数秒被截断。

  6. 构建 CQL INSERT 语句。此时,数据(无论如何在我的模板中)都在流文件属性中,原始内容可以用 CQL INSERT 语句替换(这是 PutCassandraQL 所期望的方式)。您可以将数据保留在属性中(使用 UpdateAttribute 正确命名它们,请参阅 PutCassandraQL 文档)并使用准备好的语句,但恕我直言,编写显式 CQL 语句更简单。在撰写本文时,PutCassandraQL 并未缓存PreparedStatements,因此目前以这种方式执行操作的性能实际上较低。

  7. 使用 PutCassandraQL 执行 CQL 语句。

我没有详细介绍属性名称等,但当流程到达 ReplaceText 时,我具有以下属性:

  • station.name:包含从文件名解析出的电台名称
  • sensor.name:包含从文件名解析出的传感器名称
  • tps:包含更新的时间戳值
  • columns.2:(大概)包含传感器读数的值

ReplaceText 将内容设置为以下内容(使用表达式语言填充值):

insert into sensor_data (station_id, sensor_id, tps, val) values ('${station.name}', '${sensor.name}', '${tps}', ${column.2})

希望对您有所帮助,如果您有任何疑问或问题,请告诉我。干杯!

关于cassandra - Apache Nifi/Cassandra - 如何将 CSV 加载到 Cassandra 表中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39201597/

相关文章:

cassandra - 事件溯源微服务: How to manage timestamp

apache-spark - EMR LinkageError 上的 Spark + Cassandra

Cassandra - 新 ColumnFamily 创建时出现 NullPointerException

Cassandra cql select 查询总是抛出读取超时异常

javascript - JStestDriver 可以用来测试 JSP 文件中的 js 代码吗?

java - Cassandra IllegalArgumentException 创建键空间

docker - 如何使用 docker-compose 自动创建 Cassandra 键空间?

message-queue - 在 Kafka 中设计生产者和消费者的组件

java - 如何等待 Apache Camel JDBC 作业完成

cassandra - 在 Cassandra 中计算表的大小