【nsq vs kafka】https://zhuanlan.zhihu.com/p/46421050
【kafka】https://juejin.cn/post/6844903495670169607
NSQ
分布式内存消息队列
优势:
- NSQ提倡分布式和分散的拓扑,没有单点故障,支持容错和高可用性,提供可高的消息交付保证
- NSQ支持横向管理,没有任何集中式管理
- NSQ已于配置和部署,并且内置了管理界面
nsq的应用场景
- 异步处理: 把业务流程中非关键流程异步化,从而显著降低业务请求的响应时间。
- 应用解耦:消息队列将不同的业务逻辑解耦,降低系统间的耦合,提高系统的健壮性。后续有其他的业务使用订单但数据可以直接订阅消息队列,提高系统的灵活性
- 消息消峰:类似秒杀(大秒)等场景下,某一时间可能会产生大量的请求,使用消息队列能够为后端处理请求提供一定的缓冲区,保证后端服务的稳定性。
nsq组件
- nsqd
nsqd是一个守护进程,它接收、排队并向客户端发送消息。 - nsqlookupd
nsqlookupd是维护所有nsqd状态、提供服务发现的守护进程。它能为消费者查找特定topic下的nsqd提供了运行时的自动发现服务。 它不维持持久状态,也不需要与任何其他nsqlookupd实例协调以满足查询。因此根据你系统的冗余要求尽可能多地部署nsqlookupd节点。它们小豪的资源很少,可以与其他服务共存。我们的建议是为每个数据中心运行至少3个集群。 - nsqadmin
一个实时监控集群状态、执行各种管理任务的Web管理平台。
nsq架构
-
nsq工作模式
-
Topic和Channel
每个nsqd实例旨在一次处理多个数据流。这些数据流称为
“topics”
,一个topic
具有1个或多个“channels”
。每个channel
都会收到topic
所有消息的副本,实际上下游的服务是通过对应的channel
来消费topic
消息。topic
和channel
不是预先配置的。topic
在首次使用时创建,方法是将其发布到指定topic
,或者订阅指定topic
上的channel
。channel
是通过订阅指定的channel
在第一次使用时创建的。topic
和channel
都相互独立地缓冲数据,防止缓慢的消费者导致其他chennel
的积压(同样适用于topic
级别)。channel
可以并且通常会连接多个客户端。假设所有连接的客户端都处于准备接收消息的状态,则每条消息将被传递到随机客户端。例如:
总而言之,消息是从topic -> channel(每个channel接收该topic的所有消息的副本)多播的,但是从channel -> consumers均匀分布(每个消费者接收该channel的一部分消息)。
go操作nsq
- 生产者
// nsq_producer/main.go
package main
import (
"bufio"
"fmt"
"os"
"strings"
"github.com/nsqio/go-nsq"
)
// NSQ Producer Demo
var producer *nsq.Producer
// 初始化生产者
func initProducer(str string) (err error) {
config := nsq.NewConfig()
producer, err = nsq.NewProducer(str, config)
if err != nil {
fmt.Printf("create producer failed, err:%v\n", err)
return err
}
return nil
}
func main() {
nsqAddress := "127.0.0.1:4150"
err := initProducer(nsqAddress)
if err != nil {
fmt.Printf("init producer failed, err:%v\n", err)
return
}
reader := bufio.NewReader(os.Stdin) // 从标准输入读取
for {
data, err := reader.ReadString('\n')
if err != nil {
fmt.Printf("read string from stdin failed, err:%v\n", err)
continue
}
data = strings.TrimSpace(data)
if strings.ToUpper(data) == "Q" { // 输入Q退出
break
}
// 向 'topic_demo' publish 数据
err = producer.Publish("topic_demo", []byte(data))
if err != nil {
fmt.Printf("publish msg to nsq failed, err:%v\n", err)
continue
}
}
}
- 消费者
// nsq_consumer/main.go
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/nsqio/go-nsq"
)
// NSQ Consumer Demo
// MyHandler 是一个消费者类型
type MyHandler struct {
Title string
}
// HandleMessage 是需要实现的处理消息的方法
func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body))
return
}
// 初始化消费者
func initConsumer(topic string, channel string, address string) (err error) {
config := nsq.NewConfig()
config.LookupdPollInterval = 15 * time.Second
c, err := nsq.NewConsumer(topic, channel, config)
if err != nil {
fmt.Printf("create consumer failed, err:%v\n", err)
return
}
consumer := &MyHandler{
Title: "沙河1号",
}
c.AddHandler(consumer)
// if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD
if err := c.ConnectToNSQLookupd(address); err != nil { // 通过lookupd查询
return err
}
return nil
}
func main() {
err := initConsumer("topic_demo", "first", "127.0.0.1:4161")
if err != nil {
fmt.Printf("init consumer failed, err:%v\n", err)
return
}
c := make(chan os.Signal) // 定义一个信号的通道
signal.Notify(c, syscall.SIGINT) // 转发键盘中断信号到c
<-c // 阻塞
}