InfluxDB 笔记

news2024/9/20 22:39:45

概念

在这里插入图片描述

Measurement

类似于表名。

A measurement acts as a container for tags, fields, and timestamps.

Tag

补充描述数据的信息,如示例中的locationscientist描述了该数据的采集地和采集人。这两个称为Tag Key,具体的值则称为Tag Value,文本类型。

Field

用于表示实际的数据,包括Field KeyField Value,前者描述数据的含义,后者记录具体的值,通常为数字。

Series

Series Key是共享同一 Measurement、Tag set 和 Field Key 的点的集合。例如:
在这里插入图片描述
Series包括给定 Series Key 的时间戳和 Field Value。从示例数据中,这里有一个 Series Key 和相应的 Series:

# series key
census,location=klamath,scientist=anderson bees

# series
2019-08-18T00:00:00Z 23
2019-08-18T00:06:00Z 28 

Point

一个数据点包括 Series Key、Field Value 和时间戳。例如,样本数据中的单个点如下所示:

2019-08-18T00:00:00Z census ants 30 portland mullen

Write Path

我们从数据的写入路径来一探 InfluxDB 的结构。

写入请求

首先我们看看数据写入请求是如何封装的:

// WritePointsRequest represents a request to write point data to the cluster.
type WritePointsRequest struct {
	Database        string
	RetentionPolicy string
	Points          []models.Point
}

Database 指明了相应的数据库,RetentionPolicy 指明了保留策略(可以理解为数据的有效期,详情可以看官方文档的解释,这里略),Points 则表示要请求写入的数据点数组,我们来看一下它的结构:

// point is the default implementation of Point.
type point struct {
	time time.Time

	// text encoding of measurement and tags
	// key must always be stored sorted by tags, if the original line was not sorted,
	// we need to resort it
	key []byte

	// text encoding of field data
	fields []byte

	// text encoding of timestamp
	ts []byte

	// cached version of parsed fields from data
	cachedFields map[string]interface{}

	// cached version of parsed name from key
	cachedName string

	// cached version of parsed tags
	cachedTags Tags

	it fieldIterator
}

里面出现了很多第一节中介绍过的概念。唯一有所出入的是 fields,这里并没有向第一节里说的那样一个点一个 field tag,而是都集中在一个 point 对象里。

那么这个 request 由谁来处理呢?系统会构造一个 PointsWriter 对象来负责写入,这个类型本身的结构并不重要,重点是它处理如何写数据:

// MapShards maps the points contained in wp to a ShardMapping.  If a point
// maps to a shard group or shard that does not currently exist, it will be
// created before returning the mapping.
func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {
	. . .
	for _, p := range wp.Points {
		// Either the point is outside the scope of the RP, or we already have
		// a suitable shard group for the point.
		if p.Time().Before(min) || list.Covers(p.Time()) {
			continue
		}

		// No shard groups overlap with the point's time, so we will create
		// a new shard group for this point.
		sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, p.Time())
		if err != nil {
			return nil, err
		}

		if sg == nil {
			return nil, errors.New("nil shard group")
		}
		list.Add(*sg)
	}

	mapping := NewShardMapping(len(wp.Points))
	for _, p := range wp.Points {
		sg := list.ShardGroupAt(p.Time())
		if sg == nil {
			// We didn't create a shard group because the point was outside the
			// scope of the RP.
			mapping.Dropped = append(mapping.Dropped, p)
			continue
		}

		sh := sg.ShardFor(p)      // 获取该点在该Group中的Shard
		mapping.MapPoint(&sh, p)  // 点映射到对应的Shard
	}

	return mapping, nil
}

这里省略了一部分不重要的代码。这个函数将每个 Point 映射到相应的 Shard 中,遍历每一个点。如果当前点的时间戳小于保留策略范围的最小值(即当前点已过期)或已存在覆盖该时间范围的 Shard Group,那么就 continue,否则创建新的 ShardGroup,保存到 list 中。

创建完需要的 ShardGroup 后,重新遍历所有点,获取每个点对应的 ShardGroup 和具体的 Shard,将该点映射到该 Shard。

我们再来看看MapShards是在什么地方使用的:

// WritePoints writes the data to the underlying storage. consistencyLevel and user are only used for clustered scenarios
func (w *PointsWriter) WritePoints(
	ctx context.Context,
	database, retentionPolicy string,
	consistencyLevel models.ConsistencyLevel,
	user meta.User,
	points []models.Point,
) error {
	return w.WritePointsPrivileged(ctx, database, retentionPolicy, consistencyLevel, points)
}

// WritePointsPrivileged writes the data to the underlying storage, consistencyLevel is only used for clustered scenarios
func (w *PointsWriter) WritePointsPrivileged(
	ctx context.Context,
	database, retentionPolicy string,
	consistencyLevel models.ConsistencyLevel,
	points []models.Point,
) error {
	. . .
	shardMappings, err := w.MapShards(&WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points})
	if err != nil {
		return err
	}

	// Write each shard in it's own goroutine and return as soon as one fails.
	ch := make(chan error, len(shardMappings.Points))
	for shardID, points := range shardMappings.Points {
		go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {
			err := w.writeToShard(ctx, shard, database, retentionPolicy, points)
			if err == nil {
				w.stats.pointsWriteOk.Observe(float64(len(points)))
			} else {
				w.stats.pointsWriteErr.Observe(float64(len(points)))
			}
			if err == tsdb.ErrShardDeletion {
				err = tsdb.PartialWriteError{Reason: fmt.Sprintf("shard %d is pending deletion", shard.ID), Dropped: len(points)}
			}
			ch <- err
		}(shardMappings.Shards[shardID], database, retentionPolicy, points)
	}
	. . .
	return err
}

上面两个函数用于写数据,第一个函数实际上是对第二个函数的包装。可以看到,该函数先获取所有点的 ShardMapping,然后为每个 Shard 开启一个协程,Shard 间并行写入。

再来看看中间的writeToShard

// writeToShards writes points to a shard.
func (w *PointsWriter) writeToShard(ctx context.Context, shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) error {
	err := w.TSDBStore.WriteToShard(ctx, shard.ID, points)
	if err == nil {
		return nil
	}

	// Except tsdb.ErrShardNotFound no error can be handled here
	if err != tsdb.ErrShardNotFound {
		return err
	}

	// If we've written to shard that should exist on the current node, but the store has
	// not actually created this shard, tell it to create it and retry the write
	if err = w.TSDBStore.CreateShard(ctx, database, retentionPolicy, shard.ID, true); err != nil {
		w.Logger.Warn("Write failed creating shard", zap.Uint64("shard", shard.ID), zap.Error(err))
		return err
	}

	if err = w.TSDBStore.WriteToShard(ctx, shard.ID, points); err != nil {
		w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err))
		return err
	}

	return nil
}

这个函数简单来说,就是尝试往 Shard 中写入,如果当前节点相应的 Shard 还未分配,那么就先分配,再重新插入。继续前进:

// WriteToShard writes a list of points to a shard identified by its ID.
func (s *Store) WriteToShard(ctx context.Context, shardID uint64, points []models.Point) error {
	. . .
	sh := s.shards[shardID]
	. . .
	return sh.WritePoints(ctx, points)
}

这里的大部分代码都和系统健壮性相关,因此省略。在继续看WritePoints前,我们先看看Shard的结构:

// Shard represents a self-contained time series database. An inverted index of
// the measurement and tag data is kept along with the raw time series data.
// Data can be split across many shards. The query engine in TSDB is responsible
// for combining the output of many shards into a single query result.
type Shard struct {
	path    string
	walPath string
	id      uint64

	database        string
	retentionPolicy string

	sfile   *SeriesFile
	options EngineOptions

	mu      sync.RWMutex
	_engine Engine
	index   Index
	enabled bool

	stats *ShardMetrics

	baseLogger *zap.Logger
	logger     *zap.Logger

	metricUpdater *ticker

	EnableOnOpen bool

	// CompactionDisabled specifies the shard should not schedule compactions.
	// This option is intended for offline tooling.
	CompactionDisabled bool
}

这里比较重要的是engineindex,前者是管理数据的核心,后者是索引。
继续看在 Shard 上写入数据:

// WritePoints will write the raw data points and any new metadata to the index in the shard.
func (s *Shard) WritePoints(ctx context.Context, points []models.Point) (rErr error) {
	. . .
	engine, err := s.engineNoLock()
	. . .
	points, fieldsToCreate, err := s.validateSeriesAndFields(points)
	. . .
	s.stats.fieldsCreated.Add(float64(len(fieldsToCreate)))

	// add any new fields and keep track of what needs to be saved
	if err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil {
		return err
	}

	// Write to the engine.
	if err := engine.WritePoints(ctx, points); err != nil {
		return fmt.Errorf("engine: %s", err)
	}

	return writeError
}

中间的validateSeriesAndFields较长,这里就不贴源码了,简单讲一下该函数的功能。它首先遍历所有点,检查它们的 Tag 是否包含 “time”字段、Measurement 和所有 Tag 的字符是否为合理的 unicode,保留所有合法的点,并创建丢失的 Field等,总体上是在做完整性检查的工作,不展开讲,我们直接看engine.writePoints

// WritePoints writes metadata and point data into the engine.
// It returns an error if new points are added to an existing key.
func (e *Engine) WritePoints(ctx context.Context, points []models.Point) error {
	values := make(map[string][]Value, len(points))
	var (
		keyBuf    []byte
		baseLen   int
		seriesErr error
	)

	for _, p := range points {
		keyBuf = append(keyBuf[:0], p.Key()...)
		keyBuf = append(keyBuf, keyFieldSeparator...)  //分隔符:#~#
		baseLen = len(keyBuf)
		iter := p.FieldIterator()
		t := p.Time().UnixNano()
		for iter.Next() {
			// Skip fields name "time", they are illegal
			if bytes.Equal(iter.FieldKey(), timeBytes) {
				continue
			}
			// 构造我们第一节讲过的Series Key,注意一个Field Key对应一个Series Key
			keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)
			// 检查当前插入数据的类型是否与同一Series的不一致
			if e.seriesTypeMap != nil {
				// Fast-path check to see if the field for the series already exists.
				if v, ok := e.seriesTypeMap.Get(keyBuf); !ok {
					if typ, err := e.Type(keyBuf); err != nil {
						// Field type is unknown, we can try to add it.
					} else if typ != iter.Type() {
						// Existing type is different from what was passed in, we need to drop
						// this write and refresh the series type map.
						seriesErr = tsdb.ErrFieldTypeConflict
						e.seriesTypeMap.Insert(keyBuf, int(typ))
						continue
					}

					// Doesn't exist, so try to insert
					vv, ok := e.seriesTypeMap.Insert(keyBuf, int(iter.Type()))

					// We didn't insert and the type that exists isn't what we tried to insert, so
					// we have a conflict and must drop this field/series.
					if !ok || vv != int(iter.Type()) {
						seriesErr = tsdb.ErrFieldTypeConflict
						continue
					}
				} else if v != int(iter.Type()) {
					// The series already exists, but with a different type.  This is also a type conflict
					// and we need to drop this field/series.
					seriesErr = tsdb.ErrFieldTypeConflict
					continue
				}
			}
			var v Value
			// 此处省略的代码为根据具体类型构造v
			// 将v追加到对应的Series中
			values[string(keyBuf)] = append(values[string(keyBuf)], v)
		}
	}

	e.mu.RLock()
	defer e.mu.RUnlock()

	// first try to write to the cache
	if err := e.Cache.WriteMulti(values); err != nil {
	L	return err
	}

	if e.WALEnabled {
		if _, err := e.WAL.WriteMulti(ctx, values); err != nil {
			return err
		}
	}
	return seriesErr
}

简单概括下,就是验证插入数据的类型是否和它所属Series一致,然后再插入。最后将所有Series写入Cache和WAL中。注意这里先写Cache,其他说先写WAL的博客,可以肯定是抄的,没有自己看过源码。

写入过程到这里就差不多结束了,接下来详细介绍一下存储引擎中的重要数据结构。

数据结构

我们知道,InfluxDB 采用了类似 LSM-Tree 的结构,在内存和磁盘上都有相应的组件。

Cache

这个相当于 memtable,是 InfluxDB 用于内存中的数据结构。Cache 以 TSM 文件的形式定期写入磁盘。

type Cache struct {
	// Due to a bug in atomic  size needs to be the first word in the struct, as
	// that's the only place where you're guaranteed to be 64-bit aligned on a
	// 32 bit system. See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
	size         uint64
	snapshotSize uint64

	mu      sync.RWMutex
	store   storer  // 实际存放数据的结构
	maxSize uint64

	// snapshots are the cache objects that are currently being written to tsm files
	// they're kept in memory while flushing so they can be queried along with the cache.
	// they are read only and should never be modified
	snapshot     *Cache
	snapshotting bool

	// This number is the number of pending or failed WriteSnaphot attempts since the last successful one.
	snapshotAttempts int

	stats         *cacheMetrics
	lastWriteTime time.Time

	// A one time synchronization used to initial the cache with a store.  Since the store can allocate a
	// large amount memory across shards, we lazily create it.
	initialize       atomic.Value
	initializedCount uint32
}

初始化一个 Cache:

// init initializes the cache and allocates the underlying store.  Once initialized,
// the store re-used until Freed.
func (c *Cache) init() {
	if !atomic.CompareAndSwapUint32(&c.initializedCount, 0, 1) {
		return
	}

	c.mu.Lock()
	c.store, _ = newring(ringShards)
	c.mu.Unlock()
}

newring 返回一个 ring 对象,该对象的结构为哈希环,具体细节先不展开,总之写入到 Cache 中的数据实际上就保存到了这里。

Compactor

执行LSM-Tree中的合并操作。

// Compactor merges multiple TSM files into new files or
// writes a Cache into 1 or more TSM files.
type Compactor struct {
	Dir  string
	Size int

	FileStore interface {
		NextGeneration() int
		TSMReader(path string) *TSMReader
	}

	// RateLimit is the limit for disk writes for all concurrent compactions.
	RateLimit limiter.Rate

	formatFileName FormatFileNameFunc
	parseFileName  ParseFileNameFunc

	mu                 sync.RWMutex
	snapshotsEnabled   bool
	compactionsEnabled bool

	// lastSnapshotDuration is the amount of time the last snapshot took to complete.
	lastSnapshotDuration time.Duration

	snapshotLatencies *latencies

	// The channel to signal that any in progress snapshots should be aborted.
	snapshotsInterrupt chan struct{}
	// The channel to signal that any in progress level compactions should be aborted.
	compactionsInterrupt chan struct{}

	files map[string]struct{}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/172430.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

小程序基础篇-视图与逻辑

本次学习目标&#xff1a;实现页面间的导航跳转实现下拉刷新实现上拉加载更多知道小程序常用的生命周期函数1.页面导航页面导航指的是页面之前相互的跳转浏览器之间的页面导航有两种&#xff1a;<a>;location.href小程序之间的页面导航有两种&#xff1a;声明式导航&…

告警与恢复告警原理及实现

一、 背景自“双碳”政策提出以来&#xff0c;KaiwuDB 聚焦“数字能源”领域&#xff0c;为用户打造数字能源管理平台&#xff0c;旨在提升综合能源和碳资产管理能力。数字能源管理平台是以 KaiwuDB 为核心建设的云-边-端一体化数据服务平台&#xff0c;致力于为 IoT、工业互联…

前言技术--swagger

目录一、前后端分离的特点二、在没有swagger之前三、swagger的作用四、swagger的优点五、集成swagger5.1 新建springboot项目5.2 集成swagger5.3 开发一个controller用于测试5.4 启动服务&#xff0c;验证集成效果六、swagger常用注解七、swagger使用综合案例一、前后端分离的特…

Python的PyQt框架的使用-常用控件篇

Python的PyQt框架的使用-常用控件篇一、前言二 、QLineEdit 文本框三 、QPushButton按钮控件四、QRadioButton 单选按钮一、前言 个人主页: ζ小菜鸡大家好我是ζ小菜鸡&#xff0c;小伙伴们&#xff0c;让我们一起来学习Python的PyQt框架的常用控件。如果文章对你有帮助、欢迎…

第一个程序——构建一个ServerUI

简介 本次程序设计均使用python实现&#xff0c;使用sql server对聊天室用户的数据进行存储。通过python socket套接字编程&#xff0c;实现了在线聊天室的功能&#xff0c;并使用python tkinter进行UI界面的设计。 思路 由计算机网络的基础知识易知&#xff0c;两个主机之间…

基于LSTMGRU的微博突发事件分析与谣言检测(附完整的代码+报告)

问题描述及方法基础 本章主要对课题研究所涉及的机器学习、自然语言处理的原理和方法进行介绍,主要分为四部分,第一部分是将本课谣言检测任务的符号化描述;第二部分是微博数据的预处理,包括语言模型、文本分词等技术;第三部分与第四部分分别是本文搭建的微博谣言检测模型所…

py字符串的格式化笔记

print():和cjava差不多&#xff0c;只是逗号变了&#xff0c;其中 %s 就是模板中的占位符&#xff0c;表示这个位置先占着&#xff0c;待会儿要有数据填入到这里。然后再提供一个元组&#xff0c;里面依次存放需要填入到 %s 位置 的数据。这里是使用变量 (salary,tax,aftertax)…

CSS.前端基础.html

什么是 CSS? CSS 指层叠样式表 (Cascading Style Sheets)样式定义如何显示 HTML 元素样式通常存储在样式表中把样式添加到 HTML 4.0 中&#xff0c;是为了解决内容与表现分离的问题外部样式表可以极大提高工作效率外部样式表通常存储在 CSS 文件中多个样式定义可层叠为一个示…

丝绸之路也可以是科技传播之路

唐宋元海外贸易 618年-1368年 王孝通 生卒年代不详 孙思邈541年—682年 一行 公元683年-公元727年 李淳风 602年&#xff0d;670年 沈括 1031年&#xff0d;1095年 郭守敬 1231年&#xff0d;1316年 扎马鲁丁生卒年代不详 阿拉伯帝国 632年-1258年 阿尔花拉子模 780年&#xf…

【深度强化学习】【论文阅读】【双臂模仿】Deep Imitation Learning for BimanualRobotic Manipulation

title: Deep Imitation Learning for BimanualRobotic Manipulation date: 2023-01-15T20:54:56Z lastmod: 2023-01-19T18:31:57Z Deep Imitation Learning for BimanualRobotic Manipulation 1 Introduction 文中使用的模型是一个深度的、分层的、模块化的架构。与 baselin…

微服务负载均衡器Ribbon学习笔记

目录 1.什么是Ribbon 1.1 客户端的负载均衡 1.2 服务端的负载均衡 1.3 常见负载均衡算法 2. Nacos使用Ribbon 3. Ribbon负载均衡策略 4.修改默认负载均衡策略 方式1&#xff1a;通过自定义配置类来实现 方式2&#xff1a;通过修改配置文件实现&#xff08;推荐&#xf…

9、jQuery

jQuery库&#xff1a;里面存在大量的JavaScript函数 官网&#xff1a;https://jquery.com/ 9.1 获取jQuery jQuery引入 cdn 引入 <script src"https://cdn.bootcss.com/jquery/3.4.1/core.js"></script>本地引入 <script src"lib/jquery-3.6.3.…

C语言练习——3

C语言练习——3一、 操作符练习1.1交换两个变量&#xff08;不创建临时变量&#xff09;1.2 打印整数二进制的奇数位和偶数位1.3[二进制中1的个数](https://www.nowcoder.com/questionTerminal/8ee967e43c2c4ec193b040ea7fbb10b8)1.4[两个整数二进制位不同个数](https://www.no…

【 java 反射下篇】java反射机制不难吧?来看看这篇

&#x1f4cb; 个人简介 &#x1f496; 作者简介&#xff1a;大家好&#xff0c;我是阿牛&#xff0c;全栈领域优质创作者。&#x1f61c;&#x1f4dd; 个人主页&#xff1a;馆主阿牛&#x1f525;&#x1f389; 支持我&#xff1a;点赞&#x1f44d;收藏⭐️留言&#x1f4d…

Element Plus 跟踪表格数据总数,包括查询、筛选等操作

前言 Element Plus的表格组件提供了筛选功能 前端项目中&#xff0c;如果表格使用的是后端分页&#xff0c;使用表格插件及分页器插件就可以了。这种情况下&#xff0c;前端的表格筛选被后端的分页条件查询取代了 另一种情况&#xff1a;不分页&#xff0c;直接查询所有数据…

Win7快速部署weblogic 12c

0x00 前言 需要一个漏洞中等数量的版本。 0x01 安装环境 版本操作系统Windows 7 64bitJDKjdk-8u101-windows-x64weblogic12.2.1.3 0x02 下载地址 JDK 下载地址 https://www.oracle.com/java/technologies/javase/javase8-archive-downloads.html jdk 1.8版本&#xff0c;文…

华为机试题:HJ16 购物单(python)

文章目录知识点详解1、input()&#xff1a;获取控制台&#xff08;任意形式&#xff09;的输入。输出均为字符串类型。2、print() &#xff1a;打印输出。3、strip()&#xff1a;删除字符串&#xff08;开头 / 结尾&#xff09;指定字符&#xff08;默认空格&#xff09;或字符…

两小时上手ActiveMQ

一、消息中间件概述 1.1 消息中间件产生的背景 在客户端与服务器进行通讯时.客户端调用后&#xff0c;必须等待服务对象完成处理返回结果才能继续执行。 客户与服务器对象的生命周期紧密耦合,客户进程和服务对象进程都都必须正常运行;如果由于服务对象崩溃或者网络故障导致用…

ceres学习笔记(四)

前言&#xff1a; 学习了pose_graph_2d部分&#xff0c;因为先学习了3维的pose_graph_3d部分&#xff0c;所以这个就比较容易。简单来说就是se2和se3的区别。整个的运行逻辑和3维部分的pose_graph_3d部分是一样的&#xff0c;概括为&#xff1a; 1.设置好两个type&#xff0c…

7、CenOS6安装Nginx

Nginx的安装与启动 什么是Nginx Nginx 是一款高性能的 http 服务器/反向代理服务器及电子邮件&#xff08;IMAP/POP3&#xff09;代理服务器。由俄罗斯的程序设计师伊戈尔西索夫&#xff08;Igor Sysoev&#xff09;所开发&#xff0c;官方测试 nginx 能够支支撑 5 万并发链接…