【源码阅读】事件订阅包v2

news2025/1/13 2:48:26

代码位置

1、Feed

Feed 实现一对多订阅,其中事件的载体是通道。发送到 Feed 的值会同时传送到所有订阅的通道。
订阅
与Typemux的对比
链接: link
TypeMux是一个同步的事件框架,当有一个被订阅的事件发生的时候,会遍历该事件对应的订阅者通道,通知其中的订阅者,但是当订阅者1没有接受该消息的时候,发送进程会被阻塞,会影响对订阅者2的发送。
所以Feed作为流式事件框架,是否是异步的取决于是否有缓存通道,当设计有缓存通道的时候是异步的,否则就是同步的。

type Feed struct {
	once      sync.Once        // ensures that init only runs once
	sendLock  chan struct{}    // sendLock has a one-element buffer and is empty when held.It protects sendCases.
	removeSub chan interface{} // interrupts Send
	sendCases caseList         // the active set of select cases used by Send

	// The inbox holds newly subscribed channels until they are added to sendCases.
	mu    sync.Mutex
	inbox caseList
	etype reflect.Type
}
  1. once字段是用来确保init只会执行一次

  2. sendLock是一个缓存通道,

  3. 这是一个名为Feed的结构体,它包含以下字段:

  4. once:sync.Once类型,确保init只运行一次。

  5. sendLock:chan struct{}类型,sendLock具有一个元素缓冲区,当持有时为空。可以保护sendCases

  6. removeSub:chan interface{}类型,用于中断Send

  7. sendCases:caseList类型,Send使用的活动选择集。

  8. mu:sync.Mutex类型,用于保护inbox

  9. inbox:caseList类型,用于存储新订阅的通道,直到它们被添加到sendCases中。

  10. etype:reflect.Type类型,表示结构体的反射类型。

1.1 init

func (f *Feed) init(etype reflect.Type) {
	f.etype = etype
	f.removeSub = make(chan interface{})
	f.sendLock = make(chan struct{}, 1)
	f.sendLock <- struct{}{}
	f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
}

初始化一个名为Feed的结构体。该方法接受一个参数etype,表示结构体的反射类型。

  1. 将传入的etype赋值给f.etype,表示结构体的类型。
  2. 创建一个名为f.removeSub的通道,用于接收移除订阅的通知。
  3. 创建一个名为f.sendLock的通道,并设置其缓冲区大小为1,以确保只有一个发送操作可以同时进行。
  4. f.sendLock通道发送一个空的结构体,以解锁该通道。
  5. 创建一个名为f.sendCasescaseList类型的变量,并将其初始化为包含一个元素的列表。这个元素是一个case结构体,其中Chan字段表示要监听的通道(即f.removeSub),Dir字段表示监听的方向(即接收方向)。
    初始化一个Feed结构体,并设置其相关属性和状态。

1.2 Subscribe

func (f *Feed) Subscribe(channel interface{}) Subscription

订阅将channel添加到feed中,添加一个订阅。 未来的发送将在通道上传送,直到subscription为止。 添加的所有通道必须具有相同的元素类型。
通道应该有足够的缓冲空间以避免阻塞其他订阅者。
慢速订阅者不会被丢弃。
这里大量用到了reflect类型,针对不同类型数据的相同处理,就可以用到反射。反射中的send,recv,default分别对应case中的不同情况。而reflect中还有其他类型,Bool、String 和 所有数字类型的基础类型;Array 和 Struct 对应的聚合类型;Chan、Func、Ptr、Slice 和 Map 对应的引用类型。

  1. 将channel的值映射给chanval
  2. 将类型赋值给chantyp
  3. 进行判断是否是通道类型,并且发送方向是否是想要的。
    因为对应的case中其实有三种情况:send,recv,default。其中的send和recv中的chan都是channel,如果是default就是0。
    chantyp.ChanDir()&reflect.SendDir == 0用来判断是不是发送方向,有一个有错就会报错
if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
		panic(errBadChannel)
	}
  1. 创建一个feedSub类型的对象sub,并将其与当前的Feed对象和channel关联起来。同时,初始化一个错误通道err,用于接收可能的错误信息。
sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}
  1. 调用Feed对象的init方法,对channel的元素类型进行初始化。如果初始化后的元素类型与预期不符,也会抛出一个错误。
  2. 使用互斥锁保护对Feed对象的操作,确保线程安全
  3. 将一个新的选择情况添加到inbox中,表示当有数据发送到这个频道时,需要进行相应的处理。
  4. 返回创建的sub对象。

1.3 remove

func (f *Feed) remove(sub *feedSub)

首先会从inbox中删除,其中涵盖尚未添加到 f.sendCases 的频道。
该方法的作用是从Feed对象的内部数据结构中移除一个订阅对象(feedSub)。这个方法使用了Go语言的反射机制和并发控制机制来实现多通道的同步发送操作。

  1. ch := sub.channel.Interface()是将订阅对象的通道中的值给变量ch,可以方便后续处理
  2. 需要添加锁
  3. index中查找要移除的订阅对象index := f.inbox.find(ch)(参考3.1),如果可以找到,就将其从index中删除f.inbox = f.inbox.delete(index)(参考3.2),释放锁,并返回
index := f.inbox.find(ch)
if index != -1 {
	f.inbox = f.inbox.delete(index)
	f.mu.Unlock()
	return
}
  1. 如果没有找到,就会进入select语句
  2. 从ch通道发送给f.removeSub,表示将要执行移除订阅对象
  3. 等待sendCases通道的信号,如果有正在发送的操作,这个sendLock就不会为空,就无法获得这个锁,只有没有正在发送的操作的时候才会得到该锁进入执行:
  4. sendCases中查找该订阅f.sendCases.find(ch)(参考3.1),删除f.sendCases = f.sendCases.delete(f.sendCases.find(ch))(参考3.2)
  5. 释放sendLock

1.4 Send

func (f *Feed) Send(value interface{}) (nsent int)

Send 同时发送到所有订阅的频道,返回发送到的订阅者数量。

  1. 将传入的值转换为反射类型的值rvaluervalue := reflect.ValueOf(value)
  2. 用传入的值的类型进行唯一一次的注册(通过f.once来限制)
  3. 检查是否符合定义的类型
  4. 获取发送锁sendLock,以确保在发送过程中不会发生竞争条件<-f.sendLock
  5. inbox中的值添加到sendCases列表中,并清空inbox
f.mu.Lock()
f.sendCases = append(f.sendCases, f.inbox...)
f.inbox = nil
f.mu.Unlock()
  1. 遍历sendCases列表,将每个通道的发送值设置为rvalue,将一个值发送给所有订阅者
  2. 尝试发送,有可能会一次成功trysend,遍历一轮,成功的就从case中删除cases = cases.deactivate(i)(参考3.3),且计数nsent加一
  3. 使用reflect.Select(cases)语句,类似case语句,获得返回值chosen, recv, _ := reflect.Select(cases)
  4. 如果chosen的值为0,表示通道关闭,就需要找到该对象的位置(参考3.1),并将该对象从sendCases中删除(参考3.2),并且缩小大小,重新赋值给cases
if chosen == 0 /* <-f.removeSub */ {
	index := f.sendCases.find(recv.Interface())
	f.sendCases = f.sendCases.delete(index)
	if index >= 0 && index < len(cases) {
		// Shrink 'cases' too because the removed case was still active.
		cases = f.sendCases[:len(cases)-1]
	}
} 
  1. 否则就表示重新发送发送成功了,就使用cases.deactivate(i)从cases中删除(参考3.3),并且nsent计数加一
  2. 使用f.sendCases[i].Send = reflect.Value{}来清空要发送的数据
  3. f.sendLock通道发送一个空的结构体,以解锁该通道
  4. 返回成功发送的值的数量nsent

2、feedSub

是Subscription的实现,Subscription中有两个函数:Unsubscribe(取消订阅)和Err(错误)。

type feedSub struct {
	feed    *Feed
	channel reflect.Value
	errOnce sync.Once
	err     chan error
}
  1. feed:一个指向Feed类型的指针。
  2. channel:一个反射值,表示通道类型。
  3. errOnce:一个同步原语,用于确保错误处理只执行一次。
  4. err:一个错误通道,用于传递错误信息。

2.1 Unsubscribe

func (sub *feedSub) Unsubscribe() {
	sub.errOnce.Do(func() {
		sub.feed.remove(sub)
		close(sub.err)
	})
}
  1. 确认错误处理只能处理一次
  2. 遇到该事件的时候,要使用remove(参考1.3)从订阅对象列表中删除
  3. 给出错误信息

2.2 Err

func (sub *feedSub) Err() <-chan error {
	return sub.err
}

从通道中获取错误信息并返回

3、caseList

type caseList []reflect.SelectCase

caseList是反射机制中的selectcase,类似case,但是可以动态添加所有管道

3.1 find

func (cs caseList) find(channel interface{}) int {
	for i, cas := range cs {
		if cas.Chan.Interface() == channel {
			return i
		}
	}
	return -1
}

find返回包含给定通道的事例的索引。

  1. 遍历所有caselist
  2. 如果能找到case中的Chan与所给定的通道一样的,就返回该case对应的索引index
  3. 如果都没有,就返回-1

3.2 delete

func (cs caseList) delete(index int) caseList {
	return append(cs[:index], cs[index+1:]...)
}

从cs列表中删除指定位置的case并返回新的caselist,使用append来进行

3.3 deactivate

func (cs caseList) deactivate(index int) caseList {
	last := len(cs) - 1
	cs[index], cs[last] = cs[last], cs[index]
	return cs[:last]
}

deactivate将索引处的case移动到cs切片的不可访问部分。
先将index和最后一部分last的case进行对换,然后将cs的长度减一,这样就相当于将最后一部分删除了,也就是函数传入的index部分。
-----------------------------subscription.go----------------------------------------

4、Subscription

type Subscription interface {
	Err() <-chan error // returns the error channel
	Unsubscribe()      // cancels sending of events, closing the error channel
}

只有两个方法,一个是取消订阅,而是错误函数
代码注释:

subscription表示一系列事件。事件的载体通常是一个通道,但不是接口的一部分。
建立订阅时可能会失败。故障通过错误通道报告。如果订阅存在问题(例如,传递事件的网络连接已关闭)。只发送一个值。当订阅成功结束时(即,当事件源关闭时),错误通道将关闭。当调用“取消订阅”时,它也会关闭。
Unsubscribe方法取消发送事件。在任何情况下,您都必须调用Unsubscribe以确保与订阅相关的资源得到释放。它可以被调用任意次数。

4.1 NewSubscription

func NewSubscription(producer func(<-chan struct{}) error) Subscription

NewSubscription在新的goroutine中作为订阅运行生产者函数。当调用取消订阅时,提供给制作人的频道将关闭。如果fn返回错误,则在订阅的错误通道上发送。
该函数接受一个参数producer,该参数是一个生产者函数,返回值是error。这个函数的作用是创建一个新的订阅对象,并启动一个协程来执行生产者函数。通过通道将数据发送给消费者。

  1. 创建一个名为s的指针变量,指向一个名为funcSub的结构体实例(参考5)
  2. 使用make(chan struct{})创建一个名为unsub的通道,用于取消订阅
  3. 使用make(chan error, 1)创建一个名为err的错误通道,用于传递错误信息
  4. 启动一个协程来执行生产者函数
  5. 使用defer close(s.err)确保错误通道在协程结束时被关闭
  6. 调用生产者函数producer(s.unsub),使用的通道是unsub通道,并将返回的错误赋值给变量err
  7. 使用s.mu.Lock()s.mu.Unlock()对结构体实例进行加锁和解锁操作
  8. 检查是否已经取消订阅,如果没有取消订阅,则将错误信息发送到错误通道,并将s.unsubscribed设置为true表示已取消订阅
  9. 返回创建的订阅对象s

5、funcSub

封装匿名函数

type funcSub struct {
	unsub        chan struct{}
	err          chan error
	mu           sync.Mutex
	unsubscribed bool
}
  1. 一个unsub通道,用来取消订阅
  2. 一个错误通道err
  3. 一个锁用来保证安全
  4. 布尔值来标志是否已经完成了取消订阅

5.1 Unsubscribe

func (s *funcSub) Unsubscribe() {
	s.mu.Lock()
	if s.unsubscribed {
		s.mu.Unlock()
		return
	}
	s.unsubscribed = true
	close(s.unsub)
	s.mu.Unlock()
	// Wait for producer shutdown.
	<-s.err
}
  1. 上锁和释放锁用来保证安全,
  2. 通过bool unsubscribed来确定是否已经完成了取消订阅
  3. 如果已经完成了订阅,就释放锁,并return
  4. 否则就修改值为true,关闭unsub通道,并通过s.mu.Unlock()释放锁
  5. 开启err通道

5.2 Err

func (s *funcSub) Err() <-chan error {
	return s.err
}

用于返回错误信息

6、resubscribeSub

type resubscribeSub struct {
	fn                   ResubscribeErrFunc
	err                  chan error
	unsub                chan struct{}
	unsubOnce            sync.Once
	lastTry              mclock.AbsTime
	lastSubErr           error
	waitTime, backoffMax time.Duration
}
  1. fn:一个类型为ResubscribeErrFunc的函数
  2. err:一个错误通道,用于接收错误信息
  3. unsub:一个结构体通道,用于取消订阅
  4. unsubOnce:一个sync.Once类型的变量,确保unsub只被执行一次
  5. lastTry:一个mclock.AbsTime类型的变量,表示上次尝试的时间
  6. lastSubErr:一个error类型的变量,表示上次订阅时发生的错误
  7. waitTime:一个time.Duration类型的变量,表示等待时间
  8. backoffMax:一个time.Duration类型的变量,表示最大退避时间

6.1 Resubscribe&ResubscribeErr

Resubscribe会反复调用fn以保持订阅已建立。当订阅建立时,Resubscribe会等待它失败,然后再次调用fn。此过程重复进行,直到调用“取消订阅”或活动订阅成功结束。
重新订阅在对fn的调用之间应用回退。调用之间的时间根据错误率进行调整,但永远不会超过backoffMax。

ResubscribeErr反复调用fn以保持订阅已建立。当订阅建立时,ResubscribeErr等待它失败并再次调用fn。此过程重复进行,直到调用“取消订阅”或活动订阅成功结束。
Resubscribe和ResubscribeErr之间的区别在于,使用ResubscripteErr,回调可以使用失败订阅的错误进行日志记录。
ResubscribeErr在对fn的调用之间应用回退。调用之间的时间根据错误率进行调整,但永远不会超过backoffMax。

type ResubscribeFunc func(context.Context) (Subscription, error)

6.2 ResubscribeErr

func ResubscribeErr(backoffMax time.Duration, fn ResubscribeErrFunc) Subscription {
	s := &resubscribeSub{
		waitTime:   backoffMax / 10,
		backoffMax: backoffMax,
		fn:         fn,
		err:        make(chan error),
		unsub:      make(chan struct{}),
	}
	go s.loop()
	return s
}
  1. 创建resubscribeSub对象:
  • waitTime是等待时间,是backoffMax的十分之一
  • backoffMax是最大的时间限制
  • fn是传入的重新订阅函数ResubscribeErrFunc类型
type ResubscribeErrFunc func(context.Context, error) (Subscription, error)
  • err是错误通道,用于接收错误信息
  • unsub是取消订阅通道
  1. 执行loop函数(参考6.5)
  2. 返回创建的s对象

6.3 Unsubscribe

func (s *resubscribeSub) Unsubscribe() {
	s.unsubOnce.Do(func() {
		s.unsub <- struct{}{}
		<-s.err
	})
}

取消订阅

  1. 因为unsubOnce sync.Once所以取消订阅只能执行一次
  2. 读取信息放入取消订阅通道s.unsub
  3. 开启err通道

6.4 Err

func (s *resubscribeSub) Err() <-chan error {
	return s.err
}

用于返回错误信息

6.5 loop

func (s *resubscribeSub) loop() {
	defer close(s.err)
	var done bool
	for !done {
		sub := s.subscribe()
		if sub == nil {
			break
		}
		done = s.waitForError(sub)
		sub.Unsubscribe()
	}
}
  1. defer close(s.err) 用于延迟关闭一个名为 s.err 的错误通道。当函数执行完成时,defer 关键字会确保 close(s.err) 被调用,从而关闭错误通道并释放相关资源
  2. 定义一个bool变量done确定是否完成
  3. 只要还没有完成(done为false)就会一直执行for循环
    每一次循环中:
  • 使用sub := s.subscribe()返回订阅对象(参考6.6)
  • 如果返回的对象为nil就break跳出循环
  • 否则继续执行done = s.waitForError(sub)(参考6.7)
  • 使用sub.Unsubscribe()取消订阅

6.6 subscribe

func (s *resubscribeSub) subscribe() Subscription

实现一个自动重试订阅的逻辑,当订阅失败时会进行一定的退避等待,并在等待结束后再次尝试订阅。

  1. 创建一个名为subscribed的错误通道,用于接收订阅结果
  2. 初始化一个名为subSubscription变量(参考4)(取消订阅+错误)
  3. 进入一个无限循环,每次循环执行以下操作:
    • 更新上一次尝试时间s.lastTry为当前时间
    • 创建一个上下文ctx和一个取消函数cancel
    • 启动一个新的goroutine,调用s.fn(ctx, s.lastSubErr)来获取订阅结果(参考6.2),并将结果赋值给rsuberr
    • rsub赋值给sub,并将err信息发送到subscribed通道。
    • 使用select语句等待以下情况之一发生:
      • subscribed通道接收到信号,取消上下文并检查错误是否为nil。如果是,则返回sub作为订阅结果,没有错误;否则,根据s.backoffWait()的结果决定是否继续尝试订阅(参考6.8)
      • s.unsub通道接收到取消信号,取消上下文并,使用<-subscribed接受消息但不 使用。然后返回nil表示已取消订阅。
  4. 如果循环结束仍未成功订阅,则抛出异常或返回错误信息。

6.7 waitForError

func (s *resubscribeSub) waitForError(sub Subscription) bool
  1. 使用defer sub.Unsubscribe()确保函数执行完成的时候会执行取消订阅函数
  2. select语句
  • 当从sub.Err()通道中接收到错误时,将错误赋值给s.lastSubErr,并返回是否有错误
  • 当从s.unsub通道中接收到取消信号时,直接返回true表示已取消

6.8 backoffWait

func (s *resubscribeSub) backoffWait() bool

用于在订阅失败时进行重试等待,这个方法通常用于网络通信中,当订阅失败时,通过退避算法逐渐增加等待时间,直到达到最大等待时间或收到取消信号为止。

  1. 判断当前时间与上次尝试的时间差s.lastTry是否大于退避的最大时间s.backoffMax。如果是,则将等待时间设置为退避最大时间的1/10;否则,将等待时间翻倍,但不超过退避最大时间
s.waitTime *= 2
if s.waitTime > s.backoffMax {
	s.waitTime = s.backoffMax
}
  1. 创建一个定时器t,并设置等待时间为更新后的等待时间,它会在 s.waitTime 时间后触发。
  2. 使用select语句等待定时器超时或收到取消信号
  3. 如果定时器超时,返回false表示继续重试;如果收到取消信号,返回true表示已取消

7、SubscriptionScope&scopeSub

type SubscriptionScope struct {
	mu     sync.Mutex
	subs   map[*scopeSub]struct{}
	closed bool
}

type scopeSub struct {
	sc *SubscriptionScope
	s  Subscription
}

SubscriptionScope提供了一种同时取消订阅多个订阅的功能。
对于处理多个订阅的代码,可以使用作用域通过一个调用方便地取消订阅所有订阅。该示例演示了在大型程序中的典型使用。

  • 名为SubscriptionScope的结构体,它包含以下字段:
  1. mu:一个sync.Mutex类型的变量,用于实现互斥锁,确保在多线程环境下对subsclosed字段的操作是线程安全的。
  2. subs:一个map[*scopeSub]struct{}类型的变量,用于存储订阅者信息。键为scopeSub类型的指针,值为空结构体。
  3. closed:一个布尔类型的变量,表示订阅范围是否已关闭。如果为true,则表示已关闭;如果为false,则表示未关闭。
  • 名为scopeSub的结构体,它包含以下字段:
  1. sc:一个指向SubscriptionScope类型的指针,表示订阅范围。
  2. s:一个Subscription类型的变量,表示订阅信息。

7.1 sc.Track

func (sc *SubscriptionScope) Track(s Subscription) Subscription {
	sc.mu.Lock()
	defer sc.mu.Unlock()
	if sc.closed {
		return nil
	}
	if sc.subs == nil {
		sc.subs = make(map[*scopeSub]struct{})
	}
	ss := &scopeSub{sc, s}
	sc.subs[ss] = struct{}{}
	return ss
}

Track开始跟踪订阅。如果作用域已关闭,Track将返回nil。返回的订阅是一个包装。取消订阅包装会将其从作用域中删除。

  1. 上锁和延迟当函数完成的时候释放锁
  2. 如果订阅范围已经关闭,就返回nil
  3. 如果sc.subs为nil,就要重新创建一个
  4. 以sc和s为参数创建一个scopeSub的对象
  5. 将其加入到sc中的subs存储中
    这个方法的主要作用是将订阅信息与订阅范围关联起来,并将它们存储在一个映射中。

7.2 sc.Close

func (sc *SubscriptionScope) Close() {
	sc.mu.Lock()
	defer sc.mu.Unlock()
	if sc.closed {
		return
	}
	sc.closed = true
	for s := range sc.subs {
		s.s.Unsubscribe()
	}
	sc.subs = nil
}

Close调用所有跟踪订阅的Unsubscribe,并阻止进一步添加到跟踪集。关闭后对Track的调用返回nil。

  1. 上锁和延迟当函数完成的时候释放锁
  2. 如果sc已经关闭,就直接return
  3. 否则将closed字段设置为true
  4. 同时遍历sc.subs中的所有订阅信息,进行取消订阅
  5. 最后将sc.subs的订阅map设置为nil

7.3 sc.Count

func (sc *SubscriptionScope) Count() int {
	sc.mu.Lock()
	defer sc.mu.Unlock()
	return len(sc.subs)
}

Count返回跟踪的订阅数

  1. 上锁和延迟当函数完成的时候释放锁
  2. 返回其中的sc.subs的长度

7.4 s.Unsubscribe

func (s *scopeSub) Unsubscribe() {
	s.s.Unsubscribe()
	s.sc.mu.Lock()
	defer s.sc.mu.Unlock()
	delete(s.sc.subs, s)
}
  1. 使用其中的Subscription类型的s的Unsubscribe()函数来取消订阅
  2. 上锁和延迟当函数完成的时候释放锁
  3. 使用delete从sc.subs中删除s

7.5 s.Err

func (s *scopeSub) Err() <-chan error {
	return s.s.Err()
}

返回错误信息
----------------------------event.go-----------------------------------

8、Event

type Event struct {
	once sync.Once

	feeds      map[string]*Feed
	feedsLock  sync.RWMutex
	feedsScope map[string]*SubscriptionScope
}
  1. once sync.Once:一个只执行一次的同步原语,用于确保某个操作只执行一次。
  2. feeds map[string]*Feed:一个字符串到Feed指针的映射,用于存储订阅信息。
  3. feedsLock sync.RWMutex:一个读写互斥锁,用于保护feeds字段的并发访问。
  4. feedsScope map[string]*SubscriptionScope:一个字符串到SubscriptionScope指针的映射,用于存储订阅范围信息。

8.1 init

func (e *Event) init() {
	e.feeds = make(map[string]*Feed)
	e.feedsScope = make(map[string]*SubscriptionScope)
}

初始化函数,创建feeds和feedsscope

8.2 initKey

func (e *Event) initKey(key string) {
	e.feedsLock.Lock()
	defer e.feedsLock.Unlock()
	if _, ok := e.feeds[key]; !ok {
		e.feeds[key] = new(Feed)
		e.feedsScope[key] = new(SubscriptionScope)
	}
}

初始化一个键值对,其中键为传入的字符串参数key,值为一个新的Feed对象和一个新的SubscriptionScope对象

  1. 上锁和延迟释放锁
  2. 如果feeds中还没有key对应的值,就创建一个新的Feed对象和一个新的SubscriptionScope对象

8.3 Subscribe

func (e *Event) Subscribe(channel interface{}) Subscription

是订阅一个通道(channel),并返回一个订阅对象(Subscription)。

  1. 调用e.once.Do(e.init)来确保在第一次调用Subscribe方法时执行e.init()函数进行初始化操作且只执行一次
  2. 通过reflect.TypeOf(channel).Elem().String()获取通道的类型字符串作为键值key
  3. 调用e.initKey(key)方法对键值进行初始化,传入key作为参数
  4. 使用e.feedsLock.RLock()加锁和延迟释放锁
  5. 通过e.feedsScope[key].Track(e.feeds[key].Subscribe(channel))创建一个订阅对象sub,并将其与对应的通道关联起来。
  6. 返回创建的订阅对象sub

8.4 Send

func (e *Event) Send(value interface{}) int

向订阅对象发送消息

  1. 调用e.once.Do(e.init)来确保在第一次调用Subscribe方法时执行e.init()函数进行初始化操作且只执行一次
  2. 通过key := reflect.TypeOf(value).String()获取value的类型字符串作为键值key
  3. 调用e.initKey(key)方法对键值进行初始化,传入key作为参数
  4. 定义nsent来计数已发送的数量
  5. 使用e.feedsLock.RLock()加锁和延迟释放锁
  6. 如果订阅范围为0就返回nsent
  7. 否则调用send函数进行发送(参考1.4)

8.5 Close

func (e *Event) Close() {
	e.feedsLock.Lock()
	defer e.feedsLock.Unlock()

	for _, scope := range e.feedsScope {
		scope.Close()
	}
}
  1. 上锁和延迟释放锁
  2. 遍历所有的订阅范围,全部执行close函数,Close调用所有跟踪订阅的Unsubscribe(参考7.2)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1377224.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ZigBee快速入门——外部中断(Key)

外部中断 :::tips 理解三道锁&#xff1a;EA——IENx——PxIEN EA-总开关 IENx-中断使能功能配置&#xff0c;可以配置程总线 IO中断&#xff08;P0、P1、P2&#xff09;&#xff0c;也可以配置程定时器等中断 PxIEN-总线中具体某一位的中断允许&#xff0c;如刚刚已经配置了 I…

vue3项目部署到服务器,刚打开没事,一刷新页面就404

vue3项目部署到服务器&#xff0c;刚打开没事&#xff0c;一刷新页面就404 vue3项目&#xff0c;在本地调试时各方面都没毛病&#xff0c;刷新也没毛病&#xff0c;但是&#xff0c;扔到服务器上&#xff0c;第一次打开是正常的&#xff0c;再刷新下就404了&#xff0c;不知道什…

软件测评中心▏性能测试之压力测试、负载测试的区别和联系简析

在如今的信息时代&#xff0c;软件已经成为人们日常工作和生活不可或缺的一部分。然而&#xff0c;随着软件的发展和应用范围的不断扩大&#xff0c;软件性能的优劣也成为了影响用户使用体验的重要因素。 软件性能测试即对软件在不同条件下的性能进行评估和验证的过程。通过模…

《MCtalk·CEO对话》正式上线!首期对话高成资本

2015 年 10 月&#xff0c;网易智企发布第一款产品&#xff0c;正式踏上了 ToB 商业化之路。从那以后&#xff0c;我们每年举办不同主题的科技峰会&#xff0c;分享最新的行业体感和洞察&#xff1b;访谈各界企业领导者&#xff0c;记录他们的创新与创业经历&#xff1b;走过大…

黑帽SEO简介

什么是黑帽 SEO&#xff1f; 黑帽SEO是一种违反搜索引擎指南的做法&#xff0c;用于使网站在搜索结果中排名更高。这些不道德的策略并不能解决搜索者的问题&#xff0c;并且通常以搜索引擎的惩罚而告终。黑帽技术包括关键字填充、伪装和使用专用链接网络。 出现在搜索结果中对…

确定性网络技术怎样实现网络的可靠性?

确定性网络技术通过采用特定的协议、机制和策略&#xff0c;有助于提高网络的可靠性。本文通过一些关键的方面&#xff0c;来说明确定性网络技术如何实现这一目标。 时钟同步机制 时钟同步机制是确定性网络中的核心角色。为了实现高度可靠的通信&#xff0c;需要采用先进的时钟…

如何进行有竞争力的SEO审计以超越行业竞争对手

许多营销人员都有兴趣密切关注竞争对手的搜索引擎优化 &#xff08;SEO&#xff09;。这是有道理的——无论你是刚开始做SEO&#xff0c;还是已经做了一段时间&#xff0c;你都希望对搜索引擎结果页面&#xff08;SERP&#xff09;的竞争格局有一个清晰的认识&#xff0c;这样你…

构建基于RHEL9系列(CentOS9,AlmaLinux9,RockyLinux9等)的MySQL8.0.32的RPM包

本文适用&#xff1a;rhel9系列&#xff0c;或同类系统(CentOS9,AlmaLinux9,RockyLinux9等) 文档形成时期&#xff1a;2023年 因系统版本不同&#xff0c;构建部署应略有差异&#xff0c;但本文未做细分&#xff0c;对稍有经验者应不存在明显障碍。 因软件世界之复杂和个人能力…

2024年软件测试面试八股文【含答案】

Part1 1、你的测试职业发展是什么&#xff1f;【文末有面试文档免费领取】 测试经验越多&#xff0c;测试能力越高。所以我的职业发展是需要时间积累的&#xff0c;一步步向着高级测试工程师奔去。而且我也有初步的职业规划&#xff0c;前3年积累测试经验&#xff0c;按如何做…

SpringBoot3.X源码分析(启动流程)

SpringBootApplication(scanBasePackages {"com.javaedge.base"} ) public class BaseApplication {public BaseApplication() {}public static void main(String[] args) {SpringApplication.run(BaseApplication.class, args);} } 1 启动入口 静态辅助类&#x…

博弈类问题

巴什博弈(Bash Game) String bashGame2(int n, int m) {return n % (m 1) ! 0 ? "先手" : "后手";} #include<iostream> #include<string> using namespace std;string compute(int n) {return n % 6 ! 0 ? "October wins!" : &q…

iOS开发进阶(六):Xcode14 使用信号量造成线程优先级反转问题修复

文章目录 一、前言二、关于线程优先级反转三、优先级反转会造成什么后果四、怎么避免线程优先级反转五、使用信号量可能会造成线程优先级反转&#xff0c;且无法避免六、延伸阅读&#xff1a;iOS | Xcode中快速打开终端6.1 .sh绑定6.2 执行 pod install 脚本 七、延伸阅读&…

adrv9009使用记录

这里写自定义目录标题 1.首先下载cygwin&#xff0c;CSDN可以直接搜索&#xff0c;按照对应的安装就可以&#xff0c;最后记得加一个make安装包&#xff0c;不然在make时候会导致指令不存在 2.下载完成之后&#xff0c;去adi-github官网找到对应版本的adrv9009工程 https://git…

为什么要进行漏洞扫描工作

随着互联网的普及和信息技术的飞速发展&#xff0c;网络安全问题愈发引人关注。其中&#xff0c;漏洞扫描作为保障网络安全的重要手段&#xff0c;受到了广泛的关注和应用。本文将详细介绍漏洞扫描的概念、效果、使用场景等&#xff0c;以期为读者提供有关漏洞扫描的全面了解。…

PPT自动化处理

python-pptx模块 可以创建、修改PPT(.pptx)文件非Python标准模块&#xff0c;需要单独安装 在线安装方式 pip install python-pptx 读取slide幻灯片 .slides 获取shape形状 slide.shapes 判断一个shape中是否存在文字 shape.has_text_frame 获取文字框 shape.text_f…

查看磁盘里的大文件

查看磁盘里的大文件 在PowerShell中 命令1&#xff1a; gci -r| sort -descending -property length | select -first 10 name, length 命令2&#xff1a; Get-ChildItem -Path C:\ -Recurse | Where-Object { $_.Length -gt 1GB } | Sort-Object -Property Length -Descendin…

微服务自动化docker-compose

一、docker-compose介绍 Docker Compose是一个用来定义和运行多个复杂应用的Docker编排工具。例如&#xff0c;一个使用Docker容器的微服务项目&#xff0c;通常由多个容器应用组成。那么部署时如何快速启动各个微服务呢&#xff0c;一个个手动启动&#xff1f;假如有上百个微服…

Docker进阶数据卷目录挂载及在线部署

前言 为了很好的实现数据保存和数据共享&#xff0c; Docker 提出了 Volume 这个概念&#xff0c;简单的说就是绕过默认的联合 文件系统&#xff0c;而以正常的文件或者目录的形式存在于宿主机上。又被称作数据卷 一. 数据卷介绍 Docker 中的数据卷&#xff08;Volume&#x…

这是一款户外可充电多功能LED地摊灯 手电筒方案

1,信息来源&#xff1a;深圳市世微半导体有限公司 Augus 2,产品的特性有&#xff1a; 全集成单芯片控制 5 照明循环模式可选 0.5A/1A 固定充电电流可选 内置 MOS 1.8A 驱动电流 可外置 MOS 驱动更大电流 充电指示/低电提示/短路提示 3A 手电筒过流保护? 预设 4.22V 电…

【一文搞懂JVM的内存屏障】

要命的问题&#xff1a; 什么是线程的安全性&#xff1f;怎么保证&#xff1f;jvm什么是的内存屏障&#xff1f;他有什么作用&#xff1f; **线程的安全性是指&#xff1a;**指在多线程环境下&#xff0c;多个线程同时访问同一资源时不会产生意外结果或导致数据出错的状态。其…