我想将数据从一个 actor 中的流发送到另一个 actor 中的流。最后,这应该适用于远程参与者。对于 Akka.NET Streams,这应该是一项简单的任务,但也许我误解了它。
这是 SenderActor 的一部分:
var stream = new FileStream(FILENAME, FileMode.Open);
var materializer = Context.Materializer();
var source = StreamConverters.FromInputStream(() => stream, CHUNK_SIZE);
var result = source.To(Sink.ActorRef<ByteString>(this.receiver, new Messages.StreamComplete()))
.Run(materializer);
注意:this.receiver
是一个 IActorRef
,数据应该发送到这里。
ReceiverActor 现在在流结束后获取所有 ByteString
消息和 Messages.StreamCompleted
。
如何将其轻松地放在 ReceiverActor 中?在最好的情况下再次作为 Stream
。
在 ReceiverActor 中,我尝试将所有 ByteString
消息发送到一个 Source
,它应该填充一个 MemoryStream
:
class ReceiverActor : ReceiveActor
{
private readonly IActorRef streamReceiver;
private readonly MemoryStream stream;
public ReceiverActor()
{
this.stream = new MemoryStream();
this.streamReceiver = Source.ActorRef<ByteString>(128, Akka.Streams.OverflowStrategy.Fail)
.To(StreamConverters.FromOutputStream(() => this.stream, true))
.Run(Context.Materializer());
Context.Watch(this.streamReceiver);
Receive<ByteString>((message) => ReceivedStreamChunk(message));
Receive<Messages.StreamComplete>((message) => ReceivedStreamComplete(message));
Receive<Terminated>((message) => ReceivedTerminated(message));
ReceiveAny((message) => ReceivedAnyMessage(message));
}
private void ReceivedTerminated(Terminated message)
{
Console.WriteLine($"[receiver] {message.ActorRef.Path.ToStringWithoutAddress()} terminated, local stream length {(this.stream.CanRead ? this.stream.Length : -1)}");
}
private void ReceivedStreamComplete(Messages.StreamComplete message)
{
Console.WriteLine($"[receiver] got signaled that the stream completed, local stream length {this.stream.Length}");
}
private void ReceivedStreamChunk(object message)
{
Console.WriteLine($"[receiver] got chunk, previous stream length {this.stream.Length}");
this.streamReceiver.Forward(message);
}
private void ReceivedAnyMessage(object message)
{
Console.WriteLine($"[receiver] got message {message.GetType().FullName}");
}
}
但是 MemoryStream
是异步填充的,当 streamReceiver
终止时它会关闭流,所以我无法获取数据。
如何正确检索流?
更新 我在本地工作了:
感谢 Horusiath 在 Akka.NET 的 gitter channel 中的输入,我能够直接接收 ByteStrings
,而不必在那里使用 Akka.Stream。
Receive<ByteString>((message) => ReceivedStreamChunk(message));
和:
private void ReceivedStreamChunk(ByteString message)
{
var bytes = message.ToArray();
targetStream.Write(bytes, 0, bytes.Length);
Console.WriteLine($"[receiver] got chunk, now stream length {this.stream.Length}");
}
请注意,Akka.NET 1.3 将向 ByteString
添加方法 WriteTo(Stream)
和 WriteToAsync(Stream, CancellationToken)
。
这仍然不适用于远程 actor,因为接收 actor 系统会收到此错误(Serializer 是 Hyperion):
Error [No parameterless constructor defined for this object.]
实际上 ByteString
有一个无参数的构造函数,但它是 protected
。
我认为 ByteString
不是可序列化的?
最佳答案
我在转换 ByteString
时与远程参与者一起工作至 byte[]
通过在源和汇之间插入转换流。
例子:
FileIO.FromFile(...).Via(Flow.Create<ByteString>().Select(x => x.ToArray())).To(...)
关于c# - 从 Actor 到 Actor 发送流(例如文件),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45359672/