Azure 流分析当日聚合

标签 azure azure-stream-analytics

我对 Azure 流分析还很陌生,但每次新事件到达 Azure 流分析作业时,我都需要从当天开始推送到 Power BI(实时仪表板)滚动总计。我已经创建了下一个 SQL 查询来计算这个

SELECT
    Factory_Id,
    COUNT(0) as events_count, 
    MAX(event_create_time) as last_event_time,
    SUM(event_value) as event_value_total
INTO
        [powerbi]
FROM
    [eventhub] TIMESTAMP BY event_create_time
WHERE DAY(event_create_time) = DAY(System.Timestamp) and MONTH(event_create_time) = MONTH(System.Timestamp) and YEAR(event_create_time) = YEAR(System.Timestamp)
    GROUP BY Factory_Id, SlidingWindow(day,1)

但这并没有给我想要的结果 - 我得到过去 24 小时的总计(不仅是当天),有时使用较大的 last_event_time 记录的 events_count 较小,然后使用较小的 last_event_time 记录。问题是 - 我做错了什么以及如何才能达到预期的结果?

最佳答案

编辑以下评论: 这会计算过去 24 小时的结果,但需要的是当天的运行总和/计数(从 00:00 到现在)。请参阅下面更新的答案。

我想知道是否 analytics这里的方法比聚合效果更好。

您不使用时间窗口,而是为输入中的每个事件计算并发出记录:

SELECT
    Factory_Id,
    COUNT(*) OVER (PARTITION BY Factory_Id LIMIT DURATION (hour, 24)) AS events_count,
    system.timestamp() as last_event_time,
    SUM(event_value) OVER (PARTITION BY Factory_Id LIMIT DURATION (hour, 24)) as event_value_total
INTO PowerBI
FROM [eventhub] TIMESTAMP BY event_create_time

唯一的问题是事件在同一时间戳上发生:

{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:00:00", "event_value" : 0.1}
{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:01:00", "event_value" : 2}
{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:01:00", "event_value" : 10}
{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:02:00", "event_value" : 0.2}

您不会在该时间戳上获得任何记录:

<表类=“s-表”> <标题> Factory_Id events_count last_event_time event_value_total <正文> 1 1 2021-12-10T10:00:00.0000000Z 0.1 1 2 2021-12-10T10:01:00.0000000Z 2.1 1 3 2021-12-10T10:01:00.0000000Z 12.1 1 4 2021-12-10T10:02:00.0000000Z 12.2

如果您的仪表板出现问题,我们可能希望向查询添加一个步骤来处理它。让我知道!

编辑以下评论

这个新版本将在每日滚动窗口中发出渐进的结果。为此,每次获得新记录时,我们 collect最后 24 小时。然后我们删除前一天的行,并重新计算新的聚合。为了正确收集,我们首先需要确保每个时间戳只有 1 条记录。

-- First we make sure we get only 1 record per timestamp, to avoid duplication in the analytics function below
WITH Collapsed AS (
    SELECT
        Factory_Id,
        system.timestamp() as last_event_time,
        COUNT(*) AS C,
        SUM(event_value) AS S
    FROM [input1] TIMESTAMP BY event_create_time
    GROUP BY Factory_Id, system.timestamp()
),

-- Then we build an array at each timestamp, containing all records from the last 24h
Collected as (
    SELECT
        Factory_Id,
        system.timestamp() as last_event_time,
        COLLECT() OVER (PARTITION BY Factory_Id LIMIT DURATION (hour, 24)) AS all_events
    FROM Collapsed
)

-- Finally we expand the array, removing the rows on the previous day, and aggregate
SELECT
    C.Factory_Id,
    system.timestamp() as last_event_time,
    SUM(U.ArrayValue.C) AS events_count,
    SUM(U.ArrayValue.S) AS event_value_total
FROM Collected AS C
CROSS APPLY GETARRAYELEMENTS(C.all_events) AS U
WHERE DAY(U.ArrayValue.last_event_time) = DAY(system.Timestamp())
GROUP BY C.Factory_Id, C.last_event_time, system.timestamp()

让我知道进展如何。

关于Azure 流分析当日聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72221113/

相关文章:

azure - 使用 TIMESTAMP 进行基本查询,不产生输出

azure - 一项流分析作业与多项作业

sql - 在流分析中将列逆透视为两个新列

Azure 流分析作业 - 如何在边缘远程部署/发送引用数据文件

c# - Azure KeyVault 范围有助于解决访问被拒绝错误

基于可变时间的Azure流分析窗口

azure - 使用 Azure 流分析获取翻滚窗口中的第一条记录

用于创建服务主体的 Azure 函数

c# - 使用不同的数据库,具有不同的DBcontext,并根据不同的URL请求连接到它

Azure:存储交易成本