milvus对象存储和消息中间件的工厂设计模式分析

news2024/11/24 17:38:09

milvus对象存储和消息中间件的工厂设计模式分析

需求

根据参数设置创建mq和storage
mq有kafka,pulsar
storage有local,minio,remote

配置文件

根据配置文件选择初始化mq和存储:

mq:
  type: pulsar
  
common:
  storageType: minio

对于这种类型一个是mq,一个是存储,相比工厂方法设计模式,使用抽象工厂设计模式更合理。

代码框架

在这里插入图片描述

工厂接口

代码路径:internal\util\dependency\factory.go

type Factory interface {
	msgstream.Factory
    // Init()给工厂传递参数。
	Init(p *paramtable.ComponentParam)
	NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}

// pkg\mq\msgstream\msgstream.go
// msgstream.Factory的code
type Factory interface {
	NewMsgStream(ctx context.Context) (MsgStream, error)
	NewTtMsgStream(ctx context.Context) (MsgStream, error)
	NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

dependency.Factory是一个工厂接口,里面包含了mq的工厂接口,和创建持久对象的方法。

这个接口创建消息中间件对象和持久存储对象。

这里为什么不这么写:

type Factory interface {
    Init(p *paramtable.ComponentParam)
	NewMsgStream(ctx context.Context) (MsgStream, error)
	NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}

DefaultFactory

DefaultFactory结构体是dependency.Factory的实现。

// DefaultFactory is a factory that produces instances of storage.ChunkManager and message queue.
// internal\util\dependency\factory.go
type DefaultFactory struct {
	standAlone          bool
	chunkManagerFactory storage.Factory
	msgStreamFactory    msgstream.Factory
}

// storage.Factory
// internal\storage\factory.go
type Factory interface {
	NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error)
}

// msgstream.Factory
// pkg\mq\msgstream\msgstream.go
type Factory interface {
	NewMsgStream(ctx context.Context) (MsgStream, error)
	NewTtMsgStream(ctx context.Context) (MsgStream, error)
	NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

DefaultFactory实现了dependency.Factory接口的Init()函数。

在Init()函数内初始化了chunkManagerFactory、msgStreamFactory。

func (f *DefaultFactory) Init(params *paramtable.ComponentParam) {
	// skip if using default factory
	if f.msgStreamFactory != nil {
		return
	}
    // 初始化chunkManagerFactory
	f.chunkManagerFactory = storage.NewChunkManagerFactoryWithParam(params)

	// initialize mq client or embedded mq.
    // 初始化msgStreamFactory
	if err := f.initMQ(f.standAlone, params); err != nil {
		panic(err)
	}
}

f.chunkManagerFactory:

return &ChunkManagerFactory{
		persistentStorage: persistentStorage,
		config:            c,
	}

f.msgStreamFactory:

func (f *DefaultFactory) initMQ(standalone bool, params *paramtable.ComponentParam) error {
	mqType := mustSelectMQType(standalone, params.MQCfg.Type.GetValue(), mqEnable{params.RocksmqEnable(), params.NatsmqEnable(), params.PulsarEnable(), params.KafkaEnable()})
	log.Info("try to init mq", zap.Bool("standalone", standalone), zap.String("mqType", mqType))

	switch mqType {
	case mqTypeNatsmq:
		f.msgStreamFactory = msgstream.NewNatsmqFactory()
	case mqTypeRocksmq:
		f.msgStreamFactory = smsgstream.NewRocksmqFactory(params.RocksmqCfg.Path.GetValue(), &params.ServiceParam)
	case mqTypePulsar:
		f.msgStreamFactory = msgstream.NewPmsFactory(&params.ServiceParam)
	case mqTypeKafka:
		f.msgStreamFactory = msgstream.NewKmsFactory(&params.ServiceParam)
	}
	if f.msgStreamFactory == nil {
		return errors.New("failed to create MQ: check the milvus log for initialization failures")
	}
	return nil
}

持久存储

storage.Factory是创建持久存储的工厂接口。

storage.ChunkManagerFactory是storage.Factory的实现。

NewPersistentStorageChunkManager()接口的实现:

func (f *DefaultFactory) NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) {
	return f.chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
}

func (f *ChunkManagerFactory) NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error) {
	return f.newChunkManager(ctx, f.persistentStorage)
}

func (f *ChunkManagerFactory) newChunkManager(ctx context.Context, engine string) (ChunkManager, error) {
	switch engine {
	case "local":
		return NewLocalChunkManager(RootPath(f.config.rootPath)), nil
	case "minio":
		return newMinioChunkManagerWithConfig(ctx, f.config)
	case "remote":
		return NewRemoteChunkManager(ctx, f.config)
	default:
		return nil, errors.New("no chunk manager implemented with engine: " + engine)
	}
}

根据传入的engine新建对应的持久存储对象。

LocalChunkManager、MinioChunkManager、RemoteChunkManager。

// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {
	localPath string
}

// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {
	*minio.Client
	bucketName string
	rootPath   string
}

// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {
	client ObjectStorage
	bucketName string
	rootPath   string
}

消息中间件

msgstream.Factory是创建mq的工厂接口。

工厂接口:

// pkg\mq\msgstream\msgstream.go
type Factory interface {
	NewMsgStream(ctx context.Context) (MsgStream, error)
	NewTtMsgStream(ctx context.Context) (MsgStream, error)
	NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

实现有:

CommonFactory、KmsFactory、PmsFactory

// CommonFactory is a Factory for creating message streams with common logic.
// It contains a function field named newer, which is a function that creates
// an mqwrapper.Client when called.
// pkg\mq\msgstream\common_mq_factory.go
type CommonFactory struct {
	Newer             func(context.Context) (mqwrapper.Client, error) // client constructor
	DispatcherFactory ProtoUDFactory
	ReceiveBufSize    int64
	MQBufSize         int64
}

// pkg\mq\msgstream\mq_factory.go
// kafka工厂
type KmsFactory struct {
	dispatcherFactory ProtoUDFactory
	config            *paramtable.KafkaConfig
	ReceiveBufSize    int64
	MQBufSize         int64
}

// PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go)
// pkg\mq\msgstream\mq_factory.go
// pulsar工厂
type PmsFactory struct {
	dispatcherFactory ProtoUDFactory
	// the following members must be public, so that mapstructure.Decode() can access them
	PulsarAddress    string
	PulsarWebAddress string
	ReceiveBufSize   int64
	MQBufSize        int64
	PulsarAuthPlugin string
	PulsarAuthParams string
	PulsarTenant     string
	PulsarNameSpace  string
	RequestTimeout   time.Duration
	metricRegisterer prometheus.Registerer
}

mq产品

mq的产品接口是msgstream.MsgStream

// MsgStream is an interface that can be used to produce and consume message on message queue
type MsgStream interface {
	Close()

	AsProducer(channels []string)
	Produce(*MsgPack) error
	SetRepackFunc(repackFunc RepackFunc)
	GetProduceChannels() []string
	Broadcast(*MsgPack) (map[string][]MessageID, error)

	AsConsumer(ctx context.Context, channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) error
	Chan() <-chan *MsgPack
	Seek(ctx context.Context, offset []*MsgPosition) error

	GetLatestMsgID(channel string) (MessageID, error)
	CheckTopicValid(channel string) error

	EnableProduce(can bool)
}

具体产品实现有:

msgstream.mqMsgStream、msgstream.MqTtMsgStream

type mqMsgStream struct {
	ctx              context.Context
	client           mqwrapper.Client
	producers        map[string]mqwrapper.Producer
	producerChannels []string
	consumers        map[string]mqwrapper.Consumer
	consumerChannels []string

	repackFunc    RepackFunc
	unmarshal     UnmarshalDispatcher
	receiveBuf    chan *MsgPack
	closeRWMutex  *sync.RWMutex
	streamCancel  func()
	bufSize       int64
	producerLock  *sync.RWMutex
	consumerLock  *sync.Mutex
	closed        int32
	onceChan      sync.Once
	enableProduce atomic.Value
}

// MqTtMsgStream is a msgstream that contains timeticks
type MqTtMsgStream struct {
	*mqMsgStream
	chanMsgBuf         map[mqwrapper.Consumer][]TsMsg
	chanMsgPos         map[mqwrapper.Consumer]*msgpb.MsgPosition
	chanStopChan       map[mqwrapper.Consumer]chan bool
	chanTtMsgTime      map[mqwrapper.Consumer]Timestamp
	chanMsgBufMutex    *sync.Mutex
	chanTtMsgTimeMutex *sync.RWMutex
	chanWaitGroup      *sync.WaitGroup
	lastTimeStamp      Timestamp
	syncConsumer       chan int
}

存储产品

存储的产品接口是storag.ChunkManagere

// ChunkManager is to manager chunks.
// Include Read, Write, Remove chunks.
type ChunkManager interface {
	// RootPath returns current root path.
	RootPath() string
	// Path returns path of @filePath.
	Path(ctx context.Context, filePath string) (string, error)
	// Size returns path of @filePath.
	Size(ctx context.Context, filePath string) (int64, error)
	// Write writes @content to @filePath.
	Write(ctx context.Context, filePath string, content []byte) error
	// MultiWrite writes multi @content to @filePath.
	MultiWrite(ctx context.Context, contents map[string][]byte) error
	// Exist returns true if @filePath exists.
	Exist(ctx context.Context, filePath string) (bool, error)
	// Read reads @filePath and returns content.
	Read(ctx context.Context, filePath string) ([]byte, error)
	// Reader return a reader for @filePath
	Reader(ctx context.Context, filePath string) (FileReader, error)
	// MultiRead reads @filePath and returns content.
	MultiRead(ctx context.Context, filePaths []string) ([][]byte, error)
	ListWithPrefix(ctx context.Context, prefix string, recursive bool) ([]string, []time.Time, error)
	// ReadWithPrefix reads files with same @prefix and returns contents.
	ReadWithPrefix(ctx context.Context, prefix string) ([]string, [][]byte, error)
	Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error)
	// ReadAt reads @filePath by offset @off, content stored in @p, return @n as the number of bytes read.
	// if all bytes are read, @err is io.EOF.
	// return other error if read failed.
	ReadAt(ctx context.Context, filePath string, off int64, length int64) (p []byte, err error)
	// Remove delete @filePath.
	Remove(ctx context.Context, filePath string) error
	// MultiRemove delete @filePaths.
	MultiRemove(ctx context.Context, filePaths []string) error
	// RemoveWithPrefix remove files with same @prefix.
	RemoveWithPrefix(ctx context.Context, prefix string) error
}

具体产品实现有:

LocalChunkManager、MinioChunkManager、RemoteChunkManager

// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {
	localPath string
}

// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {
	*minio.Client

	//	ctx        context.Context
	bucketName string
	rootPath   string
}

// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {
	client ObjectStorage

	//	ctx        context.Context
	bucketName string
	rootPath   string
}

总结

从代码框架可以看出每一种mq都有一个工厂,存储只有一个工厂

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1626339.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

抓住四月小尾巴,拿个offer~

首先声明一下~本人是个双非二本大三在校生。 从三月份就开始了苦哈哈的找实习之旅&#xff0c;快三月中旬才敢投大厂&#xff0c;为什么嘞&#xff1f;因为学校要求必须参加完期末考试才能出去实习&#xff08;差不多七月初&#xff09;&#xff0c;因为这个好多公司一听就不安…

算法模版自用(杂)

文章目录 算法库函数next_permutation(start,end) prev_permutation(start,end) (全排列函数)nth_element &#xff08;求第k小值&#xff09;next(it,num),prev(it,num)min_element(begin(),end()),max_element(begiin(),end()) (取最小值最大值) _int128的输入输出STLlist 数…

serdes 同轴电缆和双绞线接法

1、同轴电缆 Coaxial Cable 2、双绞线STP&#xff08;Shielded Twisted Pair&#xff09; 比如我们用的车载camera一般就只需要接一路即可&#xff0c;RIN接camera&#xff0c; RIN-通过电容接地。

Android 使用 GeckoView 并实现 js 交互、权限交互

参考文档&#xff1a; geckoview版本 引入文档&#xff08;有坑 下面会给出正确引入方式&#xff09; 官方示例代码1 官方示例代码2 参考了两位大神的博客和demo&#xff1a; GeckoView js交互实现 geckoview-jsdemo 引入方式&#xff1a; maven {url "https://maven.…

MySQL中的死锁预防和解决

MySQL中的死锁预防和解决 死锁是数据库管理系统中常见的问题&#xff0c;特别是在高并发的应用场景下。MySQL数据库中的死锁会导致事务处理速度减慢&#xff0c;甚至完全停止&#xff0c;因此理解并预防死锁至关重要。本文将详细介绍如何预防MySQL中的死锁&#xff0c;包括常用…

【算法基础实验】图论-深度优先搜索和深度优先路径

深度优先(DFS) 理论基础 深度优先搜索&#xff08;DFS, Depth-First Search&#xff09;是图和树的遍历算法中的一种&#xff0c;它从一个节点开始&#xff0c;沿着树的边走到尽可能深的分支&#xff0c;直到节点没有子节点为止&#xff0c;然后回溯继续搜索下一个分支。DFS …

网络安全实训Day17and18

写在前面 第17和18天都讲的sql注入&#xff0c;故合并 ​​​​​​ 网络空间安全实训-渗透测试 Web渗透 定义 针对Web站点的渗透攻击&#xff0c;以获取网站控制权限为目的 Web渗透的特点 Web技术学习门槛低&#xff0c;更容易实现 Web的普及性决定了Web渗透更容易找到目…

JavaEE 初阶篇-深入了解 I/O 高级流(缓冲流、交换流、数据流和序列化流)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 缓冲流概述 1.1 缓冲流的工作原理 1.2 使用缓冲流的步骤 1.3 字节缓冲流于字符缓冲流的区别 1.4 字节缓冲流的实例 1.5 字符缓冲流的实例 2.0 转换流概述 2.1 字符…

MySQL函数之单行函数

1.前言 我们在使用 SQL 语言的时候&#xff0c;不是直接和这门语言打交道&#xff0c;而是通过它使用不同的数据库软件&#xff0c;即DBMS。DBMS 之间的差异性很大&#xff0c;远大于同一个语言不同版本之间的差异。实际上&#xff0c;只有很少的函数是被 DBMS 同时支持的。比…

MySQL基础知识——MySQL索引

深入浅出索引 索引的意义 索引的意义&#xff1a;在大量数据中&#xff0c;加速访问少量特定数据&#xff1b; 使用索引的前提条件&#xff1a; 1&#xff09;索引块数量小于数据块数量&#xff1b; 2&#xff09;索引键有序&#xff0c;故可以使用二分查找等高效的查找方式&…

go语言并发实战——日志收集系统(十) 重构tailfile模块实现同时监控多个日志文件

前言 在上一篇文章中&#xff0c;我们实现了通过etcd来同时指定多个不同的有关分区与日志文件的路径&#xff0c;但是锁着一次读取配置的增多&#xff0c;不可避免的出现了一个问题&#xff1a;我们如何来监控多个日志文件&#xff0c;这样原来的tailFile模块相对于当下场景就…

前端到全栈进阶之“前端框架”

从前端入门到全栈-系列介绍 你会学到什么&#xff1f; 可能学不到什么东西&#xff0c;该系列是作者本人工作和学习积累&#xff0c;用于复习 系列介绍 现在的 Web 前端已经离不开 Node.js&#xff0c;我们广泛使用的 Babel、Webpack、工程化都是基于 Node 的&#xff0c;各…

【Linux】驱动_2_字符驱动

1. Linux设备分类 字符设备: 指应用程序按字节/字符来读写数据的设备。通常为传真、虚拟终端和串口调制解调器、键盘之类设备提供流通信服务&#xff0c;通常不支持随机存取数据。字符设备在实现时大多不使用缓存器。系统直接从设备读/写每一个字符。块设备: 通常支持随机存取…

【程序分享1】LAMMPS + OVITO + 晶体缺陷识别 + 点缺陷 + 分子动力学模拟

分享2个分子动力学模拟相关的程序。 1. 一种识别体心立方晶体缺陷的新方法。 2. 无后处理的分子动力学模拟中的并行点缺陷识别: lammps的计算和转储方式 。 感谢论文的原作者&#xff01; 第1个程序 关键词&#xff1a; 1. Atomistic simulations, 2. Molecular dynamics…

让客服工作开挂的8个客服办公高效率神器

做客服工作&#xff0c;经常需要写文案&#xff0c;做图片做视频&#xff0c;还要能快捷回复客户&#xff0c;都需要有靠谱的客服办公软件支持&#xff0c;本文介绍了8个高效神器&#xff0c;希望能帮到做客服的亲 前言 做客服工作&#xff0c;在回答客户咨询的同时&#xff0…

2024.4.28 机器学习周报

目录 引言 Abstract 文献阅读 1、题目 2、引言 3、创新点 4、总体流程 5、网络结构 5.1、损失函数 5.2、Confidence Maps 5.3、Part Affinity Fields(PAFs) 5.4、多人的PAFs 6、实验 7、结论 深度学习 yolov8实现目标检测和人体姿态估计 Yolov8网络结构 yaml…

【亲测可用】配置镜像源

文章目录 配置镜像源1. 手动添加镜像源2. 永久配置&#xff08;推荐&#xff09;方法1&#xff1a;方法2 &#xff1a; 小结 配置镜像源 配置镜像源会让资源下载的更快一些 我实验了一下&#xff0c;都成功了的方法&#xff0c;推荐给你们 1.手动添加 2.永久配置 前提是你的…

好看到爆炸的弹窗公告源码

源码介绍 好看到爆炸的弹窗公告源码&#xff0c;源码由HTMLCSSJS组成&#xff0c;记事本打开源码文件可以进行内容文字之类的修改&#xff0c;双击html文件可以本地运行效果&#xff0c;也可以上传到服务器里面&#xff0c; 源码截图 源码下载 好看到爆炸的弹窗公告源码

新标准日本语初下 课后练习作业

新版标准日本语初下 第二十五課 これは明日会議で使う資料です 第二十五課 これは明日会議で使う資料です &#xff12;&#xff14;&#xff0d;&#xff10;&#xff14;&#xff0d;&#xff12;&#xff16; 練習&#xff12;&#xff15;&#xff0d;1&#xff0d;1 例…

Vuforia AR篇(四)— AR虚拟按钮

目录 前言一、创建虚拟按钮二、创建脚本三、效果 前言 在当今互联网和移动设备普及的背景下&#xff0c;**增强现实&#xff08;AR&#xff09;**技术正迅速成为连接现实世界与数字信息的重要桥梁。AR虚拟按钮作为这一技术的创新应用&#xff0c;不仅提供了一种全新的用户交互…