我有客户端配置:
<beans:bean id="itemSerializerDeserializer"
class="org.mbracero.integration.ItemSerializerDeserializer" />
<beans:bean id="resultSerializerDeserializer"
class="org.mbracero.integration.ResultSerializerDeserializer" />
<int-ip:tcp-connection-factory id="clientRequestData"
type="client" host="${clientRequestData.host}" port="${clientRequestData.port}" single-use="true"
serializer="itemSerializerDeserializer" deserializer="resultSerializerDeserializer" />
<int-ip:tcp-outbound-gateway id="requestDataOutGateway"
request-channel="requestData" connection-factory="clientRequestData"
request-timeout="10000" reply-timeout="10000" remote-timeout="10000" />
简单网关:
public interface SimpleGateway {
@Gateway(requestChannel="requestData")
Result sendData(Item item);
}
以及服务器配置:
<int:channel id="channelServerRequestData" />
<int:channel id="channelServerResponseData" />
<beans:bean id="requestService" class="org.mbracero.integration.RequestService" />
<beans:bean id="itemSerializerDeserializer"
class="org.mbracero.integration.ItemSerializerDeserializer" />
<beans:bean id="resultSerializerDeserializer"
class="org.mbracero.integration.ResultSerializerDeserializer" />
<int-ip:tcp-connection-factory id="requestDataServer"
type="server" port="${requestDataServer.port}" single-use="true" deserializer="itemSerializerDeserializer"
serializer="resultSerializerDeserializer" />
<int-ip:tcp-inbound-gateway id="TCPInboundGateway"
connection-factory="requestDataServer" request-channel="channelServerRequestData"
reply-channel="channelServerResponseData" error-channel="errorChannel" />
请求服务:
@Service
public class RequestService {
@ServiceActivator(inputChannel="channelServerRequestData", outputChannel="channelServerResponseData")
public Result requestData(Item input) {
System.out.println("Input :::: " + input);
Result ret = new Result("AAA", "DDDD");
System.out.println("Ret :::: " + ret);
return ret;
}
}
ItemSerializer反序列化器:
public class ItemSerializerDeserializer implements Serializer<Item>, Deserializer<Item> {
(...)
}
结果序列化器反序列化器:
public class ResultSerializerDeserializer implements Serializer<Result>, Deserializer<Result> {
(...)
}
现在我必须添加一些 header (发送和检索),但我需要像上面编写的自定义序列化器/反序列化器一样进行操作。
例如,我需要发送下一个 header :
- 发行者:4 字节。初始位置 1 - 最终位置 4
- 客户端:4 字节。初始位置 5 - 最终位置 8
- 产品:2 字节。初始位置 9 - 最终位置 10
- 类型:3 字节。初始位置 11 - 最终位置 13
- (...)
我已按字节和位置发送和检索这些 header (不像 Map 那样按名称/值)。
通过我的自定义序列化器/反序列化器,我可以对有效负载进行此操作,但我不知道如何对 header 进行操作。
我已经阅读了一些有关属性映射器的内容,但我不知道我的方法是否正确。
在客户端配置中,类似于:
(...)
<int-ip:tcp-connection-factory id="clientRequestData"
type="client" host="${clientRequestData.host}" port="${clientRequestData.port}" single-use="true"
serializer="itemSerializerDeserializer" deserializer="resultSerializerDeserializer"
mapper="mapper" />
<beans:bean id="mapper"
class="org.springframework.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
<beans:constructor-arg name="messageConverter">
<beans:bean class="??????????"/>
</beans:constructor-arg>
</beans:bean>
(...)
有什么帮助吗?
提前致谢。
最佳答案
将MessageConvertingTcpMessageMapper
与MapMessageConverter
结合使用。
总体思路是,对于出站,转换器将 Message
转换为包含所有 headers
和 payload
的 Map
,并且序列化器将映射序列化为 byte[]
。您可以告诉 MapMessageConverter
您希望在 map 中包含哪些 header ,以便您可以在序列化程序中访问它们。
在入站端,解串器从 byte[]
创建一个 Map
,然后 MessageConverter.toMessage()
将映射转换为 Message
。
请参阅MapJsonSerializer举个例子。
另请参阅this test case .
编辑
这是一个简单的实现(不对输出进行错误检查或长度检查)...
private volatile Deserializer<byte[]> packetDeserializer = new ByteArrayLfSerializer();
private volatile Serializer<byte[]> packetSerializer = new ByteArrayLfSerializer();
@Override
public Map<?, ?> deserialize(InputStream inputStream) throws IOException {
byte[] bytes = readToEndOfMessage(inputStream);
String asString = new String(bytes, "UTF-8");
Map<String, String> headers = new HashMap<String, String>();
headers.put("issuer", asString.substring(0, 5));
headers.put("client", asString.substring(4, 9));
headers.put("product", asString.substring(9, 11));
headers.put("type", asString.substring(11, 14));
Map<String, Object> map = new HashMap<String, Object>();
map.put("headers", headers);
map.put("payload", createPayloadFromRemainingBytes(bytes));
return map;
}
@Override
public void serialize(Map<?, ?> object, OutputStream outputStream) throws IOException {
Map<String, String> headers = (Map<String, String>) object.get("headers");
outputStream.write(headers.get("issuer").getBytes("UTF-8"));
outputStream.write(headers.get("client").getBytes("UTF-8"));
outputStream.write(headers.get("product").getBytes("UTF-8"));
outputStream.write(headers.get("type").getBytes("UTF-8"));
outputStream.write(convertPayloadToBytes(object.get("payload")));
}
与 MapMessageConverter 一起使用,Spring Integration header 将与数据一起传送到另一端,他将解码数据并使用这些 header 创建入站消息。
关于java - TCP 按字节发送/检索 header ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31242695/