mongodb - Spring + MongoDB 同时使用非 react 性和 react 性存储库

标签 mongodb spring-boot event-listener changestream

我有一个 SpringBoot + Mongo 应用程序,使用非 react 性存储库(扩展 MongoRepository)实现,一切正常。 现在我需要实现一个审核系统,我想使用 Mongo 的 ChangeStreams 和 Spring 的 Reactive Change Streams ( reference ) 来开发它。我尝试创建 3 个类来实现此目的:

MongoMessageListener

@Component
@Slf4j
public class MongoMessageListener implements MessageListener<ChangeStreamDocument<Document>, MyDocument> {

    @Override
    public void onMessage(Message<ChangeStreamDocument<Document>, MyDocument> message) {

        OperationType operationType = message.getRaw().getOperationType();

        log.info("Operation type is : {}", operationType);

        log.info("Received Message in collection: {},message raw: {}, message body:{}",
                message.getProperties().getCollectionName(), message.getRaw(), message.getBody());
    }
}

MongoListenerConfig

@Configuration
@Slf4j
public class MongoStreamListenerConfig extends AbstractReactiveMongoConfiguration {

    @Bean
    MessageListenerContainer changeStreamListenerContainer(
            MongoTemplate template,
            MongoMessageListener consentAuditListener,
            ErrorHandler errorHandler) {

        MessageListenerContainer messageListenerContainer =
                new MongoStreamListenerContainer(template, errorHandler);

        ChangeStreamRequest<ParentContentDocument> request =
                ChangeStreamRequest.builder(consentAuditListener)
                        .collection("my_document_collection")
                        .filter(newAggregation(match(where("operationType").is("update"))))
                        .fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
                        .build();

        messageListenerContainer.register(request, MyDocument.class, errorHandler);
        log.info("> Mongo Stream Listener is registered");
        return messageListenerContainer;
    }

    @Override
    protected String getDatabaseName() {
        return "myDatabase";
    }

    @Bean
    ErrorHandler getLoggingErrorHandler() {
        return new ErrorHandler() {
            @Override
            public void handleError(Throwable throwable) {
                log.error("Error in creating audit records {}", throwable.getMessage());
            }
        };
    }
}

MongoStreamListenerContainer

public class MongoStreamListenerContainer extends DefaultMessageListenerContainer {

    public MongoStreamListenerContainer(MongoTemplate template, ErrorHandler errorHandler) {
        super(template, Executors.newFixedThreadPool(15), errorHandler);
    }

    @Override
    public boolean isAutoStartup() {
        return true;
    }
}

我还添加了一个扩展 ReactiveMongoRepository<MyDocument, String> 的存储库

当我尝试运行我的应用程序时,它会引发多个 ClassNotFoundException ([...] 'reactiveStreamsMongoClient' threw exception; nested exception is java.lang.NoClassDefFoundError: com/mongodb/internal/connection/InternalConnectionPoolSettings ) 错误,或 @Autowired 将存储库导入到不满足的服务中 (Autowired(required=true)})

在我的 Main.java 类中,我尝试设置 @EnableMongoRepositories(basePackages = "com.my.path.to.repository")@EnableReactiveMongoRepositories("com.my.path.to.reactive.repository") ,但似乎没有任何作用。 我怀疑我无法将非 react 性和 react 性 repo 挤在一起,但我发现 this SO question还有更多,所以我想你可以。 我试图关注this Spring project一步一步,但我总是收到 ClassNotFound 错误。

在我的 pom 中我有

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-data-mongodb</artifactId>
 <version>2.5.3</version>
</dependency>
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
 <version>2.5.3</version>
</dependency>
<dependency>
 <groupId>org.mongodb</groupId>
 <artifactId>mongodb-driver-reactivestreams</artifactId>
 <version>4.6.1</version>
</dependency>
<dependency>
 <groupId>io.projectreactor</groupId>
 <artifactId>reactor-core</artifactId>
 <version>3.4.19</version>
</dependency> 

我真的看不出我错过了什么:如果它只是一些配置,或者如果我不能混合非 react 性和 react 性 repo (即使我发现用户说你可以)。任何帮助将非常感激,这让我有点疯狂!非常感谢!

最佳答案

正如您在我的旧书 Question 中所问的那样,我将分享我在我的 webflux 项目中所做的事情。我在我的项目中使用了一个用 Web MVC 编写的旧库,其中使用了非响应式(Reactive) mongo。 我正在使用以下依赖项;

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

我的响应式(Reactive) mongo 配置如下所示。这里我使用 mongo URI 连接字符串来连接,你也可以使用 user/pass 来连接

@Configuration
public class ReactiveMongoConfiguration extends AbstractReactiveMongoConfiguration {

  private final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();


  @Override
  @Bean
  @Primary
  public MongoClient reactiveMongoClient() {
    ConnectionString connectionString = new ConnectionString(MY_DB_URI_KEY);
    return MongoClients.create(MongoClientSettings.builder().applyConnectionString(connectionString)
        .streamFactoryFactory(NettyStreamFactoryFactory.builder().eventLoopGroup(eventLoopGroup).build()).build());
  }

  @Bean
  @Primary
  public ReactiveMongoTemplate reactiveMongoTemplate() {
    return new ReactiveMongoTemplate(reactiveMongoClient(), getDatabaseName());
  }

  @Override
  protected String getDatabaseName() {
    return new ConnectionString(MY_DB_URI_KEY).getDatabase();
  }

  @PreDestroy
  public void shutDownEventLoopGroup() {
    eventLoopGroup.shutdownGracefully();
  }
}}

我的旧 Web MVC 库包含非响应式(Reactive) mongo bean,如下所示,

@Configuration
public class MongoConfigurations {

    @Bean
    public MongoTemplate mongoTemplate() {
            ConnectionString connectionString = new ConnectionString(MY_DB_URI);
            MongoClient mongoClient = MongoClients.create(connectionString);
            mongoTemplate = new MongoTemplate(mongoClient, connectionString.getDatabase());
            return mongoTemplate;
    }
}

在我的主应用程序中,我排除了以下自动配置;

exclude = { MongoAutoConfiguration.class, MongoDataAutoConfiguration.class }

现在您可以通过注入(inject)来使用响应式(Reactive)和非响应式(Reactive)单声道模板。 :)

关于mongodb - Spring + MongoDB 同时使用非 react 性和 react 性存储库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72731742/

相关文章:

javascript - 如何创建自定义事件来处理所有 Transitionend 事件?

mongodb - Mongoose:删除项目 - 第一个总是被删除

node.js - Mongoose 中 document.save() 后的人口数量错误

java - Spring-data 无法更新 id 为 0 的对象

java - Spring 数据在生产中休息

javascript - 如何将相同的事件监听器添加到多个标记,然后在 Google Maps API v3 中区分监听器中的标记?

mysql - NoSQL 模式中的 SQL 关系

node.js - Mongoose .save() 无法在没有回调的情况下工作

java - 请求的资源上不存在 'Access-Control-Allow-Origin' header ,并且 JWT token 不以 Bearer String 开头

带有 eventListener 的 Javascript 代码不适用于视频按钮控制系统