package org.fanli.elastic; import static org.junit.Assert.assertNull; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.http.HttpHost; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RethrottleRequest; import org.elasticsearch.client.core.MultiTermVectorsRequest; import org.elasticsearch.client.core.MultiTermVectorsResponse; import org.elasticsearch.client.core.TermVectorsRequest; import org.elasticsearch.client.core.TermVectorsResponse; import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.ScrollableHitSource; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.tasks.TaskId; public class QueryDOUtils { private static String clusterName = "my-application"; private static String host = "192.168.1.200"; private static Integer port = 9200; // 相当于数据库名称 public static String indexName = "shose"; // 初始化api客户端 public static RestHighLevelClient client = new RestHighLevelClient( RestClient.builder(new HttpHost(host, port, "http"))); public static void MultiGetRequest() { MultiGetRequest request = new MultiGetRequest(); // Index、Document id request.add(new MultiGetRequest.Item("index", "example_id")); request.add(new MultiGetRequest.Item("index", "another_id")); /* 可选参数 */ // 禁用源检索,默认情况下启用 request.add(new MultiGetRequest.Item("index", "example_id") .fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE)); // 1、为特定字段配置源包含 String[] includes = new String[] { "foo", "*r" }; String[] excludes = Strings.EMPTY_ARRAY; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.add(new MultiGetRequest.Item("index", "example_id").fetchSourceContext(fetchSourceContext)); // 2、为特定字段配置源排除 String[] includes2 = Strings.EMPTY_ARRAY; String[] excludes2 = new String[] { "foo", "*r" }; FetchSourceContext fetchSourceContext2 = new FetchSourceContext(true, includes2, excludes2); request.add(new MultiGetRequest.Item("index", "example_id").fetchSourceContext(fetchSourceContext2)); // 配置特定存储字段的检索(要求将字段分别存储在映射中) 检索foo存储的字段(要求将字段单独存储在映射中) request.add(new MultiGetRequest.Item("index", "example_id").storedFields("foo")); try { MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT); MultiGetItemResponse item = response.getResponses()[0]; String value = item.getResponse().getField("foo").getValue(); } catch (IOException e) { e.printStackTrace(); } request.add(new MultiGetRequest.Item("index", "with_routing").routing("some_routing")); request.add( new MultiGetRequest.Item("index", "with_version").versionType(VersionType.EXTERNAL).version(10123L)); // 偏好值 request.preference("some_preference"); // 将实时标志设置为false(true默认情况下) request.realtime(false); // 检索文档之前执行刷新(false默认情况下) request.refresh(true); try { MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT); MultiGetItemResponse firstItem = response.getResponses()[0]; // getFailure 返回null,因为没有失败。 assertNull(firstItem.getFailure()); GetResponse firstGet = firstItem.getResponse(); String index = firstItem.getIndex(); String id = firstItem.getId(); if (firstGet.isExists()) { long version = firstGet.getVersion(); // 将该文档检索为 String String sourceAsString = firstGet.getSourceAsString(); // 将该文档检索为 Map Map sourceAsMap = firstGet.getSourceAsMap(); // 将该文档检索为 byte[] byte[] sourceAsBytes = firstGet.getSourceAsBytes(); } else { // 处理找不到文档的情况。请注意,尽管返回的响应具有404状态码,但返回的是有效值GetResponse,而不是引发异常。 // 这样的响应不包含任何源文档,并且其isExists方法返回false。 } } catch (IOException e) { e.printStackTrace(); } } public static void queryUpdate() { // 一个ReindexRequest可以用来从一个或多个索引文件复制到目标指数。 // 它要求在请求之前可能存在或可能不存在的现有源索引和目标索引。Reindex不会尝试设置目标索引。 // 它不会复制源索引的设置。您应该在运行_reindex操作之前设置目标索引,包括设置映射,分片计数,副本等。 ReindexRequest request = new ReindexRequest(); request.setSourceIndices("source1", "source2"); // 添加要复制的来源列表 request.setDestIndex("dest"); // 添加目标索引 // 。设置versionType为external将导致Elasticsearch从源保留版本,创建丢失的所有文档,并更新目标索引中比源索引中具有旧版本的任何文档。 request.setDestVersionType(VersionType.EXTERNAL); // 设置opType为create将会导致_reindex仅在目标索引中创建丢失的文档。所有现有文档都将导致版本冲突。默认opType值为index。 request.setDestOpType("create"); // 默认情况下,版本冲突会中止该_reindex过程,但是您可以使用以下方法来计算它们: request.setConflicts("proceed"); // 添加查询来限制文档。 仅复制字段user设置为kimchy request.setSourceQuery(new TermQueryBuilder("user", "kimchy")); // 限制已处理文档的数量maxDocs。 request.setMaxDocs(10); // 默认情况下_reindex使用的批次为1000。您可以使用更改批次大小sourceBatchSize。 request.setSourceBatchSize(100); // Reindex也可以通过指定来使用摄取功能pipeline。 request.setDestPipeline("my_pipeline"); // 如果要从源索引中获取一组特定的文档,则需要使用sort。如果可能,请选择更具选择性的查询,而不是maxDocs和排序。 request.addSortField("field1", SortOrder.DESC); request.addSortField("field2", SortOrder.ASC); // 支持script修改文档。它还允许您更改文档的元数据。 // setScriptlikes使用user 增大所有文档上的字段kimchy。 request.setScript(new Script(ScriptType.INLINE, "painless", "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}", Collections.emptyMap())); // ReindexRequest支持从远程Elasticsearch集群重新索引。使用远程集群时,应在RemoteInfo对象内部指定查询,而不要使用setSourceQuery。 // 如果同时设置了远程信息和源查询,则会在请求期间导致验证错误。这样做的原因是远程Elasticsearch可能无法理解现代查询构建器构建的查询。 // 远程集群支持一直运行到Elasticsearch 0.90,此后查询语言已更改。达到旧版本时,以JSON手动编写查询会更安全。 // request.setRemoteInfo(new RemoteInfo("http", remoteHost, remotePort, null, // new BytesArray(new MatchAllQueryBuilder().toString()), user, password, Collections.emptyMap(), // new TimeValue(100, TimeUnit.MILLISECONDS), new TimeValue(100, TimeUnit.SECONDS))); try { BulkByScrollResponse bulkResponse = client.reindex(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } } public static void queryUpdate2() { UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2"); request.setConflicts("proceed"); request.setQuery(new TermQueryBuilder("user", "kimchy")); request.setMaxDocs(10); request.setBatchSize(100); request.setPipeline("my_pipeline"); request.setScript(new Script(ScriptType.INLINE, "painless", "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}", Collections.emptyMap())); request.setSlices(2); request.setScroll(TimeValue.timeValueMinutes(10)); request.setRouting("=cat"); request.setTimeout(TimeValue.timeValueMinutes(2)); request.setRefresh(true); request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); try { BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT); // 获取总时间 TimeValue timeTaken = bulkResponse.getTook(); // 检查请求是否超时 boolean timedOut = bulkResponse.isTimedOut(); // 获取处理的文档总数 long totalDocs = bulkResponse.getTotal(); // 已更新的文档数 long updatedDocs = bulkResponse.getUpdated(); // 被删除的文档数 long deletedDocs = bulkResponse.getDeleted(); // 已执行的批次数 long batches = bulkResponse.getBatches(); // 跳过的文档数 long noops = bulkResponse.getNoops(); // 版本冲突数 long versionConflicts = bulkResponse.getVersionConflicts(); // 请求必须重试批量索引操作的次数 long bulkRetries = bulkResponse.getBulkRetries(); // 请求必须重试搜索操作的次数 long searchRetries = bulkResponse.getSearchRetries(); // 如果该请求当前处于睡眠状态,则该请求已进行自身限制的总时间不包括当前的限制时间 TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); // 当前节气门睡眠的剩余延迟;如果未睡眠,则为0 TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil(); // 搜索阶段失败 List searchFailures = bulkResponse.getSearchFailures(); // 大容量索引操作期间发生故障 List bulkFailures = bulkResponse.getBulkFailures(); } catch (IOException e) { e.printStackTrace(); } } // 按查询删除 public static void queryDelete() { DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2"); request.setConflicts("proceed"); request.setQuery(new TermQueryBuilder("user", "kimchy")); request.setMaxDocs(10); request.setBatchSize(100); request.setSlices(2); // 设置要使用的切片数 request.setScroll(TimeValue.timeValueMinutes(10)); // 使用scroll参数来控制它使“搜索上下文”保持活动的时间。 request.setRouting("=cat"); request.setTimeout(TimeValue.timeValueMinutes(2)); // 等待通过查询请求执行删除的超时 TimeValue request.setRefresh(true); // 通过查询调用删除后刷新索引 request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); // 设置索引选项 try { BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } } public static void RethrottleRequest(TaskId taskId) { // 可用于更改正在运行的重新索引,按查询更新或按查询删除任务的当前限制,或完全禁用该任务的限制。它需要更改任务的任务ID。 RethrottleRequest request = new RethrottleRequest(taskId); RethrottleRequest request2 = new RethrottleRequest(taskId, 100.0f); try { // 执行重新索引重新调节请求 client.reindexRethrottle(request, RequestOptions.DEFAULT); // 通过查询更新相同 client.updateByQueryRethrottle(request, RequestOptions.DEFAULT); // 通过查询删除相同 client.deleteByQueryRethrottle(request, RequestOptions.DEFAULT); } catch (Exception e) { } } public static void main(String[] args) { System.out.println("------------ 测试结束 -------------------------"); } }