package org.fanli.elastic; import java.io.IOException; import java.util.Date; import java.util.HashMap; import java.util.Map; import org.apache.http.HttpHost; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.Strings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; public class UpdateUtils { private static String clusterName = "my-application"; private static String host = "192.168.1.200"; private static Integer port = 9200; // 相当于数据库名称 public static String indexName = "shose"; // 初始化api客户端 public static RestHighLevelClient client = new RestHighLevelClient( RestClient.builder(new HttpHost(host, port, "http"))); public static void update1() { // Index 、 Document id UpdateRequest request = new UpdateRequest("posts", "1"); // 1、Script 更新 Map parameters = new HashMap(); parameters.put("count", 4); Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.field += params.count", parameters); request.script(inline); // 2、Script 更新 Script stored = new Script(ScriptType.STORED, null, "increment-field", parameters); request.script(stored); // 1、部分文档源String以JSON格式提供 String jsonString = "{" + "\"updated\":\"2017-01-01\"," + "\"reason\":\"daily update\"" + "}"; request.doc(jsonString, XContentType.JSON); // Map Map jsonMap = new HashMap<>(); jsonMap.put("updated", new Date()); jsonMap.put("reason", "daily update"); UpdateRequest request2 = new UpdateRequest("posts", "1").doc(jsonMap); // try { XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.timeField("updated", new Date()); builder.field("reason", "daily update"); } builder.endObject(); UpdateRequest request3 = new UpdateRequest("posts", "1").doc(builder); } catch (Exception e) { e.printStackTrace(); } UpdateRequest request4 = new UpdateRequest("posts", "1").doc("updated", new Date(), "reason", "daily update"); String jsonString2 = "{\"created\":\"2017-01-01\"}"; request.upsert(jsonString2, XContentType.JSON); // 为特定字段配置源包含 String[] includes = new String[] { "updated", "r*" }; String[] excludes = Strings.EMPTY_ARRAY; request.fetchSource(new FetchSourceContext(true, includes, excludes)); // 为特定字段配置源排除 String[] includes2 = Strings.EMPTY_ARRAY; String[] excludes2 = new String[] { "updated" }; request.fetchSource(new FetchSourceContext(true, includes2, excludes2)); request.setIfSeqNo(2L); request.setIfPrimaryTerm(1L); // 禁用noop检测 request.detectNoop(false); // 指示脚本必须运行,而不管文档是否存在,即,脚本将负责创建尚不存在的文档。 request.scriptedUpsert(true); // 指示部分文档(如果尚不存在)必须用作upsert文档。 request.docAsUpsert(true); // 设置在继续更新操作之前必须处于活动状态的分片副本数 request.waitForActiveShards(2); // 作为提供碎片拷贝数ActiveShardCount:可以是ActiveShardCount.ALL, // ActiveShardCount.ONE或ActiveShardCount.DEFAULT(默认) request.waitForActiveShards(ActiveShardCount.ALL); } public static void update2() { // 添加列 UpdateRequest request = new UpdateRequest("posts", "1"); String jsonString = "{" + "\"updated\":\"2017-01-01\"," + "\"reason\":\"daily update\"" + "}"; request.doc(jsonString, XContentType.JSON); try { UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT); // // // 返回的值UpdateResponse允许检索有关已执行操作的信息 // String index = updateResponse.getIndex(); // String id = updateResponse.getId(); // long version = updateResponse.getVersion(); // if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) { // // 处理首次创建文档的情况(upsert) // } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) { // // 处理文档更新的情况 // } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) { // // 处理文件被删除的情况 // } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) { // // 处理文档不受更新影响的情况,即未对文档执行任何操作(空转) // } // // // UpdateRequest 通过fetchSource方法启用了源检索后,响应将包含更新文档的源: // GetResult result = updateResponse.getGetResult(); // 检索更新的文档为 GetResult // if (result.isExists()) { // // 检索更新文档的来源为 String // String sourceAsString = result.sourceAsString(); // // 检索更新文档的来源为 Map // Map sourceAsMap = result.sourceAsMap(); // // 检索更新文档的来源为 byte[] // byte[] sourceAsBytes = result.source(); // } else { // // 处理响应中不存在文档源的情况(默认情况下就是这种情况) // } // // // 可以检查分片故障: // ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo(); // if (shardInfo.getTotal() != shardInfo.getSuccessful()) { // // 处理成功分片数量少于总分片数量的情况 // } // if (shardInfo.getFailed() > 0) { // for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { // String reason = failure.reason(); // } // // 处理潜在的故障 // } // // // 当UpdateRequest抵靠不存在一个文件执行时,响应具有404状态码,ElasticsearchException获取引发这就需要如下进行处理: // UpdateRequest request2 = new UpdateRequest("posts", "does_not_exist").doc("field", "value"); // try { // UpdateResponse updateResponse2 = client.update(request2, RequestOptions.DEFAULT); // } catch (ElasticsearchException e) { // if (e.status() == RestStatus.NOT_FOUND) { // // 处理由于文档不存在而引发的异常 // } // } // // // 如果存在版本冲突,ElasticsearchException将引发: // UpdateRequest request3 = new UpdateRequest("posts", "1").doc("field", "value").setIfSeqNo(101L) // .setIfPrimaryTerm(200L); // try { // UpdateResponse updateResponse3 = client.update(request3, RequestOptions.DEFAULT); // } catch (ElasticsearchException e) { // if (e.status() == RestStatus.CONFLICT) { // // 引发的异常表示返回了版本冲突错误。 // } // } } catch (Exception e) { e.printStackTrace(); } } public static void update3() { UpdateRequest request = new UpdateRequest("posts", "1"); String jsonString = "{" + "\"updated\":\"2019-09-09\"," + "\"reason\":\"daily update9\"" + "}"; request.doc(jsonString, XContentType.JSON); try { UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT); } catch (Exception e) { e.printStackTrace(); } } public static void Multupdate1() { // 批量API仅支持以JSON或SMILE编码的文档。提供任何其他格式的文档将导致错误。 BulkRequest request = new BulkRequest(); request.add(new IndexRequest("posts").id("1").source(XContentType.JSON, "field", "foo")); request.add(new IndexRequest("posts").id("2").source(XContentType.JSON, "field", "bar")); request.add(new IndexRequest("posts").id("3").source(XContentType.JSON, "field", "baz")); BulkRequest request2 = new BulkRequest(); request2.add(new DeleteRequest("posts", "3")); request2.add(new UpdateRequest("posts", "2").doc(XContentType.JSON, "other", "test")); request2.add(new IndexRequest("posts").id("4").source(XContentType.JSON, "field", "baz")); // 可选参数 request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for"); request.pipeline("pipelineId"); request.routing("routingId"); BulkRequest defaulted = new BulkRequest("posts"); // 开始执行 try { BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); for (BulkItemResponse bulkItemResponse : bulkResponse) { DocWriteResponse itemResponse = bulkItemResponse.getResponse(); switch (bulkItemResponse.getOpType()) { case INDEX: // 检索操作的响应(成功与否),可以是 IndexResponse,UpdateResponse或DeleteResponse可全部被视为 // DocWriteResponse实例 case CREATE: // 处理更新操作的响应 IndexResponse indexResponse = (IndexResponse) itemResponse; break; case UPDATE: UpdateResponse updateResponse = (UpdateResponse) itemResponse; // 处理更新操作的响应 break; case DELETE: // 处理删除操作的响应 DeleteResponse deleteResponse = (DeleteResponse) itemResponse; } } if (bulkResponse.hasFailures()) { // 批量响应提供了一种方法来快速检查一个或多个操作是否失败: } // 这种情况下,有必要遍历所有操作结果,以检查操作是否失败,如果失败,则检索相应的失败: for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); } } BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { } }; } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { update3(); System.out.println("------------ 测试结束 -------------------------"); } }