NATS-研究学习

news2025/1/21 7:15:31

NATS-研究学习


文章目录

    • NATS-研究学习
    • @[toc]
      • 介绍说明
      • 提供的服务内容
      • 各模式介绍测试使用
        • 发布订阅(Publish Subscribe)
        • 请求响应(Request Reply)
        • 队列订阅&分享工作(Queue Subscribers & Sharing Work)
        • 小杭写的Demo
      • 简单安装使用与测试
      • JetStream 简单使用Demo
      • Spring 项目整合
      • Nkey 认证连接
      • 参考资料

介绍说明

NATS是一个go语言开发的开源的、轻量、高性能的原生消息系统。消息由主题处理,不依赖于网络位置。它提供了应用程序或服务与底层物理网络之间的抽象层。数据被编码并作为消息,由发布者发送。消息由一个或多个订阅者接收、解码和处理。

NATS使程序可以很容易地跨不同的环境、语言、云提供商和内部系统进行通信。客户机通常通过单个URL连接到NATS系统,然后向主题订阅或发布消息。通过这种简单的设计,NATS允许程序共享通用的消息处理代码,隔离资源和相互依赖。

NATS核心提供最多一次的服务质量。
默认情况下,NATS是一种即发即弃的消息传递系统。

如果订户没有收听主题(没有主题匹配),或者在发送消息时未激活,则不会收到消息。

如果需要高级的东东,可以试用NATS Streaming 进行,属于NATS的一个服务模块了。

**优点:**使用简单,配置简单。速度极快,性能良好。
多语言支持,不依赖于网络位置,client端只需知道nats的节点和约定好的subject名称即可。

**缺点:**对服务器稳定性要求较高,机房出现故障,导致nats server端需要重连。可能需要重启nats-server。
在消息timeout后,需要在reconnection里要重新初始化连接,不方便。


提供的服务内容

NATS支持各种消息传递模型,包括:

发布订阅(Publish Subscribe)
请求回复(Request Reply)
队列订阅(Queue Subscribers )

提供的功能:

纯粹的发布订阅模型(Pure pub-sub)
服务器集群(Cluster mode server)
自动精简订阅者(Auto-pruning of subscribers)
基于文本协议(Text-based protocol)
多服务质量保证(Multiple qualities of service - QoS)

各模式介绍测试使用

		<dependency>
            <groupId>io.nats</groupId>
            <artifactId>jnats</artifactId>
            <version>2.16.13</version>
        </dependency>
发布订阅(Publish Subscribe)

请添加图片描述

NATS将publish/subscribe消息分发模型实现为一对多通信,发布者在 subject 上发送消息,并且监听该Subject在任何活动的订阅者都会收到该消息。

Demo:【测试可用】

//publish
Connection nc = Nats.connect("nats://127.0.0.1:4222");
nc.publish("subject", "hello world".getBytes(StandardCharsets.UTF_8));
 
//subscribe [这个时间内就之后收到一个,就结束了]
Subscription sub = nc.subscribe("subject");
Message msg = sub.nextMessage(Duration.ofMillis(500));
String response = new String(msg.getData(), StandardCharsets.UTF_8);
 
//或者是基于回调的subscribe [这个程序可以保持,持续接收信息]
//subscribe
Dispatcher d = nc.createDispatcher(msg ->{
 String response = new String(msg.getData(), StandardCharsets.UTF_8);
 //do something
})
d.subscribe("subject");
请求响应(Request Reply)

请添加图片描述

Request-Reply是现代分布式系统中的常见模式。发布者(crm)发送一个请求,应用程序(ybind,fpga-agent)要么在响应时等待一定的超时,要么异步接收响应。Request()是一个简单方便的API,它提供了一个伪同步的方式,使用了超时timeout设置。它创建了一个收件箱(收件箱是一种subject类型,对请求者唯一),订阅subject,然后发布你的请求消息(消息带reply地址)设置为收件箱的subject,然后等待响应,或者超时取消。

Demo:【测试可用】

// publish 
Connection nc = Nats.connect("nats://127.0.0.1:4222");
String reply = "replyMsg";   // 这个相当于回到的主题
//请求回应方法回调
Dispatcher d = nc.createDispatcher(msg -> {
 System.out.println("reply: " +  JSON .toJSONString(msg));
}) ;
d.unsubscribe(reply , 1);
//订阅请求
d.subscribe(reply);
//发布请求
nc.publish("requestSub", reply, "request".getBytes(StandardCharsets.UTF_8));
 
//subscribe
Connection nc = Nats.connect("nats://127.0.0.1:4222");
//注册订阅
Dispatcher dispatcher = nc.createDispatcher(msg -> {
 System.out.println(JSON.toJSONString(msg));
 nc.publish(msg.getReplyTo(), "this is reply".getBytes(StandardCharsets.UTF_8));
});
dispatcher.subscribe("requestSub");
队列订阅&分享工作(Queue Subscribers & Sharing Work)

NATS提供称为队列订阅的负载均衡功能。

主要功能是将具有相同queue名字的subject进行负载均衡。

请添加图片描述

要创建一个消息队列,订阅者需注册一个队列名。所有的订阅者用同一个队列名,形成一个队列组。当消息发送到主题后,队列组会自动选择一个成员接收消息。尽管队列组有多个订阅者,但每条消息只能被组中的一个订阅者接收。

Demo:【测试可用】

// Subscribe
Connection nc = Nats.connect();
Dispatcher d = nc.createDispatcher(msg -> {
 //do something
 System.out.println("msg: " + new String(msg.getData(),StandardCharsets.UTF_8));
});
d.subscribe("subject", "queName");  //差别就是这个了
小杭写的Demo
/**
 * 发布Demo
 */
public class NatsPublish {
    public static void main(String[] args) throws IOException, InterruptedException {
//        publishSubscribe();
        requestReply();
    }
    /**
     * test 请求响应(Request Reply) 模式
     */
    public static void requestReply() throws IOException, InterruptedException {
        Connection nc = Nats.connect("nats://192.168.137.xxx:4222");
        // 这个相当于回到的主题
        String reply = "replyMsg-qingqiuxinxi";
        //请求回应方法回调
        Dispatcher d = nc.createDispatcher(msg ->{
            System.out.println("=========收到返回的信息============");
            System.out.println("reply:get retuen: " +  JSON.toJSONString(msg));
            System.out.println( JSON.parseObject(JSON.toJSONString(msg)).get("data") );
            String data = (String) JSON.parseObject(JSON.toJSONString(msg)).get("data");
            System.out.println( new String(Base64.decode( data ))  );
        });
        d.unsubscribe(reply , 1);
        //订阅请求
        d.subscribe(reply);
        //发布请求
        System.out.println( "订阅信息:"+reply );
        nc.publish("requestSub", reply, "请求参数,巴拉巴拉1".getBytes(StandardCharsets.UTF_8));
        // 下面这些用来负载测试的
        nc.publish("requestSub", reply, "请求参数,巴拉巴拉2".getBytes(StandardCharsets.UTF_8));
        nc.publish("requestSub", reply, "请求参数,巴拉巴拉3".getBytes(StandardCharsets.UTF_8));
        nc.publish("requestSub", reply, "请求参数,巴拉巴拉4".getBytes(StandardCharsets.UTF_8));
        nc.publish("requestSub", reply, "请求参数,巴拉巴拉5".getBytes(StandardCharsets.UTF_8));
        nc.publish("requestSub", reply, "请求参数,巴拉巴拉6".getBytes(StandardCharsets.UTF_8));
    }

    /**
     * test 发布订阅(Publish Subscribe) 模式
     */
    public static void publishSubscribe() throws IOException, InterruptedException {
        Connection nc = Nats.connect("nats://192.168.137.xxx:4222");
        nc.publish("subject", "hello world1111122211111111".getBytes(StandardCharsets.UTF_8));
    }
}
/**
 * 订阅Demo
 */
public class NatsSubscribe {
    public static void main(String[] args) throws IOException, InterruptedException {
//        publishSubscribe();
        requestReply();
    }

    /**
     * test 请求响应(Request Reply) 模式
     */
    public static void requestReply() throws IOException, InterruptedException {
        //subscribe
        Connection nc = Nats.connect("nats://192.168.137.xxx:4222");
        //注册订阅
        Dispatcher dispatcher = nc.createDispatcher(msg -> {
            System.out.println("=======收到请求信息===========");
            System.out.println(JSON.toJSONString(msg));
            String data = (String) JSON.parseObject(JSON.toJSONString(msg)).get("data");
            System.out.println( new String(Base64.decode( data ))  );
            nc.publish(msg.getReplyTo(), "这个是返回的数据,啦啦啦啦啦".getBytes(StandardCharsets.UTF_8));
        });
        dispatcher.subscribe("requestSub");
        // 队列订阅就换成下面这个,负载测试,都启动几个服务,就可以看到接受效果了
//         dispatcher.subscribe("requestSub", "queName");
    }

    /**
     * test 发布订阅(Publish Subscribe) 模式
     */
    public static void publishSubscribe() throws IOException, InterruptedException {
        Connection nc = Nats.connect("nats://192.168.137.xxx:4222");

//        //subscribe [这个时间内就之后收到一个,就结束了]
//        Subscription sub = nc.subscribe("subject");
//        Message msg = sub.nextMessage(Duration.ofMillis(50000));
//        String response = new String(msg.getData(), StandardCharsets.UTF_8);
//        System.out.println(response);

        //subscribe  [这个程序可以保持,持续接收信息]
        Dispatcher d = nc.createDispatcher(msg ->{
            String response = new String(msg.getData(), StandardCharsets.UTF_8);
            //do something
            System.out.println(response);
        });
        d.subscribe("subject");
    }
}

简单安装使用与测试

# 官方安装NATS[单台]
docker pull nats
docker network create nats
docker run --name nats --network nats -p 4222:4222 -p 8222:8222 nats --http_port 8222  -js

# 192.168.137.xxx : 4222
# 然后用,上文中小杭的Demo试试,基础的功能就可以了解了。

JetStream 简单使用Demo

目前这个的Demo使用的是官方的封装例子方法。

结果是,创建流之后,发送数据。消费端接入会获取全部数据。除非消息被删除,否则每次都是全部获取。

当然,正常获取的时候,由于持久化,只要没有删除,消费端都可以请求再次获取的。

// 创建发送流 和 数据 
public static void main(String[] args) throws Exception {
        jetStream(args);
    }
    public static void jetStream(String[] args) throws Exception {
        ExampleArgs exArgs = ExampleArgs.builder("Publish", args, "")
            .defaultStream("example-stream")
            .defaultSubject("example-subject")
            .defaultMessage("hello")
            .defaultMsgCount(10)
            .defaultServer("nats://192.168.137.xxx:4222")
            .build();

        String hdrNote = exArgs.hasHeaders() ? ", with " + exArgs.headers.size() + " header(s)" : "";
        System.out.printf("\nPublishing to %s%s. Server is %s\n\n", exArgs.subject, hdrNote, exArgs.server);

        try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions(exArgs.server))) {

            JetStream js = nc.jetStream();
            // Create the stream
            NatsJsUtils.createStreamOrUpdateSubjects(nc, exArgs.stream, exArgs.subject);

            int stop = exArgs.msgCount < 2 ? 2 : exArgs.msgCount + 1;
            for (int x = 1; x < stop; x++) {
                // make unique message data if you want more than 1 message
                String data = exArgs.msgCount < 2 ? exArgs.message : exArgs.message + "-" + x;

                // create a typical NATS message
                Message msg = NatsMessage.builder()
                    .subject(exArgs.subject)
                    .headers(exArgs.headers)
                    .data(data, StandardCharsets.UTF_8)
                    .build();

                PublishAck pa = js.publish(msg);
                System.out.printf("Published message %s on subject %s, stream %s, seqno %d.\n",
                    data, exArgs.subject, pa.getStream(), pa.getSeqno());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
// 消费,并删除流中已处理的数据 
public static void main(String[] args) throws Exception {
        jetStream();
    }
    public static void jetStream() throws IOException, InterruptedException, JetStreamApiException {
        Connection nc = Nats.connect("nats://192.168.137.xxx:4222");
        Dispatcher disp = nc.createDispatcher(msg -> {
            System.out.println("ddddddd"+msg);
        });
        JetStream js = nc.jetStream();

        MessageHandler handler = (msg) -> {
            // Process the message.
            // Ack the message depending on the ack model
            String response = new String(msg.getData(), StandardCharsets.UTF_8);
            //do something
            System.out.println(response);
            System.out.println(msg);
            System.out.println("处理一下数据,然后要删除掉!!");
            System.out.println(msg.metaData());
            try {
                // 处理完数据,要把数据删除掉的,否则会一直在持久队列中。
                JetStreamManagement jsm = nc.jetStreamManagement();
                jsm.deleteMessage(msg.metaData().getStream(),msg.metaData().streamSequence());
            } catch (IOException | JetStreamApiException e) {
                e.printStackTrace();
            }
        };
        boolean autoAck = true;
        js.subscribe("example-subject", disp, handler, autoAck);
    }

一些复杂的功能,还是有需要的时候,再研究一下官方的Demo程序,比文档好理解多了。。。。


Spring 项目整合

参考开源项目:wanlinus/nats-streaming-spring

代码直接打包了;这里就记录一下使用。

// pom 
        <dependency>
            <groupId>io.nats</groupId>
            <artifactId>jnats</artifactId>
            <version>2.16.13</version>
            <scope>compile</scope>
        </dependency>
            
// 配置
spring:
  nats:
    natsUrls: nats://192.168.137.xxx:4222

// 启动类
@EnableNats
@SpringBootApplication
public class AppApplication {
    
    
// 测试类
@Component
@RestController
@RequestMapping("/test")
public class TestController extends BaseController {

    @Autowired
    private Connection cconnection;

    @GetMapping("/test")
    public String test(HttpServletRequest request){
        String msg = "send msg " + DateUtil.now();
        // 测试发送普通消息
        cconnection.publish("xixi", msg.getBytes(StandardCharsets.UTF_8));

        return "test-success";
    }

    /**
     * 接收 JetStream 的消息
     * @param message
     */
    @Subscribe(value="haha",type = "JetStream")
    public void message1(Message message) {
        System.out.println("接收 JetStream 的消息,进行处理。。。。。。");
        System.out.println(message);
        System.out.println(message.getSubject() + " : " + new String(message.getData()));
    }

    /**
     * 接收普通消息
     * @param message
     */
    @Subscribe(value="xixi")
    public void message2(Message message) {
        System.out.println("接收普通消息,进行处理。。。。。。");
        System.out.println(message);
        System.out.println(message.getSubject() + " : " + new String(message.getData()));
    }
}

其他类型的封装 和 发送操作,就真实需要的时候再继续完善一下了。


Nkey 认证连接

AuthHandler authHandler = Nats.staticCredentials("UCVU4OEHWAxxxxxxxxxxxxDDIxxxxxBMYxxxxxxxxxxxxxxxxxx".toCharArray(),"SUAMMIOB6xxxxxxxxxxxxxxxxxSHYxxxx7MUxxxxxxxxxxx5FCI".toCharArray());
        Options.Builder builder = new Options.Builder()
            // 配置 nats 服务器地址
            .servers(new String[]{"nats://xxxx.xxxx.xxx:4222"})
            .authHandler(authHandler);
        Connection nc = Nats.connect(builder.build());

参考资料

  • 简单看看:https://www.jianshu.com/p/341082dadd3e
  • 详细点说明:http://www.guoxiaolong.cn/blog/?id=10376
  • JetStream:https://docs.nats.io/nats-concepts/jetstream
  • Developing With NATS:https://docs.nats.io/using-nats/developer
  • https://docs.nats.io/running-a-nats-service/nats_docker/nats-docker-tutorial
  • 官方javademo:https://github.com/nats-io/nats.java 参考这个 【这个重点的样子】
  • 发布订阅:https://blog.csdn.net/qq_47848696/article/details/117746807
  • https://zhuanlan.zhihu.com/p/628371358 用户+密码连接

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1719368.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue+vant移动端显示table表格加横向滚动条

vant移动端显示table效果&#xff0c;增加复选框&#xff0c;可以进行多选和全选。加横向滚动条&#xff0c;可以看全部内容。 <template><div class"app-container"><div class"nav_text" style"position: relative;"><…

简单介绍QKeySequenceEdit的使用

QKeySequenceEdit是Qt框架中的一个便捷用户界面组件&#xff0c;用于输入和显示键盘快捷键。它提供了一个简单的界面&#xff0c;允许用户输入一个键盘快捷键&#xff0c;并将其显示为一个字符串。这在需要配置快捷键的应用程序中非常有用。在本文中&#xff0c;我们将详细介绍…

【C++】——string模拟实现

前言 string的模拟实现其实就是增删改查&#xff0c;只不过加入了类的概念。 为了防止与std里面的string冲突&#xff0c;所以这里统一用String。 目录 前言 一 初始化和销毁 1.1 构造函数 1.2 析构函数 二 迭代器实现 三 容量大小及操作 四 运算符重载 4.1 bool…

二叉树的顺序实现-堆

一、什么是堆 在数据结构中&#xff0c;堆&#xff08;Heap&#xff09;是一种特殊的树形数据结构&#xff0c;用数组存储&#xff0c;通常被用来实现优先队列。 堆具有以下特点&#xff1a; 堆是一棵完全二叉树&#xff08;Complete Binary Tree&#xff09;&#xff0c;即…

uni-app的网络请求库封装及使用(同时支持微信小程序)

其实uni-app中内置的uni.request()已经很强大了&#xff0c;简单且好用。为了让其更好用&#xff0c;同时支持拦截器&#xff0c;支持Promise 写法&#xff0c;特对其进行封装。同时支持H5和小程序环境&#xff0c;更好用啦。文中给出使用示例&#xff0c;可以看到使用变得如此…

算法(六)计数排序

文章目录 计数排序技术排序简介算法实现 计数排序 技术排序简介 计数排序是利用数组下标来确定元素的正确位置的。 假定数组有10个整数&#xff0c;取值范围是0~10&#xff0c;可以根据这有限的范围&#xff0c;建立一个长度为11的数组。数组下标从0到10&#xff0c;元素初始…

智慧校园有哪些特征

随着科技的飞速进步&#xff0c;教育领域正经历着一场深刻的变革。智慧校园&#xff0c;作为这场变革的前沿代表&#xff0c;正在逐步重塑我们的教育理念和实践方式。它不仅仅是一个概念&#xff0c;而是一个集成了物联网、大数据、人工智能等先进技术的综合生态系统&#xff0…

Nginx(openresty) 开启目录浏览 以及进行美化配置

1 nginx 安装 可以参考:Nginx(openresty) 通过lua结合Web前端 实现图片&#xff0c;文件&#xff0c;视频等静态资源 访问权限验证&#xff0c;进行鉴权 &#xff0c;提高安全性-CSDN博客 2 开启目录浏览 location /file{alias /data/www/; #指定目录所在路径autoindex on; …

差旅游记|绵阳印象:与其羡慕他人,不如用力活好自己。

哈喽&#xff0c;你好啊&#xff0c;我是雷工&#xff01; 来绵阳之前同事就问: “雷工&#xff0c;能吃辣嘛&#xff1f;”。 “还行&#xff0c;能吃点辣。” “那你去了四川别说能吃点辣&#xff0c;那边的能吃点比跟你说的能吃点不太一样” 01 你好 今天打车&#xff0c;上…

信息学奥赛初赛天天练-17-阅读理解-浮点数精准输出与海伦公式的巧妙应用

PDF文档公众号回复关键字:20240531 1 2023 CSP-J 阅读程序1 阅读程序&#xff08;程序输入不超过数组成字符串定义的范围&#xff1a;判断题正确填√&#xff0c;错误填&#xff1b;除特殊说明外&#xff0c;判断题1.5分&#xff0c;选择题3分&#xff0c;共计40分&#xff0…

python 如何判断一组数呈上升还是下降趋势

目录 一、引言 二、基本概念 三、判断方法 直接比较法 斜率法 统计检验法 四、方法比较与选择 五、案例分析 六、总结 一、引言 在数据处理和分析的领域中&#xff0c;判断一组数是否呈现上升或下降趋势是一个重要的环节。这不仅能够帮助我们了解数据的基本变化…

蓝桥杯高频考点-与日期相关的题目

文章目录 前言1. 如何枚举合法日期1.1 预存每个月的天数1.2 封装一个判断日期是否合法的函数1.3 枚举日期并判断日期是否合法 2. 判断日期是否为回文日期2.1 将日期当作字符串进行处理2.2 将日期当作一个8位数进行处理 3. 给定初始日期&#xff0c;计算经过n天后对应的日期3.1 …

Ai晚班车531

1.中央网信办等三部门&#xff1a;加快推进大模型、生成式人工智能标准研制。 2.中国石油与中国移动、华为、科大讯飞签署合作协议。 3.Opera浏览器与谷歌云合作&#xff0c;接入 Gemini 大模型。 4.谷歌 Gemini 加持Chromebook Plus。 5.英飞凌&#xff1a;开发 8kW和12kW…

5.25.1 用于组织病理学图像分类的深度注意力特征学习

提出了一种基于深度学习的组织病理学图像分类新方法。我们的方法建立在标准卷积神经网络 (CNN) 的基础上,并结合了两个独立的注意力模块,以实现更有效的特征学习。 具体而言,注意力模块沿不同维度推断注意力图,这有助于将 CNN 聚焦于关键图像区域,并突出显示判别性特征通…

Redis 探索之旅(进阶)

目录 今日良言&#xff1a;从不缺乏从头开始的勇气 一、持久化 1、RDB 2、AOF 二、Redis 的事务 三、主从复制 四、哨兵模式 五、集群模式 六、缓存 七、分布式锁 今日良言&#xff1a;从不缺乏从头开始的勇气 一、持久化 持久化就是把数据存储在硬盘上&#xff0c;无…

使用 DuckDuckGo API 实现多种搜索功能

在日常生活中&#xff0c;我经常使用搜索引擎来查找信息&#xff0c;如谷歌和百度。然而&#xff0c;当我想通过 API 来实现这一功能时&#xff0c;会发现这些搜索引擎并没有提供足够的免费 API 服务。如果有这样的免费 API, 就能定时获取“关注实体”的相关内容&#xff0c;并…

高通开发系列 - ubuntu中的docker安装debian镜像

By: fulinux E-mail: fulinux@sina.com Blog: https://blog.csdn.net/fulinus 喜欢的盆友欢迎点赞和订阅! 你的喜欢就是我写作的动力! 返回:专栏总目录 目录 概述当前状态Ubuntu中安装dockerDebian镜像Debian容器中操作更改Debian源安装应用程序

推荐:4本易发表的优质SSCI期刊,含期刊官网!

01、Risk Management and Healthcare Policy 开源四区&#xff0c;国人发表占比25%&#xff0c;发表量前三的国家分别是中国、埃塞俄比亚和美国。 该期刊对国人友好&#xff0c;年度发文量400多&#xff0c;影响因子3.6。 主要刊发公共卫生相关的文章。 研究者可以围绕居民…

【第十三节】C++控制台版本坦克大战小游戏

目录 一、游戏简介 1.1 游戏概述 1.2 知识点应用 1.3 实现功能 1.4 开发环境 二、项目设计 2.1 类的设计 2.2 各类功能 三、程序运行截图 3.1 游戏主菜单 3.2 游戏进行中 3.3 双人作战 3.4 编辑地图 一、游戏简介 1.1 游戏概述 本项目是一款基于C语言开发的控制台…

【学习笔记】Windows GDI绘图(九)Graphics详解(上)

文章目录 Graphics 定义创建Graphics对象的方法通过Graphics绘制不同的形状、线条、图像和文字等通过Graphics操作对象坐标 Graphics属性Clip(裁切/绘制区域)ClipBounds获取裁切区域矩形范围CompositiongMode合成方式CompositingQuality渲染质量DpiX和DpiY 水平、垂直分辨率Int…