java - Spring 集成聚合器

标签 java spring spring-integration

我想使用聚合器从两条消息中创建一条消息,但我不知道如何执行此操作。

目前,我正在从目录中读取两个文件,并希望将这些消息聚合为一个。

我的整个项目如下所示:

读入.zip -> 传递给自定义消息处理程序,将其解压缩到目录中 -> 从此目录中读取文件 -> 尝试聚合它们

如果我可以在解压缩文件后发送带有两个有效负载的消息,那就太好了,但在阅读后聚合就足够了。

我的解压程序如下所示:

public class ZipHandler extends AbstractMessageHandler {

File dat;
File json;

@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
    byte[] buffer = new byte[1024];
    try {
        File file = (File) message.getPayload();
        ZipFile zip = new ZipFile(file);

        for (Enumeration<? extends ZipEntry> entries = zip.entries(); entries
                .hasMoreElements();) {
            ZipEntry ze = entries.nextElement();
            String name = ze.getName();

            if (name.endsWith(".dat") || name.endsWith(".DAT")) {
                InputStream input = zip.getInputStream(ze);
                File datFile = new File("D:/lrtrans/zipOut"
                        + File.separator + name);
                FileOutputStream fos = new FileOutputStream(datFile);
                int len;
                while ((len = input.read(buffer)) > 0) {
                    fos.write(buffer, 0, len);
                }
                this.dat = datFile;
                fos.close();
            } else if (name.endsWith(".json") || name.endsWith(".JSON")) {
                InputStream input = zip.getInputStream(ze);
                File jsonFile = new File("D:/lrtrans/zipOut"
                        + File.separator + name);
                FileOutputStream fos = new FileOutputStream(jsonFile);
                int len;
                while ((len = input.read(buffer)) > 0) {
                    fos.write(buffer, 0, len);
                }
                this.json = jsonFile;
                fos.close();
            }
        }
        zip.close();
    } catch (Exception e) {
        e.printStackTrace();
    }

}
}

它将这些文件放入两个目录中,我使用 FileReadingMessageSource 再次从中读取它们。 我还想仅使用基于注释的表示法而不是 xml 来解决此问题。

编辑:

我只想将 defaultAggregatingMessagegroupProcssor 与基于名为“zip”的 header 的关联策略和基于消息的releaseStrategy 结合使用,因为在这种情况下,两个文件应合并为一个。

@Aggregator(inputChannel = "toAggregatorChannel", outputChannel = "toRouterChannel", discardChannel = "nullChannel")
public DefaultAggregatingMessageGroupProcessor aggregate(){
    DefaultAggregatingMessageGroupProcessor aggregator = new DefaultAggregatingMessageGroupProcessor(); 
    return aggregator;
}
@CorrelationStrategy 
public String correlateBy(@Header("zipFile") String zip){
    return zip;
}
@ReleaseStrategy
public boolean isReadytoRelease(List<Message<?>> messages) {
    return messages.size() == 2;
}

最佳答案

我想说你走的是对的路。由于您的 zip 文件中包含多个文件,因此正确的要求是将其解压缩并将这些文件收集为一条消息并发送以进行进一步处理。

所以,是的,<aggregator>是给你的。只需要确定如何将它们关联和分组。

不知道如何解压缩它们,但您确实可以使用 zip 文件名作为 correlationKey并使用多个文件作为组大小来确定释放组的信号。

欢迎提出更多问题。但首先我需要看看你的“ zipper ”。

更新

首先,基于注释的聚合器配置有点受限,最好使用 @ServiceActivator关于AggregatingMessageHandler @Bean更好地控制其选项。

但是,即使您做出选择,也可以实现您的要求。但是@Aggregator配置应遵循 POJO 方法调用原则:

@Aggregator(inputChannel = "toAggregatorChannel", outputChannel = "toRouterChannel", discardChannel = "nullChannel")
public List<File> aggregate(List<File> files){
    return files;
}

类似的事情。

关于java - Spring 集成聚合器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29390141/

相关文章:

java - 从针对特定主题标签的查询返回的 StatusJSONImpl 获取用户的用户名及其推文

java - Spring REST Controller 未映射

java - Spring Cloud 流: Republish to other amqp connection if current connection throws exception

java - 没有函数指针,除了接口(interface)之外,在 Java 中实现回调的推荐方法是什么?

java - 用真值表过滤

java - 从@ElementCollection 中搜索一个对象

javascript - 使用 displaytag 在表(网格)中添加新列

spring - 在 Spring Integration 中设置消息的生存时间

java - java.lang.IllegalArgumentException:找到了多个参数类型候选:[java.lang.String]和[java.lang.Long]

java - 在 C++ 中创建一个 byte[][] 并使用 JNI 将其返回给 Java