|
|
|
@ -1,8 +1,11 @@
|
|
|
|
|
package com.wayn.data.elastic.manager;
|
|
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
|
|
import com.wayn.data.elastic.config.ElasticConfig;
|
|
|
|
|
import lombok.AllArgsConstructor;
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
|
|
|
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
|
|
|
|
import org.elasticsearch.action.bulk.BulkRequest;
|
|
|
|
|
import org.elasticsearch.action.bulk.BulkResponse;
|
|
|
|
|
import org.elasticsearch.action.delete.DeleteRequest;
|
|
|
|
@ -23,48 +26,47 @@ import org.elasticsearch.client.indices.GetIndexRequest;
|
|
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
|
|
|
|
import org.elasticsearch.index.query.QueryBuilder;
|
|
|
|
|
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
|
|
|
|
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
|
|
|
|
import org.elasticsearch.rest.RestStatus;
|
|
|
|
|
import org.elasticsearch.search.SearchHit;
|
|
|
|
|
import org.elasticsearch.search.SearchHits;
|
|
|
|
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
import org.springframework.beans.factory.DisposableBean;
|
|
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.Collection;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
@Slf4j
|
|
|
|
|
@Component
|
|
|
|
|
public class ElasticDocument {
|
|
|
|
|
@AllArgsConstructor
|
|
|
|
|
public class ElasticDocument implements DisposableBean {
|
|
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
|
RestHighLevelClient restHighLevelClient;
|
|
|
|
|
private ElasticConfig elasticConfig;
|
|
|
|
|
private RestHighLevelClient restHighLevelClient;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 创建索引
|
|
|
|
|
*
|
|
|
|
|
* @param idxName 缩影名称
|
|
|
|
|
* @param idxSQL 缩影定义
|
|
|
|
|
* @param idxSQL 索引创建语句
|
|
|
|
|
*/
|
|
|
|
|
public boolean createIndex(String idxName, String idxSQL) {
|
|
|
|
|
try {
|
|
|
|
|
if (this.indexExist(idxName)) {
|
|
|
|
|
log.error(" idxName={} 已经存在,idxSql={}", idxName, idxSQL);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
CreateIndexRequest request = new CreateIndexRequest(idxName);
|
|
|
|
|
buildSetting(request);
|
|
|
|
|
request.mapping(idxSQL, XContentType.JSON);
|
|
|
|
|
// request.settings() 手工指定Setting
|
|
|
|
|
CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
|
|
|
|
|
if (!res.isAcknowledged()) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error(e.getMessage(), e);;
|
|
|
|
|
public boolean createIndex(String idxName, String idxSQL) throws IOException {
|
|
|
|
|
if (indexExist(idxName)) {
|
|
|
|
|
log.info("idxName={} 已经存在,idxSql={}", idxName, idxSQL);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
CreateIndexRequest request = new CreateIndexRequest(idxName);
|
|
|
|
|
buildSetting(request);
|
|
|
|
|
request.mapping(idxSQL, XContentType.JSON);
|
|
|
|
|
// request.settings() 手工指定Setting
|
|
|
|
|
CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
|
|
|
|
|
return res.isAcknowledged();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -72,9 +74,9 @@ public class ElasticDocument {
|
|
|
|
|
*
|
|
|
|
|
* @param idxName 索引名称
|
|
|
|
|
* @return boolean
|
|
|
|
|
* @throws Exception 异常
|
|
|
|
|
* @throws IOException 异常
|
|
|
|
|
*/
|
|
|
|
|
public boolean indexExist(String idxName) throws Exception {
|
|
|
|
|
public boolean indexExist(String idxName) throws IOException {
|
|
|
|
|
GetIndexRequest request = new GetIndexRequest(idxName);
|
|
|
|
|
request.local(false);
|
|
|
|
|
request.humanReadable(true);
|
|
|
|
@ -84,49 +86,32 @@ public class ElasticDocument {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 断某个index是否存在
|
|
|
|
|
*
|
|
|
|
|
* @param idxName index名
|
|
|
|
|
* @return boolean
|
|
|
|
|
*/
|
|
|
|
|
public boolean isExistsIndex(String idxName) throws Exception {
|
|
|
|
|
return restHighLevelClient.indices().exists(new GetIndexRequest(idxName), RequestOptions.DEFAULT);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 设置分片
|
|
|
|
|
* 设置分片和副本
|
|
|
|
|
*
|
|
|
|
|
* @param request
|
|
|
|
|
* @param request 创建索引请求
|
|
|
|
|
*/
|
|
|
|
|
public void buildSetting(CreateIndexRequest request) {
|
|
|
|
|
request.settings(Settings.builder().put("index.number_of_shards", 3)
|
|
|
|
|
.put("index.number_of_replicas", 2));
|
|
|
|
|
request.settings(Settings.builder().put("index.number_of_shards", elasticConfig.getShards())
|
|
|
|
|
.put("index.number_of_replicas", elasticConfig.getReplicas()));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param idxName index
|
|
|
|
|
* @param entity 对象
|
|
|
|
|
*/
|
|
|
|
|
public boolean insertOrUpdateOne(String idxName, ElasticEntity entity) {
|
|
|
|
|
public boolean insertOrUpdateOne(String idxName, ElasticEntity entity) throws IOException {
|
|
|
|
|
IndexRequest request = new IndexRequest(idxName);
|
|
|
|
|
log.info("Data : id={},entity={}", entity.getId(), JSON.toJSONString(entity.getData()));
|
|
|
|
|
request.id(entity.getId());
|
|
|
|
|
request.source(entity.getData(), XContentType.JSON);
|
|
|
|
|
// request.source(JSON.toJSONString(entity.getData()), XContentType.JSON);
|
|
|
|
|
try {
|
|
|
|
|
IndexResponse indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
|
|
|
|
|
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
|
|
|
|
|
if (shardInfo.getFailed() > 0) {
|
|
|
|
|
for (ReplicationResponse.ShardInfo.Failure failure :
|
|
|
|
|
shardInfo.getFailures()) {
|
|
|
|
|
log.error(failure.reason());
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
IndexResponse indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
|
|
|
|
|
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
|
|
|
|
|
if (shardInfo.getFailed() > 0) {
|
|
|
|
|
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
|
|
|
|
|
log.error(failure.reason(), failure.getCause());
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
return indexResponse.status().equals(RestStatus.OK);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -136,19 +121,30 @@ public class ElasticDocument {
|
|
|
|
|
* @param idxName index
|
|
|
|
|
* @param list 带插入列表
|
|
|
|
|
*/
|
|
|
|
|
public boolean insertBatch(String idxName, List<ElasticEntity> list) {
|
|
|
|
|
public boolean insertBatch(String idxName, List<ElasticEntity> list) throws IOException {
|
|
|
|
|
BulkRequest request = new BulkRequest();
|
|
|
|
|
list.forEach(item -> request.add(new IndexRequest(idxName).id(item.getId())
|
|
|
|
|
.source(item.getData(), XContentType.JSON)));
|
|
|
|
|
try {
|
|
|
|
|
BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
|
|
|
|
|
if (bulkResponse.hasFailures()) {
|
|
|
|
|
return false;
|
|
|
|
|
BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
|
|
|
|
|
return bulkResponseHandler(bulkResponse);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* bulkResponse 处理
|
|
|
|
|
*
|
|
|
|
|
* @param bulkResponse 批量请求响应
|
|
|
|
|
* @return boolean
|
|
|
|
|
*/
|
|
|
|
|
private boolean bulkResponseHandler(BulkResponse bulkResponse) {
|
|
|
|
|
boolean flag = true;
|
|
|
|
|
for (BulkItemResponse response : bulkResponse) {
|
|
|
|
|
if (response.isFailed()) {
|
|
|
|
|
flag = false;
|
|
|
|
|
BulkItemResponse.Failure failure = response.getFailure();
|
|
|
|
|
log.error(failure.getMessage(), failure.getCause());
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
return flag;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -157,14 +153,11 @@ public class ElasticDocument {
|
|
|
|
|
* @param idxName index
|
|
|
|
|
* @param idList 待删除列表
|
|
|
|
|
*/
|
|
|
|
|
public <T> void deleteBatch(String idxName, Collection<T> idList) {
|
|
|
|
|
public <T> boolean deleteBatch(String idxName, Collection<T> idList) throws IOException {
|
|
|
|
|
BulkRequest request = new BulkRequest();
|
|
|
|
|
idList.forEach(item -> request.add(new DeleteRequest(idxName, item.toString())));
|
|
|
|
|
try {
|
|
|
|
|
restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
|
}
|
|
|
|
|
BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
|
|
|
|
|
return bulkResponseHandler(bulk);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -174,87 +167,71 @@ public class ElasticDocument {
|
|
|
|
|
* @param id 文档ID
|
|
|
|
|
* @return boolean
|
|
|
|
|
*/
|
|
|
|
|
public boolean delete(String idxName, String id) {
|
|
|
|
|
DeleteRequest request = new DeleteRequest(
|
|
|
|
|
idxName, id);
|
|
|
|
|
try {
|
|
|
|
|
DeleteResponse deleteResponse = restHighLevelClient.delete(
|
|
|
|
|
request, RequestOptions.DEFAULT);
|
|
|
|
|
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
|
|
|
|
|
if (shardInfo.getFailed() > 0) {
|
|
|
|
|
for (ReplicationResponse.ShardInfo.Failure failure :
|
|
|
|
|
shardInfo.getFailures()) {
|
|
|
|
|
log.error(failure.reason());
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
public boolean delete(String idxName, String id) throws IOException {
|
|
|
|
|
DeleteRequest request = new DeleteRequest(idxName, id);
|
|
|
|
|
DeleteResponse deleteResponse = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
|
|
|
|
|
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
|
|
|
|
|
if (shardInfo.getFailed() > 0) {
|
|
|
|
|
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
|
|
|
|
|
log.error(failure.reason());
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
return deleteResponse.status().equals(RestStatus.OK);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 搜索文档
|
|
|
|
|
*
|
|
|
|
|
* @param idxName index
|
|
|
|
|
* @param builder 查询参数
|
|
|
|
|
* @param c 结果类对象
|
|
|
|
|
* @return java.util.List<T>
|
|
|
|
|
*/
|
|
|
|
|
public <T> List<T> search(String idxName, SearchSourceBuilder builder, Class<T> c) {
|
|
|
|
|
public <T> List<T> search(String idxName, SearchSourceBuilder builder, Class<T> c) throws IOException {
|
|
|
|
|
SearchRequest request = new SearchRequest(idxName);
|
|
|
|
|
request.source(builder);
|
|
|
|
|
try {
|
|
|
|
|
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
|
|
|
|
|
SearchHit[] hits = response.getHits().getHits();
|
|
|
|
|
List<T> res = new ArrayList<>(hits.length);
|
|
|
|
|
for (SearchHit hit : hits) {
|
|
|
|
|
res.add(JSON.parseObject(hit.getSourceAsString(), c));
|
|
|
|
|
}
|
|
|
|
|
return res;
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
|
}
|
|
|
|
|
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
|
|
|
|
|
SearchHit[] hits = response.getHits().getHits();
|
|
|
|
|
return Arrays.stream(hits).map(hit -> JSON.parseObject(hit.getSourceAsString(), c)).collect(Collectors.toList());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public <T> List<T> search(MultiSearchRequest request, Class<T> c) {
|
|
|
|
|
try {
|
|
|
|
|
MultiSearchResponse response = restHighLevelClient.msearch(request, RequestOptions.DEFAULT);
|
|
|
|
|
MultiSearchResponse.Item[] responseResponses = response.getResponses();
|
|
|
|
|
List<T> all = new ArrayList<>();
|
|
|
|
|
for (MultiSearchResponse.Item item : responseResponses) {
|
|
|
|
|
SearchHits hits = item.getResponse().getHits();
|
|
|
|
|
List<T> res = new ArrayList<>();
|
|
|
|
|
for (SearchHit hit : hits) {
|
|
|
|
|
res.add(JSON.parseObject(hit.getSourceAsString(), c));
|
|
|
|
|
}
|
|
|
|
|
all.addAll(res);
|
|
|
|
|
/**
|
|
|
|
|
* 在单个http请求中并行执行多个搜索请求
|
|
|
|
|
*
|
|
|
|
|
* @param request 多个搜索请求
|
|
|
|
|
* @param c 结果类对象
|
|
|
|
|
* @return java.util.List<T>
|
|
|
|
|
*/
|
|
|
|
|
public <T> List<T> msearch(MultiSearchRequest request, Class<T> c) throws IOException {
|
|
|
|
|
MultiSearchResponse response = restHighLevelClient.msearch(request, RequestOptions.DEFAULT);
|
|
|
|
|
MultiSearchResponse.Item[] responseResponses = response.getResponses();
|
|
|
|
|
List<T> all = new ArrayList<>();
|
|
|
|
|
for (MultiSearchResponse.Item item : responseResponses) {
|
|
|
|
|
SearchHits hits = item.getResponse().getHits();
|
|
|
|
|
List<T> res = new ArrayList<>(hits.getHits().length);
|
|
|
|
|
for (SearchHit hit : hits) {
|
|
|
|
|
res.add(JSON.parseObject(hit.getSourceAsString(), c));
|
|
|
|
|
}
|
|
|
|
|
return all;
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
|
all.addAll(res);
|
|
|
|
|
}
|
|
|
|
|
return all;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 删除index
|
|
|
|
|
*
|
|
|
|
|
* @param idxName 索引名称
|
|
|
|
|
* @return boolean
|
|
|
|
|
*/
|
|
|
|
|
public boolean deleteIndex(String idxName) {
|
|
|
|
|
try {
|
|
|
|
|
if (!this.indexExist(idxName)) {
|
|
|
|
|
log.error(" idxName={} 不存在", idxName);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().delete(new DeleteIndexRequest(idxName), RequestOptions.DEFAULT);
|
|
|
|
|
if (!acknowledgedResponse.isAcknowledged()) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
|
public boolean deleteIndex(String idxName) throws IOException {
|
|
|
|
|
if (!this.indexExist(idxName)) {
|
|
|
|
|
log.error(" idxName={} 不存在", idxName);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices()
|
|
|
|
|
.delete(new DeleteIndexRequest(idxName), RequestOptions.DEFAULT);
|
|
|
|
|
return acknowledgedResponse.isAcknowledged();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -263,19 +240,36 @@ public class ElasticDocument {
|
|
|
|
|
*
|
|
|
|
|
* @param idxName 缩影名称
|
|
|
|
|
* @param builder 查询条件
|
|
|
|
|
* @return boolean
|
|
|
|
|
*/
|
|
|
|
|
public void deleteByQuery(String idxName, QueryBuilder builder) {
|
|
|
|
|
|
|
|
|
|
public boolean deleteByQuery(String idxName, QueryBuilder builder) throws IOException {
|
|
|
|
|
DeleteByQueryRequest request = new DeleteByQueryRequest(idxName);
|
|
|
|
|
request.setQuery(builder);
|
|
|
|
|
//设置批量操作数量,最大为10000
|
|
|
|
|
request.setBatchSize(10000);
|
|
|
|
|
// 设置批量操作数量,最大为10000
|
|
|
|
|
request.setBatchSize(100);
|
|
|
|
|
// 版本冲突时继续执行
|
|
|
|
|
request.setConflicts("proceed");
|
|
|
|
|
try {
|
|
|
|
|
restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
|
BulkByScrollResponse bulkByScrollResponse = restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
|
|
|
|
|
List<BulkItemResponse.Failure> bulkFailures = bulkByScrollResponse.getBulkFailures();
|
|
|
|
|
boolean flag = true;
|
|
|
|
|
for (BulkItemResponse.Failure bulkFailure : bulkFailures) {
|
|
|
|
|
log.error(bulkFailure.getMessage(), bulkFailure.getCause());
|
|
|
|
|
flag = false;
|
|
|
|
|
}
|
|
|
|
|
return flag;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* bean对象生命周期结束时,关闭连接
|
|
|
|
|
*/
|
|
|
|
|
@Override
|
|
|
|
|
public void destroy() {
|
|
|
|
|
try {
|
|
|
|
|
if (restHighLevelClient != null) {
|
|
|
|
|
restHighLevelClient.close();
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception exception) {
|
|
|
|
|
log.error(exception.getMessage(), exception);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|