集成ES分组查询统计求平均值

news2025/1/22 11:54:17

前言

       之前其实写过ES查询数据,进行分组聚合统计:
复杂聚合分组统计实现


一、目标场景

  1. 机房机柜的物联网设备上传环境数据,会存储到ES
  2. 存到ES的温湿度数据需要查询,进行分组后,再聚合统计求平均值

二、使用步骤

1.引入库

       我这里因为ES服务已经升级到8.0.0了,然后ES数据查询分组,我这里需要对时间进行格式化,再聚合avg,所以客户端相关版本用的7.17.4

<dependency>
	<groupId>org.elasticsearch.client</groupId>
	<artifactId>elasticsearch-rest-client</artifactId>
	<version>7.17.4</version>
	<exclusions>
		<exclusion>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
		</exclusion>
	</exclusions>
</dependency>
<dependency>
	<groupId>org.elasticsearch.client</groupId>
	<artifactId>elasticsearch-rest-high-level-client</artifactId>
	<version>7.17.4</version>
	<exclusions>
		<exclusion>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
		</exclusion>
	</exclusions>
</dependency>

<dependency>
	<groupId>org.elasticsearch</groupId>
	<artifactId>elasticsearch</artifactId>
	<version>7.17.4</version>
</dependency>

2.配置类

       目前我们就是单服务的,这个配置类够用了。其实我配置类就是要把RestHighLevelClient注入,并交给spring管理。

/**
 * ES配置类
 * @author zwmac
 */
@Configuration
@Data
public class ElasticSearchConfig {

    @Value("${es.host}")
    private String host;
    @Value("${es.port}")
    private int port;
    @Value("${es.username}")
    private String loginName;
    @Value("${es.password}")
    private String password;
    private RestHighLevelClient client;

    @Bean
    public RestHighLevelClient client() {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(loginName, password));
        HttpHost[] httpHostArray = new HttpHost[1];
        httpHostArray[0] = new HttpHost(host, port);
        RestClientBuilder restClientBuilder = RestClient.builder(httpHostArray)
                .setHttpClientConfigCallback(httpClientBuilder -> {
                    httpClientBuilder.disableAuthCaching();
                    return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                });
        restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
                .setConnectTimeout(60000)
                .setSocketTimeout(150000));
        client = new RestHighLevelClient(
                restClientBuilder
        );
        return client;
    }
}

3.使用


    @Resource
    private RestHighLevelClient restHighLevelClient;

/**
     * 查询温湿度24小时平均值
     * @param deviceCode 设备编码
     * @param startTime 开始时间
     * @param endTime 结束时间
     * @param humName 湿度字段名
     * @param tempName 温度字段名
     * @return 温湿度24小时平均值
     */
    private TreeMap<String, Map<String, Double>> queryTempHumDayAvg(String deviceCode, Date startTime, Date endTime, String humName, String tempName) {
        TreeMap<String, Map<String, Double>> treeMap = new TreeMap<>();
        //ES查询
        String index = EsCalendar.getDeviceFlowIndex(startTime, endTime);
        SearchRequest searchRequest = new SearchRequest(index);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        //忽略不可用索引,允许索引不不存在,通配符表达式将扩展为打开的索引
        searchRequest.indicesOptions(IndicesOptions.fromOptions(true, true, true, false));

        String timeFmt = "yyyy-MM-dd";

        // 组装ES请求数据
        String startTimeStr = DateUtil.format(startTime, DatePattern.NORM_DATETIME_PATTERN);
        String endTimeStr = DateUtil.format(endTime, DatePattern.NORM_DATETIME_PATTERN);
        QueryBuilder rangeQuery = QueryBuilders.rangeQuery("createTime").lte(endTimeStr).gte(startTimeStr);

        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        // 必须为deviceCode
        boolQueryBuilder.must(QueryBuilders.termQuery("deviceCode", deviceCode));
        rangeQuery = QueryBuilders.boolQuery().must(rangeQuery).must(boolQueryBuilder);
        QueryBuilder boolQuery = QueryBuilders.boolQuery().must(rangeQuery);

        searchSourceBuilder.query(boolQuery).size(0);


        //平均值 温度
        //String tempName = "temp_avg";
        String tempAvgName = tempName + "_avg";
        String tempFactorName = "data." + tempName;
        AvgAggregationBuilder tempAvgAggregationBuilder = AggregationBuilders.avg(tempAvgName).field(tempFactorName);

        //平均值 湿度
        //String humName = "hygrometer_avg";
        String humAvgName = humName + "_avg";
        String humFactorName = "data." + humName;
        AvgAggregationBuilder humAvgAggregationBuilder = AggregationBuilders.avg(humAvgName).field(humFactorName);


        String createTimeGroup = "createTimeGroup";
        DateHistogramAggregationBuilder aggregation = AggregationBuilders.dateHistogram(createTimeGroup)
                .field("createTime").fixedInterval(DateHistogramInterval.DAY)
                .format(timeFmt)
                //过滤掉count为0的数据
                .minDocCount(1).subAggregation(tempAvgAggregationBuilder).subAggregation(humAvgAggregationBuilder);

        //分组条件
        searchSourceBuilder.aggregation(aggregation);
        searchRequest.source(searchSourceBuilder);


        // 按照因子列表查询
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = null;
        Map<String, Map<String, Double>> mp = new HashMap<>();
        try {
            log.info("方法getCabinetTempHum24HourAvg查询ES请求数据:" + searchRequest);
            searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            log.info("方法getCabinetTempHum24HourAvg查询ES响应数据:" + searchResponse.toString());
            Aggregations aggregations = searchResponse.getAggregations();
            if (aggregations != null) {
                //组织出参数
                aggregations.forEach(agg -> {
                    ParsedDateHistogram parsedDateHistogram = (ParsedDateHistogram) agg;
                    List buckets = parsedDateHistogram.getBuckets();
                    if (CollectionUtil.isNotEmpty(buckets)) {
                        buckets.forEach(bucket -> {
                            ParsedDateHistogram.ParsedBucket timeGroupTerm = (ParsedDateHistogram.ParsedBucket) bucket;
                            String timeStr = timeGroupTerm.getKeyAsString();

                            Aggregations subAggregations = timeGroupTerm.getAggregations();
                            if (subAggregations != null) {
                                Map<String, Double> tempHumMap = new HashMap<>();
                                Map<String, Aggregation> subAggMap = subAggregations.asMap();
                                if (subAggMap != null) {
                                    Aggregation tempAgg = subAggMap.get(tempAvgName);
                                    if (tempAgg != null) {
                                        ParsedAvg tempAggPdh = (ParsedAvg) tempAgg;
                                        tempHumMap.put(tempName, tempAggPdh.getValue());
                                    }
                                    Aggregation humAgg = subAggMap.get(humAvgName);
                                    if (humAgg != null) {
                                        ParsedAvg humAggPdh = (ParsedAvg) humAgg;
                                        tempHumMap.put(humName, humAggPdh.getValue());
                                    }

                                }
                                mp.put(timeStr, tempHumMap);
                            }

                        });
                    }

                });

            }
            //数据补全
            List<DateTime> dateTimeList = DateUtil.rangeToList(startTime, DateUtil.offsetHour(endTime, -1), DateField.HOUR_OF_DAY);
            if (CollectionUtil.isNotEmpty(dateTimeList)) {
                String finTempName = "temp_avg";
                String finHumName = "hum_avg";
                dateTimeList.forEach(dateTime -> {
                    String timeStr = DateUtil.format(dateTime, timeFmt);
                    Map<String, Double> finTempHumMap = new HashMap<>();
                    Map<String, Double> tempHumMap = mp.get(timeStr);
                    if (tempHumMap == null) {
                        finTempHumMap.put(finTempName, 0.0);
                        finTempHumMap.put(finHumName, 0.0);
                    } else {
                        Double tempAvg = tempHumMap.get(tempName);
                        Double humAvg = tempHumMap.get(humName);
                        finTempHumMap.put(finTempName, tempAvg);
                        finTempHumMap.put(finHumName, humAvg);
                    }
                    treeMap.put(timeStr, finTempHumMap);
                });
            }

        } catch (Exception e) {
            log.error("方法countByEs查询ES异常", e);
        }

        return treeMap;
    }

关键点注意:

  1. QueryBuilders.rangeQuery传入的时间精度,需要yyyy-MM-dd HH:mm:ss,否则会报错在这里插入图片描述

  2. 这里对时间格式化分组,使用的是DateHistogramAggregationBuilder
    这个在EsApi7+就废弃了calendarInterval,替换新的fixedInterval

  3. 分组再聚合,注意嵌套关系,各位自己理解下subAggregation

  4. 最后数据查询出来后,迭代解析,注意理解ParsedDateHistogram取值、parsedDateHistogram.getBuckets()、迭代解析

总结

  • gs一直用老版本的ES6,这次终于被逼的更新了吧,真好。(之前一直建议、希望,都。。。。)
  • 本来很想引入EasyEs用用,但是总有同事不认可,算了
  • 之前也建议给ES装上sql-package插件,让DBeaver可以连接,试过一阵子,新版本又没装,算了
  • 其他就没啥好说的了,唯一就是restHighLevelClient现在在7+也被标记为过时了,下次有机会,这个再改改。
  • 希望能帮到大家,uping!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1547540.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

移动端Web笔记day03

移动 Web 第三题 01-移动 Web 基础 谷歌模拟器 模拟移动设备&#xff0c;方便查看页面效果&#xff0c;移动端的效果是当手机屏幕发生了变化&#xff0c;页面和页面中的元素也要跟着等比例变化。 屏幕分辨率 分类&#xff1a; 硬件分辨路 -> 物理分辨率&#xff1a;硬件…

《机器学习:引领数字化时代的技术革命》

随着科技的不断发展&#xff0c;机器学习作为人工智能的重要支柱之一&#xff0c;正迅速崛起并引领着数字化时代的技术革命。本文将从机器学习的技术进展、技术原理、行业应用案例、面临的挑战与机遇以及未来趋势预测和学习路线等方面展开探讨&#xff0c;为您揭示机器学习的神…

c++的学习之路:3、入门(2)

一、引用 1、引用的概念 引用不是新定义一个变量&#xff0c;而是给已存在变量取了一个别名&#xff0c;编译器不会为引用变量开辟内存空 间&#xff0c;它和它引用的变量共用同一块内存空间。 怎么说呢&#xff0c;简单点理解就是你的小名&#xff0c;家里人叫你小名&#…

配置DNS后,SSH登录变慢

问题描述 最近使用ssh时出现登录非常缓慢的状态&#xff0c;登录一般需要花费20秒以上才能正常登陆&#xff0c; Connecting to *****:22... Connection established. To escape to local shell, press CtrlAlt].等待十秒钟后&#xff0c;提示登录成功 Last login: Mon Jun …

k8s系列之十七 Istio中的服务治理

删除前面配置的目的地规则 [rootk8s-master ~]# kubectl delete destinationrule details destinationrule.networking.istio.io "details" deleted [rootk8s-master ~]# kubectl delete destinationrule productpage destinationrule.networking.istio.io "pr…

00000基础搭建vue+flask前后端分离项目

我完全是参考的这个vue3flask前后端分离环境速建_flask vue3-CSDN博客 安装了node_js&#xff08;添加了环境变量&#xff09; 环境变量 把原来的镜像源换成了淘宝镜像源 npm config set registry https://registry.npmmirror.com/ 查看版本证明安装成功 npm - v 安装npm i…

caffe | 使用caffe SSD制作VOC07112 lmdb数据集

git clone -b ssd https://github.com/weiliu89/caffe.git caffe_ssdcd caffe_ssdcp caffe/Makefile.config caffe_ssd/# 把 cuda 和 cudnn 关了&#xff0c;用 cpu 版本的就好了 make -j32 make pycaffemake test -j8 make runtest -j8 vim ~/.bashrc# 加入 export LD_LIBRAR…

Day49:WEB攻防-文件上传存储安全OSS对象分站解析安全解码还原目录执行

目录 文件-解析方案-目录执行权限&解码还原 目录执行权限 解码还原 文件-存储方案-分站存储&OSS对象 分站存储 OSS对象存储 知识点&#xff1a; 1、文件上传-安全解析方案-目录权限&解码还原 2、文件上传-安全存储方案-分站存储&OSS对象 文件-解析方案-目…

数据分析之Power Pivot多表数据建模

Power Pivot 介绍&#xff1a; 可以融合多个数据表可夺标关联搭建复杂数据模型一次建模&#xff0c;一键刷新DAX函数编写公式计算可将数据模型轻松移植到PBI和SQL中 1.将数据导入power pivot(power pivot------添加到数据模型) 2.导入其他表格&#xff0c;并有一定的关联 导入…

Cesium for UE-03-添加数据集(倾斜摄影)

继续上一章节&#xff0c;在创建了项目和关卡的基础上添加倾斜摄影 重新打开上次的项目和关卡 如果你已经关掉了上次的项目和关卡&#xff0c;可以重新打开ue&#xff0c;然后选择 选择 文件-打开关卡&#xff0c;在弹出的窗口中&#xff0c;选择 上次的关卡&#xff0c;并点击…

web学习笔记(四十五)Node.js

目录 1. Node.js 1.1 什么是Node.js 1.2 为什么要学node.js 1.3 node.js的使用场景 1.4 Node.js 环境的安装 1.5 如何查看自己安装的node.js的版本 1.6 常用终端命令 2. fs 文件系统模块 2.1引入fs核心模块 2.2 读取指定文件的内容 2.3 向文件写入指定内容 2.4 创…

【双指针】Leetcode 有效三角形的个数

题目解析 611. 有效三角形的个数 算法讲解 回顾知识&#xff1a;任意两数之和大于第三数就可以构成三角形 算法 1&#xff1a;暴力枚举 int triangleNumber(vector<int>& nums) {// 1. 排序sort(nums.begin(), nums.end());int n nums.size(), ret 0;// 2. 从…

基于ACO蚁群优化的UAV最优巡检路线规划算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 4.1 蚂蚁移动和信息素更新 4.2 整体优化过程 5.完整程序 1.程序功能描述 基于ACO蚁群优化法的UAV最优巡检路线规划。蚁群优化算法源于对自然界蚂蚁寻找食物路径行为的模拟。在无人机巡检路…

Redis入门三(主从复制、Redis哨兵、Redis集群、缓存更新策略、缓存穿透、缓存击穿、缓存雪崩)

文章目录 一、主从复制1.单例redis存在的问题2.主从复制是什么&#xff1f;3.主从复制的原理4.主从搭建1&#xff09;准备工作2&#xff09;方式一3&#xff09;方式二 5.python中操作1&#xff09;原生操作2&#xff09;Django的缓存操作 二、Redis哨兵&#xff08;Redis-Sent…

SQL109 纠错4(组合查询,order by..)

SELECT cust_name, cust_contact, cust_email FROM Customers WHERE cust_state MI UNION SELECT cust_name, cust_contact, cust_email FROM Customers WHERE cust_state IL ORDER BY cust_name;order by子句&#xff0c;必须位于最后一条select语句之后

【C语言】C语言运算符优先级详解

文章目录 &#x1f4dd;前言&#x1f309;运算符优先级简述 &#x1f320;逻辑与和逻辑或&#x1f309;赋值和逗号运算符 &#x1f320;位运算&#x1f309;条件表达式&#x1f309;位运算与算术运算结合&#x1f309;混合使用条件表达式和赋值运算符&#x1f309; 逗号运算符的…

图像处理与视觉感知---期末复习重点(4)

文章目录 一、图像复原与图像增强1.1 概述1.2 异同点 二、图像复原/退化模型2.1 模型图简介2.2 线性复原法 三、彩色基础四、彩色模型五、彩色图像处理 一、图像复原与图像增强 1.1 概述 1. 图像增强技术一般要利用人的视觉系统特性&#xff0c;目的是取得较好的视觉效果&…

DMA知识

提示&#xff1a;文章 文章目录 前言一、背景二、 2.1 2.2 总结 前言 前期疑问&#xff1a; 本文目标&#xff1a; 一、背景 2024年3月26日23:32:43 今天看了DMA存储器到存储器的DMA传输和存储器到外设的DMA实验&#xff0c;在keil仿真可以看到效果。还没有在protues和开发…

雷卯推荐多种系列汽车级TVS供您选择

1. 车规级TVS的应用 2.车规级TVS系列表格如下 3.方案推荐 12V汽车电源浪涌保护方案 方案优点&#xff1a;用于满足前装汽车的ISO7637-2 5A5BA测试&#xff0c;可采用单独大功率的TVS或PTCTVS的组合方案&#xff0c;满足ISO10605-2&#xff0c; 等级4&#xff0c;接触放电15K…

使用ai智能写作场景之gpt整理资料,如何ai智能写作整理资料

Ai智能写作助手&#xff1a;Ai智能整理资料小助手 Ai智能整理资料小助手可试用3天&#xff01; 通俗的解释一下怎么用ChatGPT来进行资料整理&#xff1a; 搜寻并获取指定数量的特定领域文章&#xff1a; 想像你在和我说话一样&#xff0c;告诉我你想要多少篇关于某个话题的文…