SpringBoot整合InfluxDB(实战)

news2024/12/29 9:00:50

一、简单介绍InfluxDB是什么?

InfluxDB是一个由InfluxData开发的开源时序型数据。它由Go写成,着力于高性能地查询与存储时序型数据。InfluxDB被广泛应用于存储系统的监控数据,IoT行业的实时数据等场景。

1、主要特点

时间序列数据存储
专门设计用于高效处理按时间顺序产生的数据,如传感器数据、日志数据、指标数据等。时间戳是 InfluxDB 中数据的关键组成部分,确保数据的时间顺序性。
可以存储大量的时间序列数据,并提供高效的查询和存储机制,以满足对实时数据和历史数据的处理需求。

高性能
针对时间序列数据的特点进行了优化,能够快速写入和查询大规模的数据。它采用了高效的数据存储结构和索引机制,使得数据的读写操作非常迅速。
支持高并发的写入和查询,可以满足大规模数据采集和实时监控系统的需求。

灵活的数据模型
InfluxDB 使用一种灵活的数据模型,包括测量(measurement)、标签(tag)和字段(field)。
测量类似于传统数据库中的表,用于存储具有相同数据结构的时间序列数据。标签用于对数据进行分类和索引,方便快速查询。字段则存储实际的测量值,可以是数值、字符串或布尔值等。

强大的查询语言
InfluxDB 提供了一种功能强大的查询语言 InfluxQL,用于查询和分析时间序列数据。
InfluxQL 支持各种聚合函数、时间范围查询、过滤条件等,可以方便地进行数据分析和可视化。它还支持连续查询(Continuous Queries)和存储策略(Retention Policies),可以自动对数据进行聚合和清理,以提高查询性能和节省存储空间。

2、应用场景

物联网(IoT)
在物联网应用中,大量的传感器设备会不断产生时间序列数据,如温度、湿度、压力等。InfluxDB 可以高效地存储和查询这些数据,为物联网数据分析和监控提供支持。
可以实时监测设备状态、分析设备性能、预测设备故障等。

系统监控
用于监控服务器、网络设备、应用程序等的性能指标。例如,可以收集 CPU 使用率、内存使用率、网络流量等数据,并使用 InfluxDB 进行存储和分析。
通过实时监控和历史数据分析,可以及时发现系统性能问题,进行故障排除和优化。

金融交易数据分析
在金融领域,时间序列数据非常重要,如股票价格、汇率、交易量等。InfluxDB 可以用于存储和分析这些金融数据,为交易决策和风险评估提供支持。
可以进行实时行情分析、历史数据回溯、交易策略评估等。

日志分析
可以将日志数据以时间序列的形式存储在 InfluxDB 中,方便进行日志分析和故障排查。
通过查询特定时间范围内的日志数据,可以快速定位问题发生的时间和原因。

总之,InfluxDB 是一个功能强大的时间序列数据库,适用于各种需要处理时间序列数据的场景。它的高性能、灵活的数据模型和强大的查询语言使得它成为了许多企业和开发者的首选数据库之一。

想要更深入了解,请:点击这里

二、使用步骤

1、集成原生的InfluxDB

依赖:

<!-- InfluxDB 原生依赖 -->
<dependency>
   <groupId>org.influxdb</groupId>
   <artifactId>influxdb-java</artifactId>
   <version>2.22</version>
</dependency>

配置:

#---------
# Influxdb
#---------
influxdb:
    url: http://127.0.0.1:8086
    username: admin
    password: admin
    database: test
    retention: autogen  //数据保留策略

InfluxDB数据库操作类:

package com.geesun.influxdb;

import cn.hutool.core.collection.CollUtil;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDB.ConsistencyLevel;
import org.influxdb.dto.*;
import org.influxdb.dto.Point.Builder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import plus.ojbk.influxdb.autoconfigure.properties.InfluxdbProperties;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * InfluxDB数据库操作类
 */
@Service
public class InfluxDbCommand {

    @Resource
    private InfluxDB influxDB;
    @Resource
    private InfluxdbProperties config;
    @Value("${influxdb.retention}")
    private String retentionPolicy;

    /**
     * 测试连接是否正常
     *
     * @return true 正常
     */
    public boolean ping() {
        boolean isConnected = false;
        Pong pong;
        try {
            pong = influxDB.ping();
            if (pong != null) {
                isConnected = true;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return isConnected;
    }

    /**
     * 切换数据库
     */
    public void setDB(String dbName) {
        influxDB.setDatabase(dbName);
    }

    /**
     * 关闭数据库
     */
    public void close() {
        influxDB.close();
    }

    /**
     * 创建自定义保留策略
     *
     * @param policyName  策略名
     * @param days        保存天数
     * @param replication 保存副本数量
     * @param isDefault   是否设为默认保留策略
     */
    public void createRetentionPolicy(String policyName, int days, int replication, Boolean isDefault) {
        String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %sd REPLICATION %s ", policyName,
                config.getDatabase(), days, replication);
        if (Boolean.TRUE.equals(isDefault)) {
            sql = sql + " DEFAULT";
        }
        query(sql);
    }

    /**
     * 切换策略
     *
     * @param policyName 策略名
     */
    public void updRetentionPolicy(String policyName) {
        String sql = "ALTER RETENTION POLICY \"" + policyName + "\" ON \"" + config.getDatabase() + "\" DEFAULT";
        query(sql);
        this.retentionPolicy = policyName;
    }

    /**
     * 创建默认的保留策略
     * <p>
     * 策略名:hour,保存天数:30天,保存副本数量:1,设为默认保留策略
     */
    public void createDefaultRetentionPolicy() {
        String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT"
                , "hour", config.getDatabase(), "30d", 1);
        this.query(command);
    }

/*********************************增删查**************************************************/
    /**
     * 查询
     *
     * @param command 查询语句
     * @return
     */
    public QueryResult query(String command) {
        return influxDB.query(new Query(command, config.getDatabase()));
    }

    /**
     * 插入
     *
     * @param measurement 表
     * @param tags        标签
     * @param fields      字段
     */
    public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields, long time,
                       TimeUnit timeUnit) {
        Builder builder = Point.measurement(measurement);
        builder.tag(tags);
        builder.fields(fields);
        if (0 != time) {
            builder.time(time, timeUnit);
        }
        influxDB.write(config.getDatabase(), retentionPolicy, builder.build());
    }

    /**
     * 插入
     *
     * @param measurement 表
     * @param tags        标签
     * @param fields      字段
     */
    public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields) {
        insert(measurement, tags, fields, System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * 删除
     *
     * @param command 删除语句
     * @return 返回错误信息
     */
    public String deleteMeasurementData(String command) {
        QueryResult result = influxDB.query(new Query(command, config.getDatabase()));
        return result.getError();
    }

    /**
     * 构建Point
     *
     * @param measurement 表
     * @param time        时间
     * @param timeUnit    时间单位
     * @param tags        tags
     * @param fields
     * @return
     */
    public Point pointBuilder(String measurement, long time, TimeUnit timeUnit, Map<String, String> tags
            , Map<String, Object> fields) {
        return Point.measurement(measurement).time(time, timeUnit).tag(tags).fields(fields).build();
    }

    /**
     * 批量写入测点
     *
     * @param batchPoints
     */
    public void batchInsert(BatchPoints batchPoints) {
        influxDB.write(batchPoints);
    }

    /**
     * 批量写入数据
     *
     * @param database        数据库
     * @param retentionPolicy 保存策略
     * @param consistency     一致性
     * @param records         要保存的数据(调用BatchPoints.lineProtocol()可得到一条record)
     */
    public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency
            , TimeUnit timeUnit, final List<String> records) {
        influxDB.write(database, retentionPolicy, consistency, timeUnit, records);
    }

    /**
     * 查询-把查询出的结果集转换成对应的实体对象,聚合成list
     * @param command : sql语句
     */
    public List<Map<String, Object>> queryWrapper(String command) {
        List<Map<String, Object>> list = new ArrayList<>();
        QueryResult queryResult = influxDB.query(new Query(command));
        List<QueryResult.Result> resultList = queryResult.getResults();
        for (QueryResult.Result result : resultList) {
            List<QueryResult.Series> seriesList = result.getSeries();
            if (CollUtil.isEmpty(seriesList)) {
                return list;
            }
            for (QueryResult.Series series : seriesList) {
                List<String> columns = series.getColumns();
                List<List<Object>> values = series.getValues();
                if (CollUtil.isEmpty(values)) {
                    continue;
                }
                values.forEach(value -> {
                    Map<String, Object> map = new HashMap<>();
                    for (int i = 0; i < columns.size(); i++) {
                        map.put(columns.get(i), value.get(i));
                    }
                    list.add(map);
                });
            }
        }
        return list;
    }

}

2、集成封装的InfluxDBTemplate

依赖:

<dependency>
    <groupId>plus.ojbk</groupId>
    <artifactId>influxdb-spring-boot-starter</artifactId>
    <version>1.0.2</version>
</dependency>

配置:

#---------
# Influxdb
#---------
influxdb:
    url: http://127.0.0.1:8086
    username: admin
    password: admin
    database: test
    retention: autogen  //数据保留策略

实体,对标influxDB的表:

package io.springboot.influxdb.entity;
 
import lombok.Data;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;
import plus.ojbk.influxdb.annotation.Count;
 
import java.math.BigDecimal;
import java.time.LocalDateTime;
 
/**
 * @version 1.0
 * @since 2021/6/17 18:26
 */
@Data
@Measurement(name = "device")
public class Device {
    /**
     * 设备编号
     */
    @Column(name="device_no", tag = true)  //tag 可以理解为influxdb的索引
    private String deviceNo;
    /**
     * 数据值
     */
    @Count("value")
    @Column(name="value")
    private BigDecimal value;
    /**
     * 电压
     */
    @Column(name="voltage")
    private Float voltage;
    /**
     * 状态
     */
    @Column(name="state")
    private Boolean state;
    /**
     * 上报时间
     */
    @Column(name="time")
    private LocalDateTime time;
 
}

测试:

package io.springboot.influxdb;
 
import com.alibaba.fastjson.JSON;
import io.springboot.influxdb.entity.Device;
import org.influxdb.dto.QueryResult;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import plus.ojbk.influxdb.core.Delete;
import plus.ojbk.influxdb.core.InfluxdbTemplate;
import plus.ojbk.influxdb.core.Op;
import plus.ojbk.influxdb.core.Order;
import plus.ojbk.influxdb.core.Query;
import plus.ojbk.influxdb.core.model.DeleteModel;
import plus.ojbk.influxdb.core.model.QueryModel;
 
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
 
@SpringBootTest
class InfluxdbDemoApplicationTests {
 
    @Autowired
    private InfluxdbTemplate influxdbTemplate;
 
    private String measurement = "device";
    
    @Test
    void getCount() {
        QueryModel countModel = new QueryModel();
        ///countModel.setMeasurement(measurement);
        countModel.setMeasurement(InfluxdbUtils.getMeasurement(Device.class));
        countModel.setStart(LocalDateTime.now().plusHours(-2L));
        countModel.setEnd(LocalDateTime.now());
        //countModel.setSelect(Query.count("voltage"));  //只能count field字段
        countModel.setSelect(Query.count(InfluxdbUtils.getCountField(Device.class)));
        countModel.setWhere(Op.where(countModel));
        //获得总条数
        long count = influxdbTemplate.count(Query.build(countModel));
        System.err.println(count);
    }
 
    @Test
    void getData() {
        QueryModel model = new QueryModel();
        model.setCurrent(1L); //当前页
        model.setSize(10L); //每页大小
        //model.setMeasurement(measurement);
        model.setMeasurement(InfluxdbUtils.getMeasurement(Device.class));
        model.setStart(LocalDateTime.now().plusHours(-2L)); //开始时间
        model.setEnd(LocalDateTime.now()); //结束时间
        model.setUseTimeZone(true);  //时区
        model.setOrder(Order.DESC);  //排序
        //where 条件中额外参数可放入model.setMap();
        model.setWhere(Op.where(model)); //理解为where条件
        //分页数据
        List<Device> deviceList = influxdbTemplate.selectList(Query.build(model), Device.class);
        System.err.println(JSON.toJSONString(deviceList));
    }
    
    @Test
    void insert() {
        List<Device> deviceList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Device device = new Device();
            device.setDeviceNo("device-" + i);
            device.setValue(new BigDecimal(12.548));
            device.setState(true);
            device.setVoltage(3.5F);
            deviceList.add(device);
        }
        influxdbTemplate.insert(deviceList);
    }
    
    @Test
    void delete() {
        Map<String, Object> map = new TreeMap<>();
        map.put("device_no", "device-1");
        DeleteModel model = new DeleteModel();
        model.setMap(map);
        //model.setStart(LocalDateTime.now().plusHours(-10L));
        //model.setEnd(LocalDateTime.now());
        model.setMeasurement(measurement);
        model.setWhere(Op.where(model));
        influxdbTemplate.delete(Delete.build(model));
    }
 
    void other(){
        influxdbTemplate.execute("自己写sql");
    }
}

相较于原版,它封装了自有的Util以及Template等,对于原版Point的time列类型问题,它对number和long 型转换成了LocalDateTime类型,并且封装了更多的方法(具体自行拓展)。

注:原生的influxDB和spring自带的可一起使用。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2160384.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网页跨域异常100%解决(谷歌浏览器)

目的&#xff1a; 1.开发过程中&#xff0c;经常出现浏览器提示跨域 2.原因新版本浏览器拦截跨域请求 3.错误关键消息如下&#xff1a; Access-Control-Allow-Origin cess to XMLHttpRequest at http://192.168.1.104:3080/api/Login/Store from origin http://yingyongliere…

sheng的学习笔记-AI-K-摇臂赌博机(K-armed bandit)

AI目录&#xff1a;sheng的学习笔记-AI目录-CSDN博客 强化学习 sheng的学习笔记-AI-强化学习&#xff08;Reinforcement Learning, RL&#xff09;-CSDN博客 基础知识 单步强化学习任务 先考虑比较简单的情形&#xff1a;最大化单步奖赏&#xff0c;即仅考虑一步操作。需注意…

【LVIO-SLAM】 激光slam理论与实践

【LVIO-SLAM】 激光slam理论与实践 1. 激光slam理论与实践1.1 2D激光SLAM1.2 3D激光SLAM 2. 激光雷达运动畸变去除2.1 Lidar数学模型与点云去畸变2.2 运动畸变 3. 激光slam的前端配准3.1 帧间匹配3.2 3.2 ICP (Iterative Closest Point)3.3 PI-ICP (Point-to-Line Iterative Cl…

虚幻引擎游戏保存/加载存档功能

函数名功能Does Save Game Exist检查存档是否存在Load Game from Slot加载存档Save Game to Slot保存存档Delete Game in Slot删除存档 Slot Name 是插槽名字 存档都是通过插槽名字来 读取/加载/检查/删除的 先创建一个SaveGame类 , 这个类里可以存放要保存的数据 , 比如 玩家…

Unity Debug时出现请选择unity实例

Unity Debug时出现请选择unity实例 问题描述 出现请选择unity实例&#xff0c;并且选择框里为空。 出现原因 你打开了两个Unity工程&#xff0c;在附加时&#xff0c;不知道加在哪个Unity工程上。 解决方法 在调试窗口中点击“附加Unity调试程序”&#xff0c;然后在弹出…

Linux文件IO(三)-Linux系统如何管理文件

1.静态文件与 inode 文件在没有被打开的情况下一般都是存放在磁盘中的&#xff0c;譬如电脑硬盘、移动硬盘、U 盘等外部存储设备&#xff0c;文件存放在磁盘文件系统中&#xff0c;并且以一种固定的形式进行存放&#xff0c;我们把他们称为静态文件。 文件储存在硬盘上&#…

[000-002-01].第29节:MySQL执行流程

1、MySQL的查询流程&#xff1a; 客户端请求进入到数据库服务器后&#xff0c;先进行查询缓存&#xff0c;如果命中&#xff0c;那么就返回结果&#xff1b;如果没命中&#xff0c;进入到解析器&#xff0c;进行词法解析和语法解析&#xff0c;生成解析树&#xff1b;然后进入到…

Python在AI中的应用--使用决策树进行文本分类

Python在AI中的应用--使用决策树进行文本分类 文本分类决策树什么是决策树 scikit算法 使用scikit的决策树进行文章分类一个文本分类的Python代码使用的scikit APIs说明装入数据集决策树算法类类构造器&#xff1a; 构造决策树分类器产生输出评估输出结果分类准确度分类文字评估…

langchain介绍以及简单实用

1,介绍 LangChain是一个用于开发由大语言模型支持的应用程序的框架。它提供了大量组件来帮助我们构建LLM支持的应用程序。 其主要是有六大功能组成。 LLMs(大语言模型&#xff08;生成式语言模型&#xff09;)&#xff0c;Prompts(提示词)&#xff0c;Memory(记忆力)&#xff…

电器行业文件加密怎么做?防泄密哪种方法实用?

管控需求 1.电子文档&#xff08;源代码、设计图纸、设计方案等&#xff09;数据不同应用场景下如何有效保护&#xff1b; 2.发给第三方或外部单位的成果数据没有任何限制&#xff0c;对方可拷贝、篡改、截屏、盗用&#xff0c;严重损害单位的利益&#xff1b; 3.对员工出差…

基于单片机巡迹避障智能小车系统

文章目录 前言资料获取设计介绍设计程序具体实现截图设计获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师&#xff0c;一名热衷于单片机技术探索与分享的博主、专注于 精通51/STM32/MSP430/AVR等单片机设计 主要对象是咱们…

[Python学习日记-28] 开发基础练习1——股票查询程序

[Python学习日记-28] 开发基础练习1——股票查询程序 简介 题目及效果参考 源码与解析 简介 该练习使用了列表、字典、字符串等之前学到的数据类型&#xff0c;用于巩固实践之前学习的内容&#xff0c;题目当中使用到的数据均摘录与东方财富网&#xff0c;最好在学习完前面的…

【论文阅读】Grounding Language with Visual Affordances over Unstructured Data

Abstract 最近的研究表明&#xff0c;大型语言模型&#xff08;llms&#xff09;可以应用于将自然语言应用于各种各样的机器人技能。然而&#xff0c;在实践中&#xff0c;学习多任务、语言条件机器人技能通常需要大规模的数据收集和频繁的人为干预来重置环境或帮助纠正当前的…

USB总线同步数据采集卡6路高速模拟量采集带DIO功能USB2884/2885/2886

USB2884/2885/2886 数据采集卡 概述&#xff1a; 系统框图&#xff1a; 规格参数&#xff1a; 板卡外形图&#xff1a; 尺寸图及元器件功能说明&#xff1a;

图像识别OCR(Tess4J)

&#x1f353; 简介&#xff1a;java系列技术分享(&#x1f449;持续更新中…&#x1f525;) &#x1f353; 初衷:一起学习、一起进步、坚持不懈 &#x1f353; 如果文章内容有误与您的想法不一致,欢迎大家在评论区指正&#x1f64f; &#x1f353; 希望这篇文章对你有所帮助,欢…

ZBrush入门使用介绍——16、ArrayMesh

大家好&#xff0c;我是阿赵。   继续介绍ZBrush的用法。这次看看ArrayMesh功能。   简单来说&#xff0c;ArrayMesh功能是可以复制很多个模型&#xff0c;然后根据路径排列。 一、 从阵列预设生成ArrayMesh 先把模型转换成多边形网格体 这时候&#xff0c;ArrayMesh的选…

jupyter安装与使用——Ubuntu服务器

jupyter安装与使用——Ubuntu服务器 一、安装miniconda3/anaconda31. 下载miniconda32. 安装miniconda33. 切换到bin文件夹4. 输入pwd获取路径5. 打开用户环境编辑页面6. 重新加载用户环境变量7. 初始化conda8.验证是否安装成功9.conda配置 二、安装jupyter2.1 conda安装2.2 配…

Java调用数据库 笔记05(查询篇)

一. 数据库&#xff08;通过各种驱动来实现调用&#xff09;&#xff1a; &#xff08;应用程序通过接口控制的各种数据库驱动来调用数据库-->jdbc方法&#xff09; 1.创建Java的普通class类 2.加载驱动 Class.forName("com.mysql.jdbc.Driver"); 3.驱动管理类…

C++_23_STL容器

文章目录 STL容器概念常用容器A string作用构造函数基本赋值操作获取字符串长度存取字符操作拼接操作查找和替换注意:查找是不存在返回-1比较操作截取操作插入与删除string与char * 转换 B vector概述与数组区别迭代器构造函数赋值操作插入与删除取值操作大小相关存储自定义类型…

linux 安装 tomcat9、java环境

一、安装 Java环境 1. 下载文件 https://repo.huaweicloud.com/java/jdk/ 或者网盘&#xff1a;通过网盘分享的文件&#xff1a;jdk-8u192-linux-x64.tar.gz 链接: https://pan.baidu.com/s/1V3pQWzgSLJxdrUdmmKueRA 提取码: qspw 2. 查看Linux系统是否有自带的jdk&#xf…