我正在使用两个 C# 流 API,其中之一 is a data source和另一个is a data sink .
这两个 API 实际上都没有公开流对象;两者都希望您将流传递给它们,并且它们处理流中的写入/读取。
有没有办法将这些 API 链接在一起,以便将源的输出流式传输到接收器中,而不必在 MemoryStream 中缓冲整个源?这是一个对 RAM 非常敏感的应用程序。
这是一个使用我试图避免的 MemoryStream 方法的示例,因为它在将整个流写入 S3 之前将其缓冲在 RAM 中:
using (var buffer = new MemoryStream())
using (var transferUtil = new TransferUtility(s3client))
{
// This destructor finishes the file and transferUtil closes
// the stream, so we need this weird using nesting to keep everyone happy.
using (var parquetWriter = new ParquetWriter(schema, buffer))
using (var rowGroupWriter = parquetWriter.CreateRowGroup())
{
rowGroupWriter.WriteColumn(...);
...
}
transferUtil.Upload(buffer, _bucketName, _key.Replace(".gz", "") + ".parquet");
}
最佳答案
您正在寻找一个可以同时传递给数据源和接收器并且可以在两者之间异步“传输”数据的流。有许多可能的解决方案,我可能考虑过围绕 BlockingCollection 的生产者-消费者模式。
最近,System.IO.Pipelines、Span 和 Memory 类型的添加真正专注于高性能 IO,我认为它非常适合这里。 Pipe 类及其关联的 Reader 和 Writer 可以自动处理它们之间的流量控制、背压和 IO,同时利用所有新的 Span 和 Memory 相关类型。
我已经在 PipeStream 上传了一个 Gist这将为您提供一个带有内部 Pipe 实现的自定义流,您可以将其传递给两个 API 类。写入 WriteAsync(或 Write)方法的任何内容都将可供 ReadAsync(或 Read)方法使用,而无需任何进一步的 byte[] 或 MemoryStream 分配
在您的情况下,您只需将 MemoryStream 替换为这个新类,它应该开箱即用。我没有进行完整的 S3 测试,但直接从 Parquet 流读取并将其转储到控制台窗口表明它是异步工作的。
// Create some very badly 'mocked' data
var idColumn = new DataColumn(
new DataField<int>("id"),
Enumerable.Range(0, 10000).Select(i => i).ToArray());
var cityColumn = new DataColumn(
new DataField<string>("city"),
Enumerable.Range(0, 10000).Select(i => i % 2 == 0 ? "London" : "Grimsby").ToArray());
var schema = new Schema(idColumn.Field, cityColumn.Field);
using (var pipeStream = new PipeStream())
{
var buffer = new byte[4096];
int read = 0;
var readTask = Task.Run(async () =>
{
//transferUtil.Upload(readStream, "bucketName", "key"); // Execute this in a Task / Thread
while ((read = await pipeStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
var incoming = Encoding.ASCII.GetString(buffer, 0, read);
Console.WriteLine(incoming);
// await Task.Delay(5000); uncomment this to simulate very slow consumer
}
});
using (var parquetWriter = new ParquetWriter(schema, pipeStream)) // This destructor finishes the file and transferUtil closes the stream, so we need this weird using nesting to keep everyone happy.
using (var rowGroupWriter = parquetWriter.CreateRowGroup())
{
rowGroupWriter.WriteColumn(idColumn); // Step through both these statements to see data read before the parquetWriter completes
rowGroupWriter.WriteColumn(cityColumn);
}
}
实现尚未完全完成,但我认为它展示了一种不错的方法。在控制台“readTask”中,您可以取消注释 Task.Delay 以模拟慢速读取 (transferUtil),您应该会看到管道自动限制写入任务。
对于其中一种 Span 扩展方法,您需要使用 C# 7.2 或更高版本(VS 2017 -> 项目属性 -> 构建 -> 高级 -> 语言版本),但它应该与任何 .Net 框架兼容。您可能需要 Nuget Package
流是可读和可写的(很明显!)但不可搜索,这在这种情况下应该适合您,但无法从需要可搜索流的 Parquet SDK 读取。
希望对你有帮助
关于c# - 如何链接两个希望您提供流的 C# API?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52744006/