监听DB配置变更之go-broadcast简单实现

news2025/1/16 14:00:14

文章目录

  • 1. 前言
  • 2. 分析
  • 3. 实现
  • 4. 问题
  • 5. 小结
  • 6. 参考

1. 前言

之前遇到一个需求,因为配置的查找是基于db的,而db的更改却无法实时通知到具体利用到这条数据的使用方,为了实现db数据变动时,能够尽快让使用方知道这条数据发生了变更,从而进行后续数据变更等相关逻辑的运行,就需要实现db数据变动时的通知。

在观察者模式中,因为观察者模式是一种一对多的关系模式,即多个观察者观察同一个主题对象,当主题对象发生变化时,会通知所有的观察者对象。

2. 分析

使用观察者模式来实现的话,则需要实现如下四个部分的结构:

  1. 抽象主题
  2. 具体主题
  3. 抽象观察者
  4. 具体观察者

举个例子,在我们日常使用微信公众号中,当你关注了一个公众号,这个公众号如果有更新的话,则会推送给每一个关注过这个公众号的用户。此时我们可以将具体的部分的接收映射到微信公众号中,即:

  1. 抽象主题:公众号,具备订阅、取消订阅和发送消息的功能
  2. 具体主题:具体某一个公众号
  3. 抽象观察者:用户(泛指使用微信公众号的用户受众)
  4. 具体观察者:某一个具体的用户

分析了以上四个结构之后,我们需要实现的功能部分就清楚了。即我们需要实现一个抽象主题,这个主题需要有提供注册、取消注册以及提交信息的能力,当提交信息到抽象主题的时候,抽象主题需要将这个消息通知到所有已经注册过的具体观察者。

3. 实现

在明确了需求之后, 就开始进行功能的实现,因为使用的是go语言,则第一时间肯定是希望通过chan这样的功能来实现,因为chan天生具备监听的能力,我们可以通过监听注册到抽象主题的chan,从而实现抽象主题消息的实时监听。

但秉持着“你需要的功能,基本都有人实现过”的方针,第一时间还是上到了github,看看是否有现成的开源方案,经过一番查找,还真发现了一个开源库可以使用,这个库的名称是go-broadcast。

下面就来说下broadcaster是如何实现上面的功能逻辑的,broadcaster这个库的代码很简单,主体实现逻辑只有110行代码左右,但符合我们的功能逻辑实现需要。

type broadcaster struct {
	input chan interface{}
	reg   chan chan<- interface{}
	unreg chan chan<- interface{}

	outputs map[chan<- interface{}]bool
}

// The Broadcaster interface describes the main entry points to
// broadcasters.
type Broadcaster interface {
	// Register a new channel to receive broadcasts
	Register(chan<- interface{})
	// Unregister a channel so that it no longer receives broadcasts.
	Unregister(chan<- interface{})
	// Shut this broadcaster down.
	Close() error
	// Submit a new object to all subscribers
	Submit(interface{})
	// Try Submit a new object to all subscribers return false if input chan is fill
	TrySubmit(interface{}) bool
}

首先定义了一个接口叫做Broadcaster,然后定义了一个broadcaster实现了Broadcaster的所有方法逻辑。

func (b *broadcaster) Register(newch chan<- interface{}) {
	b.reg <- newch
}

func (b *broadcaster) Unregister(newch chan<- interface{}) {
	b.unreg <- newch
}

func (b *broadcaster) Close() error {
	close(b.reg)
	close(b.unreg)
	return nil
}

// Submit an item to be broadcast to all listeners.
func (b *broadcaster) Submit(m interface{}) {
	if b != nil {
		b.input <- m
	}
}
  • Register方法主要实现了将注册的chan直接放入到reg这个chan中,用于后续注册
  • Register方法主要实现了将注册的chan直接让如到ureg这个chan中,用于后续注销
  • Close方法主要是关闭reg和ureg两个chan
  • Submit方法主要实现对抽象主题broadcaster发送消息,将消息放入input这个chan中

上面的方法都是基于chan作为通信的,而chan中有了数据,后续需要消费数据。

// NewBroadcaster creates a new broadcaster with the given input
// channel buffer length.
func NewBroadcaster(buflen int) Broadcaster {
	b := &broadcaster{
		input:   make(chan interface{}, buflen),
		reg:     make(chan chan<- interface{}),
		unreg:   make(chan chan<- interface{}),
		outputs: make(map[chan<- interface{}]bool),
	}

	go b.run()

	return b
}

这里的run()方法则是消费所有chan数据的地方。

func (b *broadcaster) broadcast(m interface{}) {
	for ch := range b.outputs { // 遍历所有注册的chan,将消息发送到注册的chan中
		ch <- m
	}
}

func (b *broadcaster) run() {
	for {
		select {
		case m := <-b.input: // 如果有消息输入,则广播出去
			b.broadcast(m)
		case ch, ok := <-b.reg: // 如果有新注册的,则进行output的添加
			if ok {
				b.outputs[ch] = true
			} else {
				return
			}
		case ch := <-b.unreg: // 如果有注销的,则进行output的删除
			delete(b.outputs, ch)
		}
	}
}

整体的运行图如下:

在这里插入图片描述

  • 对应chan通过reg进行注册,注册后的chan记录在outputs中
  • 对应chan通过ureg进行注销,注销后的chan从output中移除
  • 对应的信息通过input输入,输入后的msg通过遍历outputs注册列表,从而通知到每一个注册者

4. 问题

在使用go-broadcast的过程中,看到之前有个pr加了一个TrySubmit的逻辑,这个逻辑主要是解决当input被装满了以后,broadcast会被阻塞,这个时候如果有新的消息进来,如何办呢?

// TrySubmit attempts to submit an item to be broadcast, returning
// true iff it the item was broadcast, else false.
func (b *broadcaster) TrySubmit(m interface{}) bool {
	if b == nil {
		return false
	}
	select {
	case b.input <- m:
		return true
	default:
		return false
	}
}

解决办法是采用select的方法尝试去塞入,塞入不成功则意味着消息提交失败,返回false,让使用者根据消息提交的结果进行后续的逻辑处理。

但这里还存在另外一个问题,库中给了一个样本case,这个样本case基于的条件都是消息传递给chan的时候没有阻塞。如下代码所示:

func (b *broadcaster) broadcast(m interface{}) {
	for ch := range b.outputs {
		ch <- m
	}
}

但一旦有注册的chan消费的时候阻塞了,这时候就会产生问题,会导致其它正常消费的chan因为一个异常chan而全部被阻塞住,导致其他chan都无法正常消费。

这个时候就会导致在input没有满的时候,即消息可以放入,但是消息无法被正常的消费,进而又反向导致input逐渐被塞满,最终导致input无法被塞入,消息也无法被发送到对应的chan中,导致run方法逻辑卡在broadcast中,导致整个运行出现问题。

解决办法:

func (b *broadcaster) broadcast(m interface{}) {
	for ch := range b.outputs {
		// if exist one output consume the chan message is too slow,
		// will block other output receive the msg.
		select {
		case ch <- m:
		default:
		}
	}
}

但这种虽然解决了一个chan满消费block其他chan的问题,随之也引入了丢消息的问题了,即有些消费慢的chan,由于chan消费慢导致无法接收新的消息,进而导致新消息丢失的问题。

5. 小结

因为需要实时监听db配置的变更,所以去探寻了一下方案,最终采用了go-broadcast的方案,但在使用go-broadcast的过程中,发现在broadcast消息的时候存在阻塞的行为,为了保证整个服务不被某个chan阻塞而停止运行,在broadcast消息的时候添加了select default条件来规避这个问题。

6. 参考

  • go-broadcast

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1810418.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

echarts组件x轴坐标显示不全解决方法

1.旋转: 修改前: option {xAxis: {type: category,data: [Mon, Tue, Wed, Thu, Fri, Sat, Sun,Mon, Tue, Wed, Thu, Fri, Sat, Sun,Mon, Tue, Wed, Thu, Fri, Sat, Sun]},yAxis: {type: value},series: [{data: [120, 200, 150, 80, 70, 110, 130,120, 200, 150, 80, 70, 1…

802.11漫游流程简单解析与笔记_Part3

原定计划在Part3分析ns3的Roaming流程抓包&#xff0c;但ns3并不支持漫游&#xff1a; 看过Part1的应该都知道&#xff0c;标准关联流程是auth*2 associate*2 key*4&#xff0c;但ns3里面没有与auth有关的部分&#xff0c;third脚本里面有cap字段&#xff0c;但无auth也无key&a…

elementUI el-table高度heght和总结summary 同时使用 表格样式异常

背景 同时使用height和 show-summary 样式错位 解决方案 在钩子函数updated 中重新渲染此表格 <el-table :height"autoHeight" show-summary ref"dataTable" >updated() {this.$nextTick(() >{this.$refs.dataTable.doLayout();})},更改后的效果 …

代码随想录算法训练营第三十三天| 1005.K次取反后最大化的数组和,134. 加油站,135. 分发糖果

1005. K 次取反后最大化的数组和 - 力扣&#xff08;LeetCode&#xff09; class Solution {public int largestSumAfterKNegations(int[] nums, int k) {Arrays.sort(nums);int i 0;while (i < nums.length && nums[i] < 0 && k > 0) {nums[i] num…

Apollo9.0 PNC源码学习之Control模块(二)

前面文章&#xff1a;Apollo9.0 PNC源码学习之Control模块&#xff08;一&#xff09; 本文将对具体控制器以及原理做一个剖析 1 PID控制器 1.1 PID理论基础 如下图所示&#xff0c;PID各参数(Kp,Ki,Kd)的作用&#xff1a; 任何闭环控制系统的首要任务是要稳、准、快的响…

LLM Algorithms(1): Flash Attention

目录 Background Flash Attention Flash Attention Algorithm 参考 NIPS-2022: Flash Attention: Fast and Memory-Efficient Exact Attention with IO-Awareness idea&#xff1a;减少资源消耗&#xff0c;提升或保持模型性能。普通attention的空间复杂度是 --》降低到F…

探究IOC容器刷新环节初始化前的预处理

目录 一、IOC容器的刷新环节快速回顾 二、初始化前的预处理prepareRefresh源码分析 三、初始化属性源 &#xff08;一&#xff09;GenericWebApplicationContext初始化属性源 &#xff08;二&#xff09;StaticWebApplicationContext初始化属性源 四、初始化早期事件集合…

亚马逊冗余库存处理

在亚马逊放置90天以上的产品&#xff0c;又不在正常的动销&#xff0c;就要采取一定的措施了。清库存方式&#xff1a; 最直接的方式——降价促销&#xff08;至少要降价百分之三十以上&#xff0c;库龄越久&#xff0c;降价越狠&#xff09;参加官方的活动促销的话是需要符合…

在 Word 中,如何有效调整文字与下划线之间的距离

&#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ 如果你在使用 Word 时&#xff0c;希望调整文字和下划线之间的距离&#xff0c;让它们看起来更加美观&#xff0c;可以按照以下步骤操作&#xff1a; 1. 在你想要加下划线的文字前后各加一个空格&…

Tdengine的时序数据库简介、单机部署、操作语句及java应用

Tdengine的时序数据库简介、单机部署、操作语句及java应用 本文介绍了Tdengine的功能特点、应用场景、超级表和子表等概念&#xff0c;讲述了Tdengine2.6.0.34的单机部署&#xff0c;并介绍了taos数据库的常见使用方法及特色窗口查询方法&#xff0c;最后介绍了在java中的应用。…

Harmony中的HAP、HAR、HSP区别

Harmony中的HAP、HAR、HSP区别 想要更加合理的开发一个企业级别的Harmony应用&#xff0c;那么就不得不提其中的HAP、HAR、HSP了。 前言 对于普通的用户来说&#xff0c;可能一个普通的应用就等于一个安装文件如安卓下的APK。但是对于Harmony应用开发工程师来讲&#xff0c;…

Python | Leetcode Python题解之第143题重排链表

题目&#xff1a; 题解&#xff1a; class Solution:def reorderList(self, head: ListNode) -> None:if not head:returnmid self.middleNode(head)l1 headl2 mid.nextmid.next Nonel2 self.reverseList(l2)self.mergeList(l1, l2)def middleNode(self, head: ListNo…

单田芳mp3百度网盘,单田芳评书下载百度云百度网盘

单老的评书还注重情感的表达。他善于运用声音、语气、语调等手段&#xff0c;将人物的情感刻画得淋漓尽致。无论是喜怒哀乐&#xff0c;他都能准确地把握人物的情感变化&#xff0c;并通过自己的表演将其传递给听众。这种情感的传递&#xff0c;使得听众能够更加深入地理解故事…

Springboot 开发之任务调度框架(一)Quartz 简介

一、引言 常见的定时任务框架有 Quartz、elastic-job、xxl-job等等&#xff0c;本文主要介绍 Spirng Boot 集成 Quartz 定时任务框架。 二、Quartz 简介 Quartz 是一个功能强大且灵活的开源作业调度库&#xff0c;广泛用于 Java 应用中。它允许开发者创建复杂的调度任务&…

Web--CSS基础

文章目录 定义方式选择器文本字体背景边框元素展示格式内边距与外边距盒子模型位置浮动flex布局响应式布局 定义方式 行内样式表 直接定义在style属性中&#xff0c;作用于当前标签 <img src "/imges/logo.jpg" alt "" style "width 400"…

react修改本地运行项目的端口

一、描述 如果你想让项目在你想要的端口打开的话&#xff0c;就需要进行设置 二、代码 设置一下pages.json文件就可以了&#xff0c;如下&#xff1a; 如果想打开项目不需要点击下面的链接地址&#xff0c;让他运行npm run dev之后自己直接打开到浏览器的话&#xff0c;在后…

智能楼宇的智慧心脏:ARMxy工业计算机在自动化控制中的应用

智能楼宇已成为现代化城市不可或缺的一部分。在这场数字化转型浪潮中&#xff0c;ARMxy工业计算机凭借其强大的处理能力、高度的系统兼容性和灵活的I/O配置&#xff0c;成为了推动楼宇自动化控制领域创新的重要力量。 某大型商业综合体项目&#xff0c;面临着传统HVAC系统效率低…

Android Studio历史版本

android studio的历史版本

OpenAI 宕机事件:GPT 停摆的影响与应对

引言 2024年6月4日&#xff0c;OpenAI 的 GPT 模型发生了一次全球性的宕机&#xff0c;持续时间长达8小时。此次宕机不仅影响了OpenAI自家的服务&#xff0c;还导致大量用户涌向竞争对手平台&#xff0c;如Claude和Gemini&#xff0c;结果也导致这些平台出现故障。这次事件的广…

conda 创建环境失败

conda create -n pylableimg python3.10在conda &#xff08;base&#xff09;环境下&#xff0c;创建新的环境&#xff0c;失败。 报错&#xff1a; LookupError: didn’t find info-scipy-1.11.3-py310h309d312_0 component in C:\Users\Jane.conda\pkgs\scipy-1.11.3-py310h…