我对 Azure 流分析还很陌生,但每次新事件到达 Azure 流分析作业时,我都需要从当天开始推送到 Power BI(实时仪表板)滚动总计。我已经创建了下一个 SQL 查询来计算这个
SELECT
Factory_Id,
COUNT(0) as events_count,
MAX(event_create_time) as last_event_time,
SUM(event_value) as event_value_total
INTO
[powerbi]
FROM
[eventhub] TIMESTAMP BY event_create_time
WHERE DAY(event_create_time) = DAY(System.Timestamp) and MONTH(event_create_time) = MONTH(System.Timestamp) and YEAR(event_create_time) = YEAR(System.Timestamp)
GROUP BY Factory_Id, SlidingWindow(day,1)
但这并没有给我想要的结果 - 我得到过去 24 小时的总计(不仅是当天),有时使用较大的 last_event_time 记录的 events_count 较小,然后使用较小的 last_event_time 记录。问题是 - 我做错了什么以及如何才能达到预期的结果?
最佳答案
编辑以下评论: 这会计算过去 24 小时的结果,但需要的是当天的运行总和/计数(从 00:00 到现在)。请参阅下面更新的答案。
我想知道是否 analytics这里的方法比聚合效果更好。
您不使用时间窗口,而是为输入中的每个事件计算并发出记录:
SELECT
Factory_Id,
COUNT(*) OVER (PARTITION BY Factory_Id LIMIT DURATION (hour, 24)) AS events_count,
system.timestamp() as last_event_time,
SUM(event_value) OVER (PARTITION BY Factory_Id LIMIT DURATION (hour, 24)) as event_value_total
INTO PowerBI
FROM [eventhub] TIMESTAMP BY event_create_time
唯一的问题是事件在同一时间戳上发生:
{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:00:00", "event_value" : 0.1}
{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:01:00", "event_value" : 2}
{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:01:00", "event_value" : 10}
{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:02:00", "event_value" : 0.2}
您不会在该时间戳上获得任何记录:
如果您的仪表板出现问题,我们可能希望向查询添加一个步骤来处理它。让我知道!
编辑以下评论
这个新版本将在每日滚动窗口中发出渐进的结果。为此,每次获得新记录时,我们 collect最后 24 小时。然后我们删除前一天的行,并重新计算新的聚合。为了正确收集,我们首先需要确保每个时间戳只有 1 条记录。
-- First we make sure we get only 1 record per timestamp, to avoid duplication in the analytics function below
WITH Collapsed AS (
SELECT
Factory_Id,
system.timestamp() as last_event_time,
COUNT(*) AS C,
SUM(event_value) AS S
FROM [input1] TIMESTAMP BY event_create_time
GROUP BY Factory_Id, system.timestamp()
),
-- Then we build an array at each timestamp, containing all records from the last 24h
Collected as (
SELECT
Factory_Id,
system.timestamp() as last_event_time,
COLLECT() OVER (PARTITION BY Factory_Id LIMIT DURATION (hour, 24)) AS all_events
FROM Collapsed
)
-- Finally we expand the array, removing the rows on the previous day, and aggregate
SELECT
C.Factory_Id,
system.timestamp() as last_event_time,
SUM(U.ArrayValue.C) AS events_count,
SUM(U.ArrayValue.S) AS event_value_total
FROM Collected AS C
CROSS APPLY GETARRAYELEMENTS(C.all_events) AS U
WHERE DAY(U.ArrayValue.last_event_time) = DAY(system.Timestamp())
GROUP BY C.Factory_Id, C.last_event_time, system.timestamp()
让我知道进展如何。
关于Azure 流分析当日聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72221113/