java - Spring 集成通过拆分丰富

标签 java spring spring-integration spring-integration-dsl

假设我有一个产品,我需要使用下载的图像 ID 来丰富该产品。

作为输入消息,我有 java pojo。为了简单起见,将其呈现为 json :

{
    "id" : "productId",
    "price" : 10000,
    "productPhotos" : ["http://url1", "http://url2", ...],
    "marketPhotos" : ["http://url1", "http://url2", ...]
}

我还有可轮询的chanel,可以下载图像并将其放在存储中的某个位置并返回下载的照片ID

@Bean
    public IntegrationFlow imageDownloadFlow() {
        return IntegrationFlows.from(inputChannel())
                .transform(Message.class, messageTransformer::transformToImageMassage, e -> e.poller(queuePoller()))
                .transform(imageDownloader::download)
                .transform(imageS3Uploader::upload)
                .channel(outputChannel())
                .get();
    }

因此,我的任务是并行运行“productPhotos”和“marketPhotos”,并使用下载的 ID 丰富产品消息。 例如

{
    "id" : "productId",
    "price" : 10000,
    "productPhotos" : ["id1", "id2", ...],
    "marketPhotos" : ["id3", "id4", ...]
}

是否可以丰富IntegrationFlows?

最佳答案

是的,使用 ContentEnricherrequestChannel 中使用 PublishSubscribeChannel(带有任务执行器)以及图像下载器下游的聚合器。

使用enrich() DSL 方法。

编辑

这是一个例子:

@SpringBootApplication
public class So57357544Application {

    public static void main(String[] args) {
        SpringApplication.run(So57357544Application.class, args);
    }

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from(() -> new Pojo("one", 42, Collections.singletonList("https://localhost/foo"),
                    Collections.singletonList("https://localhost/bar")),
                    e -> e.poller(Pollers.fixedRate(50000)))
                .enrich(enr -> enr.requestChannel("enricherFlow.input")
                        .<List<Pojo>>propertyFunction("productPhotos", msg -> {
                            List<String> photos = msg.getPayload().get(0).getProductPhotos();
                            photos.addAll(msg.getPayload().get(1).getProductPhotos());
                            return photos;
                        })
                        .<List<Pojo>>propertyFunction("marketPhotos", msg -> {
                            List<String> photos = msg.getPayload().get(0).getMarketPhotos();
                            photos.addAll(msg.getPayload().get(1).getMarketPhotos());
                            return photos;
                        }))
                .log()
                .get();
    }

    @Bean
    public IntegrationFlow enricherFlow() {
        return f -> f
                .<Pojo, Pojo> transform(pojo -> new Pojo(pojo.getId(), pojo.getPrice(),
                        pojo.getProductPhotos(),
                        pojo.getMarketPhotos()))
                    .publishSubscribeChannel(exec(), ps -> ps
                            .applySequence(true)
                            .subscribe(f1 -> f1.handle("urlToId", "product").channel("aggregator.input"))
                            .subscribe(f1 -> f1.handle("urlToId", "market").channel("aggregator.input")));
    }

    @Bean
    public IntegrationFlow aggregator() {
        return f -> f.aggregate();
    }

    @Bean
    public Executor exec() {
        ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
        exec.setCorePoolSize(2);
        return exec;
    }

}

@Component
class UrlToId {

    public Pojo product(Pojo pojo) {
        List<String> productPhotos = pojo.getProductPhotos().stream()
                .map(phot -> phot.substring(phot.lastIndexOf('/')))
                .collect(Collectors.toList());
        return new Pojo(pojo.getId(), pojo.getPrice(), productPhotos, Collections.emptyList());
    }

    public Pojo market(Pojo pojo) {
        List<String> marketPhotos = pojo.getMarketPhotos().stream()
                .map(phot -> phot.substring(phot.lastIndexOf('/')))
                .collect(Collectors.toList());
        return new Pojo(pojo.getId(), pojo.getPrice(), Collections.emptyList(), marketPhotos);
    }

}

class Pojo {

    private final String id;

    private final int price;

    private final List<String> productPhotos = new ArrayList<>();

    private final List<String> marketPhotos = new ArrayList<>();

    public Pojo(String id, int price, List<String> productPhotes, List<String> marketPhotos) {
        this.id = id;
        this.price = price;
        setProductPhotos(productPhotes);
        setMarketPhotos(marketPhotos);
    }

    public String getId() {
        return this.id;
    }

    public int getPrice() {
        return this.price;
    }

    public List<String> getProductPhotos() {
        return new ArrayList<>(this.productPhotos);
    }

    public List<String> getMarketPhotos() {
        return new ArrayList<>(this.marketPhotos);
    }

    public final void setProductPhotos(List<String> photos) {
        if (photos.size() > 0) {
            this.productPhotos.clear();
            this.productPhotos.addAll(photos);
        }
    }

    public final void setMarketPhotos(List<String> photos) {
        if (photos.size() > 0) {
            this.marketPhotos.clear();
            this.marketPhotos.addAll(photos);
        }
    }

    @Override
    public String toString() {
        return "Pojo [id=" + this.id + ", price=" + this.price
                + ", productPhotos=" + this.productPhotos
                + ", marketPhotos=" + this.marketPhotos + "]";
    }

}

关于java - Spring 集成通过拆分丰富,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57357544/

相关文章:

java - 了解同步的使用

java - 在 Actionscript 3 中从 Java Servlet 接收 ByteArray

java - Spring用单线程池并发处理多个队列

java - Spring Integration 使用 DSL 从 Unix 位置读取文件

java - Spring 与 JMS + ActiveMQ 集成 : Messages remain in JDBC Message Store after reconnect

java - 使用 PropertiesFactoryBean 进行属性扩展

java - 是否值得学习谷歌数据存储

mysql - 在 MySQL 数据库中正确创建和持久化

java - 微服务:我应该按实体还是按 MVC 模式结构来安排 Spring 项目结构?

java - 在http请求连接器中传递动态查询参数和路径参数