azure - Azure 流分析中的跳跃窗口

标签 azure azure-stream-analytics

我尝试了解 azure 流分析中的跳跃窗口。 我将从 Azure 事件中心获取以下数据:

[
  {
    "Id": "1",
    "SensorData": [
      {
        "Timestamp": 1603112431,
        "Type": "LineCrossing",
        "Direction": "forward"
      },
      {
        "Timestamp": 1603112431,
        "Type": "LineCrossing",
        "Direction": "forward"
      }
    ],
    "EventProcessedUtcTime": "2020-10-20T06:35:48.5890814Z",
    "PartitionId": 1,
    "EventEnqueuedUtcTime": "2020-10-20T06:35:48.3540000Z"
  },
  {
    "Id": "1",
    "SensorData": [
      {
        "Timestamp": 1603112430,
        "Type": "LineCrossing",
        "Direction": "backward"
      }
    ],
    "EventProcessedUtcTime": "2020-10-20T06:35:48.5890814Z",
    "PartitionId": 0,
    "EventEnqueuedUtcTime": "2020-10-20T06:35:48.2140000Z"
  }
]

我的查询如下所示:

SELECT s.Id, COUNT(data.ArrayValue.Direction) as Count
FROM [customers] s TIMESTAMP BY EventEnqueuedUtcTime
CROSS APPLY GetArrayElements(s.SensorData) AS data
WHERE data.ArrayValue.Type = 'LineCrossing' 
AND data.ArrayValue.Direction = 'forward'
GROUP BY s.Id, HoppingWindow(second, 3600, 5)

我使用跳跃窗口每 5 秒获取最后一天的所有事件。 我对给定 dto 的期望是:Id1 为一行,计数为 2,但我收到的是:Id1 为 720 行(因此 3600 除以 5),计数为 2。

这些事件不应该由 HoppingWindow 函数聚合吗?

最佳答案

我按如下方式构建了您的查询:

with inputValues as (Select input.*, message.ArrayValue as Data from input CROSS APPLY GetArrayElements(input.SensorData) as message)

select inputValues.Id, count(Data.Direction) as Count
into output
from inputValues 
where Data.Type = 'LineCrossing' and Data.Direction='forward'
GROUP BY inputValues.Id, HoppingWindow(second, 3600, 5)

我已将输入设置为事件中心,并在 Visual Studio 中使用云输入启动查询。

我用了Windows Client application将消息通过管道传输到事件中心(2. 下图所示),并观察到事件每 5 秒发生一次(1. 下图所示,3. 下图所示)。

也许只需更改我共享的查询以反射(reflect)正确的时间戳,但结果应该符合预期 - 对于过去一小时(3600 秒)到达的所有事件,每 5 秒根据定义的条件计入输出HoppingWindow 函数)。

enter image description here

关于azure - Azure 流分析中的跳跃窗口,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64439893/

相关文章:

azure - 将 JSON 数据行转换为列

azure b2c - 发生异常

sockets - 在 Azure 中动态打开端口,负载均衡器是否足够智能,能够路由到正确的端口?

azure - 如何创建在计时器上运行的服务来查询外部 Web 服务?

android - 在 Azure 管道上运行 Android 模拟器存在性能问题

java - azure服务总线方法中的Abandon()向死信队列发送特定消息

azure - 流分析 - 看不到输出

azure - 使用流分析将数据插入 CosmosDB

azure - Azure 流分析是否读取来自所有分区的数据

Azure 流分析 - 仅当事件与前一个事件不同时才输出事件