java - 通过 http 的 Akka 对象流

标签 java stream akka akka-stream

我有一段代码(见下文),它生成一个服务器,该服务器回显从端口 6001 接收到的每个 ByteString 流。该示例还定义了一个连接到服务器并发送包含列表的 ByteString 流的客户端从字母“a”到“z”的字符。

此时我的问题是,akka 是否提供了一种通过 http 发送和接收对象流而不是 ByStreams 的方法?例如,Client 类的对象。

如果是这样,我如何发送和接收这样的对象流?您能给我提供一个演示如何执行此操作的代码片段吗?

Akka 文档对于非玩具示例来说不是用户友好的...

感谢您的帮助

公共(public)类 TcpEcho {

/**
 * Use without parameters to start both client and server.
 *
 * Use parameters `server 0.0.0.0 6001` to start server listening on port
 * 6001.
 *
 * Use parameters `client 127.0.0.1 6001` to start client connecting to
 * server on 127.0.0.1:6001.
 *
 */
public static void main(String[] args) throws IOException {
    if (args.length == 0) {
        ActorSystem system = ActorSystem.create("ClientAndServer");
        InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", 6000);
        server(system, serverAddress);
        client(system, serverAddress);
    } else {
        InetSocketAddress serverAddress;
        if (args.length == 3) {
            serverAddress = new InetSocketAddress(args[1], Integer.valueOf(args[2]));
        } else {
            serverAddress = new InetSocketAddress("127.0.0.1", 6000);
        }
        if (args[0].equals("server")) {
            ActorSystem system = ActorSystem.create("Server");
            server(system, serverAddress);
        } else if (args[0].equals("client")) {
            ActorSystem system = ActorSystem.create("Client");
            client(system, serverAddress);
        }
    }
}

public static void server(ActorSystem system, InetSocketAddress serverAddress) {
    final ActorMaterializer materializer = ActorMaterializer.create(system);

    final Sink<IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> {
        System.out.println("Client connected from: " + conn.remoteAddress());
        conn.handleWith(Flow.<ByteString> create(), materializer);
    });

    final CompletionStage<ServerBinding> bindingFuture = Tcp.get(system)
            .bind(serverAddress.getHostString(), serverAddress.getPort()).to(handler).run(materializer);

    bindingFuture.whenComplete((binding, throwable) -> {
        System.out.println("Server started, listening on: " + binding.localAddress());
    });

    bindingFuture.exceptionally(e -> {
        System.err.println("Server could not bind to " + serverAddress + " : " + e.getMessage());
        system.terminate();
        return null;
    });

}

public static void client(ActorSystem system, InetSocketAddress serverAddress) {
    final ActorMaterializer materializer = ActorMaterializer.create(system);

    final List<ByteString> testInput = new ArrayList<>();
    for (char c = 'a'; c <= 'z'; c++) {
        testInput.add(ByteString.fromString(String.valueOf(c)));
    }

    Source<ByteString, NotUsed> responseStream = Source.from(testInput)
            .via(Tcp.get(system).outgoingConnection(serverAddress.getHostString(), serverAddress.getPort()));

    CompletionStage<ByteString> result = responseStream.runFold(ByteString.empty(), (acc, in) -> acc.concat(in),
            materializer);

    result.whenComplete((success, failure) -> {

        if (failure != null) {
            System.err.println("Failure: " + failure.getMessage());
        } else {
            System.out.println("Result: " + success.utf8String());
        }
        System.out.println("Shutting down client");
        system.terminate();

    });
}

}

最佳答案

akka.stream.{javadsl,scaladsl}.Framing 包含可帮助您构建一致消息的实用程序。例如,您可以通过 Framing.simpleFramingProtocolEncoder(maxLength) 发送消息,以自动为其添加长度信息。另一方面,Framing.simpleFramingProtocolDecoder(maxLength) 将根据消息包含的长度信息对消息进行解码。

如果你想操作普通对象,你只需在通过编码器发送它们之前将它们序列化为ByteString,并在收到它们后从ByteString反序列化它们来自解码器的表示。

关于java - 通过 http 的 Akka 对象流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36387359/

相关文章:

java - 如何处理作为参数发送到 Servlet 的空白日期

java - JUnit+Mockito 或 RestAssured

scala - 案例类和案例对象之间的区别?

java - Akka Actors - 锁定单个资源请求

akka - 使用 AbstractBehavior 和 AbstractActor 来定义 Akka Actor 有什么区别?

java - 为什么在 Hibernate 中需要在 session.delete() 之后调用 session.flush()?

java - 如何在不同的类中使用 Spring Bean

http - 使用 ffserver 和 ffmpeg 将来自 USB 网络摄像头的视频嵌入到网页中

java - 关闭 Java FileInputStream

c++ - 从 C/C++ 编码的应用程序读取输出流