package org.fanli.elastic;
|
|
import java.io.IOException;
|
import java.util.Date;
|
import java.util.HashMap;
|
import java.util.Map;
|
|
import org.apache.http.HttpHost;
|
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.action.DocWriteResponse;
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
import org.elasticsearch.action.bulk.BulkProcessor;
|
import org.elasticsearch.action.bulk.BulkRequest;
|
import org.elasticsearch.action.bulk.BulkResponse;
|
import org.elasticsearch.action.delete.DeleteRequest;
|
import org.elasticsearch.action.delete.DeleteResponse;
|
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexResponse;
|
import org.elasticsearch.action.support.ActiveShardCount;
|
import org.elasticsearch.action.support.WriteRequest;
|
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
import org.elasticsearch.action.update.UpdateRequest;
|
import org.elasticsearch.action.update.UpdateResponse;
|
import org.elasticsearch.client.RequestOptions;
|
import org.elasticsearch.client.RestClient;
|
import org.elasticsearch.client.RestHighLevelClient;
|
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.index.get.GetResult;
|
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.script.Script;
|
import org.elasticsearch.script.ScriptType;
|
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
|
public class UpdateUtils {
|
|
private static String clusterName = "my-application";
|
private static String host = "192.168.1.200";
|
private static Integer port = 9200;
|
|
// 相当于数据库名称
|
public static String indexName = "shose";
|
|
// 初始化api客户端
|
public static RestHighLevelClient client = new RestHighLevelClient(
|
RestClient.builder(new HttpHost(host, port, "http")));
|
|
public static void update1() {
|
// Index 、 Document id
|
UpdateRequest request = new UpdateRequest("posts", "1");
|
|
// 1、Script 更新
|
Map<String, Object> parameters = new HashMap<String, Object>();
|
parameters.put("count", 4);
|
Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.field += params.count", parameters);
|
request.script(inline);
|
|
// 2、Script 更新
|
Script stored = new Script(ScriptType.STORED, null, "increment-field", parameters);
|
request.script(stored);
|
|
// 1、部分文档源String以JSON格式提供
|
String jsonString = "{" + "\"updated\":\"2017-01-01\"," + "\"reason\":\"daily update\"" + "}";
|
request.doc(jsonString, XContentType.JSON);
|
|
// Map
|
Map<String, Object> jsonMap = new HashMap<>();
|
jsonMap.put("updated", new Date());
|
jsonMap.put("reason", "daily update");
|
UpdateRequest request2 = new UpdateRequest("posts", "1").doc(jsonMap);
|
|
//
|
try {
|
XContentBuilder builder = XContentFactory.jsonBuilder();
|
builder.startObject();
|
{
|
builder.timeField("updated", new Date());
|
builder.field("reason", "daily update");
|
}
|
builder.endObject();
|
UpdateRequest request3 = new UpdateRequest("posts", "1").doc(builder);
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
|
UpdateRequest request4 = new UpdateRequest("posts", "1").doc("updated", new Date(), "reason", "daily update");
|
|
String jsonString2 = "{\"created\":\"2017-01-01\"}";
|
request.upsert(jsonString2, XContentType.JSON);
|
|
// 为特定字段配置源包含
|
String[] includes = new String[] { "updated", "r*" };
|
String[] excludes = Strings.EMPTY_ARRAY;
|
request.fetchSource(new FetchSourceContext(true, includes, excludes));
|
|
// 为特定字段配置源排除
|
String[] includes2 = Strings.EMPTY_ARRAY;
|
String[] excludes2 = new String[] { "updated" };
|
request.fetchSource(new FetchSourceContext(true, includes2, excludes2));
|
|
request.setIfSeqNo(2L);
|
request.setIfPrimaryTerm(1L);
|
|
// 禁用noop检测
|
request.detectNoop(false);
|
|
// 指示脚本必须运行,而不管文档是否存在,即,脚本将负责创建尚不存在的文档。
|
request.scriptedUpsert(true);
|
|
// 指示部分文档(如果尚不存在)必须用作upsert文档。
|
request.docAsUpsert(true);
|
|
// 设置在继续更新操作之前必须处于活动状态的分片副本数
|
request.waitForActiveShards(2);
|
// 作为提供碎片拷贝数ActiveShardCount:可以是ActiveShardCount.ALL,
|
// ActiveShardCount.ONE或ActiveShardCount.DEFAULT(默认)
|
request.waitForActiveShards(ActiveShardCount.ALL);
|
}
|
|
public static void update2() {
|
|
// 添加列
|
UpdateRequest request = new UpdateRequest("posts", "1");
|
String jsonString = "{" + "\"updated\":\"2017-01-01\"," + "\"reason\":\"daily update\"" + "}";
|
request.doc(jsonString, XContentType.JSON);
|
try {
|
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
|
|
//
|
// // 返回的值UpdateResponse允许检索有关已执行操作的信息
|
// String index = updateResponse.getIndex();
|
// String id = updateResponse.getId();
|
// long version = updateResponse.getVersion();
|
// if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
|
// // 处理首次创建文档的情况(upsert)
|
// } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
|
// // 处理文档更新的情况
|
// } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
|
// // 处理文件被删除的情况
|
// } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
|
// // 处理文档不受更新影响的情况,即未对文档执行任何操作(空转)
|
// }
|
//
|
// // UpdateRequest 通过fetchSource方法启用了源检索后,响应将包含更新文档的源:
|
// GetResult result = updateResponse.getGetResult(); // 检索更新的文档为 GetResult
|
// if (result.isExists()) {
|
// // 检索更新文档的来源为 String
|
// String sourceAsString = result.sourceAsString();
|
// // 检索更新文档的来源为 Map<String, Object>
|
// Map<String, Object> sourceAsMap = result.sourceAsMap();
|
// // 检索更新文档的来源为 byte[]
|
// byte[] sourceAsBytes = result.source();
|
// } else {
|
// // 处理响应中不存在文档源的情况(默认情况下就是这种情况)
|
// }
|
//
|
// // 可以检查分片故障:
|
// ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
|
// if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
|
// // 处理成功分片数量少于总分片数量的情况
|
// }
|
// if (shardInfo.getFailed() > 0) {
|
// for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
|
// String reason = failure.reason();
|
// }
|
// // 处理潜在的故障
|
// }
|
//
|
// // 当UpdateRequest抵靠不存在一个文件执行时,响应具有404状态码,ElasticsearchException获取引发这就需要如下进行处理:
|
// UpdateRequest request2 = new UpdateRequest("posts", "does_not_exist").doc("field", "value");
|
// try {
|
// UpdateResponse updateResponse2 = client.update(request2, RequestOptions.DEFAULT);
|
// } catch (ElasticsearchException e) {
|
// if (e.status() == RestStatus.NOT_FOUND) {
|
// // 处理由于文档不存在而引发的异常
|
// }
|
// }
|
//
|
// // 如果存在版本冲突,ElasticsearchException将引发:
|
// UpdateRequest request3 = new UpdateRequest("posts", "1").doc("field", "value").setIfSeqNo(101L)
|
// .setIfPrimaryTerm(200L);
|
// try {
|
// UpdateResponse updateResponse3 = client.update(request3, RequestOptions.DEFAULT);
|
// } catch (ElasticsearchException e) {
|
// if (e.status() == RestStatus.CONFLICT) {
|
// // 引发的异常表示返回了版本冲突错误。
|
// }
|
// }
|
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
public static void update3() {
|
UpdateRequest request = new UpdateRequest("posts", "1");
|
String jsonString = "{" + "\"updated\":\"2019-09-09\"," + "\"reason\":\"daily update9\"" + "}";
|
request.doc(jsonString, XContentType.JSON);
|
try {
|
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
|
public static void Multupdate1() {
|
// 批量API仅支持以JSON或SMILE编码的文档。提供任何其他格式的文档将导致错误。
|
BulkRequest request = new BulkRequest();
|
request.add(new IndexRequest("posts").id("1").source(XContentType.JSON, "field", "foo"));
|
request.add(new IndexRequest("posts").id("2").source(XContentType.JSON, "field", "bar"));
|
request.add(new IndexRequest("posts").id("3").source(XContentType.JSON, "field", "baz"));
|
|
BulkRequest request2 = new BulkRequest();
|
request2.add(new DeleteRequest("posts", "3"));
|
request2.add(new UpdateRequest("posts", "2").doc(XContentType.JSON, "other", "test"));
|
request2.add(new IndexRequest("posts").id("4").source(XContentType.JSON, "field", "baz"));
|
|
// 可选参数
|
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
|
request.setRefreshPolicy("wait_for");
|
|
request.pipeline("pipelineId");
|
request.routing("routingId");
|
BulkRequest defaulted = new BulkRequest("posts");
|
|
// 开始执行
|
try {
|
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
|
|
for (BulkItemResponse bulkItemResponse : bulkResponse) {
|
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
|
|
switch (bulkItemResponse.getOpType()) {
|
case INDEX:
|
// 检索操作的响应(成功与否),可以是 IndexResponse,UpdateResponse或DeleteResponse可全部被视为
|
// DocWriteResponse实例
|
case CREATE:
|
// 处理更新操作的响应
|
IndexResponse indexResponse = (IndexResponse) itemResponse;
|
break;
|
case UPDATE:
|
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
|
// 处理更新操作的响应
|
break;
|
case DELETE:
|
// 处理删除操作的响应
|
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
|
}
|
}
|
|
if (bulkResponse.hasFailures()) {
|
// 批量响应提供了一种方法来快速检查一个或多个操作是否失败:
|
}
|
|
// 这种情况下,有必要遍历所有操作结果,以检查操作是否失败,如果失败,则检索相应的失败:
|
for (BulkItemResponse bulkItemResponse : bulkResponse) {
|
if (bulkItemResponse.isFailed()) {
|
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
|
}
|
}
|
|
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
|
@Override
|
public void beforeBulk(long executionId, BulkRequest request) {
|
|
}
|
|
@Override
|
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
|
|
}
|
|
@Override
|
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
|
|
}
|
};
|
|
} catch (IOException e) {
|
e.printStackTrace();
|
}
|
}
|
|
|
public static void main(String[] args) {
|
update3();
|
System.out.println("------------ 测试结束 -------------------------");
|
}
|
|
}
|