Elasticsearch--客户端

news2024/12/24 20:50:27

Es客户端

语言无关

 java最常用的客户端是Java Client、Java Rest Client、Java Transport Client

Java Client

从es7.17开始,官方推出Java Client,并且将Java Rest Client标为Deprecated(过期)

要求jdk至少要jdk8

具体用法再看===》

Java Rest Client

 Java Rest Client分为:

Java Low level Rest Client

Java High level Rest Client

在es7.15的时候过期的

RestClient 是线程安全的,RestClient使用 Elasticsearch 的 HTTP 服务,默认为9200端口,这一点和transport client不同。

Java Low level Rest Client

之所以称为低级客户端,是因为它几乎没有帮助 Java 用户构建请求或解析响应。它处理请求的路径和查询字符串构造,但它将 JSON 请求和响应主体视为必须由用户处理的不透明字节数组。

特点
  • 与任何 Elasticsearch 版本兼容

    • ES 5.0.0只是发布第一个Java Low-level REST client时的ES版本(2016年),不代表其向前只兼容到5.0,Java Low-level REST client基于Apache HTTP 客户端,它允许使用 HTTP 与任何版本的 Elasticsearch 集群进行通信。

  • 最小化依赖

  • 跨所有可用节点的负载平衡

  • 在节点故障和特定响应代码的情况下进行故障转移

  • 连接失败惩罚(是否重试失败的节点取决于它连续失败的次数;失败的尝试越多,客户端在再次尝试同一节点之前等待的时间就越长)

  • 持久连接

  • 请求和响应的跟踪记录

  • 可选的集群节点自动发现(也称为嗅探)

Java High Level REST Client

Java 高级 REST 客户端在 Java 低级 REST 客户端之上运行。

它的主要目标是公开 API 特定的方法,接受请求对象作为参数并返回响应对象,以便请求编组和响应解组由客户端本身处理。

要求Elasticsearch版本为2.0或者更高。

maven

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>7.12.0</version>
</dependency>

 初始化

// 初始化
RestClient restClient = RestClient.builder(
    new HttpHost("localhost1", 9200, "http"),
    new HttpHost("localhost2", 9200, "http")).build();

// 资源释放
restClient.close();

简单用法 

    @Test
    @SneakyThrows
    public void createIndex() {

        //region 创建客户端对象
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http")
                )
        );
        //endregion

        //region Request对象
        CreateIndexRequest request = new CreateIndexRequest("product2");
        //endregion

        //region 组装数据
        //region setting
        request.settings(Settings.builder()
                .put("index.number_of_shards", 3)
                .put("index.number_of_replicas", 0)
        );
        //endregion

        //region mapping
//        request.mapping(
//                "{\n" +
//                        "  \"properties\": {\n" +
//                        "    \"message\": {\n" +
//                        "      \"type\": \"text\"\n" +
//                        "    }\n" +
//                        "  }\n" +
//                        "}",
//                XContentType.JSON);

        //region 还可以使用Map构建
//        Map<String, Object> message = new HashMap<>();
//        message.put("type", "text");
//        Map<String, Object> properties = new HashMap<>();
//        properties.put("message", message);
//        Map<String, Object> mapping = new HashMap<>();
//        mapping.put("properties", properties);
//        request.mapping(mapping);
        //endregion

        //region 使用XContentBuilder构建
//        XContentBuilder builder = XContentFactory.jsonBuilder();
//        builder.startObject();
//        {
//            builder.startObject("properties");
//            {
//                builder.startObject("message");
//                {
//                    builder.field("type", "text");
//                }
//                builder.endObject();
//            }
//            builder.endObject();
//        }
//        builder.endObject();
//        request.mapping(builder);
        //endregion

        //endregion


        //region 别名
        request.alias(new Alias("product_alias").filter(QueryBuilders.termQuery("name", "xiaomi")));
        //endregion
        request.timeout(TimeValue.timeValueMillis(2));
        //endregion


        // 同步
        CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
        // 异步
        client.indices().createAsync(request, RequestOptions.DEFAULT, new ActionListener<CreateIndexResponse>() {
            @Override
            public void onResponse(CreateIndexResponse createIndexResponse) {

            }

            @Override
            public void onFailure(Exception e) {

            }
        });

        // 是否所有节点都已确认请求
        createIndexResponse.isAcknowledged();
        // 在超时之前是否为索引中的每个碎片启动所需数量的碎片副本
        createIndexResponse.isShardsAcknowledged();
        client.close();
    }

    @Test
    @SneakyThrows
    public void getIndex() {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http")
                )
        );

        GetIndexRequest request = new GetIndexRequest("product*");
        GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
        String[] indices = response.getIndices();
        for (String indexName : indices) {
            System.out.println("index name:" + indexName);
        }

        client.close();
    }

    @Test
    @SneakyThrows
    public void delIndex() {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http")
                )
        );
        DeleteIndexRequest request = new DeleteIndexRequest("product2");
        AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
        if (response.isAcknowledged()) {
            System.out.println("删除index成功!");
        } else {
            System.out.println("删除index失败!");
        }
        client.close();
    }

    @Test
    @SneakyThrows
    public void insertData() {
        //region 创建连接
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http")
                )
        );
        //endregion

        //region 准备数据
        List<Product> list = service.list();
        //endregion

        //region 创建Request对象
        //插入数据,index不存在则自动根据匹配到的template创建。index没必要每天创建一个,如果是为了灵活管理,最低建议每月一个 yyyyMM。
        IndexRequest request = new IndexRequest("test_index");
        //endregion

        //region 组装数据
        Product product = list.get(0);
        Gson gson = new Gson();
        //最好不要自定义id 会影响插入速度。
        request.id(product.getId().toString());
        request.source(gson.toJson(product)
                , XContentType.JSON);
        //endregion

        //region 执行Index操作
        IndexResponse response = client.index(request, RequestOptions.DEFAULT);
        //endregion

        System.out.println(response);
        client.close();
    }

    @Test
    @SneakyThrows
    public void batchInsertData() {
        //region 创建连接
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http")
                )
        );
        //endregion

        //region 创建Request对象
        //批量插入数据,更新和删除同理
        BulkRequest request = new BulkRequest("test_index");
        //endregion

        //region 组装数据
        Gson gson = new Gson();
        Product product = new Product();
        product.setPrice(3999.00);
        product.setDesc("xioami");
        for (int i = 0; i < 10; i++) {
            product.setName("name" + i);
            request.add(new IndexRequest()
                    .id(Integer.toString(i))
                    .source(gson.toJson(product)
                    , XContentType.JSON)
            );
        }
        //endregion

        BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);

        System.out.println("数量:" + response.getItems().length);
        client.close();
    }

    @Test
    @SneakyThrows
    public void getById() {
        //region 创建连接
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http")));
        //endregion

        //region 创建Request对象
        //注意 这里查询使用的是别名。
        GetRequest request = new GetRequest("test_index", "6");
        //endregion

        //region 组装数据
        String[] includes = {"name", "price"};
        String[] excludes = {"desc"};
        FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
        //只查询特定字段。如果需要查询所有字段则不设置该项。
        request.fetchSourceContext(fetchSourceContext);
        //endregion

        //region 响应数据
        GetResponse response = client.get(request, RequestOptions.DEFAULT);
        //endregion

        System.out.println(response);
        client.close();

    }

    @Test
    public void delById() throws IOException {
        //region Description
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http")
                )
        );
        //endregion

        DeleteRequest request = new DeleteRequest("test_index", "1");

        DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);

        System.out.println(response);
        client.close();
    }

    @Test
    public void multiGetById() throws IOException {
        //region Description
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http")));
        //endregion

        //region Description
        //根据多个id查询
        MultiGetRequest request = new MultiGetRequest();
        //endregion

        //region Description
        request.add("test_index", "6");
        //两种写法
        request.add(new MultiGetRequest.Item(
                "test_index",
                "7"));
        //endregion

        //region Description
        MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
        //endregion
        for (MultiGetItemResponse itemResponse : response) {
            System.out.println(itemResponse.getResponse().getSourceAsString());
        }
        client.close();
    }

    @Test
    public void updateByQuery() throws IOException {

        //region 连接
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http")
                )
        );
        //endregion

        //region 请求对象
        UpdateByQueryRequest request = new UpdateByQueryRequest("test_index");
        //endregion

        //region 组装数据
        //默认情况下,版本冲突会中止 UpdateByQueryRequest 进程,但是你可以用以下命令来代替
        //设置版本冲突继续
//        request.setConflicts("proceed");
        //设置更新条件
        request.setQuery(QueryBuilders.termQuery("name", "name2"));
//        //限制更新条数
//        request.setMaxDocs(10);
        request.setScript(
                new Script(ScriptType.INLINE, "painless", "ctx._source.desc+='#';", Collections.emptyMap()));
        //endregion

        BulkByScrollResponse response = client.updateByQuery(request, RequestOptions.DEFAULT);

        System.out.println(response);
        client.close();
    }

优缺点

优点

        安全:REST API使用单一的集群入口点,可以通过 HTTPS 保障数据安全性,传输层只用于内部节点到节点的通信。
        易用:客户端只通过 REST 层而不是通过传输层调用服务,可以大大简化代码编写

 缺点

        性能略逊于Java API,但是差距不大

 Low level Client

 优点:
        轻依赖:Apache HTTP 异步客户端及其传递依赖项(Apache HTTP 客户端、Apache HTTP Core、Apache HTTP Core NIO、Apache Commons Codec 和 Apache Commons Logging)
        兼容性强:兼容所有ES版本
  缺点:
        功能少:显而易见,轻量化带来的必然后果

High level Client

 优点:
        功能强大:支持所有ES的API调用。
        松耦合:客户端和ES核心服务完全独立,无共同依赖。
         接口稳定:REST API 比与 Elasticsearch 版本完全匹配的`Transport Client`接口稳定得多。
缺点:
        兼容性中等:基于Low Level Client,只向后兼容ES的大版本,比如6.0的客户端兼容6.x(即6.0之后的版本),但是6.1的客户端未必支持所有6.0ES的API,但是这并不是什么大问题,咱们使用相同版本的客户端和服务端即可,而且不会带来其他问题。

Java Transport Client

使用的客户端名称叫TransportClient

从7.0.0开始,官方已经不建议使用TransportClient作为ES的Java客户端了,并且从8.0会被彻底删除

TransportClient 使用transport模块(9300端口)远程连接到 Elasticsearch 集群,客户端并不加入集群,而是通过获取单个或者多个transport地址来以轮询的方式与他们通信。


TransportClient使用transport协议与Elasticsearch节点通信,如果客户端的版本和与其通信的ES实例的版本不同,就会出现兼容性问题。而low-level REST使用的是HTTP协议,可以与任意版本ES集群通信。high-level REST是基于low-level REST的。

es整合java时,es的版本和java中的版本要保证大版本一致,比如,7.x

es的版本和springboot版本兼容性关系

依赖

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>7.12.1</version>
</dependency>

连接

// 创建客户端连接
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
        .addTransportAddress(new TransportAddress(InetAddress.getByName("host1"), 9300))
        .addTransportAddress(new TransportAddress(InetAddress.getByName("host2"), 9300));

// 关闭客户端
client.close();

简单使用

    @SneakyThrows
    private void create(TransportClient client) {
        List<Product> list = service.list();
        for (Product item : list) {
            System.out.println(item.getDate().toLocalDateTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            IndexResponse response = client.prepareIndex("product", "_doc", item.getId().toString())
                    .setSource(XContentFactory.jsonBuilder()
                            .startObject()
                            .field("name", item.getName())
                            .field("desc", item.getDesc())
                            .field("price", item.getPrice())
                            .field("date", item.getDate().toLocalDateTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
                            .field("tags", item.getTags().replace("\"", "").split(","))
                            .endObject())
                    .get();
            System.out.println(response.getResult());
        }
    }

    @SneakyThrows
    private void get(TransportClient client) {
        GetResponse response = client.prepareGet("product", "_doc", "1").get();
        String index = response.getIndex();//获取索引名称
        String type = response.getType();//获取索引类型
        String id = response.getId();//获取索引id
        System.out.println("index:" + index);
        System.out.println("type:" + type);
        System.out.println("id:" + id);
        System.out.println(response.getSourceAsString());
    }

    private void getAll(TransportClient client) {
        SearchResponse response = client.prepareSearch("product")
                .get();
        SearchHits searchHits = response.getHits();
        SearchHit[] hits = searchHits.getHits();
        for (SearchHit hit : hits) {
            String res = hit.getSourceAsString();
            System.out.println("res" + res);
        }
    }

    @SneakyThrows
    private void update(TransportClient client) {
        UpdateResponse response = client.prepareUpdate("product", "_doc", "2")
                .setDoc(XContentFactory.jsonBuilder()
                        .startObject()
                        .field("name", "update name")
                        .endObject())
                .get();
        System.out.println(response.getResult());
    }

    @SneakyThrows
    private void delete(TransportClient client) {
        DeleteResponse response = client.prepareDelete("product", "_doc", "2").get();
        System.out.println(response.getResult());
    }

kibana中操作的是Rest api

dsl转成代码

  void aggSearch() {
        //region 1->创建客户端连接
        TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
        //endregion

        //region 2->计算并返回聚合分析response对象
        SearchResponse response = client.prepareSearch("product")
                .setSize(0)
                .setQuery(QueryBuilders.matchAllQuery())
                .addAggregation(AggregationBuilders.dateHistogram("group_by_month")
                        .field("date")
                        .calendarInterval(DateHistogramInterval.MONTH)
                        .minDocCount(1)
                        .subAggregation(AggregationBuilders.terms("by_tag")
                                .field("tags.keyword")
                                .subAggregation(AggregationBuilders.avg("avg_price")
                                        .field("price"))
                        )
                ).execute().actionGet();

        //endregion

        //region 3->输出结果信息
        SearchHit[] hits = response.getHits().getHits();
        Map<String, Aggregation> map = response.getAggregations().asMap();
        Aggregation group_by_month = map.get("group_by_month");
        Histogram dates = (Histogram) group_by_month;
        Iterator<Histogram.Bucket> buckets = (Iterator<Histogram.Bucket>) dates.getBuckets().iterator();
        while (buckets.hasNext()) {
            Histogram.Bucket dateBucket = buckets.next();
            System.out.println("\n月份:" + dateBucket.getKeyAsString() + "\n计数:" + dateBucket.getDocCount());
            Aggregation by_tag = dateBucket.getAggregations().asMap().get("by_tag");
            StringTerms terms = (StringTerms) by_tag;
            Iterator<StringTerms.Bucket> tags = terms.getBuckets().iterator();
            while (tags.hasNext()) {
                StringTerms.Bucket tag = tags.next();
                System.out.println("\t标签名称:" + tag.getKey() + "\n\t数量:" + tag.getDocCount());
                Aggregation avg_price = tag.getAggregations().get("avg_price");
                Avg avg = (Avg) avg_price;
                System.out.println("\t平均价格:" + avg.getValue());
            }
        }
        //endregion

        client.close();


    }


 

嗅探器sniffer

允许从正在运行的 Elasticsearch 集群中自动发现节点并将它们设置为现有 RestClient 实例的最小库(在集群中,根据一个节点找到其他节点)

依赖

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client-sniffer</artifactId>
    <version>7.12.1</version>
</dependency>
// 默认每五分钟发现一次
RestClient restClient = RestClient.builder(
    new HttpHost("localhost", 9200, "http"))
    .build();
Sniffer sniffer = Sniffer.builder(restClient).build();



// 设置嗅探间隔
RestClient restClient = RestClient.builder(
    new HttpHost("localhost", 9200, "http"))
    .build();
// 设置嗅探间隔为60000毫秒
Sniffer sniffer = Sniffer.builder(restClient)
    .setSniffIntervalMillis(60000).build();



// 失败时重启嗅探
SniffOnFailureListener sniffOnFailureListener =
    new SniffOnFailureListener();
RestClient restClient = RestClient.builder(
    new HttpHost("localhost", 9200))
    .setFailureListener(sniffOnFailureListener) //将失败侦听器设置为 RestClient 实例 
    .build();
Sniffer sniffer = Sniffer.builder(restClient)
    .setSniffAfterFailureDelayMillis(30000) //在嗅探失败时,不仅节点在每次失败后都会更新,而且还会比平常更早安排额外的嗅探轮次,默认情况下是在失败后一分钟,假设事情会恢复正常并且我们想要检测尽快地。可以在 Sniffer 创建时通过 setSniffAfterFailureDelayMillis 方法自定义所述间隔。请注意,如果如上所述未启用故障嗅探,则最后一个配置参数无效。
    .build();
sniffOnFailureListener.setSniffer(sniffer); //将 Sniffer 实例设置为失败侦听器




// 资源释放  
// Sniffer 对象应该与RestClient 具有相同的生命周期,并在客户端之前关闭。
sniffer.close();
restClient.close();

Spring Data Elasticsearch

Spring Data 的目的是用统一的接口,适配所有不同的存储类型。

Spring Data Elasticsearch是Spring Data的一个子项目,该项目旨在为新数据存储提供熟悉且一致的基于 Spring 的编程模型,同时保留特定于存储的功能和功能。Spring Data Elasticsearch是一个以 POJO 为中心的模型,用于与 Elastichsearch 文档交互并轻松编写 Repository 风格的数据访问层

特点

  • Spring 配置支持使用基于 Java 的@Configuration类或用于 ES 客户端实例的 XML 命名空间。

  • ElasticsearchTemplate提高执行常见 ES 操作的生产力的助手类。包括文档和 POJO 之间的集成对象映射。

  • 功能丰富的对象映射与 Spring 的转换服务集成

  • 基于注释的映射元数据但可扩展以支持其他元数据格式

  • Repository接口的自动实现,包括对自定义查找器方法的支持。

  • 对存储库的 CDI 支持

 依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

注解

@Document:在类级别应用,以指示该类是映射到数据库的候选类。最重要的属性包括:

indexName:用于存储此实体的索引的名称。它可以包含类似于“日志-#{T(java.time.LocalDate).now().toString()}”

type :映射类型。如果未设置,则使用该类的小写简单名称。(自4.0版起已弃用)

createIndex:标记是否在存储库引导时创建索引。默认值为true。请参阅自动创建带有相应映射的索引

versionType:版本管理的配置。默认值为外部 .

@Id:在字段级别应用,以标记用于标识的字段。

@Transient:默认情况下,存储或检索文档时,所有字段都映射到文档,此批注不包括该字段。

@PersistenceConstructor:标记在从数据库实例化对象时要使用的给定构造函数(甚至是包受保护的构造函数)。构造函数参数按名称映射到检索文档中的键值。

@Field:应用于字段级别并定义字段的属性,大多数属性映射到相应的Elasticsearch映射定义(以下列表不完整,请查看注释Javadoc以获取完整的参考):

name:将在Elasticsearch文档中表示的字段的名称,如果未设置,则使用Java字段名称。

type:字段类型,可以是Text,关键字,Long,Integer,Short,Byte,Double,Float,Half_Float,Scaled_Float,日期,日期Nanos,Boolean,Binary,Integer_Range,Float_Range,Long_Range,DoubleˉRange,DateˉRange,Object,Nested,Ip,TokenCount,percollator,flatten,搜索。请参阅Elasticsearch映射类型

format:一个或多个内置日期格式,请参阅下一节格式数据映射 .

pattern:一个或多个自定义日期格式,请参阅下一节格式数据映射 .

store:标志是否应将原始字段值存储在Elasticsearch中,默认值为假 .

analyzer ,搜索分析器 ,normalizer用于指定自定义分析器和规格化器。

@GeoPoint:将字段标记为地理点如果字段是GeoPoint班级

 简单使用

public class EsUtil {
    // 生成批量处理对象
    private static BulkRequest bulkRequest = new BulkRequest();

    /**
     * 添加数据到es
     * @param indexName
     * @param typeName
     * @param indexId
     * @param json
     */

    public static void add(RestHighLevelClient restHighLevelClient, String indexName, String typeName, String indexId, Map<String, Object> json) throws IOException {
        IndexRequest indexRequest = new IndexRequest(indexName, typeName,indexId);
//        Gson gson = new Gson();
        indexRequest.source(new JSONObject(json).toString(), XContentType.JSON);
        try {
            restHighLevelClient.index(indexRequest,RequestOptions.DEFAULT);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * 判断索引名是否存在
     * @param indexName
     * @return
     */
    public static boolean existsIndex(RestHighLevelClient restHighLevelClient,String indexName) {
        try{
            GetIndexRequest request = new GetIndexRequest(indexName);
            boolean exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
            return exists;
        }catch (Exception e){
            System.out.println("Exception");
        }
        return false;
    }
    /**
     * @param : client
     * @description : 判断文档是否存在
     */
    public static boolean isExist(RestHighLevelClient restHighLevelClient, String indexName, String typeName, String indexId) throws IOException{

        GetRequest request = new GetRequest(indexName, typeName, indexId);
        //1.同步判断
        boolean exists = restHighLevelClient.exists(request, RequestOptions.DEFAULT);

        //2.异步判断
        ActionListener<Boolean> listener = new ActionListener<Boolean>() {
            @Override
            public void onResponse(Boolean exists) {
                if (exists){
                    System.out.println("文档存在");
                }else {
                    System.out.println("文档不存在");
                }
            }

            @Override
            public void onFailure(Exception e) {

            }
        };
        //client.existsAsync(request, RequestOptions.DEFAULT, listener);

        return exists;
    }

    /**
     * @param : client
     * @description : 删除文档
     */
    public static void deleteDocument(RestHighLevelClient restHighLevelClient, String indexName, String typeName, String indexId) throws IOException{

        DeleteRequest request = new DeleteRequest(indexName,typeName,indexId);

        //设置请求超时时间:2分钟
        request.timeout(TimeValue.timeValueMinutes(2));
        //request.timeout("2m");

        //同步删除
        DeleteResponse deleteResponse = restHighLevelClient.delete(request, RequestOptions.DEFAULT);

        //异步删除
        ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
            @Override
            public void onResponse(DeleteResponse deleteResponse) {
                System.out.println("删除后操作");
            }

            @Override
            public void onFailure(Exception e) {
                System.out.println("删除失败");
            }
        };

    }

    /**
     * 批量增加数据的方法
     * @param restHighLevelClient
     * @param indexname
     * @param typename
     * @param row_key
     * @param map
     * @throws Exception
     */
    public void bulkadd(RestHighLevelClient restHighLevelClient, String indexname, String typename, String row_key, Map<String,Object> map) throws Exception {

        try {
            // 生成批量处理对象
            //BulkRequest bulkRequest = new BulkRequest();

            // 得到某一行的数据,并封装成索引对象
            IndexRequest indexRequest = new IndexRequest(indexname, typename,row_key);
            indexRequest.source(new JSONObject(map).toString(), XContentType.JSON);


            //判断是否执行加载
            if (bulkRequest.numberOfActions() != 0 && (bulkRequest.numberOfActions() > 100)) {
                try {
                    bulkRequest(restHighLevelClient);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 装填数据
            bulkRequest.add(indexRequest);

        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            bulkRequest(restHighLevelClient);
        }
    }

    /**
     * 批量具体执行方法
     * execute bulk process
     * @throws Exception
     */
    private void bulkRequest(RestHighLevelClient restHighLevelClient) throws Exception {
        // 加载数据
        BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

        // 判断加载情况
        if(bulkResponse.hasFailures()){
            System.out.println("失败");
        }else{
            System.out.println("成功");
            // 重新定义
            bulkRequest = new BulkRequest();

        }
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/754635.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大模型技术发展概述 -(四)

文本内容参考论文《A Survey of Large Language Models》 论文标题&#xff1a;A Survey of Large Language Models 论文链接&#xff1a;https://arxiv.org/pdf/2303.18223v10.pdf 大模型技术发展概述 -&#xff08;四&#xff09; 6. 使用方法6.1 上下文学习6.1.1 提示形式6.…

网络套接字编程(三)(HTTP)

gitee仓库&#xff1a;https://gitee.com/WangZihao64/linux/tree/master/CalTcp 一、重谈协议 协议是一种“约定”&#xff0c;这种约定是双方都知道的。有了一致的约定&#xff0c;双方才能够正常地进行通信。协议在网络的第一篇博客中也提到过&#xff0c;协议是双方进行通…

uniapp中H5定位功能实现

1.要实现该功能 必须使用vue-jsonp进行跨域 JSONP是一种跨域数据请求的解决方案&#xff0c;它使用script元素来请求数据&#xff0c;再利用回调函数将数据传回页面。 Vue框架提供了对JSONP的支持&#xff0c;可以方便地在Vue应用中使用JSONP获取跨域数据。下面我们来了解一下…

【Java】StringBuffer和StringBuilder

共同点 他们都是可变的&#xff0c;在每次进行修改操作时&#xff0c;都不会产生新的对象&#xff0c;所以在进行修改的时候&#xff0c;尽量使用这两种类型的字符串 不同点 StringBuffer在单线程中效率高 StringBuilder用于多线程确保安全性 测试代码 public class test …

keepalived安装配置详解

文章目录 高可用介绍keepalived安装、使用vip漂移抓包脑裂脑裂有没有危害&#xff1f;如果有危害对业务有什么影响&#xff1f; keepalived架构双vip架构 Healthcheck实现 notifyVRRP选举格式 高可用 介绍 高可用性&#xff08;High Availability&#xff09;是指系统或服务能…

Linux的locale本地化配置

Linux的locale本地化配置 locale简介localectl常用操作语言环境键盘布局 常见问题:配置语言环境报错Linux系统locale(UTF-8)报错最小化自动安装的Centos7修改完整中文显示 locale简介 参考: http://m.blog.chinaunix.net/uid-20621049-id-3427444.html locale把按照所涉及到的…

Python调用ImageMagick生成PDF文件缩略图

使用Python调用ImageMagick生成PDF文件缩略图 Imagemagick使用Ghostscript作为其依赖项之一&#xff0c;以便能够处理和转换PDF相关的图像。 准备 安装Ghostscript&#xff0c;网站安装ImageMagick&#xff0c;网站 安装完毕后&#xff0c;需要自行配置环境路径 脚本 使用示…

灌区信息化智能测控一体化闸门系统解决方案

一、方案背景 闸门是节水灌溉工程中重要组成部分。在农田灌区中&#xff0c;一方面存在传统手摇闸门&#xff0c;未能实现自动化、数字化&#xff0c;另一方面部分灌区闸站虽然部分实现了自动化控制&#xff0c;但是由于闸站较多&#xff0c;有些位置较为偏僻&#xff0c;部分水…

网络数据安全风险评估实施指引(一)

近日&#xff0c;全国信息安全标准化技术委员会发布了《网络安全标准实践指南 网络数据安全风险评估实施指引》&#xff08;TC260-PG-20231A v1.0-202305&#xff09;&#xff0c;旨在响应《数据安全法》要求&#xff0c;落实重要数据处理过程风险评估&#xff0c;衔接已发布的…

前端开发中的微服务架构设计

前端服务化和小程序容器技术为前端应用带来了更好的组织结构、可维护性和可扩展性。这些技术的应用将促进前端开发的创新和发展&#xff0c;使团队能够更好地应对复杂的前端需求和业务挑战。通过将前端视为一个服务化的架构&#xff0c;我们能够构建出更强大、可靠且可持续的前…

OpenCv (C++) 使用矩形 Rect 覆盖图像中某个区域

文章目录 1. 使用矩形将图像中某个区域置为黑色2. cv::Rect 类介绍 1. 使用矩形将图像中某个区域置为黑色 推荐参考博客&#xff1a;OpenCV实现将任意形状ROI区域置黑&#xff08;多边形区域置黑&#xff09; 比较常用的是使用 Rect 矩形实现该功能&#xff0c;代码如下&…

vmware-ubuntu 出现的奇怪问题

虚拟机突然连不上网 参考博文-CSDN-卍一十二画卍&#xff08;作者&#xff09;-Vmware虚拟机突然连接不上网络【方案集合】 sudo vim /var/lib/NetworkManager/NetworkManager.statesudo service network-manager stop sudo vim /var/lib/NetworkManager/NetworkManager.stat…

华为云子网路由表作用及价值

子网路由表 子网路由表作用云专线、VPN的配置与子网路由表强关联&#xff0c;本质是在相应的子网路由表中添加了一条路由Nat路由表问题地址变更问题snat和dnat 子网路由表作用 子网内部作为一个二层网络&#xff0c;通过mac地址互通&#xff0c;不通过路由互通。跨子网&#x…

Java Vue物联网系统

一个简单易用的物联网平台&#xff0c;可用于搭建物联网平台以及二次开发和学习。适用于智能家居、智慧办公、智慧社区、农业监测、水利监测、工业控制等。 系统后端采用Spring boot&#xff1b;前端采用Vue&#xff1b;消息服务器采用EMQX&#xff1b; 技术栈 服务端相关技术…

Vector - CANoe - DoIP在CANoe应用

目录 背景说明 一、DoIP通信说明 1、连接(Connection) 2、车辆发现(Vehicle Discovery)

第一阶段-第八章 Python的文件操作

目录 一、文件的编码  1.学习目标  2.文件编码  3.查看文件编码  4.本小节的总结 二、文件的读取  1.学习目标  2.什么是文件  3.文件包含的操作&#xff08;打开、关闭、读、写&#xff09;  4.文件的操作步骤&#xff08;打开或创建文件open&#xff08;mode…

【计算机视觉 | 目标检测 | 图像分割】arxiv 计算机视觉关于目标检测和图像分割的学术速递(7 月 14 日论文合集)

文章目录 一、检测相关(6篇)1.1 LVLane: Deep Learning for Lane Detection and Classification in Challenging Conditions1.2 Garbage in, garbage out: Zero-shot detection of crime using Large Language Models1.3 Robotic surface exploration with vision and tactile …

汽车电子 -- 使用CANdb++ Editor创建并制作一个DBC

参看&#xff1a;关于DBC文件的创建&#xff08;DBC文件系列其一&#xff09; 一、什么是DBC DBC文件是DataBase Container文件的缩写。 CAN数据库文件也称为后缀为&#xff08;.dbc&#xff09;的文件。DBC文件是基本的文本文件&#xff0c;其中包括将原始CAN总线数据解码为…

Raft算法之日志复制

Raft算法之日志复制 一、日志复制大致流程 在Leader选举过程中&#xff0c;集群最终会选举出一个Leader节点&#xff0c;而集群中剩余的其他节点将会成为Follower节点。Leader节点除了向Follower节点发送心跳消息&#xff0c;还会处理客户端的请求&#xff0c;并将客户端的更…

音频播放器Web页面代码实例(基于HTML5)

音频播放器Web页面代码实例&#xff08;基于HTML5&#xff09;&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta http-equiv"X-UA-Compatible" content"IEedge" /><…