NSQ和KAFKA的使用入门

news2025/1/12 20:48:13

【nsq vs kafka】https://zhuanlan.zhihu.com/p/46421050
【kafka】https://juejin.cn/post/6844903495670169607

NSQ

分布式内存消息队列
优势:

  1. NSQ提倡分布式和分散的拓扑,没有单点故障,支持容错和高可用性,提供可高的消息交付保证
  2. NSQ支持横向管理,没有任何集中式管理
  3. NSQ已于配置和部署,并且内置了管理界面

nsq的应用场景

  1. 异步处理: 把业务流程中非关键流程异步化,从而显著降低业务请求的响应时间。
  2. 应用解耦:消息队列将不同的业务逻辑解耦,降低系统间的耦合,提高系统的健壮性。后续有其他的业务使用订单但数据可以直接订阅消息队列,提高系统的灵活性
  3. 消息消峰:类似秒杀(大秒)等场景下,某一时间可能会产生大量的请求,使用消息队列能够为后端处理请求提供一定的缓冲区,保证后端服务的稳定性。

nsq组件

  1. nsqd
    nsqd是一个守护进程,它接收、排队并向客户端发送消息。
  2. nsqlookupd
    nsqlookupd是维护所有nsqd状态、提供服务发现的守护进程。它能为消费者查找特定topic下的nsqd提供了运行时的自动发现服务。 它不维持持久状态,也不需要与任何其他nsqlookupd实例协调以满足查询。因此根据你系统的冗余要求尽可能多地部署nsqlookupd节点。它们小豪的资源很少,可以与其他服务共存。我们的建议是为每个数据中心运行至少3个集群。
  3. nsqadmin
    一个实时监控集群状态、执行各种管理任务的Web管理平台。

nsq架构

  1. nsq工作模式

  2. Topic和Channel

    每个nsqd实例旨在一次处理多个数据流。这些数据流称为“topics”,一个topic具有1个或多个“channels”。每个channel都会收到topic所有消息的副本,实际上下游的服务是通过对应的channel来消费topic消息。

    topicchannel不是预先配置的。topic在首次使用时创建,方法是将其发布到指定topic,或者订阅指定topic上的channelchannel是通过订阅指定的channel在第一次使用时创建的。

    topicchannel都相互独立地缓冲数据,防止缓慢的消费者导致其他chennel的积压(同样适用于topic级别)。

    channel可以并且通常会连接多个客户端。假设所有连接的客户端都处于准备接收消息的状态,则每条消息将被传递到随机客户端。例如:


    总而言之,消息是从topic -> channel(每个channel接收该topic的所有消息的副本)多播的,但是从channel -> consumers均匀分布(每个消费者接收该channel的一部分消息)。

go操作nsq

  1. 生产者
// nsq_producer/main.go
package main

import (
	"bufio"
	"fmt"
	"os"
	"strings"

	"github.com/nsqio/go-nsq"
)

// NSQ Producer Demo

var producer *nsq.Producer

// 初始化生产者
func initProducer(str string) (err error) {
	config := nsq.NewConfig()
	producer, err = nsq.NewProducer(str, config)
	if err != nil {
		fmt.Printf("create producer failed, err:%v\n", err)
		return err
	}
	return nil
}

func main() {
	nsqAddress := "127.0.0.1:4150"
	err := initProducer(nsqAddress)
	if err != nil {
		fmt.Printf("init producer failed, err:%v\n", err)
		return
	}

	reader := bufio.NewReader(os.Stdin) // 从标准输入读取
	for {
		data, err := reader.ReadString('\n')
		if err != nil {
			fmt.Printf("read string from stdin failed, err:%v\n", err)
			continue
		}
		data = strings.TrimSpace(data)
		if strings.ToUpper(data) == "Q" { // 输入Q退出
			break
		}
		// 向 'topic_demo' publish 数据
		err = producer.Publish("topic_demo", []byte(data))
		if err != nil {
			fmt.Printf("publish msg to nsq failed, err:%v\n", err)
			continue
		}
	}
}
  1. 消费者
// nsq_consumer/main.go
package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/nsqio/go-nsq"
)

// NSQ Consumer Demo

// MyHandler 是一个消费者类型
type MyHandler struct {
	Title string
}

// HandleMessage 是需要实现的处理消息的方法
func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
	fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body))
	return
}

// 初始化消费者
func initConsumer(topic string, channel string, address string) (err error) {
	config := nsq.NewConfig()
	config.LookupdPollInterval = 15 * time.Second
	c, err := nsq.NewConsumer(topic, channel, config)
	if err != nil {
		fmt.Printf("create consumer failed, err:%v\n", err)
		return
	}
	consumer := &MyHandler{
		Title: "沙河1号",
	}
	c.AddHandler(consumer)

	// if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD
	if err := c.ConnectToNSQLookupd(address); err != nil { // 通过lookupd查询
		return err
	}
	return nil

}

func main() {
	err := initConsumer("topic_demo", "first", "127.0.0.1:4161")
	if err != nil {
		fmt.Printf("init consumer failed, err:%v\n", err)
		return
	}
	c := make(chan os.Signal)        // 定义一个信号的通道
	signal.Notify(c, syscall.SIGINT) // 转发键盘中断信号到c
	<-c                              // 阻塞
}

KAFKA

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/993031.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何理解Java中的自动拆箱和自动装箱?

个人主页&#xff1a;金鳞踏雨 个人简介&#xff1a;大家好&#xff0c;我是金鳞&#xff0c;一个初出茅庐的Java小白 目前状况&#xff1a;22届普通本科毕业生&#xff0c;几经波折了&#xff0c;现在任职于一家国内大型知名日化公司&#xff0c;从事Java开发工作 我的博客&am…

亚运会:举办次数最多的是泰国,观众满意度衡量赛事成功情况

本文由群狼调研&#xff08;长沙第三方评估&#xff09;整理出品&#xff0c;欢迎转载&#xff0c;请注明出处。8日起至9月20日&#xff0c;杭州第19届亚运会火炬传递启动&#xff0c;杭州亚运会是放开后的首个盛会&#xff0c;亚洲的健儿齐集于此&#xff0c;同场竞技&#xf…

【C++入门】C语言的不足之处

概要 C入门主要讲的是C语言的一些不足&#xff0c;C作为补充&#xff0c;来补充C的不足之处 C的关键字有63个&#xff0c;C语言有32个&#xff08;作为了解&#xff0c;不需要专门记&#xff09; 变量的命名规则&#xff1a; 变量名必须以字母或下划线开头。变量名只能包含字…

httprunner环境变量

前言 我的上一篇文章讲了httprunner的基本介绍&#xff0c;这篇文章呢主要来给大家介绍httprunner中的环境变量。 一般来说&#xff0c;在进行实际应用的开发过程中&#xff0c;应用会拥有不同的运行环境&#xff0c;通常会有以下环境&#xff1a; 本地开发环境测试环境生产…

Flask狼书笔记 | 07_留言板

文章目录 7 留言板7.1 使用包组织代码7.2 Web开发流程7.3 使用Bootstrap-Flask7.4 Flask-Moment本地化日期和时间7.5 使用Faker生成虚拟数据7.6 Flask_DebugToolbar调试程序7.7 Flask配置的两种组织形式小结 7 留言板 这是一个简单的程序&#xff0c;涉及到的大部分是之前所学…

windows弹出交互式服务检测一键取消bat脚本

现象 脚本命令 新建一个bat文件&#xff0c;将下面的脚本拷贝进去&#xff0c;保存&#xff0c;双击即可 禁用服务&#xff1a;重启电脑的时候不会启动 停止服务&#xff1a;立即停止服务&#xff0c;马上生效的 sc config UI0Detect start disabled net stop UI0Detect

AQS同步队列和等待队列的同步机制

理解AQS必须要理解同步队列和等待队列之间的同步机制&#xff0c;简单来说流程是&#xff1a; 获取锁失败的线程进入同步队列&#xff0c;成功的占用锁&#xff0c;占锁线程调用await方法进入条件等待队列&#xff0c;其他占锁线程调用signal方法&#xff0c;条件等待队列线程进…

攻防世界题目练习——Crypto密码难度1(一)

题目目录 1. base642. Caesar3. Morse4. Broadcast5. hidden key6. [简单] 初识RSA7. 简单的LFSR8. baigeiRSA 1. base64 下载文件&#xff0c;打开是一个txt文件&#xff0c;解密工具base64解码&#xff0c;如图&#xff1a; 2. Caesar 打开文件看到字符如下&#xff1a; …

道一云·七巧对接打通金蝶云星空查询七巧表单接口与供应商新增接口

道一云七巧对接打通金蝶云星空查询七巧表单接口与供应商新增接口 数据源平台:道一云七巧 道一云七巧拥有强大的自定义表单设计工具&#xff0c;并配置工作流程&#xff0c;通过流程智能流转&#xff0c;打通各个业务场景中的审批、协作环节&#xff0c;包含数据采集单、任务单…

strlen函数使用与模拟实现【进阶版】

strlen函数使用与模拟实现 1.strlen函数介绍 资源来源于cplusplus网站 翻译过来的大致意思就是&#xff1a; 获取字符串长度 2.strlen的使用 int main() { //strlen - 求字符串长度的 //字符串的结束标志是\0 //strlen统计的是\0之前出现的字符的个数 //基本功能 char arr[]…

【送书活动】全网超50万粉丝的Linux大咖良许,出书了!

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 「推荐专栏」&#xff1a; ★java一站式服务 ★ ★ React从入门到精通★ ★前端炫酷代码分享 ★ ★ 从0到英雄&#xff0c;vue成神之路★ ★ uniapp-从构建到提升★ ★ 从0到英雄&#xff…

母婴品牌的小红书投放策略有哪些,投放总结

最近有很多人想知道达人投放的一些相关知识&#xff0c;我们立马捕捉到了大家对这一干货内容的感兴趣程度&#xff0c;今天就来为大家分享下&#xff0c;母婴品牌的小红书投放策略有哪些&#xff0c;投放总结&#xff01; 什么是达人投放? 达人投放是一种常用于社交媒体营销的…

异步FIFO项目 UVM验证

文章目录 前言一、异步FIFO仿真过程1、异步FIFO设计2、UVM验证 二、脚本文件编写三、编译错误 前言 2022.11.15 记录自己开始使用UVM仿真异步FIFO项目 一、异步FIFO仿真过程 1、异步FIFO设计 首先对编写的异步FIFO设计代码进行验证&#xff0c;写了一个test.v文件 &#xff…

docker概念、安装与卸载

第一章 docker概念 Docker 是一个开源的应用容器引擎。 Docker 诞生于2013年初&#xff0c;基于 Go 语言实现&#xff0c;dotCloud 公司出品&#xff0c;后改名为 Docker Inc。 Docker 可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中&#xff0c;然后发…

Python的命名空间和作用域

所谓作用域就是指变量名/对象名在多大范围内是可见的/有效的&#xff0c;一个变量名既不能作用域太大&#xff0c;这样会引发广泛的命名冲突&#xff0c;也不能太小&#xff0c;这样很难实现变量的共享&#xff0c;所以一般都有多层次的作用域。一个层次的作用域&#xff0c;在…

WebGL 同时使用多幅纹理

目录 前言 ​编辑 示例代码 颜色矢量的分量乘法来计算两个纹素最终的片元颜色 注册事件响应函数&#xff1a;loadTexture&#xff08;&#xff09;&#xff0c;最后一个参数是纹理单元编号。 请求浏览器加载图像&#xff1a; 配置纹理&#xff1a;loadTexture&#xff0…

linux 安装Docker

# 1、yum 包更新到最新 yum update # 2、安装需要的软件包&#xff0c; yum-util 提供yum-config-manager功能&#xff0c;另外两个是devicemapper驱动依赖的 yum install -y yum-utils device-mapper-persistent-data lvm2 # 3、 设置yum源 yum-config-manager --add-repo h…

基于SSM的汽车养护管理系统

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

谈论浏览器内核

浏览器内核是指浏览器使用的渲染引擎&#xff0c;用于解析并显示网页的内容。主要有以下几种浏览器内核&#xff1a; Trident&#xff08;IE内核&#xff09;&#xff1a;由Microsoft开发&#xff0c;被用于Internet Explorer浏览器。目前已经被Edge取代。 Gecko&#xff1a;…

并发内存池(C++)

项目简介 这个项目是实现了一个高效的并发内存池。它的原型的goggle的一个开源项目tcmalloc&#xff0c;即thread-cache malloc&#xff08;线程缓存的malloc&#xff09;&#xff0c;实现了高效多线程的内存管理&#xff0c;可实现对系统提供的内存分配函数malloc和free的替代…