Azure流分析如何在从不同设备获取数据并且所有设备都有差异字段/属性时编写窗口函数

标签 azure azure-stream-analytics

我有来自 16 台设备的遥测数据传输到 IOT 中心,并且所有设备都属于不同类型。我想应用窗口函数来获取最后 10 分钟数据的平均值。由于每个设备都有非常不同的参数,我无法定义查询。设备参数为

Voltage L1
Voltage L2
Voltage L3
Current L1
Current L2
Current L3
Power
Consumer Energy
Main_Tank_Oil_Level
Calibration_Level
Temperature
Actual Pressure
Actual_Flow
Main_Tank_Oil_Level
Calibration_Level
Temperature
Actual Pressure
Oil_Flow

并且每个设备都有多个参数。

在这种情况下我们如何应用窗口函数

最佳答案

从评论中,我了解到您实际上在单个物理流内有多个逻辑管道。您有多种设备类型,每种设备类型都需要不同的 Azure 流分析 (ASA) 查询。它们都共享相同的输入事件/物联网中心。

这里有 3 种方法。

1 - 多个 ASA 作业

您可以创建多个 ASA 作业,所有作业均从同一源事件/物联网中心读取。每个查询都会有一个针对特定设备类型的不同查询。每个查询的第一步是过滤流以仅查看其范围内的设备类型(WHERE deviceType = 1WHEREVoltageL1 IS NOT NULL) .

它们将具有不同的输出模式。因此,您可以针对它们的单个输出,但它需要是“无模式”服务,例如事件中心、cosmos db、存储帐户...或者,如果您的目标是强烈的目标,则可以输出到不同的目的地。类型化存储组件(SQL 数据库中的不同表)。

这里的关键是定义和使用不同的 consumer groups对于事件中心中的每个 ASA 作业。

2 - 单一作业,独立查询

为了降低成本,并且如果数据量允许,您可以将所有查询放入单个 ASA 作业中。它看起来像这样:

--- This step concentrates reads on the input source to reduce consumer group exhaustion
WITH SingleSource AS (
   SELECT * FROM MyInput
)

SELECT
   DeviceId, 
   DeviceType,
   System.Timestamp() AS WindowEndtime,
   AVG(VoltageL1) as AvgVoltageL1,
   AVG(VoltageL2) as AvgVoltageL2
INTO MyOutput
FROM SingleSource
WHERE DeviceType = 1 -- (or something like VoltageL1 IS NOT NULL)
GROUP BY DeviceId, DeviceType, TUMBLINGWINDOW(minute,10)

SELECT
   DeviceId, 
   DeviceType,
   System.Timestamp() AS WindowEndtime,
   AVG(Main_Tank_Oil_Level) as AvgMain_Tank_Oil_Level,
   AVG(Calibration_Level_Temperature) as AvgCalibration_Level_Temperature
INTO MyOutput
FROM SingleSource
WHERE DeviceType = 2 -- (or something like Main_Tank_Oil_Level IS NOT NULL)
GROUP BY DeviceId, DeviceType, TUMBLINGWINDOW(minute,10)

...

如果您想将所有内容输出到同一目的地(此处为MyOutput)。与多个作业相同,您始终可以创建多个输出并改变 INTO 子句。

3 - 单一作业,符合查询

如果您还想处理数据,以便所有内容都适合单个架构,您可以添加最后一个步骤来执行此操作:

--- This step concentrates reads on the input source to reduce consumer group exhaustion
WITH SingleSource AS (
   SELECT * FROM MyInput
),

Type1 AS (
   SELECT
      DeviceId, 
      DeviceType,
      System.Timestamp() AS WindowEndtime,
      AVG(VoltageL1) as AvgVoltageL1,
      AVG(VoltageL2) as AvgVoltageL2
   INTO MyOutput
   FROM SingleSource
   WHERE DeviceType = 1 -- (or something like VoltageL1 IS NOT NULL)
   GROUP BY DeviceId, DeviceType, TUMBLINGWINDOW(minute,10)
),

Type2 AS (
   SELECT
      DeviceId, 
      DeviceType,
      System.Timestamp() AS WindowEndtime,
      AVG(Main_Tank_Oil_Level) as AvgMain_Tank_Oil_Level,
      AVG(Calibration_Level_Temperature) as AvgCalibration_Level_Temperature
   INTO MyOutput
   FROM SingleSource
   WHERE DeviceType = 2 -- (or something like Main_Tank_Oil_Level IS NOT NULL)
   GROUP BY DeviceId, DeviceType, TUMBLINGWINDOW(minute,10)
),
...
AllRecords AS (
   SELECT DeviceId, DeviceType,WindowEndtime, AvgVoltageL1, AvgVoltageL2, NULL AS AvgMain_Tank_Oil_Level, NULL AS AvgCalibration_Level_Temperature FROM Type1
   UNION ALL
   SELECT DeviceId, DeviceType,WindowEndtime, NULL AS AvgVoltageL1, NULL AS AvgVoltageL2, AvgMain_Tank_Oil_Level, AvgCalibration_Level_Temperature FROM Type2
...
)

SELECT * INTO MyOutput FROM AllRecords

如果您共享一些通用指标并需要将其放入单个 SQL 表中,那么这是最佳选择。

关于Azure流分析如何在从不同设备获取数据并且所有设备都有差异字段/属性时编写窗口函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70677371/

相关文章:

azure - 如何从 Synapse 生成的输出文件中删除或替换 "\"(使用 copy stge)

Azure 流分析查询移动平均值

javascript - 在流分析中将 Javascript 的 Date.now() 转换为时间戳

azure - 尝试将 Azure 函数作为输出接收器添加到流分析作业时连接测试失败

json - 如何保存从azure认知服务转换后的文本数据?

sql-server - 使用 AD DS 切换到 Azure 存储 Gen2 后,SSMS 备份和还原 SQL 数据库到 Blob 停止工作

azure - 无法获取 Azure 成本消耗 api 的 Azure 范围

azure - CORS:Azure WebApp Docker 容器内的 NestJs,CORS 被拒绝

python - 错误: "Your deployment does not have an associated swagger.json" - ACI deployment on Stream Analytics Job

azure - 未找到指定时间范围内的事件