java - TCP 按字节发送/检索 header

标签 java spring-integration

我有客户端配置:

<beans:bean id="itemSerializerDeserializer"
    class="org.mbracero.integration.ItemSerializerDeserializer" />

<beans:bean id="resultSerializerDeserializer"
    class="org.mbracero.integration.ResultSerializerDeserializer" />

<int-ip:tcp-connection-factory id="clientRequestData"
    type="client" host="${clientRequestData.host}" port="${clientRequestData.port}" single-use="true"
    serializer="itemSerializerDeserializer" deserializer="resultSerializerDeserializer" />

<int-ip:tcp-outbound-gateway id="requestDataOutGateway"
    request-channel="requestData" connection-factory="clientRequestData"
    request-timeout="10000" reply-timeout="10000" remote-timeout="10000" />

简单网关:

public interface SimpleGateway {
     @Gateway(requestChannel="requestData")
     Result sendData(Item item);
}

以及服务器配置:

<int:channel id="channelServerRequestData" />
<int:channel id="channelServerResponseData" />

<beans:bean id="requestService" class="org.mbracero.integration.RequestService" />

<beans:bean id="itemSerializerDeserializer"
    class="org.mbracero.integration.ItemSerializerDeserializer" />

<beans:bean id="resultSerializerDeserializer"
    class="org.mbracero.integration.ResultSerializerDeserializer" />

<int-ip:tcp-connection-factory id="requestDataServer"
    type="server" port="${requestDataServer.port}" single-use="true" deserializer="itemSerializerDeserializer"
    serializer="resultSerializerDeserializer" />

<int-ip:tcp-inbound-gateway id="TCPInboundGateway"
    connection-factory="requestDataServer" request-channel="channelServerRequestData"
    reply-channel="channelServerResponseData" error-channel="errorChannel" />

请求服务:

@Service
public class RequestService {
@ServiceActivator(inputChannel="channelServerRequestData", outputChannel="channelServerResponseData")
public Result requestData(Item input) {
    System.out.println("Input :::: " + input);
    Result ret = new Result("AAA", "DDDD");
    System.out.println("Ret :::: " + ret);
    return ret;
  }
}

ItemSerializer反序列化器:

public class ItemSerializerDeserializer implements Serializer<Item>, Deserializer<Item> {
(...)
}

结果序列化器反序列化器:

public class ResultSerializerDeserializer implements Serializer<Result>, Deserializer<Result> {
(...)
}

现在我必须添加一些 header (发送和检索),但我需要像上面编写的自定义序列化器/反序列化器一样进行操作。

例如,我需要发送下一个 header :

  • 发行者:4 字节。初始位置 1 - 最终位置 4
  • 客户端:4 字节。初始位置 5 - 最终位置 8
  • 产品:2 字节。初始位置 9 - 最终位置 10
  • 类型:3 字节。初始位置 11 - 最终位置 13
  • (...)

我已按字节和位置发送和检索这些 header (不像 Map 那样按名称/值)。

通过我的自定义序列化器/反序列化器,我可以对有效负载进行此操作,但我不知道如何对 header 进行操作。

我已经阅读了一些有关属性映射器的内容,但我不知道我的方法是否正确。

在客户端配置中,类似于:

(...)
<int-ip:tcp-connection-factory id="clientRequestData"
    type="client" host="${clientRequestData.host}" port="${clientRequestData.port}" single-use="true"
    serializer="itemSerializerDeserializer" deserializer="resultSerializerDeserializer"
    mapper="mapper" />

<beans:bean id="mapper"
      class="org.springframework.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
    <beans:constructor-arg name="messageConverter">
        <beans:bean class="??????????"/>
    </beans:constructor-arg>
</beans:bean>
(...)

有什么帮助吗?

提前致谢。

最佳答案

MessageConvertingTcpMessageMapperMapMessageConverter结合使用。

总体思路是,对于出站,转换器将 Message 转换为包含所有 headerspayloadMap,并且序列化器将映射序列化为 byte[]。您可以告诉 MapMessageConverter 您希望在 map 中包含哪些 header ,以便您可以在序列化程序中访问它们。

在入站端,解串器从 byte[] 创建一个 Map,然后 MessageConverter.toMessage() 将映射转换为 Message

请参阅MapJsonSerializer举个例子。

另请参阅this test case .

编辑

这是一个简单的实现(不对输出进行错误检查或长度检查)...

private volatile Deserializer<byte[]> packetDeserializer = new ByteArrayLfSerializer();

private volatile Serializer<byte[]> packetSerializer = new ByteArrayLfSerializer();


@Override
public Map<?, ?> deserialize(InputStream inputStream) throws IOException {
    byte[] bytes = readToEndOfMessage(inputStream);
    String asString = new String(bytes, "UTF-8");
    Map<String, String> headers = new HashMap<String, String>();
    headers.put("issuer", asString.substring(0, 5));
    headers.put("client", asString.substring(4, 9));
    headers.put("product", asString.substring(9, 11));
    headers.put("type", asString.substring(11, 14));
    Map<String, Object> map = new HashMap<String, Object>();
    map.put("headers", headers);
    map.put("payload", createPayloadFromRemainingBytes(bytes));
    return map;
}

@Override
public void serialize(Map<?, ?> object, OutputStream outputStream) throws IOException {
    Map<String, String> headers = (Map<String, String>) object.get("headers");
    outputStream.write(headers.get("issuer").getBytes("UTF-8"));
    outputStream.write(headers.get("client").getBytes("UTF-8"));
    outputStream.write(headers.get("product").getBytes("UTF-8"));
    outputStream.write(headers.get("type").getBytes("UTF-8"));
    outputStream.write(convertPayloadToBytes(object.get("payload")));
}

与 MapMessageConverter 一起使用,Spring Integration header 将与数据一起传送到另一端,他将解码数据并使用这些 header 创建入站消息。

关于java - TCP 按字节发送/检索 header ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31242695/

相关文章:

java - AEM 6.3 使用 OSGi R6 注释和 Sling 模型

java - 一种重置 Spring Integration 上下文中所有消息的方法

http - spring 集成入站网关 http 请求 header

java - 将 @Gateway 与 Spring-Integration-Kafka 一起使用

java - 使用 AffineTransform 旋转图像

java - 单击按钮时出现雨伞异常

java - NSLog Java 翻译

spring-integration - 如何使用SPEL引用@GatewayHeader中参数的属性

ibm-mq - Websphere 7 + Websphere MQ 7.X + Spring Integration + JMS - 消息监听器停止从队列读取消息

java - 如何通过单击按钮在应用程序图标上设置通知号码?