package org.fanli.elastic;
|
|
import static org.junit.Assert.assertNull;
|
|
import java.io.IOException;
|
import java.util.Collections;
|
import java.util.List;
|
import java.util.Map;
|
|
import org.apache.http.HttpHost;
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
import org.elasticsearch.action.get.GetResponse;
|
import org.elasticsearch.action.get.MultiGetItemResponse;
|
import org.elasticsearch.action.get.MultiGetRequest;
|
import org.elasticsearch.action.get.MultiGetResponse;
|
import org.elasticsearch.action.support.IndicesOptions;
|
import org.elasticsearch.client.RequestOptions;
|
import org.elasticsearch.client.RestClient;
|
import org.elasticsearch.client.RestHighLevelClient;
|
import org.elasticsearch.client.RethrottleRequest;
|
import org.elasticsearch.client.core.MultiTermVectorsRequest;
|
import org.elasticsearch.client.core.MultiTermVectorsResponse;
|
import org.elasticsearch.client.core.TermVectorsRequest;
|
import org.elasticsearch.client.core.TermVectorsResponse;
|
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.index.VersionType;
|
import org.elasticsearch.index.query.TermQueryBuilder;
|
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
import org.elasticsearch.index.reindex.ReindexRequest;
|
import org.elasticsearch.index.reindex.ScrollableHitSource;
|
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
import org.elasticsearch.script.Script;
|
import org.elasticsearch.script.ScriptType;
|
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
import org.elasticsearch.search.sort.SortOrder;
|
import org.elasticsearch.tasks.TaskId;
|
|
public class QueryDOUtils {
|
|
private static String clusterName = "my-application";
|
private static String host = "192.168.1.200";
|
private static Integer port = 9200;
|
|
// 相当于数据库名称
|
public static String indexName = "shose";
|
|
// 初始化api客户端
|
public static RestHighLevelClient client = new RestHighLevelClient(
|
RestClient.builder(new HttpHost(host, port, "http")));
|
|
public static void MultiGetRequest() {
|
MultiGetRequest request = new MultiGetRequest();
|
// Index、Document id
|
request.add(new MultiGetRequest.Item("index", "example_id"));
|
request.add(new MultiGetRequest.Item("index", "another_id"));
|
|
/* 可选参数 */
|
// 禁用源检索,默认情况下启用
|
request.add(new MultiGetRequest.Item("index", "example_id")
|
.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE));
|
|
// 1、为特定字段配置源包含
|
String[] includes = new String[] { "foo", "*r" };
|
String[] excludes = Strings.EMPTY_ARRAY;
|
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
|
request.add(new MultiGetRequest.Item("index", "example_id").fetchSourceContext(fetchSourceContext));
|
|
// 2、为特定字段配置源排除
|
String[] includes2 = Strings.EMPTY_ARRAY;
|
String[] excludes2 = new String[] { "foo", "*r" };
|
FetchSourceContext fetchSourceContext2 = new FetchSourceContext(true, includes2, excludes2);
|
request.add(new MultiGetRequest.Item("index", "example_id").fetchSourceContext(fetchSourceContext2));
|
|
// 配置特定存储字段的检索(要求将字段分别存储在映射中) 检索foo存储的字段(要求将字段单独存储在映射中)
|
request.add(new MultiGetRequest.Item("index", "example_id").storedFields("foo"));
|
try {
|
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
|
MultiGetItemResponse item = response.getResponses()[0];
|
String value = item.getResponse().getField("foo").getValue();
|
} catch (IOException e) {
|
e.printStackTrace();
|
}
|
|
request.add(new MultiGetRequest.Item("index", "with_routing").routing("some_routing"));
|
request.add(
|
new MultiGetRequest.Item("index", "with_version").versionType(VersionType.EXTERNAL).version(10123L));
|
|
// 偏好值
|
request.preference("some_preference");
|
// 将实时标志设置为false(true默认情况下)
|
request.realtime(false);
|
// 检索文档之前执行刷新(false默认情况下)
|
request.refresh(true);
|
|
try {
|
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
|
|
MultiGetItemResponse firstItem = response.getResponses()[0];
|
// getFailure 返回null,因为没有失败。
|
assertNull(firstItem.getFailure());
|
|
GetResponse firstGet = firstItem.getResponse();
|
String index = firstItem.getIndex();
|
String id = firstItem.getId();
|
if (firstGet.isExists()) {
|
long version = firstGet.getVersion();
|
// 将该文档检索为 String
|
String sourceAsString = firstGet.getSourceAsString();
|
// 将该文档检索为 Map<String, Object>
|
Map<String, Object> sourceAsMap = firstGet.getSourceAsMap();
|
// 将该文档检索为 byte[]
|
byte[] sourceAsBytes = firstGet.getSourceAsBytes();
|
} else {
|
// 处理找不到文档的情况。请注意,尽管返回的响应具有404状态码,但返回的是有效值GetResponse,而不是引发异常。
|
// 这样的响应不包含任何源文档,并且其isExists方法返回false。
|
}
|
} catch (IOException e) {
|
e.printStackTrace();
|
}
|
}
|
|
public static void queryUpdate() {
|
// 一个ReindexRequest可以用来从一个或多个索引文件复制到目标指数。
|
// 它要求在请求之前可能存在或可能不存在的现有源索引和目标索引。Reindex不会尝试设置目标索引。
|
// 它不会复制源索引的设置。您应该在运行_reindex操作之前设置目标索引,包括设置映射,分片计数,副本等。
|
ReindexRequest request = new ReindexRequest();
|
request.setSourceIndices("source1", "source2"); // 添加要复制的来源列表
|
request.setDestIndex("dest"); // 添加目标索引
|
|
// 。设置versionType为external将导致Elasticsearch从源保留版本,创建丢失的所有文档,并更新目标索引中比源索引中具有旧版本的任何文档。
|
request.setDestVersionType(VersionType.EXTERNAL);
|
// 设置opType为create将会导致_reindex仅在目标索引中创建丢失的文档。所有现有文档都将导致版本冲突。默认opType值为index。
|
request.setDestOpType("create");
|
// 默认情况下,版本冲突会中止该_reindex过程,但是您可以使用以下方法来计算它们:
|
request.setConflicts("proceed");
|
|
// 添加查询来限制文档。 仅复制字段user设置为kimchy
|
request.setSourceQuery(new TermQueryBuilder("user", "kimchy"));
|
// 限制已处理文档的数量maxDocs。
|
request.setMaxDocs(10);
|
// 默认情况下_reindex使用的批次为1000。您可以使用更改批次大小sourceBatchSize。
|
request.setSourceBatchSize(100);
|
|
// Reindex也可以通过指定来使用摄取功能pipeline。
|
request.setDestPipeline("my_pipeline");
|
|
// 如果要从源索引中获取一组特定的文档,则需要使用sort。如果可能,请选择更具选择性的查询,而不是maxDocs和排序。
|
request.addSortField("field1", SortOrder.DESC);
|
request.addSortField("field2", SortOrder.ASC);
|
|
// 支持script修改文档。它还允许您更改文档的元数据。
|
// setScriptlikes使用user 增大所有文档上的字段kimchy。
|
request.setScript(new Script(ScriptType.INLINE, "painless",
|
"if (ctx._source.user == 'kimchy') {ctx._source.likes++;}", Collections.emptyMap()));
|
|
// ReindexRequest支持从远程Elasticsearch集群重新索引。使用远程集群时,应在RemoteInfo对象内部指定查询,而不要使用setSourceQuery。
|
// 如果同时设置了远程信息和源查询,则会在请求期间导致验证错误。这样做的原因是远程Elasticsearch可能无法理解现代查询构建器构建的查询。
|
// 远程集群支持一直运行到Elasticsearch 0.90,此后查询语言已更改。达到旧版本时,以JSON手动编写查询会更安全。
|
// request.setRemoteInfo(new RemoteInfo("http", remoteHost, remotePort, null,
|
// new BytesArray(new MatchAllQueryBuilder().toString()), user, password, Collections.emptyMap(),
|
// new TimeValue(100, TimeUnit.MILLISECONDS), new TimeValue(100, TimeUnit.SECONDS)));
|
|
try {
|
BulkByScrollResponse bulkResponse = client.reindex(request, RequestOptions.DEFAULT);
|
} catch (IOException e) {
|
e.printStackTrace();
|
}
|
}
|
|
public static void queryUpdate2() {
|
UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2");
|
|
request.setConflicts("proceed");
|
request.setQuery(new TermQueryBuilder("user", "kimchy"));
|
request.setMaxDocs(10);
|
request.setBatchSize(100);
|
request.setPipeline("my_pipeline");
|
|
request.setScript(new Script(ScriptType.INLINE, "painless",
|
"if (ctx._source.user == 'kimchy') {ctx._source.likes++;}", Collections.emptyMap()));
|
|
request.setSlices(2);
|
|
request.setScroll(TimeValue.timeValueMinutes(10));
|
request.setRouting("=cat");
|
|
request.setTimeout(TimeValue.timeValueMinutes(2));
|
request.setRefresh(true);
|
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
|
try {
|
BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
|
|
// 获取总时间
|
TimeValue timeTaken = bulkResponse.getTook();
|
// 检查请求是否超时
|
boolean timedOut = bulkResponse.isTimedOut();
|
// 获取处理的文档总数
|
long totalDocs = bulkResponse.getTotal();
|
// 已更新的文档数
|
long updatedDocs = bulkResponse.getUpdated();
|
// 被删除的文档数
|
long deletedDocs = bulkResponse.getDeleted();
|
// 已执行的批次数
|
long batches = bulkResponse.getBatches();
|
// 跳过的文档数
|
long noops = bulkResponse.getNoops();
|
// 版本冲突数
|
long versionConflicts = bulkResponse.getVersionConflicts();
|
// 请求必须重试批量索引操作的次数
|
long bulkRetries = bulkResponse.getBulkRetries();
|
// 请求必须重试搜索操作的次数
|
long searchRetries = bulkResponse.getSearchRetries();
|
// 如果该请求当前处于睡眠状态,则该请求已进行自身限制的总时间不包括当前的限制时间
|
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
|
// 当前节气门睡眠的剩余延迟;如果未睡眠,则为0
|
TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil();
|
// 搜索阶段失败
|
List<ScrollableHitSource.SearchFailure> searchFailures = bulkResponse.getSearchFailures();
|
// 大容量索引操作期间发生故障
|
List<BulkItemResponse.Failure> bulkFailures = bulkResponse.getBulkFailures();
|
|
} catch (IOException e) {
|
e.printStackTrace();
|
}
|
}
|
|
// 按查询删除
|
public static void queryDelete() {
|
DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2");
|
request.setConflicts("proceed");
|
request.setQuery(new TermQueryBuilder("user", "kimchy"));
|
request.setMaxDocs(10);
|
request.setBatchSize(100);
|
request.setSlices(2); // 设置要使用的切片数
|
request.setScroll(TimeValue.timeValueMinutes(10)); // 使用scroll参数来控制它使“搜索上下文”保持活动的时间。
|
request.setRouting("=cat");
|
request.setTimeout(TimeValue.timeValueMinutes(2)); // 等待通过查询请求执行删除的超时 TimeValue
|
request.setRefresh(true); // 通过查询调用删除后刷新索引
|
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); // 设置索引选项
|
try {
|
BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);
|
} catch (IOException e) {
|
e.printStackTrace();
|
}
|
}
|
|
public static void RethrottleRequest(TaskId taskId) {
|
// 可用于更改正在运行的重新索引,按查询更新或按查询删除任务的当前限制,或完全禁用该任务的限制。它需要更改任务的任务ID。
|
RethrottleRequest request = new RethrottleRequest(taskId);
|
|
RethrottleRequest request2 = new RethrottleRequest(taskId, 100.0f);
|
|
try {
|
// 执行重新索引重新调节请求
|
client.reindexRethrottle(request, RequestOptions.DEFAULT);
|
// 通过查询更新相同
|
client.updateByQueryRethrottle(request, RequestOptions.DEFAULT);
|
// 通过查询删除相同
|
client.deleteByQueryRethrottle(request, RequestOptions.DEFAULT);
|
} catch (Exception e) {
|
}
|
}
|
|
public static void main(String[] args) {
|
System.out.println("------------ 测试结束 -------------------------");
|
}
|
|
}
|