admin
2020-05-06 24a8d17e007545f7426c48352109aa1a9c6587ee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
package org.fanli.elastic;
 
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
 
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
 
public class UpdateUtils {
 
    private static String clusterName = "my-application";
    private static String host = "192.168.1.200";
    private static Integer port = 9200;
 
    // 相当于数据库名称
    public static String indexName = "shose";
 
    // 初始化api客户端
    public static RestHighLevelClient client = new RestHighLevelClient(
            RestClient.builder(new HttpHost(host, port, "http")));
 
    public static void update1() {
        // Index 、 Document id
        UpdateRequest request = new UpdateRequest("posts", "1");
 
        // 1、Script 更新
        Map<String, Object> parameters = new HashMap<String, Object>();
        parameters.put("count", 4);
        Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.field += params.count", parameters);
        request.script(inline);
 
        // 2、Script 更新
        Script stored = new Script(ScriptType.STORED, null, "increment-field", parameters);
        request.script(stored);
 
        // 1、部分文档源String以JSON格式提供
        String jsonString = "{" + "\"updated\":\"2017-01-01\"," + "\"reason\":\"daily update\"" + "}";
        request.doc(jsonString, XContentType.JSON);
 
        // Map
        Map<String, Object> jsonMap = new HashMap<>();
        jsonMap.put("updated", new Date());
        jsonMap.put("reason", "daily update");
        UpdateRequest request2 = new UpdateRequest("posts", "1").doc(jsonMap);
 
        //
        try {
            XContentBuilder builder = XContentFactory.jsonBuilder();
            builder.startObject();
            {
                builder.timeField("updated", new Date());
                builder.field("reason", "daily update");
            }
            builder.endObject();
            UpdateRequest request3 = new UpdateRequest("posts", "1").doc(builder);
        } catch (Exception e) {
            e.printStackTrace();
        }
 
        UpdateRequest request4 = new UpdateRequest("posts", "1").doc("updated", new Date(), "reason", "daily update");
 
        String jsonString2 = "{\"created\":\"2017-01-01\"}";
        request.upsert(jsonString2, XContentType.JSON);
 
        // 为特定字段配置源包含
        String[] includes = new String[] { "updated", "r*" };
        String[] excludes = Strings.EMPTY_ARRAY;
        request.fetchSource(new FetchSourceContext(true, includes, excludes));
 
        // 为特定字段配置源排除
        String[] includes2 = Strings.EMPTY_ARRAY;
        String[] excludes2 = new String[] { "updated" };
        request.fetchSource(new FetchSourceContext(true, includes2, excludes2));
 
        request.setIfSeqNo(2L);
        request.setIfPrimaryTerm(1L);
 
        // 禁用noop检测
        request.detectNoop(false);
 
        // 指示脚本必须运行,而不管文档是否存在,即,脚本将负责创建尚不存在的文档。
        request.scriptedUpsert(true);
 
        // 指示部分文档(如果尚不存在)必须用作upsert文档。
        request.docAsUpsert(true);
 
        // 设置在继续更新操作之前必须处于活动状态的分片副本数
        request.waitForActiveShards(2);
        // 作为提供碎片拷贝数ActiveShardCount:可以是ActiveShardCount.ALL,
        // ActiveShardCount.ONE或ActiveShardCount.DEFAULT(默认)
        request.waitForActiveShards(ActiveShardCount.ALL);
    }
 
    public static void update2() {
 
        // 添加列
        UpdateRequest request = new UpdateRequest("posts", "1");
        String jsonString = "{" + "\"updated\":\"2017-01-01\"," + "\"reason\":\"daily update\"" + "}";
        request.doc(jsonString, XContentType.JSON);
        try {
            UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
 
//            
//            // 返回的值UpdateResponse允许检索有关已执行操作的信息
//            String index = updateResponse.getIndex();
//            String id = updateResponse.getId();
//            long version = updateResponse.getVersion();
//            if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
//                // 处理首次创建文档的情况(upsert)
//            } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
//                // 处理文档更新的情况
//            } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
//                // 处理文件被删除的情况
//            } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
//                // 处理文档不受更新影响的情况,即未对文档执行任何操作(空转)
//            }
//
//            // UpdateRequest 通过fetchSource方法启用了源检索后,响应将包含更新文档的源:
//            GetResult result = updateResponse.getGetResult(); // 检索更新的文档为 GetResult
//            if (result.isExists()) {
//                // 检索更新文档的来源为 String
//                String sourceAsString = result.sourceAsString();
//                // 检索更新文档的来源为 Map<String, Object>
//                Map<String, Object> sourceAsMap = result.sourceAsMap();
//                // 检索更新文档的来源为 byte[]
//                byte[] sourceAsBytes = result.source();
//            } else {
//                // 处理响应中不存在文档源的情况(默认情况下就是这种情况)
//            }
//
//            // 可以检查分片故障:
//            ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
//            if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
//                // 处理成功分片数量少于总分片数量的情况
//            }
//            if (shardInfo.getFailed() > 0) {
//                for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
//                    String reason = failure.reason();
//                }
//                // 处理潜在的故障
//            }
//
//            // 当UpdateRequest抵靠不存在一个文件执行时,响应具有404状态码,ElasticsearchException获取引发这就需要如下进行处理:
//            UpdateRequest request2 = new UpdateRequest("posts", "does_not_exist").doc("field", "value");
//            try {
//                UpdateResponse updateResponse2 = client.update(request2, RequestOptions.DEFAULT);
//            } catch (ElasticsearchException e) {
//                if (e.status() == RestStatus.NOT_FOUND) {
//                    // 处理由于文档不存在而引发的异常
//                }
//            }
//
//            // 如果存在版本冲突,ElasticsearchException将引发:
//            UpdateRequest request3 = new UpdateRequest("posts", "1").doc("field", "value").setIfSeqNo(101L)
//                    .setIfPrimaryTerm(200L);
//            try {
//                UpdateResponse updateResponse3 = client.update(request3, RequestOptions.DEFAULT);
//            } catch (ElasticsearchException e) {
//                if (e.status() == RestStatus.CONFLICT) {
//                    //     引发的异常表示返回了版本冲突错误。
//                }
//            }
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    public static void update3() {
        UpdateRequest request = new UpdateRequest("posts", "1");
        String jsonString = "{" + "\"updated\":\"2019-09-09\"," + "\"reason\":\"daily update9\"" + "}";
        request.doc(jsonString, XContentType.JSON);
        try {
            UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    
    public static void Multupdate1() {
        // 批量API仅支持以JSON或SMILE编码的文档。提供任何其他格式的文档将导致错误。
        BulkRequest request = new BulkRequest();
        request.add(new IndexRequest("posts").id("1").source(XContentType.JSON, "field", "foo"));
        request.add(new IndexRequest("posts").id("2").source(XContentType.JSON, "field", "bar"));
        request.add(new IndexRequest("posts").id("3").source(XContentType.JSON, "field", "baz"));
 
        BulkRequest request2 = new BulkRequest();
        request2.add(new DeleteRequest("posts", "3"));
        request2.add(new UpdateRequest("posts", "2").doc(XContentType.JSON, "other", "test"));
        request2.add(new IndexRequest("posts").id("4").source(XContentType.JSON, "field", "baz"));
 
        // 可选参数
        request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        request.setRefreshPolicy("wait_for");
 
        request.pipeline("pipelineId");
        request.routing("routingId");
        BulkRequest defaulted = new BulkRequest("posts");
 
        // 开始执行
        try {
            BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
 
            for (BulkItemResponse bulkItemResponse : bulkResponse) {
                DocWriteResponse itemResponse = bulkItemResponse.getResponse();
 
                switch (bulkItemResponse.getOpType()) {
                case INDEX:
                    // 检索操作的响应(成功与否),可以是 IndexResponse,UpdateResponse或DeleteResponse可全部被视为
                    // DocWriteResponse实例
                case CREATE:
                    // 处理更新操作的响应
                    IndexResponse indexResponse = (IndexResponse) itemResponse;
                    break;
                case UPDATE:
                    UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                    // 处理更新操作的响应
                    break;
                case DELETE:
                    // 处理删除操作的响应
                    DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                }
            }
 
            if (bulkResponse.hasFailures()) {
                // 批量响应提供了一种方法来快速检查一个或多个操作是否失败:
            }
 
            // 这种情况下,有必要遍历所有操作结果,以检查操作是否失败,如果失败,则检索相应的失败:
            for (BulkItemResponse bulkItemResponse : bulkResponse) {
                if (bulkItemResponse.isFailed()) {
                    BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                }
            }
 
            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
 
                }
 
                @Override
                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
 
                }
 
                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
 
                }
            };
 
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
 
    public static void main(String[] args) {
        update3();
        System.out.println("------------ 测试结束 -------------------------");
    }
 
}