admin
2020-05-06 24a8d17e007545f7426c48352109aa1a9c6587ee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
package org.fanli.elastic;
 
import static org.junit.Assert.assertNull;
 
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
 
import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RethrottleRequest;
import org.elasticsearch.client.core.MultiTermVectorsRequest;
import org.elasticsearch.client.core.MultiTermVectorsResponse;
import org.elasticsearch.client.core.TermVectorsRequest;
import org.elasticsearch.client.core.TermVectorsResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.TaskId;
 
public class QueryDOUtils {
 
    private static String clusterName = "my-application";
    private static String host = "192.168.1.200";
    private static Integer port = 9200;
 
    // 相当于数据库名称
    public static String indexName = "shose";
 
    // 初始化api客户端
    public static RestHighLevelClient client = new RestHighLevelClient(
            RestClient.builder(new HttpHost(host, port, "http")));
 
    public static void MultiGetRequest() {
        MultiGetRequest request = new MultiGetRequest();
        // Index、Document id
        request.add(new MultiGetRequest.Item("index", "example_id"));
        request.add(new MultiGetRequest.Item("index", "another_id"));
 
        /* 可选参数 */
        // 禁用源检索,默认情况下启用
        request.add(new MultiGetRequest.Item("index", "example_id")
                .fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE));
 
        // 1、为特定字段配置源包含
        String[] includes = new String[] { "foo", "*r" };
        String[] excludes = Strings.EMPTY_ARRAY;
        FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
        request.add(new MultiGetRequest.Item("index", "example_id").fetchSourceContext(fetchSourceContext));
 
        // 2、为特定字段配置源排除
        String[] includes2 = Strings.EMPTY_ARRAY;
        String[] excludes2 = new String[] { "foo", "*r" };
        FetchSourceContext fetchSourceContext2 = new FetchSourceContext(true, includes2, excludes2);
        request.add(new MultiGetRequest.Item("index", "example_id").fetchSourceContext(fetchSourceContext2));
 
        // 配置特定存储字段的检索(要求将字段分别存储在映射中) 检索foo存储的字段(要求将字段单独存储在映射中)
        request.add(new MultiGetRequest.Item("index", "example_id").storedFields("foo"));
        try {
            MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
            MultiGetItemResponse item = response.getResponses()[0];
            String value = item.getResponse().getField("foo").getValue();
        } catch (IOException e) {
            e.printStackTrace();
        }
 
        request.add(new MultiGetRequest.Item("index", "with_routing").routing("some_routing"));
        request.add(
                new MultiGetRequest.Item("index", "with_version").versionType(VersionType.EXTERNAL).version(10123L));
 
        // 偏好值
        request.preference("some_preference");
        // 将实时标志设置为false(true默认情况下)
        request.realtime(false);
        // 检索文档之前执行刷新(false默认情况下)
        request.refresh(true);
 
        try {
            MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
 
            MultiGetItemResponse firstItem = response.getResponses()[0];
            // getFailure 返回null,因为没有失败。
            assertNull(firstItem.getFailure());
 
            GetResponse firstGet = firstItem.getResponse();
            String index = firstItem.getIndex();
            String id = firstItem.getId();
            if (firstGet.isExists()) {
                long version = firstGet.getVersion();
                // 将该文档检索为 String
                String sourceAsString = firstGet.getSourceAsString();
                // 将该文档检索为 Map<String, Object>
                Map<String, Object> sourceAsMap = firstGet.getSourceAsMap();
                // 将该文档检索为 byte[]
                byte[] sourceAsBytes = firstGet.getSourceAsBytes();
            } else {
                // 处理找不到文档的情况。请注意,尽管返回的响应具有404状态码,但返回的是有效值GetResponse,而不是引发异常。
                // 这样的响应不包含任何源文档,并且其isExists方法返回false。
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    public static void queryUpdate() {
        // 一个ReindexRequest可以用来从一个或多个索引文件复制到目标指数。
        // 它要求在请求之前可能存在或可能不存在的现有源索引和目标索引。Reindex不会尝试设置目标索引。
        // 它不会复制源索引的设置。您应该在运行_reindex操作之前设置目标索引,包括设置映射,分片计数,副本等。
        ReindexRequest request = new ReindexRequest();
        request.setSourceIndices("source1", "source2"); // 添加要复制的来源列表
        request.setDestIndex("dest"); // 添加目标索引
 
        // 。设置versionType为external将导致Elasticsearch从源保留版本,创建丢失的所有文档,并更新目标索引中比源索引中具有旧版本的任何文档。
        request.setDestVersionType(VersionType.EXTERNAL);
        // 设置opType为create将会导致_reindex仅在目标索引中创建丢失的文档。所有现有文档都将导致版本冲突。默认opType值为index。
        request.setDestOpType("create");
        // 默认情况下,版本冲突会中止该_reindex过程,但是您可以使用以下方法来计算它们:
        request.setConflicts("proceed");
 
        // 添加查询来限制文档。 仅复制字段user设置为kimchy
        request.setSourceQuery(new TermQueryBuilder("user", "kimchy"));
        // 限制已处理文档的数量maxDocs。
        request.setMaxDocs(10);
        // 默认情况下_reindex使用的批次为1000。您可以使用更改批次大小sourceBatchSize。
        request.setSourceBatchSize(100);
 
        // Reindex也可以通过指定来使用摄取功能pipeline。
        request.setDestPipeline("my_pipeline");
 
        // 如果要从源索引中获取一组特定的文档,则需要使用sort。如果可能,请选择更具选择性的查询,而不是maxDocs和排序。
        request.addSortField("field1", SortOrder.DESC);
        request.addSortField("field2", SortOrder.ASC);
 
        // 支持script修改文档。它还允许您更改文档的元数据。
        // setScriptlikes使用user 增大所有文档上的字段kimchy。
        request.setScript(new Script(ScriptType.INLINE, "painless",
                "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}", Collections.emptyMap()));
 
        // ReindexRequest支持从远程Elasticsearch集群重新索引。使用远程集群时,应在RemoteInfo对象内部指定查询,而不要使用setSourceQuery。
        // 如果同时设置了远程信息和源查询,则会在请求期间导致验证错误。这样做的原因是远程Elasticsearch可能无法理解现代查询构建器构建的查询。
        // 远程集群支持一直运行到Elasticsearch 0.90,此后查询语言已更改。达到旧版本时,以JSON手动编写查询会更安全。
//        request.setRemoteInfo(new RemoteInfo("http", remoteHost, remotePort, null,
//                new BytesArray(new MatchAllQueryBuilder().toString()), user, password, Collections.emptyMap(),
//                new TimeValue(100, TimeUnit.MILLISECONDS), new TimeValue(100, TimeUnit.SECONDS)));
 
        try {
            BulkByScrollResponse bulkResponse = client.reindex(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    public static void queryUpdate2() {
        UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2");
 
        request.setConflicts("proceed");
        request.setQuery(new TermQueryBuilder("user", "kimchy"));
        request.setMaxDocs(10);
        request.setBatchSize(100);
        request.setPipeline("my_pipeline");
 
        request.setScript(new Script(ScriptType.INLINE, "painless",
                "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}", Collections.emptyMap()));
 
        request.setSlices(2);
 
        request.setScroll(TimeValue.timeValueMinutes(10));
        request.setRouting("=cat");
 
        request.setTimeout(TimeValue.timeValueMinutes(2));
        request.setRefresh(true);
        request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
        try {
            BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
 
            // 获取总时间
            TimeValue timeTaken = bulkResponse.getTook();
            // 检查请求是否超时
            boolean timedOut = bulkResponse.isTimedOut();
            // 获取处理的文档总数
            long totalDocs = bulkResponse.getTotal();
            // 已更新的文档数
            long updatedDocs = bulkResponse.getUpdated();
            // 被删除的文档数
            long deletedDocs = bulkResponse.getDeleted();
            // 已执行的批次数
            long batches = bulkResponse.getBatches();
            // 跳过的文档数
            long noops = bulkResponse.getNoops();
            // 版本冲突数
            long versionConflicts = bulkResponse.getVersionConflicts();
            // 请求必须重试批量索引操作的次数
            long bulkRetries = bulkResponse.getBulkRetries();
            // 请求必须重试搜索操作的次数
            long searchRetries = bulkResponse.getSearchRetries();
            // 如果该请求当前处于睡眠状态,则该请求已进行自身限制的总时间不包括当前的限制时间
            TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
            // 当前节气门睡眠的剩余延迟;如果未睡眠,则为0
            TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil();
            // 搜索阶段失败
            List<ScrollableHitSource.SearchFailure> searchFailures = bulkResponse.getSearchFailures();
            // 大容量索引操作期间发生故障
            List<BulkItemResponse.Failure> bulkFailures = bulkResponse.getBulkFailures();
 
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    // 按查询删除
    public static void queryDelete() {
        DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2");
        request.setConflicts("proceed");
        request.setQuery(new TermQueryBuilder("user", "kimchy"));
        request.setMaxDocs(10);
        request.setBatchSize(100);
        request.setSlices(2); // 设置要使用的切片数
        request.setScroll(TimeValue.timeValueMinutes(10)); // 使用scroll参数来控制它使“搜索上下文”保持活动的时间。
        request.setRouting("=cat");
        request.setTimeout(TimeValue.timeValueMinutes(2)); // 等待通过查询请求执行删除的超时 TimeValue
        request.setRefresh(true); // 通过查询调用删除后刷新索引
        request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); // 设置索引选项
        try {
            BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    public static void RethrottleRequest(TaskId taskId) {
        // 可用于更改正在运行的重新索引,按查询更新或按查询删除任务的当前限制,或完全禁用该任务的限制。它需要更改任务的任务ID。
        RethrottleRequest request = new RethrottleRequest(taskId);
 
        RethrottleRequest request2 = new RethrottleRequest(taskId, 100.0f);
 
        try {
            // 执行重新索引重新调节请求
            client.reindexRethrottle(request, RequestOptions.DEFAULT);
            // 通过查询更新相同
            client.updateByQueryRethrottle(request, RequestOptions.DEFAULT);
            // 通过查询删除相同
            client.deleteByQueryRethrottle(request, RequestOptions.DEFAULT);
        } catch (Exception e) {
        }
    }
 
    public static void main(String[] args) {
        System.out.println("------------ 测试结束 -------------------------");
    }
 
}