消息队列 ---nsq

news2024/9/22 19:41:51

设计

topic和channel

单个nsqd实例旨在一次处理多个数据流。流称为“主题”,一个主题有 1 个或多个“通道”。每个通道都会收到一个主题的所有消息的_副本_。

主题和通道道_不是_提前配置的。主题是在首次使用时通过发布到指定主题或订阅指定主题的通道来创建的。频道是通过订阅指定频道在首次使用时创建的。

主题和通道都相互独立地缓冲数据,防止缓慢的消费者导致其他通道的积压(同样适用于主题级别)。

一个通道可以并且通常确实连接了多个客户端。假设所有连接的客户端都处于准备接收消息的状态,则每条消息将被传递给随机客户端。

消息是从主题 -> 通道多播的(每个频道都接收该主题的所有消息的副本),但从通道 -> 消费者均匀分布(每个消费者接收该频道的部分消息)

消息传递保证

NSQ保证一条消息至少被传递一次,尽管重复消息是可能的。消费者应该预料到这一点并进行重复数据删除或执行幂等操作。

工作方式(假设客户端已成功连接并订阅了主题):

  1. 客户端表示他们已准备好接收消息
  2. NSQ发送消息并将数据临时存储在本地(在重新排队或超时的情况下)
  3. 客户端回复 FIN(完成)或 REQ(重新排队)分别指示成功或失败。如果客户端没有回复NSQ将在可配置的持续时间后超时并自动重新排队消息)

这确保了唯一会导致消息丢失的边缘情况是 nsqd进程的不正常关闭。在这种情况下,内存中的任何消息(或任何未刷新到磁盘的缓冲写入)都将丢失。

完全解决方案是:建立冗余nsqd对(在不同的主机上)接收相同部分消息的副本。因为您已将消费者编写为幂等,所以对这些消息执行双重时间不会对下游产生影响,并且允许系统承受任何单节点故障而不会丢失消息。

消息持久化

nsqd提供了一个配置选项--mem-queue-size,用于确定队列在内存中保留的消息数量。如果队列的深度超过此阈值,消息将透明地写入磁盘。这将给定进程的内存占用限制为 :nsqd mem-queue-size * #_of_channels_and_topics

通过将此值设置为较低的值(如 1 甚至 0),这是一种获得更高交付保证的便捷方法。磁盘支持的队列被设计为在不干净的重启后仍然存在(尽管消息可能会被传递两次)。

此外,与消息传递保证相关,干净关闭(通过向nsqd进程发送 TERM 信号)能够安全地将当前在内存中、运行中、延迟和各种内部缓冲区中的消息持久保存。

请注意,名称以字符串结尾的主题/频道#ephemeral的消息不会缓冲到磁盘,而是会在传递mem-queue-size. 这使得不需要消息保证的消费者能够订阅频道。这些临时通道也将在其最后一个客户端断开连接后消失。对于一个临时主题,这意味着至少有一个频道被创建、消费和删除(通常是一个临时频道)。

消息传输

NSQ旨在通过“类似 memcached”的命令协议与简单的大小前缀响应进行通信。所有消息数据都保存在核心中,包括尝试次数、时间戳等元数据。这消除了从服务器到客户端来回复制数据,这是重新排队消息时先前工具链的固有属性。这也简化了客户端,因为它们不再需要负责维护消息状态。

对于数据协议,我们做出了一个关键的设计决策,即通过将数据推送到客户端而不是等待它拉取来最大化性能和吞吐量。这个概念,我们称之为RDY状态,本质上是客户端流控制的一种形式。

当客户端连接nsqd并订阅通道时,它的RDY状态为 0。这意味着不会向客户端发送任何消息。当客户端准备好接收消息时,它会发送一条命令,将其RDY状态更新为它准备处理的某个 #,比如 100。如果没有任何其他命令,100 条消息将在可用时推送到客户端(每次递减该客户端的服务器端 RDY 计数)。

客户端库旨在在达到可配置设置的约 25% 时发送更新RDY计数的命令max-in-flight(并适当考虑到多个nsqd 实例的连接,适当划分)。

值得注意的是,因为消息既是缓冲的又是基于推送的,能够满足对流(通道)的独立副本的需求,我们制作了一个行为类似于simplequeuepubsub _组合_的守护进程。

nsqd

nsqd是接收、排队和传递消息给客户端的守护进程。

它可以独立运行,但通常配置在带有nsqlookupd 实例的集群中(在这种情况下,它将宣布主题和发现通道)。

它监听两个 TCP 端口,一个用于客户端,另一个用于 HTTP API。它可以选择在第三个端口上侦听 HTTPS。

nsqd监听两个端口:

4151 HTTP Producer使用HTTP协议的curl等工具生产数据;Consumer使用HTTP协议的curl等工具消费数据;
4150 TCP Producer使用TCP协议的nsq-j等工具生产数据;Consumer使用TCP协议的nsq-j等工具消费数据;

nsqlookupd

nsqlookupd是管理拓扑信息的守护进程。客户端查询nsqlookupd以发现 nsqd特定主题的生产者,nsqd节点广播主题和频道信息。

有两个端口:用于nsqd广播的 TCP 端口和用于客户端执行发现和管理操作的 HTTP 接口。

nsqlookupd 监听两个端口:

4160 TCP 用于接收nsqd的广播,记录nsqd的地址以及监听TCP/HTTP端口等。
4161 HTTP 用于接收客户端发送的管理和发现操作请求(增删话题,节点等管理查看性操作等)。当Consumer进行连接时,返回对应存在Topic的nsqd列表。

nsqadmin

nsqadmin监听一个端口
4171 HTTP 用于管理页面

代码分析

/ping

使用了装饰器模式


// http请求的各个handler,从这里创建请求处理的handler------------nsqd.go Main()
httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
...
// 很多handler

//http_api.Decorate看一下这个方法:
func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle {
  decorated := f
  for _, decorate := range ds {
    decorated = decorate(decorated)
  }
  return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
    decorated(w, req, ps)
  }
}
//s.pingHandler
func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  health := s.nsqd.GetHealth()
  if !s.nsqd.IsHealthy() {
    return nil, http_api.Err{500, health}
  }
  return health, nil
}

//log
func Log(logf lg.AppLogFunc) Decorator {
  return func(f APIHandler) APIHandler {
    return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
      start := time.Now()
      response, err := f(w, req, ps)
      elapsed := time.Since(start)
      status := 200
      if e, ok := err.(Err); ok {
        status = e.Code
      }
      logf(lg.INFO, "%d %s %s (%s) %s",
        status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed)
      return response, err
    }
  }
}
//http_api.PlainText
func PlainText(f APIHandler) APIHandler {
  return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
    code := 200
    data, err := f(w, req, ps)
    if err != nil {
      code = err.(Err).Code
      data = err.Error()
    }
    switch d := data.(type) {
    case string:
      w.WriteHeader(code)
      io.WriteString(w, d)
    case []byte:
      w.WriteHeader(code)
      w.Write(d)
    default:
      panic(fmt.Sprintf("unknown response type %T", data))
    }
    return nil, nil
  }
}

PUB

// tcp_server.go 文件中 使用协程创建这个连接的客户端
go func() {
  handler.Handle(clientConn)
  wg.Done()
}()
// 创建客户端
client := prot.NewClient(conn)
p.conns.Store(conn.RemoteAddr(), client)
err = prot.IOLoop(client)
  // 开启一个消息泵
  go p.messagePump(client, messagePumpStartedChan)
  
  response, err = p.Exec(client, params)
      // 根据参数执行对应的
      case bytes.Equal(params[0], []byte("PUB")):
        return p.PUB(client, params)
      // PUB方法中:
      topic := p.nsqd.GetTopic(topicName)//获取Topic,如果没有就会创建topic。在这里还会将所有的channel放到一个map里,如果channel还没有,那么就去创建出来
            NewTopic(topicName, n, deleteCallback)//创建topic,创建之后会开启一个协程,循环从topic消息管道中放到channel中
                t.waitGroup.Wrap(t.messagePump)// 开启一个协程 循环从topic消息管道中放到channel中
      msg := NewMessage(topic.GenerateID(), messageBody) // 创建一个消息
      err = topic.PutMessage(msg)// 将消息放到管道中
  // 响应
  p.Send(client, frameTypeResponse, response)
  
消息put到memoryMsgChan(主题的管道),管道满了之后,就会持久化到磁盘中。这个管道的大小由启动时参数MemQueueSize
func (t *Topic) put(m *Message) error {
  select {
  case t.memoryMsgChan <- m:
  default:
    将消息写入磁盘
    err := writeMessageToBackend(m, t.backend)
    t.nsqd.SetHealth(err)
    if err != nil {
      t.nsqd.logf(LOG_ERROR,
        "TOPIC(%s) ERROR: failed to write message to backend - %s",
        t.name, err)
      return err
    }
  }
  return nil
}

SUB

// 开始和上面相同,创建客户端,然后执行
   //这个协程内,开启一个消息泵,循环从channel中获取消息,发送到客户端
  go p.messagePump(client, messagePumpStartedChan)
// 匹配到SUB,执行
case bytes.Equal(params[0], []byte("SUB")):
    return p.SUB(client, params)
        //从参数中获取
        topicName := string(params[1])
        channelName := string(params[2])
        topic := p.nsqd.GetTopic(topicName)
        channel = topic.GetChannel(channelName)
        channel.AddClient(client.ID, client)

推送消息

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
  var err error
  var memoryMsgChan chan *Message
  var backendMsgChan <-chan []byte
  var subChannel *Channel
  // NOTE: `flusherChan` is used to bound message latency for
  // the pathological case of a channel on a low volume topic
  // with >1 clients having >1 RDY counts
  var flusherChan <-chan time.Time
  var sampleRate int32


  for {
  // 判断客户端是 消费者 还是 生产者
    if subChannel == nil || !client.IsReadyForMessages() {
      // the client is not ready to receive messages...
      memoryMsgChan = nil
      backendMsgChan = nil
      flusherChan = nil
     // ....

    select {
    // 省略部分代码...
      // 当memoryMsgChan管道满了之后,新来的消息会放到这个管道中
    case b := <-backendMsgChan:
      subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
      client.SendingMessage()
      err = p.SendMessage(client, msg)
    case msg := <-memoryMsgChan:
      if sampleRate > 0 && rand.Int31n(100) > sampleRate {
        continue
      }
      msg.Attempts++

      //在向客户端发送消息之前,将消息设置为在飞翔中,如果处理成功就把这个消息从飞翔中的状态中去掉,
      //如果在规定的时间内没有收到客户端的反馈,则认为这个消息超时,然后重新归队。 其实就是将消息放到一个map中
      subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
      client.SendingMessage()
      err = p.SendMessage(client, msg)
      if err != nil {
        goto exit
      }
      flushed = false
    case <-client.ExitChan:
      goto exit
    }
  }
}

如果不考虑负载情况,把随机的把消息发送到某一个客服端去处理消息,如果机器的性能不同,可能发生的情况就是某一个或几个客户端处理速度慢,但还有大量新的消息需要处理,其他的客户端处于空闲状态。理想的状态是,找到当前相对空闲的客户端去处理消息。

nsq的处理方式是客户端主动向nsqd报告自已的可处理消息数量(也就是RDY命令)。nsqd根据每个连接的客户端的可处理消息的状态来随机把消息发送到可用的客户端,来进行消息处理

同时订阅同一topic的客户端(comsumer)有很多个,每个客户端根据自己的配置或状态发送RDY命令到nsqd表明自己能处理多少消息量nsqd服务端会检查每个客户端的的状态是否可以发送消息。也就是IsReadyForMessages方法,判断inFlightCount是否大于readyCount,如果大于或者等于就不再给客户端发送数据,等待Ready后才会再给客户端发送数据

func (c *clientV2) IsReadyForMessages() bool {
  if c.Channel.IsPaused() {
    return false
  }

  readyCount := atomic.LoadInt64(&c.ReadyCount)
  inFlightCount := atomic.LoadInt64(&c.InFlightCount)

  c.ctx.nsqd.logf(LOG_DEBUG, "[%s] state rdy: %4d inflt: %4d", c, readyCount, inFlightCount)

  if inFlightCount >= readyCount || readyCount <= 0 {
    return false
  }

  return true

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
  // ...
  for {
    // 检查订阅状态和消息是否可处理状态  
    if subChannel == nil || !client.IsReadyForMessages() {
      // the client is not ready to receive messages...
      memoryMsgChan = nil
      backendMsgChan = nil
      flusherChan = nil
      // ...
      flushed = true
    } else if flushed {
      memoryMsgChan = subChannel.memoryMsgChan
      backendMsgChan = subChannel.backend.ReadChan()
      flusherChan = nil
    } else {
      memoryMsgChan = subChannel.memoryMsgChan
      backendMsgChan = subChannel.backend.ReadChan()
      flusherChan = outputBufferTicker.C
    }

    select {
    case <-flusherChan:
      // ...
    // 消息处理      
    case b := <-backendMsgChan:
      client.SendingMessage()
      // ...
    case msg := <-memoryMsgChan:
      client.SendingMessage()    
      //...
    }
  }
// ...
}

处理超时消息

nsq启动的时候启动协程去处理channel的过期数据

func (n *NSQD) Main() error {
  // ...
  // 启动协程去处理channel的过期数据    
  n.waitGroup.Wrap(n.queueScanLoop)
  n.waitGroup.Wrap(n.lookupLoop)
  if n.getOpts().StatsdAddress != "" {
    n.waitGroup.Wrap(n.statsdLoop)
  }

  err := <-exitCh
  return err
}

func (n *NSQD) queueScanLoop() {
  //resizePool调整queueScanWorker goroutines池的大小
  // 随机选取一个channel放到workCh中,queueScanWorker()进行扫描
  n.resizePool(len(channels), workCh, responseCh, closeCh)
}



// 随机
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
  for {
    select {
    case c := <-workCh:
      now := time.Now().UnixNano()
      dirty := false
     // 处理飞翔中的消息,如果过期了重新入队
      if c.processInFlightQueue(now) {
        dirty = true
      }
      // 处理延迟期的消息
      if c.processDeferredQueue(now) {
        dirty = true
      }
      responseCh <- dirty
    case <-closeCh:
      return
    }
  }
}



持久化消息

默认的情况下,只有内存队列不足时MemQueueSize:10000时,才会把数据保存到文件内进行持久到硬盘。

如果将 --mem-queue-size 设置为 0,所有的消息将会存储到磁盘。我们不用担心消息会丢失,nsq 内部机制保证在程序关闭时将队列中的数据持久化到硬盘,重启后就会恢复。

nsq自己开发了一个库go-diskqueue来持久会消息到内存

func (t *Topic) put(m *Message) error {
  select {
  case t.memoryMsgChan <- m:
  default:
    // 持久化消息
    err := writeMessageToBackend(m, t.backend)
    t.nsqd.SetHealth(err)
    if err != nil {
      t.nsqd.logf(LOG_ERROR,
        "TOPIC(%s) ERROR: failed to write message to backend - %s",
        t.name, err)
      return err
    }
  }
  return nil
}

集群实现

nsqd 启动命令:

./nsqd -tcp-address ":8000"  -http-address ":8001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./a

--lookupd-tcp-address 用于指定nsqlookupdtcp监听地址。

nsqd启动后连接nsqlookupd,连接成功后,要发送一个魔法标识nsq.MagicV1,这个标识有啥魔法么,当然不是,他只是用于标明,客户端和服务端双方使用的信息通信版本,不能的版本有不同的处理方式,为了后期做新的消息处理版本方便.

func (p *tcpServer) Handle(clientConn net.Conn) {  
  // ...
  buf := make([]byte, 4)
  _, err := io.ReadFull(clientConn, buf)
  // ...
  protocolMagic := string(buf)
  // ...
  var prot protocol.Protocol
  switch protocolMagic {
  case "  V1":
    prot = &LookupProtocolV1{ctx: p.ctx}
  default:
    // ...
    return
  }
  err = prot.IOLoop(clientConn)
  //...
}

这个时候的nsqd已经和nsqlookupd建立好了连接,但是这时,仅仅说明他俩连接成功。nsqlookupd也并没有把这个连接加到可用的nsqd列表里。建立连接完成后,nsqd会发送IDENTIFY命令,这个命令里包含了nsq的基本信息nsqd的代码

ci := make(map[string]interface{})
    ci["version"] = version.Binary
    ci["tcp_port"] = n.RealTCPAddr().Port
    ci["http_port"] = n.RealHTTPAddr().Port
    ci["hostname"] = hostname
    ci["broadcast_address"] = n.getOpts().BroadcastAddress

    cmd, err := nsq.Identify(ci)
    if err != nil {
      lp.Close()
      return
    }
    resp, err := lp.Command(cmd)

包含了nsqd 提供的tcphttp端口,主机名,版本等等,发送给nsqlookupd,nsqlookupd收到IDENTIFY命令后,解析信息然后加到nsqd的可用列表里nsqlookupd 的代码块

func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
  var err error
  if client.peerInfo != nil {
    return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again")
  }
  var bodyLen int32
  err = binary.Read(reader, binary.BigEndian, &bodyLen)
  // ...
  body := make([]byte, bodyLen)
  _, err = io.ReadFull(reader, body)
  // ...  
  peerInfo := PeerInfo{id: client.RemoteAddr().String()}
  err = json.Unmarshal(body, &peerInfo)
  // ...
  client.peerInfo = &peerInfo
  // 把nsqd的连接加入到可用列表里    
  if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) {
    p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "")
  }
  // ...
  return response, nil
}

然后每过15秒,nsqd会发送一个PING心跳命令给nsqlookupd,这样保持存活状态,nsqlookupd每次收到发过来的PING命令后,也会记下这个nsqd的最后更新时间,这样做为一个筛选条件,如果长时间没有更新,就认为这个节点有问题,不会把这个节点的信息加入到可用列表。

nsqlookupd 挂掉的处理方式

目前的处理方式是这样的,无论是心跳,还是其他命令,nsqd会给所有的nsqlookup发送信息,当nsqd发现nsqlookupd出现问题时,在每次发送命令时,会不断的进行重新连接:

func (lp *lookupPeer) Command(cmd *nsq.Command) ([]byte, error) {
  initialState := lp.state
  if lp.state != stateConnected {
    err := lp.Connect()
    if err != nil {
      return nil, err
    }
    lp.state = stateConnected
    _, err = lp.Write(nsq.MagicV1)
    if err != nil {
      lp.Close()
      return nil, err
    }
    if initialState == stateDisconnected {
      lp.connectCallback(lp)
    }
    if lp.state != stateConnected {
      return nil, fmt.Errorf("lookupPeer connectCallback() failed")
    }
  }
  // ...
}

如果连接成功,会再次调用connectCallback方法,进行IDENTIFY命令的调用等。

当有nsqd出现故障时怎么办?

  • nsqdlookupd会把这个故障节点从可用列表中去除,客户端从接口得到的可用列表永远都是可用的。

  • 客户端会把这个故障节点从可用节点上移除,然后要去判断是否使用了nsqlookup进行了连接,如果是则case r.lookupdRecheckChan <- 1 去刷新可用列表queryLookupd,如果不是,然后启动一个协程去定时做重试连接,如果故障恢复,连接成功,会重新加入到可用列表.

    客户端实现的代码

func (r *Consumer) onConnClose(c *Conn) {
  // ...
  // remove this connections RDY count from the consumer's total
  delete(r.connections, c.String())
  left := len(r.connections)
  // ...
  r.mtx.RLock()
  numLookupd := len(r.lookupdHTTPAddrs)
  reconnect := indexOf(c.String(), r.nsqdTCPAddrs) >= 0
  // 如果使用的是nslookup则去刷新可用列表
  if numLookupd > 0 {
    // trigger a poll of the lookupd
    select {
    case r.lookupdRecheckChan <- 1:
    default:
    }
  } else if reconnect {
    // ... 
    }(c.String())
  }
}

消息传输

生产者生产数据的过程 消息是字节流: 消息类型+" "+ 主题名 + " " + channel名 + “/n”

//将单条消息同步发送到指定主题,消息也可以异步/批量发送
err = producer.Publish(topicName, messageBody)

func (w *Producer) Publish(topic string, body []byte) error {
  return w.sendCommand(Publish(topic, body))
}

//Publish创建一个新的命令来向给定的主题写入消息
func Publish(topic string, body []byte) *Command {
  var params = [][]byte{[]byte(topic)}
  return &Command{[]byte("PUB"), params, body}
}
// 省略一些代码片段。。。
// 下面时消息序列化代码 发送到服务端
func (c *Command) WriteTo(w io.Writer) (int64, error) {
  var total int64
  var buf [4]byte
  // 1,写入消息的类型
  n, err := w.Write(c.Name)
  total += int64(n)
  if err != nil {
    return total, err
  }

  for _, param := range c.Params {
    // 2,写入一个空字符串
    n, err := w.Write(byteSpace)
    total += int64(n)
    if err != nil {
      return total, err
    }
    // 3,写入 主题名称,这里循环写入是因为 params中 可能有主题名,通道名
    n, err = w.Write(param)
    total += int64(n)
    if err != nil {
      return total, err
    }
  }
  // 4,写入 换行符
  n, err = w.Write(byteNewLine)
  total += int64(n)
  if err != nil {
    return total, err
  }

  if c.Body != nil {
    bufs := buf[:]
    binary.BigEndian.PutUint32(bufs, uint32(len(c.Body)))
    // 5,写入  消息的内容的长度
    n, err := w.Write(bufs)
    total += int64(n)
    if err != nil {
      return total, err
    }
    // 6,写入 消息的内容
    n, err = w.Write(c.Body)
    total += int64(n)
    if err != nil {
      return total, err
    }
  }

  return total, nil
}

服务端如何接收消息反序列化的, 按规定的格式读取字节

// 读取客户端发送来的消息,首先读取换行符前面的内容就是params
line, err = client.Reader.ReadSlice('\n')
    if err != nil {
      if err == io.EOF {
        err = nil
      } else {
        err = fmt.Errorf("failed to read command - %s", err)
      }
      break
    }

    // trim the '\n'
    line = line[:len(line)-1]
    // optionally trim the '\r'
    if len(line) > 0 && line[len(line)-1] == '\r' {
      line = line[:len(line)-1]
    }
    params := bytes.Split(line, separatorBytes)// [消息类型, 主题名, channel名]
    
    bodyLen, err := readLen(client.Reader, client.lenSlice)
    messageBody := make([]byte, bodyLen) // 消息内容
  _, err = io.ReadFull(client.Reader, messageBody)

服务端推送消息及消息如何序列化, 响应:data的长度+ data(消息的类型+ 消息)

type Message struct {
  ID        MessageID
  Body      []byte
  Timestamp int64
  Attempts  uint16

  // for in-flight handling
  deliveryTS time.Time
  clientID   int64
  pri        int64
  index      int
  deferred   time.Duration
}

func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error {
  p.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): writing msg(%s) to client(%s) - %s", msg.ID, client, msg.Body)

  buf := bufferPoolGet()
  defer bufferPoolPut(buf)

  _, err := msg.WriteTo(buf)// msg->字节流
  if err != nil {
    return err
  }

  err = p.Send(client, frameTypeMessage, buf.Bytes())
  if err != nil {
    return err
  }

  return nil
}
func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error {
  client.writeLock.Lock()

  var zeroTime time.Time
  if client.HeartbeatInterval > 0 {
    client.SetWriteDeadline(time.Now().Add(client.HeartbeatInterval))
  } else {
    client.SetWriteDeadline(zeroTime)
  }

  _, err := protocol.SendFramedResponse(client.Writer, frameType, data)
  if err != nil {
    client.writeLock.Unlock()
    return err
  }

  if frameType != frameTypeMessage {
    err = client.Flush()
  }

  client.writeLock.Unlock()

  return err
}

func SendFramedResponse(w io.Writer, frameType int32, data []byte) (int, error) {
  beBuf := make([]byte, 4)
  size := uint32(len(data)) + 4

  binary.BigEndian.PutUint32(beBuf, size)
  n, err := w.Write(beBuf)// 写入消息的长度
  if err != nil {
    return n, err
  }

  binary.BigEndian.PutUint32(beBuf, uint32(frameType))
  n, err = w.Write(beBuf) // 写入frameType,这是响应的类型,有:三种类型
                                    //frameTypeResponse int32 = 0
                                    //frameTypeError    int32 = 1
                                    //frameTypeMessage  int32 = 2
  if err != nil {
    return n + 4, err
  }

  n, err = w.Write(data) // 写入消息
  return n + 8, err
}

//    [x][x][x][x][x][x][x][x]...
//    |  (int32) || (binary)
//    |  4-byte  || N-byte
//    ------------------------...
//        size       data

消费者接收消息如何反序列化

func ReadUnpackedResponse(r io.Reader) (int32, []byte, error) {
  resp, err := ReadResponse(r)
  if err != nil {
    return -1, nil, err
  }
  return UnpackResponse(resp)
}

//    [x][x][x][x][x][x][x][x]...
//    |  (int32) || (binary)
//    |  4-byte  || N-byte
//    ------------------------...
//        size       data

func ReadResponse(r io.Reader) ([]byte, error) {
  var msgSize int32

  // message size
  err := binary.Read(r, binary.BigEndian, &msgSize)
  if err != nil {
    return nil, err
  }

  if msgSize < 0 {
    return nil, fmt.Errorf("response msg size is negative: %v", msgSize)
  }
  // message binary data
  buf := make([]byte, msgSize) // data
  _, err = io.ReadFull(r, buf)
  if err != nil {
    return nil, err
  }

  return buf, nil
}

//
//    [x][x][x][x][x][x][x][x]...
//    |  (int32) || (binary)
//    |  4-byte  || N-byte
//    ------------------------...
//      frame ID     data
//
// Returns a triplicate of: frame type, data ([]byte), error
func UnpackResponse(response []byte) (int32, []byte, error) {
  if len(response) < 4 {
    return -1, nil, errors.New("length of response is too small")
  }

  return int32(binary.BigEndian.Uint32(response)), response[4:], nil
}

常用工具

上游如何保证消息的可靠性?

  • nsq_to _file:消费指定的话题(topic)/通道(channel),并写到文件中,有选择地滚动和/或压缩文件。
  • nsq_to _http:消费指定的话题(topic)/通道(channel)和执行 HTTP requests (GET/POST) 到指定的端点。
  • nsq_to _nsq:消费者指定的话题/通道和重发布消息到目的地 nsqd 通过 TCP。

消息备份/回放
消息是写到一个单一的 nsqd 的,如果那个 nsqd 部署机器挂掉无法恢复,可能会导致这个 nsqd 中积压的消息丢失。所以我们还需要设计消息备份和异常回放的机制。

  • 备份

NSQ 提供了 nsq_to_file 工具,可以用来做消息备份。所有消息实时备份到本地文件,按小时切割文件,消息文件三备份,备份数据存储 n 天。

需要备份的消息使用 nsq_to_nsq 工具,同步写入备份队列,备份机器上运行 nsq_to_file 订阅备份队列的数据,写到磁盘备份。

  • 回放

1.机器挂掉能重启恢复,重启恢复后重启服务即可,nsqd 的内存队列大小设置为 0,数据全部落盘,重启不会丢失数据。
2.机器不能重启恢复的情况下,从备份数据中回放该 nsqd 的备份消息。 需要注意回放时对消息去重,因为写备份时采用全部备份写成功才算成功的方案,可能会导致消息重复。

集群监控
同时我们还需要部署监控,实时监控 NSQ 集群的运行情况,以便出现问题时能技术感知修复。

机器状态:cpu、内存、磁盘、网络等。
服务状态:进程、端口存活等。
消息队列:队列数量、队列积压情况等。
这样就可以做到整个系统无单点,数据三备份,及时感知集群异常。做到一个基础的可用性要求,能够满足绝大部分的场景使用了。

高可用

队列长度设置为0,所有的消息都存到磁盘中

nsq_to_file,在一个主机上,消费指定主题的消息,按小时全部存到文件中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/171775.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何看懂行业分析报告?

从下面几部分聊聊行业分析&#xff1a;1.什么时候需要做行业分析&#xff1f;2.如何做行业分析&#xff1f;3.案例学习4.在工作中如何应用&#xff1f;5.在生活中如何应用&#xff1f;1.什么时候需要做行业分析呢&#xff1f;当你在对自己进行职业规划的时候&#xff0c;会思考…

【SpringCloud15】SpringCloud Stream消息驱动

1.消息驱动概述 1.1 为什么要引入消息驱动 1.2 是什么 概述&#xff1a;屏蔽底层消息中间件的差异&#xff0c;降低切换成本&#xff0c;统一消息的编程模型 官网 Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架&#xff0c;该框架提…

一 、Qml开发之环境搭建

进入官网下载相应版本的qtcreator &#xff1a;https://download.qt.io/archive/qt/5.12/5.12.6/ 1.1 安装的时候注意如下对话框&#xff0c;需要选择下图所示的必须选项&#xff0c;因为我是mac 所以选择的macOS下载完之后进行点击安装&#xff0c;安装后运行软件图片如下&…

小程序uni-app介绍

uni-app介绍 uni-app简介 uni-app 是一个使用**Vue.js **开发所有前端应用的框架&#xff0c;开发者编写一套代码&#xff0c;可发布到iOS、Android、Web&#xff08;响应式&#xff09;、以及各种小程序&#xff08;微信/支付宝/百度/头条/QQ/钉钉/淘宝&#xff09;、快应用…

C++|读写xml文件开源库tingxml2的使用

参考&#xff1a; TinyXML使用方法[通俗易懂] https://cloud.tencent.com/developer/article/2037579 TinyXML2 入门教程&#xff08;这篇写很好&#xff0c;本文侧重讲解使用不过做多介绍&#xff09; 不了解xml的建议自行查阅&#xff0c;在此不赘述。 开源库github链接&…

Python中的列表、元组、字典

​​​​​​​列表是一种让程序员在代码中批量表示/保存数据的方式&#xff0c;元组和列表相比&#xff0c;是非常相似的&#xff0c;只是列表中放哪些元素可以修改调整&#xff0c;元组中放的元素是创建元组的时候就设定好的&#xff0c;不能修改调整。 列表和元组类似于其他…

SpringBoot 2-9-2 ServletAPI

使用27个解析器中 ServletRequestMethodArgumentResolver Step1 页面请求 注意RestController ResponseBody Controller Controller 将当前修饰的类注入SpringBoot IOC容器&#xff0c;使得从该类启动后就被实例化 ResponseBody 表示它会以Json字符串的形式返回给客户…

【日常系列】LeetCode《27·动态规划2》

数据规模->时间复杂度 <10^4 &#x1f62e;(n^2) <10^7:o(nlogn) <10^8:o(n) 10^8<:o(logn),o(1) 内容 1&#xff09;爬楼梯、打家劫舍问题 2&#xff09;0-1&#xff0c;多重&#xff0c;完全&#xff0c;二维被动背包问题 lc 70【剑指 10 - 2】【top100】&…

Maven仓库集成与使用

1.概念:Maven主要服务于基于java平台的项目构建(编译、测试、生成文档、打包、部署等)&#xff0c;依赖管理和项目信息管理。 2.四大特性: 2.1:依赖管理系统(jar包管理, jar 升级时修改配置文件即可) 依赖(Coordination):由groupId、artifactId、version组成 …

PHP MySQL 预处理语句

预处理语句对于防止 MySQL 注入是非常有用的。 预处理语句及绑定参数 预处理语句用于执行多个相同的 SQL 语句&#xff0c;并且执行效率更高。 预处理语句的工作原理如下&#xff1a; 预处理&#xff1a;创建 SQL 语句模板并发送到数据库。预留的值使用参数 "?" 标…

Python 实现 JSON 解析器

Json 解析 文章目录Json 解析Json 的组成对象结构数组结构词法分析逻辑性解析解析对象类型解析数组类型完整代码小结Json 的组成 JSON结构共有2种 对象结构数组结构 一个合法的JSON字符串可以包含这几种元素: 特殊符号,如"{" “}“表示一个JSON Object&#xff0…

将DataFrame进行转置的DataFrame.transpose()方法

【小白从小学Python、C、Java】 【计算机等级考试500强双证书】 【Python-数据分析】 将DataFrame进行转置 DataFrame.transpose() 选择题 关于以下python代码说法错误的一项是? import pandas as pd dfpd.DataFrame({a:[a1,a2],b:[b1,b2]}) print("【显示】df:\n"…

高德地图红绿灯读秒是怎么实现的?(一)

关于这个读秒实现功能众说风云&#xff0c;目前有两种说法&#xff0c;一种说是靠大数据分析&#xff0c;一种说是靠交管部门数据。 我们先看一下官方的回应&#xff1a;可以自行去抖音看官方号的解释。 以下为原答&#xff1a; 有人说是接入了地方交管数据&#xff0c;其实政策…

2022年度 FinClip 扩展 SDK 推荐!

2022年&#xff0c;FinClip 团队进行了24个产品迭代&#xff0c;为了丰富FinClip 的平台能力&#xff0c;除了核心SDK之外&#xff0c;我们还为开发者们提供了扩展SDK&#xff0c;扩展SDK是一个依赖核心SDK的库&#xff0c;里面提供了核心SDK中所没有的各种小程序API。 官方希…

arduino和物联网云端平台系列---物模型之事件

事件&#xff0c;先下个简单的定义就是发生了什么事件 系列文章都是已经完成了基本的库安装和使用为前提 物模型之事件 基本的添加步骤不描述了&#xff0c;设置一个测试用例 事件我已经设定好了&#xff0c;输出参数代表的是在云端得到的输出&#xff0c;需要我们在设备进行…

【程序环境和预处理】C语言

前言&#xff1a; 到此节便是我们C语言学习的终章了&#xff0c;对C语言的学习便告一段落了&#xff0c;到学完这一章节我们便要进入下一个主题的学习了。 目录1. 程序的翻译环境和执行环境2. 详解编译链接2.1 翻译环境2.2 编译本身也分为几个阶段2.3 运行环境3. 预处理详解3.1…

ESP32设备驱动-L3GD20三轴角速率传感器驱动

L3GD20三轴角速率传感器驱动 1、L3GD20介绍 L3GD20 是一款低功耗三轴角速率传感器。 它包括一个传感元件和一个 I2C 接口,能够通过数字接口 (I2C/SPI) 向外部世界提供测量的角速率。传感元件采用意法半导体开发的专用微加工工艺制造,用于在硅晶片上生产惯性传感器和执行器。…

高通Qualcomm处理器的手机或设备进EDL 9008模式的办法

适用于变砖的设备 由于我们有很多基于 Qualcomm 的设备&#xff0c;其中一些设备可能会古怪地猜测如何进入 EDL 模式&#xff0c;或者如何正确进入。 例如&#xff0c;对于 Alcatel&#xff0c;您必须先按住两个音量键&#xff0c;然后再按住其中一个&#xff0c;对于 CAT B35…

DocPrompt代码实现细节

数据预处理阶段 PaddleOCR PP-Structure&#xff1a;这个库其实是用于版面分析的一个开源库&#xff0c;参见&#xff1a;github: Layout-Parser/layout-parserhttps://github.com/Layout-Parser/layout-parser 代码推理阶段 Paddle-Inferencehttps://paddle-inference.readt…

[JavaEE]定时器

专栏简介: JavaEE从入门到进阶 题目来源: leetcode,牛客,剑指offer. 创作目标: 记录学习JavaEE学习历程 希望在提升自己的同时,帮助他人,,与大家一起共同进步,互相成长. 学历代表过去,能力代表现在,学习能力代表未来! 目录: 1.定时器的概念 2.标准库中的定时器 3.实现定时…