c# - 从 Actor 到 Actor 发送流(例如文件)

标签 c# akka.net akka.net-streams

我想将数据从一个 actor 中的流发送到另一个 actor 中的流。最后,这应该适用于远程参与者。对于 Akka.NET Streams,这应该是一项简单的任务,但也许我误解了它。

这是 SenderActor 的一部分:

var stream = new FileStream(FILENAME, FileMode.Open);
var materializer = Context.Materializer();
var source = StreamConverters.FromInputStream(() => stream, CHUNK_SIZE);
var result = source.To(Sink.ActorRef<ByteString>(this.receiver, new Messages.StreamComplete()))
    .Run(materializer);

注意:this.receiver 是一个 IActorRef,数据应该发送到这里。

ReceiverActor 现在在流结束后获取所有 ByteString 消息和 Messages.StreamCompleted

如何将其轻松地放在 ReceiverActor 中?在最好的情况下再次作为 Stream

ReceiverActor 中,我尝试将所有 ByteString 消息发送到一个 Source,它应该填充一个 MemoryStream :

class ReceiverActor : ReceiveActor
{
    private readonly IActorRef streamReceiver;
    private readonly MemoryStream stream;

    public ReceiverActor()
    {
        this.stream = new MemoryStream();
        this.streamReceiver = Source.ActorRef<ByteString>(128, Akka.Streams.OverflowStrategy.Fail)
            .To(StreamConverters.FromOutputStream(() => this.stream, true))
            .Run(Context.Materializer());
        Context.Watch(this.streamReceiver);

        Receive<ByteString>((message) => ReceivedStreamChunk(message));
        Receive<Messages.StreamComplete>((message) => ReceivedStreamComplete(message));
        Receive<Terminated>((message) => ReceivedTerminated(message));
        ReceiveAny((message) => ReceivedAnyMessage(message));
    }
    private void ReceivedTerminated(Terminated message)
    {
        Console.WriteLine($"[receiver] {message.ActorRef.Path.ToStringWithoutAddress()} terminated, local stream length {(this.stream.CanRead ? this.stream.Length : -1)}");
    }
    private void ReceivedStreamComplete(Messages.StreamComplete message)
    {
        Console.WriteLine($"[receiver] got signaled that the stream completed, local stream length {this.stream.Length}");
    }
    private void ReceivedStreamChunk(object message)
    {
        Console.WriteLine($"[receiver] got chunk, previous stream length {this.stream.Length}");
        this.streamReceiver.Forward(message);
    }
    private void ReceivedAnyMessage(object message)
    {
        Console.WriteLine($"[receiver] got message {message.GetType().FullName}");
    }
}

但是 MemoryStream 是异步填充的,当 streamReceiver 终止时它会关闭流,所以我无法获取数据。

如何正确检索流?


更新 我在本地工作了:

感谢 Horusiath 在 Akka.NET 的 gitter channel 中的输入,我能够直接接收 ByteStrings,而不必在那里使用 Akka.Stream。

Receive<ByteString>((message) => ReceivedStreamChunk(message));

和:

private void ReceivedStreamChunk(ByteString message)
{
    var bytes = message.ToArray();
    targetStream.Write(bytes, 0, bytes.Length);
    Console.WriteLine($"[receiver] got chunk, now stream length {this.stream.Length}");
}

请注意,Akka.NET 1.3 将向 ByteString 添加方法 WriteTo(Stream)WriteToAsync(Stream, CancellationToken)

这仍然不适用于远程 actor,因为接收 actor 系统会收到此错误(Serializer 是 Hyperion):

Error [No parameterless constructor defined for this object.]

实际上 ByteString 有一个无参数的构造函数,但它是 protected

我认为 ByteString 不是可序列化的?

最佳答案

我在转换 ByteString 时与远程参与者一起工作至 byte[]通过在源和汇之间插入转换流。

例子:

FileIO.FromFile(...).Via(Flow.Create<ByteString>().Select(x => x.ToArray())).To(...)

关于c# - 从 Actor 到 Actor 发送流(例如文件),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45359672/

相关文章:

c# - 如何检测远程参与者断开连接(分离)?

c# - 如何在 Akka.NET 中远程部署具有动态名称的 actor

C# SerialPort.IsOpen 在物理断开连接时返回 true

c# - 如何在 C# 中创建未知/通用类型列表?

c# - 当我为我的模型使用 Automapper 时。模型的 ID 正在更改

c# - List<T> 的动态类型?

asp.net - akka.net 在 azure asp.net 网站中进行缩放