azure - 从 Azure 流分析中的 EventHub 删除重复项

标签 azure azure-cosmosdb azure-stream-analytics stream-analytics

我创建了一个 Azure 流分析作业,它将从 EventHub 获取输入数据并写入 cosmosDB 和 Blob。

我有时会看到来自 eventHub 的数据是重复的,因此重复的数据将被写入 cosmosDB 和 Blob 存储。

下面显示了从 EventHub 到流分析的示例输入数据。

[
{
               "idnum":"00011XXX01",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"000000",
                                             "id":61
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00026XXX03",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"000000",
                                             "id":61
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]

在上面的示例中,idnum: 00086XXX02 的事件重复了 3 次。

我正在进行以下分析并获得重复的输出。

temp AS (
    SELECT
        input.idnum AS IDNUM,
        input.basetime AS BASETIME,
        input.time AS TIME,
        ROUND(input.sig1,5) AS SIG1,
        flatArrayElement as SIG2,
        udf.sgnlArrayMap(input.signals, input.basetime) AS SGNL //UDF to process the signals in input
    FROM [input01] as input
    CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
    WHERE GetArrayLength(input.sig2) >=1
 ),
SIGNALS AS (
  SELECT * FROM temp T JOIN master M ON T.SIG2.ArrayValue.sig3 = M.sig3 
)

--Insert SIG2 to COSMOS Container
SELECT 
    t.IDNUM,
    t.BASETIME,
    t.TIME,
    t.SIG1,
    t.SIG2.ArrayValue.id AS ID,
    t.SIG2.ArrayValue.sig3 AS SIG3,
    t.SGNL
INTO [CosmosTbl]
FROM SIGNALS PARTITION BY PartitionId

输出如下,其中 "idnum":"00086XXX02"存在重复事件

[
{
               "idnum":"00011XXX01",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00011XXX01",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"000000",
               "id":61
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},                           
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]

预期输出将是没有重复的事件(对于提供的示例,“idnum”不应该有重复的事件:“00086XXX02”)

在将数据写入存储之前,我想删除重复的事件。可以通过流分析来实现吗?

使用唯一ID创建cosmos DB集合是Cosmos端的一个解决方案,但是这里的表已经存在,我们可以从流分析端做任何事情吗?

最佳答案

我将你的测试sql简化如下:

with t AS (
    SELECT
        flatArrayElement as SIG2
    FROM fromblob as input
    CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
    WHERE GetArrayLength(input.sig2) >=1
 )
SELECT 
    t.SIG2.ArrayValue.id AS ID,
    t.SIG2.ArrayValue.sig3 AS SIG3
FROM t PARTITION BY PartitionId

由于 GetArrayElements() 方法,它会产生重复的数据,我认为这是正常的。数组被拆分,结果肯定应该重复。

根据我的经验和我的 findings (加上这个feedback),ASA中没有明确的方法。我认为原因是ASA处理的是实时流数据,而不是像SQL表这样的静态数据。它无法判断每个时间单位内的数据是否重复。

结合最后一个cosmos db案例(How to find Duplicate documents in Cosmos DB),我认为解决方案的关键点是找到根本原因:为什么事件中心处理重复的源数据。当然,你可以设置一个cosmos db触发器来防止重复的数据流入db。但我认为这不是一个有效的方法。

关于azure - 从 Azure 流分析中的 EventHub 删除重复项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59408124/

相关文章:

Python azure 模块: how to create a new deployment

visual-studio - 适用于 Visual Studio 的 Azure 流分析工具 : Error when executing aggregated queries - "Object reference not set to an instance of an object"

javascript - CosmosDB 插入具有特定 ID 的项目

node.js - Azure Cosmos DB : TypeError: Cannot read property 'electionId' of undefined

azure - 使用 Azure 流分析保留最后的值

Azure ARM 模板 - 流分析 Identity.principalId 作为输出

azure - Visual Studio 2019 中的 Azure 流分析中没有 SystemPropertyColumns(v : 16. 3.9)

c# - Azure Blob TIFF 到 PNG 转换,无需下载

azure - 如何在 Azure 中实现 Ruby On Rails 延迟作业

c# - ConnectionStrings 从哪里来? ASP.net