我正在尝试加入 2 个流源,它会从 EventHub 生成相同的数据输出。 我试图每 5 分钟找到该股票的最高开盘价,并尝试将其写入表格。我感兴趣的是 5 分钟窗口内库存达到最大值的时间和窗口时间。 我使用了下面提到的查询,但它没有产生任何输出。 我想我已经搞乱了加入条件。
WITH Source1 AS (
SELECT
System.TimeStamp() as TimeSlot,max([open]) as 'MaxOpenPrice'
FROM
EventHubInputData TIMESTAMP BY TimeSlot
GROUP BY TumblingWindow(minute,5)
),
Source2 AS(
SELECT EventEnqueuedUtcTime,[open]
FROM EventHubInputDataDup TIMESTAMP BY EventEnqueuedUtcTime),
Source3 as (
select Source2.EventEnqueuedUtcTime as datetime,Source1.MaxOpenPrice,System.TimeStamp() as TimeSlot
FROM Source1
JOIN Source2
ON Source2.[Open] = Source1.[MaxOpenPrice] AND DATEDIFF (minute,Source1,Source2) BETWEEN 0 AND 5
)
SELECT datetime,MaxOpenPrice,TimeSlot
INTO EventHubOutPutSQLDB
FROM Source3 ```
最佳答案
这里的逻辑很好。首先,您确定每个 5 分钟窗口的最大值,然后在原始流中查找它发生的时间。
WITH MaxOpen5MinTumbling AS (
SELECT
--TickerId,
System.TimeStamp() AS WindowEnd, --this always return the end of the window when windowing
MAX([open]) AS 'MaxOpenPrice'
FROM EventHubInputData --no need to timestamp if using ingestion time
GROUP BY TumblingWindow(minute,5)
)
SELECT
--M.TickedId,
M.WindowEnd,
M.MaxOpenPrice,
O.EventEnqueuedUtcTime AS MaxOpenPriceTime
FROM MaxOpen5MinTumbling M
LEFT JOIN EventHubInputData O
ON M.MaxOpenPrice = o.[open]
AND DATEDIFF(minute,M,O) BETWEEN -5 AND 0 --The new timestamp is at the end of the window, you need to look back 5 minutes
--AND M.TickerId = O.TickerId
请注意,如果最高价格出现多次,此时您可能会在每个时间窗口获得多个结果。
关于Azure 流分析 - 连接两个流源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69287794/