Mq之pulsar的入门使用(一)

news2025/1/23 3:46:48

目录

一、linux集群安装pulsar

  • 注意事项
    • 编辑 /etc/hostname与/etc/hosts
    • 执行初始化命令

二、创建应用程序对消息的生产和消费进行测试

  • 物理主机启动应用发送消息时报错处理
  • 程序的搭建及说明
  • 使用到的pom依赖
  • springboot中pulsar配置
  • 接收消息
  • 模拟发送消息
  • 发送与接收消息打印输出

总结

  • docker容器的部署效率会更高
  • 错误问题继续排查
  • pulsar的部署需要搭配的内容真不少
  • websocket与pulsar还可以进行搭配

一、linux集群安装pulsar 详情可查看这里 》》

注:也采用docker进行安装后期会补充, 并挂载链接 》》
注:按照教程我的部署策略是对一台系统部署后,再对此台系统进行克隆,后修改其他两台系统的配置即可

  • 搭建vmware虚拟环境, 并进行克隆(在虚拟系统进行安装)。
    搭建环境访问此篇文章即可 》》

注意事项

注:跟着做完全没有问题,下面我会说一下安装时的注意事项

1. 编辑 /etc/hostname与/etc/hosts

注:这里编辑的两处文件 hostname 是编辑主机名称, hosts是编辑主机名称与ip地址的映射

2. 执行初始化命令

注:执行下方命令会出现如下错误,但是,,,可以继续往下部署,不会影像pulsar的集群安装使用
注:具体为什么会出现还需再进行研究,若发现解决方法,会在这里挂链接 》》

Unable to read additional data from server sessionid 0x0, likely server has closed socket
./pulsar initialize-cluster-metadata \
--cluster pulsar-cluster \
--zookeeper pulsarCluster3:2181 \
--configuration-store pulsarCluster3:2181 \
--web-service-url http://pulsarCluster3:8080,pulsarCluster4:8080,pulsarCluster5:8080 \
--web-service-url-tls https://pulsarCluster3:8443,pulsarCluster4:8443,pulsarCluster5:8443 \
--broker-service-url pulsar://pulsarCluster3:6650,pulsarCluster4:6650,pulsarCluster5:6650 \
--broker-service-url-tls pulsar+ssl://pulsarCluster3:6651,pulsarCluster4:6651,pulsarCluster5:6651

二、创建应用程序对消息的生产和消费进行测试(物理主机建立应用程序运行)

1. 物理主机启动应用发送消息时报错处理。

注:按照教程中集群部署后,物理主机建立应用启动程序会报错
在这里插入图片描述注:解决方法就是在hosts中进行ip地址映射
在这里插入图片描述

2. 程序的搭建及说明。

注:主类是这样的, 启用了pulsar注解和定时任务注解

package comp;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.pulsar.annotation.EnablePulsar;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnablePulsar
@EnableScheduling
public class PulsarApplication {
	public static void main(String[] args) {
		SpringApplication.run(PulsarApplication.class, args);
	}
}

注:使用的application.yml是这样的,其中mongo可以不用

server:
  port: 8200
  servlet:
    context-path: /
spring:
  pulsar:
    client:
      service-url: pulsar://192.168.249.3:6650,192.168.249.4:6650,192.168.249.5:6650
3. 使用到的pom依赖

注:此次测试使用的springboot版本:3.2.2
注:此次测试使用的open jdk版本:17.0.9

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <artifactId>pulsar</artifactId>
    <packaging>jar</packaging>

    <name>pulsar</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-pulsar</artifactId>
        </dependency>
        <!--mq 消费-->
        <dependency>
            <groupId>io.github.majusko</groupId>
            <artifactId>pulsar-java-spring-boot-starter</artifactId>
            <version>1.1.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.pulsar</groupId>
                    <artifactId>pulsar-client-original</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.32</version>
        </dependency>
    </dependencies>
</project>
4. springboot中pulsar配置
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

@Slf4j
@Component
public class PulsarComponent {

    private PulsarTemplate<byte[]> template;

    @Autowired
    private void setTemplate(PulsarTemplate<byte[]> template) {
        this.template = template;
    }

    /**
     * 根据topicName发送消息
     * @param topic
     * @param message
     */
    public void sendMessage(String topic, String message) {
        try {
            template.sendAsync(topic, message.getBytes("utf-8"));
        } catch (Exception e) {
            log.error("发送信息出错{}", e.getMessage());
            e.printStackTrace();
        }
    }

}

5. 接收消息

注:使用注解@PulsarListener

@Service
@Slf4j
public class ConsumeServiceImpl {
    @PulsarListener(
            subscriptionName = "hell",
            topics = "zf",
            subscriptionType = SubscriptionType.Shared,
            schemaType = SchemaType.STRING
    )
    public void saveOperation(String operationLogStr) {
        log.info(operationLogStr);
    }
}
6. 模拟发送消息

注:使用定时任务模拟发送消息,3s一次
注:生产内容 hell zf
注:topic为 zf

import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class ProductServiceImpl {

    @Resource
    private PulsarComponent pulsarComponent;

    @Scheduled(cron = "0/3 * * * * ?")
    public void spell() {
        log.info("已发送");
        pulsarComponent.sendMessage("zf", "hell zf");
    }

}
7. 发送与接收消息打印输出

在这里插入图片描述

总结

  1. docker容器的部署效率会更高。
    若使用docker容器搭建效率应该会更高。后面会使用容器进行搭建发布出来。》》
    docker容器的官网使用docker compose插件方式部署,可以尝试一下 》》
  2. 错误问题继续排查。
    那个[错误](#err)虽然不影响使用, 但是挺纠结,在尝试使用docker部署时关注这一点,进行纠错, 并在这个文章里进行更新.
    pulsar集群安装的文章中提到了如何解决这个问题,但是不起作用
  3. pulsar的部署需要搭配的内容真不少。
    注:bookeeper,主要用于持久化存储等
    注:zookeeper,任务的协调,元数据存储
    注:broker,负责处理生产和消费的请求
  4. websocket与pulsar还可以进行搭配
    后期还会发布websocket与pulsar消息队列配合的技术文章 》》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1536136.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java-SSM医院在线预约系统

Java-SSM医院在线预约系统 1.服务承诺&#xff1a; 包安装运行&#xff0c;如有需要欢迎联系&#xff08;VX:yuanchengruanjian&#xff09;。 2.项目所用框架: 前端:JSP、layui等。 后端:SSM,即Spring、SpringMvc、Mybatis等。 3.项目功能点: 1.管理员功能: a.修改个人信息…

【图解物联网】第6章 物联网与数据分析

6.1 传感器数据与分析 从前几章中我们已经了解到&#xff0c;只要把配备传感器的设备连接到网络&#xff0c;就能把所有的信息采集到物联网服务之中&#xff08;图6.1&#xff09;。 从工业角度而言&#xff0c;给工厂中的生产流水线和流通的产品打上电子标签&#x…

Linux 系统是如何收发⽹络包的

Linux 系统是如何收发⽹络包的&#xff1f; ⽹络模型 为了使得多种设备能通过⽹络相互通信&#xff0c;和为了解决各种不同设备在⽹络互联中的兼容性问题&#xff0c;国际标准化组织制定了开放式系统互联通信参考模型&#xff08;Open System Interconnection Reference Mode…

了解Kafka位移自动提交的秘密:避免常见陷阱的方法

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 了解Kafka位移自动提交的秘密&#xff1a;避免常见陷阱的方法 前言位移自动提交简介自动提交的优缺点自动提交位移的优点&#xff1a;自动提交位移的缺点&#xff1a;自动提交与手动提交的对比分析&am…

安捷伦Agilent E4440A频谱分析仪

181/2461/8938产品概述&#xff1a; 这是一篇关于安捷伦Agilent E4440A频谱分析仪的详细指南。在这篇文章中&#xff0c;您将了解该设备的基本概述、技术规格、使用方法、应用场景以及与其他类似设备的比较。让我们一起深入了解Agilent E4440A频谱分析仪的各个方面。 让我们简…

软件杯 深度学习 机器视觉 人脸识别系统 - opencv python

文章目录 0 前言1 机器学习-人脸识别过程人脸检测人脸对其人脸特征向量化人脸识别 2 深度学习-人脸识别过程人脸检测人脸识别Metric Larning 3 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习 机器视觉 人脸识别系统 该项目…

DashVector - 阿里云向量检索服务

DashVector 文章目录 DashVector一、关于 DashVector二、使用 DashVector 前提准备1、创建Cluster&#xff1a;2、获得API-KEY3、安装最新版SDK 三、快速使用 DashVector1. 创建Client2. 创建Collection3、插入Doc4、相似性检索5、删除Doc6. 查看Collection统计信息7. 删除Coll…

js中多重引号会导致函数的参数失效报错-Invalid or unexpected token

在js使用中我们经常会使动态添加html信息到元素对象中&#xff0c;且还加入了函数及其&#xff0c;函数对应参数&#xff0c;这个时候就会使用多重引号去拼接&#xff0c;如果拼接中没有做引号的转义&#xff0c;就会出现Invalid or unexpected token。 例如以下代码&#xff0…

【嵌入式——QT】Charts常见的图表的绘制

【嵌入式——QT】Charts常见的图表的绘制 柱状图QBarSetQBarSeriesQBarCategoryAxis图示 饼图堆叠柱状图百分比柱状图散点图和光滑曲线图代码示例 柱状图 QBarSet 用于创建柱状图的数据集。 主要函数 setLabel()&#xff1a;设置数据集标签 &#xff1b;setLabelBrush()&am…

复习斐波那契(用C++写)

或者这样写&#xff1a; 斐波那契数列 题目描述 斐波那契数列是指这样的数列&#xff1a;数列的第一个和第二个数都为 1 1 1&#xff0c;接下来每个数都等于前面 2 2 2 个数之和。 给出一个正整数 a a a&#xff0c;要求斐波那契数列中第 a a a 个数是多少。 输入格式…

Windows下MySQL服务启动常见的两种方式,完美适配Mysql5.7,MySql8.0

文章目录 一、图形界面下启动mysql服务二、在命令行重新启动mysql服务3 推荐阅读4 源码获取&#xff1a; Windows系统下&#xff0c;MySQL服务的启动&#xff0c;常见的两种启动方式如下&#xff1a; 一、图形界面下启动mysql服务 在图形界面下启动mysql服务的流程如下&#x…

算法体系-13 第十三 二叉树的基本算法+二叉树的递归套路

一 完全二叉树的判断 1.1 描述 完全二叉树&#xff1a;他每一层都是满的&#xff0c;即使不满也是最后一层不满&#xff0c;最后一层不满也是从左到右变满的&#xff1b;话句话说就是 完全二叉树从根结点到倒数第二层满足完美二叉树&#xff0c;最后一层可以不完全填充&#x…

Elasticsearch数据存储优化方案

优化Elasticsearch数据存储有助于提升系统性能、降低成本、提高数据查询效率以及增强系统的稳定性和可靠性。通常我们再优化Elasticsearch数据存储会遇到一些问题&#xff0c;导致项目卡壳。以下是优化Elasticsearch数据存储的一些重要作用&#xff1a; 1、问题背景 在某些场景…

我的春招求职面经

智能指针在面试时经常被问到&#xff0c;最近自己也在写&#xff0c;有一点思考&#xff0c;于是找到了这样一个题目&#xff0c;可以看看&#xff0c;上面这个代码有什么问题&#xff1f;留言区说出你的答案吧&#xff01; 最后分享一下之前的实习->春招->秋招等文章汇总…

地质灾害在线监测,精准预警智能化

自然灾害无情且威力巨大,对人类生命财产安全造成严重威胁。地质灾害作为重要的自然灾害类型之一,给人类社会带来了沉重的经济损失和生命威胁。及时掌握地质灾害信息,提高预警能力和监测水平,是保障人民群众生命财产安全的当务之急。&#xff08;key-iot.com.cn/18703.html&…

Juniper SRX 防火墙基础上网配置

简介 基于PNET-LAB模拟器&#xff0c;使用 vSRX-NG 23.4R1.9 镜像进行实验。 博客&#xff1a;https://songxwn.com/Juniper-SRX-snat/ 实验需求 配置WAN口 LAN口&#xff0c;实现基础的上网功能。配置NAT、DHCP。 ISP 路由器使用Cisco IOS模拟&#xff0c;与SRX对接口配置…

docker镜像安装空间不足no space left on device

报错&#xff1a;Error processing tar file(exit status 1): open /usr/local/lib/libmkl_tbb_thread.so.1: no space left on device 原先docker模型保存位置&#xff1a; docker info -f ‘{{ .DockerRootDir}}’ docker 高点版本&#xff0c;这里26.0 解决参考&#xf…

力扣---零钱兑换---动态规划

思路&#xff1a; 这是一道典型的动态规划问题&#xff08;希望下次不用提示&#xff0c;能直接认出来&#xff09;&#xff1a;我将g[i]定义为总金币为i所需的最少硬币个数。所以递推公式可以表示为&#xff1a;g[i]min(g[i-1],g[i-2],g[i-5])1&#xff0c;也就是g[i]min(g[i-…

demo版多人聊天系统

目录 ​编辑 一&#xff0c;引入 二&#xff0c;在Server端修改的代码 1&#xff0c;保存用户信息功能实现 2&#xff0c;拼接消息 3&#xff0c;广播消息 三&#xff0c; Client端要修改的代码 四&#xff0c;效果演示 一&#xff0c;引入 在上一篇文章udp网络服务器中&a…

Java-Java基础学习(4)-多线程(2)

3.7. Lambda表达式 为什么要使用lambda表达式 避免匿名内部类定义过多&#xff1b;可以让代码看起来更简洁&#xff1b;去掉一堆没有意义的代码&#xff0c;只留下核心逻辑 属于函数式编程的概念&#xff0c;格式 (params) -> expression [表达式](params) -> statement…