C#并行写入Azure Data Lake File

标签 c# azure azure-functions azure-data-lake

在我们的 Azure 数据湖中,我们每天都有记录事件和这些事件的坐标的文件。我们需要获取这些坐标并查找这些坐标属于哪个州、县、乡镇和部门。我尝试过此代码的几个版本。

  • 我尝试在 U-SQL 中执行此操作。我什至上传了一个实现 Microsoft.SqlServer.Types.SqlGeography 方法的自定义程序集,却发现 ADLA 未设置为执行地理编码等逐行操作。
  • 我将所有行拉入 SQL Server,将坐标转换为 SQLGeography,并构建了执行州、县等查找的 T-SQL 代码。经过多次优化后,我将此过程降低到每行约 700 毫秒。 (积压中有 1.33 亿行,每天都会添加约 16k 行,我们需要近 3 年的时间才能 catch 。所以我并行化了 T-SQL,情况变得更好,但还不够。
  • 我采用了 T-SQL 代码,并将该流程构建为控制台应用程序,因为 SqlGeography 库实际上是一个 .Net 库,而不是 native SQL Server 产品。我能够将单线程处理时间缩短 t0 ~ 500ms。添加 .Net 的并行性 (parallel.ForEach) 并将我的机器的 10/20 核心投入其中,效果很好,但仍然不够。
  • 我尝试将此代码重写为 Azure 函数,并逐个文件处理数据湖中的文件。大多数文件都超时,因为处理时间超过 10 分钟。因此,我更新了代码以读取文件,并将行共享到 Azure 队列存储中。然后我有第二个 Azure 函数,它为队列中的每一行触发。我们的想法是,Azure Functions 的扩展能力远远大于任何单台机器。

这就是我陷入困境的地方。我无法可靠地将行写入 ADLS 中的文件。这是我现在拥有的代码。

public static void WriteGeocodedOutput(string Contents, String outputFileName, ILogger log) {

        AdlsClient client = AdlsClient.CreateClient(ADlSAccountName, adlCreds);
        //if the file doesn't exist write the header first
        try {
            if (!client.CheckExists(outputFileName)) {
                using (var stream = client.CreateFile(outputFileName, IfExists.Fail)) {
                    byte[] headerByteArray = Encoding.UTF8.GetBytes("EventDate, Longitude, Latitude, RadarSiteID, CellID, RangeNauticalMiles, Azimuth, SevereProbability, Probability, MaxSizeinInchesInUS, StateCode, CountyCode, TownshipCode, RangeCode\r\n");
                    //stream.Write(headerByteArray, 0, headerByteArray.Length);
                    client.ConcurrentAppend(outputFileName, true, headerByteArray, 0, headerByteArray.Length);
                }
            }
        } catch (Exception e) {
            log.LogInformation("multiple attempts to create the file. Ignoring this error, since the file was created.");
        }

        //the write the data
        byte[] textByteArray = Encoding.UTF8.GetBytes(Contents);
        for (int attempt = 0; attempt < 5; attempt++) {
            try {
                log.LogInformation("prior to write, the outputfile size is: " + client.GetDirectoryEntry(outputFileName).Length);
                var offset = client.GetDirectoryEntry(outputFileName).Length;
                client.ConcurrentAppend(outputFileName, false, textByteArray, 0, textByteArray.Length);
                log.LogInformation("AFTER write, the outputfile size is: " + client.GetDirectoryEntry(outputFileName).Length);
                //if successful, stop trying to write this row
                attempt = 6;                    
            }
            catch (Exception e){
                log.LogInformation($"exception on adls write: {e}");
            }
            Random rnd = new Random();
            Thread.Sleep(rnd.Next(attempt * 60));
        }
    }

该文件将在需要时创建,但我确实在日志中收到几条消息,表明有多个线程尝试创建它。我并不总是能写出标题行。

我也不再只获取任何数据行:

"BadRequest ( IllegalArgumentException  concurrentappend failed with error 0xffffffff83090a6f 
(Bad request. The target file does not support this particular type of append operation. 
If the concurrent append operation has been used with this file in the past, you need to append to this file using the concurrent append operation.
If the append operation with offset has been used in the past, you need to append to this file using the append operation with offset. 
On the same file, it is not possible to use both of these operations.). []

我觉得我在这里缺少一些基本的设计理念。该代码应该尝试将一行写入文件中。如果该文件尚不存在,请创建它并放入标题行。然后放入该行。

完成这种写入场景的最佳实践方法是什么?

关于如何在 ADLS 中处理这种并行写入工作负载,还有其他建议吗?

最佳答案

我对此有点晚了,但我想问题之一可能是由于在同一文件流上使用“Create”和“ConcurrentAppend”造成的? ADLS 文档提到它们不能在同一文件上使用。也许,尝试将“Create”命令更改为“ConcurrentAppend”,因为后者可用于创建不存在的文件。

此外,如果您找到更好的方法,请在此处发布您的解决方案。

关于C#并行写入Azure Data Lake File,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51971326/

相关文章:

azure - 在 ARM 模板中获取功能键 - 间歇性失败

python - Azure Functions Python blobTrigger 如何修复 "Microsoft.Azure.WebJobs.Extensions.Storage: Object reference not set to an instance of an object."?

c# - Azure 表 : Unable to load one or more of the requested types. 检索 LoaderExceptions 属性以获取更多信息

c# - 该接口(interface)是否已存在于标准 .NET 库中?

c# - 从方法表达式获取具体类型

azure - 是 Console.ReadKey();适合 azure webjob

Azure RBAC 自定义角色

c# - 使用防伪 token 模拟 Windows 用户

c# - 如何使用 C# 在 XNA 中创建俄罗斯方 block block ?

azure - 如何通过Powershell启用Azure应用程序网关上的所有防火墙规则?