上下文:我想通过 ElasticSearch 和 Spring WebFlux 在一个完整的 react 堆栈化合物中使用 ElasticSearch。
这是我第一次使用 springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient 和 springframework.data.elasticsearch.core.ReactiveElasticsearchOperations。我曾使用 MongoDb 在响应式堆栈中工作,但这是我第一次使用 ElasticSearch。
我已经成功地按照教程使用 ReactiveElasticsearchOperations 和 spring-data-elasticsearch-3.2.6 和 elasticsearch-6.8.7 (Elastic Tutorial)
findAll/findById 与 elastic-6.8.7 和 spring-data-elasticsearch-3.2.6 一起正常工作
我的模型服务:
...
private final ReactiveElasticsearchOperations reactiveElasticsearchOperations;
private final ReactiveElasticsearchClient reactiveElasticsearchClient;
public MyModelServiceImpl(ReactiveElasticsearchOperations reactiveElasticsearchOperations,
ReactiveElasticsearchClient reactiveElasticsearchClient) {
this.reactiveElasticsearchOperations = reactiveElasticsearchOperations;
this.reactiveElasticsearchClient = reactiveElasticsearchClient;
}
@Override
public Mono<MyModel> findMyModelById(String id){
return reactiveElasticsearchOperations.findById(
id,
MyModel.class,
MYMODEL_ES_INDEX,
DEFAULT_ES_DOC_TYPE
).doOnError(throwable -> logger.error(throwable.getMessage(), throwable));
}
@Override
public Flux<MyModel> findAllMyModels(String field, String value){
NativeSearchQueryBuilder query = new NativeSearchQueryBuilder();
if (!StringUtils.isEmpty(field) && !StringUtils.isEmpty(value)) {
query.withQuery(QueryBuilders.matchQuery(field, value));
}
return reactiveElasticsearchOperations.find(
query.build(),
MyModel.class,
MYMODEL_ES_INDEX
).doOnError(throwable -> logger.error(throwable.getMessage(), throwable));
}
我尝试使用更新版本(spring-data-elasticsearch-4 和 elast-7.6.2)遵循相同的想法。因为我可以阅读“已弃用。自 4.0 以来,使用搜索(查询,...)通量发射匹配实体一一包装在 SearchHit 中。”然后我完全卡住了,因为结果被包裹在 SearchHit 中。好吧,四处搜索我不明白为什么这样的包装器既不知道如何将我的模型转换/映射/flatMap/etc 转换为 Flux 以返回通过 Controller 方法。
这是我在这个问题主题中提到的问题的暂定:
服务:
import com.poc.favoritos.model.Sugestao;
import org.elasticsearch.index.query.QueryBuilders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class SugestaoServiceImpl implements SugestaoService{
private static final Logger logger = LoggerFactory.getLogger(SugestaoServiceImpl.class);
private final ReactiveElasticsearchOperations reactiveElasticsearchOperations;
private final ReactiveElasticsearchClient reactiveElasticsearchClient;
public SugestaoServiceImpl(ReactiveElasticsearchOperations reactiveElasticsearchOperations,
ReactiveElasticsearchClient reactiveElasticsearchClient) {
this.reactiveElasticsearchOperations = reactiveElasticsearchOperations;
this.reactiveElasticsearchClient = reactiveElasticsearchClient;
}
@Override
public Mono<Sugestao> findSugestaoById(String id) {
return reactiveElasticsearchOperations.get(id, Sugestao.class)
.doOnError(throwable -> logger.error(throwable.getMessage(), throwable));
}
@Override
public Flux<Sugestao> findAllMySugestoes(String field, String value) {
NativeSearchQueryBuilder query = new NativeSearchQueryBuilder();
if (!StringUtils.isEmpty(field) && !StringUtils.isEmpty(value)) {
query.withQuery(QueryBuilders.matchQuery(field, value));
}
return reactiveElasticsearchOperations.search(query.build(), Sugestao.class);
}
}
ElastiSearchConfig 最初复制自 Same tutorial mentioned above .老实说,我不确定我为什么需要以及这个配置添加到我的项目中。顺便说一句,我也在研究它 operations reference .
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
@Configuration
public class ElasticsearchConfig {
@Bean
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(elassandraHostAndPort)
.withWebClientConfigurer(webClient -> {
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs()
.maxInMemorySize(-1))
.build();
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
})
.build();
return ReactiveRestClients.create(clientConfiguration);
}
@Bean
public ElasticsearchConverter elasticsearchConverter() {
return new MappingElasticsearchConverter(elasticsearchMappingContext());
}
@Bean
public SimpleElasticsearchMappingContext elasticsearchMappingContext() {
return new SimpleElasticsearchMappingContext();
}
@Bean
public ReactiveElasticsearchOperations reactiveElasticsearchOperations() {
return new ReactiveElasticsearchTemplate(reactiveElasticsearchClient(), elasticsearchConverter());
}
@Value("${spring.data.elasticsearch.client.reactive.endpoints}")
private String elassandraHostAndPort;
}
最佳答案
至于SearchHit
:此类包含来自搜索结果的信息,这些信息不是实体的一部分,而是搜索结果的一部分,例如分数、排序值、突出显示条目。
如果您不需要这个并且只想单独使用实体的 Flux:
Flux<SearchHit<Entity>> fluxSearchHits = ...
Flux<Entity> fluxEntity = fluxSearchHits.map(searchHit -> searchHit.getContent);
至于配置:
您需要
ReactiveElasticsearchClient
bean 来配置 Spring Data Elasticsearch。其他 3 颗 bean :不知道它们为什么在那里; Spring Data Elasticsearch 4.0 不需要它们2020 年 5 月 16 日编辑:
配置:您应该从
AbstractReactiveElasticsearchConfiguration
派生配置类,那么你就不需要其他bean了,因为基类定义了必要的东西:@Configuration
public class ElasticsearchConfig extends AbstractReactiveElasticsearchConfiguration{
@Value("${spring.data.elasticsearch.client.reactive.endpoints}")
private String elassandraHostAndPort;
@Bean
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(elassandraHostAndPort)
.build();
return ReactiveRestClients.create(clientConfiguration);
}
}
只有当您检索大型结果集并且结果缓冲区的默认内存大小太低时,才需要自定义 WebClientConfiguration。
关于spring-boot - Spring-data-elasticsearch:更新到 7.6.2 后无法从 Flux<SearchHit<Sugestao>> 转换为 Flux<Sugestao>。如何处理 SearchHit?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61827249/