java - Spring Integration 不转发自定义 header ?

标签 java spring spring-integration

我在 my-transformer 中添加一些 header :

public Message<?> transform(final Message<?> message) {        
    List<Item> items = doStuff(message);

    final MessageBuilder<?> messageBuilder = MessageBuilder
            .withPayload(message.getPayload())
            .copyHeadersIfAbsent(message.getHeaders());


    for (final Item item : items) {
        messageBuilder.setHeader(item.getHeaderName(), item.getValue());
    }

    return messageBuilder.build();
}

我编写了一个集成测试来确认我的 header 存在于输出 channel 上:

 public static class HeaderTest extends TransformerTest {
    @Test
    public void test() throws Exception {
        channels.input().send(new GenericMessage<>(TransformerTest.EXAMPLE_PAYLOAD));
        final Message<?> out = this.collector.forChannel(this.channels.output()).poll(10, TimeUnit.SECONDS);

        assertThat(out, HeaderMatcher.hasHeader("header-test", notNullValue()));
    }
}

但是,当我创建一个像这样的流时:

http --port=1234 | my-transformer | log --expression=toString()

并发送了相同的 EXAMPLE_PAYLOAD 我在 log 日志中收到以下消息:GenericMessage [payload=..., headers={kafka_offset=0 , id=f0a0727c-9351-274c-58b3-edee9ccbf6ce, kafka_receivedPartitionId=0, contentType=text/plain;charset=UTF-8, kafka_receivedTopic=myTopic.my-transformer, timestamp=1485171448947}]

为什么我的 header-test 不在邮件 header 中?

-- 编辑--

所以,如果我理解正确的话,我应该做类似的事情:

public class MyTransformer implements Transformer {

    private final EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter();

    @Override
    public Message<?> transform(final Message<?> message) {
        List<Item> items = doStuff(message);

        final MessageBuilder<byte[]> messageBuilder = MessageBuilder
                .withPayload(((String) message.getPayload()).getBytes())
                .copyHeadersIfAbsent(message.getHeaders());

        final int itemsSize = items.size();
        final String[] headerNames = new String[itemsSize];
        for (int i = 0; i < itemsSize; i++) {
            final Item item = items.get(i);
            messageBuilder.setHeader(item.getHeaderName(), item.getValue());
            headerNames[i] = item.getHeaderName();
        }

        final Message<byte[]> msg = messageBuilder.build();

        final byte[] rawMessageWithEmbeddedHeaders;
        try {
            rawMessageWithEmbeddedHeaders = converter.embedHeaders(new MessageValues(msg), headerNames);
        } catch (final Exception e) {
            throw new HeaderEmbeddingException(String.format("Cannot embed headers from '%s' into message: %s", items, msg), e);
        }

        return new GenericMessage<>(rawMessageWithEmbeddedHeaders);
    }
}

application.properties中设置spring.cloud.stream.bindings.output. Producer.headerMode=raw,然后在接收端转换消息负载?或者我可以以某种方式让接收方自动转换消息负载吗?

最佳答案

您没有说明您使用的是 Spring XD 还是 Spring Cloud DataFlow,但每种情况下的解决方案都是相似的。

由于 kafka 没有对 header 的 native 支持,因此我们必须将它们嵌入到消息负载中。由于我们不想传输不必要的 header ,因此您必须通过在 servers.yml 中设置 header 名称来选择要传输的 header 。适用于 Spring XD 或 application.yml (或 .properties )用于 Spring Cloud Stream 应用程序。

编辑

不幸的是,不支持模式。一种选择是使用 EmbeddedHeadersMessageConverter自己设置kafka mode到原始(在变压器的输出目的地)。原始模式意味着 Binder 不会嵌入 header 。

这样,下一个应用程序(没有模式 raw )应该能够解码 header ,就像它们已由变压器中的绑定(bind)器编码一样。 Javadocs here .

您的 header 数量限制为 255 个。

关于java - Spring Integration 不转发自定义 header ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41805743/

相关文章:

error-handling - Spring 集成 : How to use an "error gateway" with Splitter Aggregator when Exception thrown

java - IntelliJ : Could not target platform: 'Java SE 12' using tool chain: 'JDK 11 (11)'

java - 奥运会java编码

java - com.marlonjones.nimbus.WeatherWatchFaceService.WeatherWatchFaceEngine 中没有可用的默认构造函数

java - 使用 Swing 组件在 Java Swing 中制作一个简单的 GUI 编辑器

java - 如何通过向其传递运行时参数来使用命令模式

java - 从 Wildfly 11 升级到 Wildfly 15 时出现日期序列化问题

spring - 如何从 Spring 授权服务器示例中获取刷新 token

rest - 如何将 MVC Rest 服务与 Spring Integration 集成?

java - Spring Integration 使用 randomUUID 丰富 Header