我最近正在使用 ASA,我正在尝试使用引用数据将 ASA 流直接插入到 SQL 表中。我的开发基于这篇 MS 文章:https://msdn.microsoft.com/en-us/azure/stream-analytics/reference/reference-data-join-azure-stream-analytics .
数据流概述 - 遥测:
- 我有许多不同类型的设备(热泵、电池、水泵、空调...)。每个设备的遥测数据都有不同的 JSON 架构。我可以通过消息中的属性来区分 JSON(例如:"DeviceType":"HeatPump"或 "DeviceType":"AirCon"...)
- 所有这些设备都将遥测数据发送到单个事件中心
- 在事件中心后面,有一个流分析组件,我可以根据属性设备类型将流重定向到不同的输出。例如,我使用查询
SELECT * INTO output-sql-table FROM input-event-hub WHERE DeviceType = 'HeatPump'
从 HeatPumps 重定向遥测数据
在将流插入 SQL 表之前,我想使用一些引用数据通过一些 IDKey 来“丰富”ASA 流。
我已经做了什么:
使用 ASA 查询成功将 ASA 流直接插入 SQL 表
SELECT * INTO [sql-table] FROM Input WHERE DeviceType ='HeatPump'
,其中 [sql-table] 与 JSON 消息 + 标准列(EventProcessedUtcTime、PartitionID、EventEnqueueUtcTime)具有相同的架构使用 ASA 查询成功将 ASA 流直接插入 SQL 表
SELECT Column1, Column2, Column3... INTO [sql-table] FROM Input WHERE DeviceType = 'HeatPump'
- 与上面的查询基本相同,只是这次我在select
中使用了命名列声明。- 生成引用数据的 JSON 文件并将其放入 BLOB 存储
- 在 ASA 中创建了新的静态(不使用 {date} 和 {time} 占位符) 引用数据输入,指向 BLOB 存储中的文件。
- 然后,我使用具有命名列的相同语句将引用数据加入到 ASA 查询中的数据流
- 结果 SQL 表中没有输出行
在调试问题时,我使用了 Query ASA 中的测试功能
我从事件中心采样数据 - 流数据。
我从文件 - 引用数据上传示例数据。
从事件中心采样数据完成后,我测试了一个查询 -> 输出生成了一些行 -> 这不是查询中的问题
但是...如果我运行 ASA,则不会将任何输出行插入到 SQL 表中。
我尝试过的一些其他想法:
二手
TRY_CAST
在将字段与流数据中的字段连接之前,将字段从引用数据转换为适当的数据类型的函数二手
TRY_CAST
函数将字段转换为SELECT
在我将它们插入 SQL 表之前
我现在真的不知道该怎么办。有什么建议么?
<小时/>编辑:添加数据流 JSON、引用数据 JSON、ASA 查询、ASA 输入配置、BLOB 存储配置和 ASA 测试输出结果
数据流 JSON - 单条消息
[
{
"Activation": 0,
"AvailablePowerNegative": 6.0,
"AvailablePowerPositive": 1.91,
"DeviceID": 99999,
"DeviceIsAvailable": true,
"DeviceOn": true,
"Entity": "HeatPumpTelemetry",
"HeatPumpMode": 3,
"Power": 1.91,
"PowerCompressor": 1.91,
"PowerElHeater": 0.0,
"Source": "<omitted>",
"StatusToPowerOff": 1,
"StatusToPowerOn": 9,
"Timestamp": "2018-08-29T13:34:26.0Z",
"TimestampDevice": "2018-08-29T13:34:09.0Z"
}
]
引用数据 JSON - 单条消息
[
{
"SourceID": 1,
"Source": "<ommited>",
"DeviceID": 10,
"DeviceSourceCode": 99999,
"DeviceName": "NULL",
"DeviceType": "Heat Pump",
"DeviceTypeID": 1
}
]
ASA查询
WITH HeatPumpTelemetry AS
(
SELECT
*
FROM
[input-eh]
WHERE
source='<omitted>'
AND entity = 'HeatPumpTelemetry'
)
SELECT
e.Activation,
e.AvailablePowerNegative,
e.AvailablePowerPositive,
e.DeviceID,
e.DeviceIsAvailable,
e.DeviceOn,
e.Entity,
e.HeatPumpMode,
e.Power,
e.PowerCompressor,
e.PowerElHeater,
e.Source,
e.StatusToPowerOff,
e.StatusToPowerOn,
e.Timestamp,
e.TimestampDevice,
e.EventProcessedUtcTime,
e.PartitionId,
e.EventEnqueuedUtcTime
INTO
[out-SQL-HeatPumpTelemetry]
FROM
HeatPumpTelemetry e
LEFT JOIN [input-json-devices] d ON
TRY_CAST(d.DeviceSourceCode as BIGINT) = TRY_CAST(e.DeviceID AS BIGINT)
ASA 引用数据输入配置 Reference Data input configuration in Stream Analytics
BLOB存储目录树 Blob storage directory tree
ASA 测试查询输出 ASA test query output
最佳答案
matejp。我没有重现您的问题,您可以引用我的步骤。
blob 存储中的引用数据:
{
"a":"aaa",
"reference":"www.bing.com"
}
Blob 存储中的流数据
[
{
"id":"1",
"name":"DeIdentified 1",
"DeviceType":"aaa"
},
{
"id":"2",
"name":"DeIdentified 2",
"DeviceType":"No"
}
]
查询语句:
SELECT
inputSteam.*,inputRefer.*
into sqloutput
FROM
inputSteam
Join inputRefer on inputSteam.DeviceType = inputRefer.a
输出:
希望对您有帮助。如有任何疑问,请告诉我。
关于sql - 使用引用数据时,流分析不会向 SQL 表生成输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52073387/