Apache-Pulsar安装操作说明

news2025/1/27 13:07:19

说明

Pulsar 是一种用于服务器到服务器消息传递的多租户高性能解决方案。

Pulsar 的主要特性如下:

对 Pulsar 实例中的多个集群的本机支持,并跨集群无缝地复制消息。
极低的发布和端到端延迟。
无缝可扩展至超过一百万个主题。
一个简单的客户端 API,具有Java、Go、Python和C++的绑定。
主题的多种订阅类型(独占、共享和故障转移)。
通过Apache BookKeeper提供的持久消息存储来保证消息传递。无服务器轻量级计算框架Pulsar Functions提供流原生数据处理功能。
基于 Pulsar Functions 构建的无服务器连接器框架Pulsar IO可以更轻松地将数据移入和移出 Apache Pulsar。
当数据老化时,分层存储将数据从热/温存储卸载到冷/长期存储(例如S3和GCS)。

安装包下载

本文使用的是apache-pulsar-3.2.2-bin.tar.gz版本

csdn下载 也可以自行去官网下载

解压目录

tar -zxvf apache-pulsar-3.2.2-bin.tar.gz

目录说明

目录描述
bin入口pulsar点脚本和许多其他命令行工具
conf配置文件,包括broker.conf
libPulsar 使用的 JAR
examplesPulsar 函数示例
instancesPulsar 函数的工件

启动Pulsar

bin/pulsar standalone

注意:需要保证jdk在17+

创建Topic

创建一个名为my-topic的topic

bin/pulsar-admin topics create persistent://public/default/my-topic

生产者发送消息

bin/pulsar-client produce my-topic --messages 'Hello Pulsar!'

消费者消费消息

测试批量发送消息

bin/pulsar-client produce my-topic --messages "$(seq -s, -f 'Message NO.%g' 1 10)"


重新消费

bin/pulsar-client consume my-topic -s 'my-subscription' -p Earliest -n 0

java生产消息

pom.xml

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>3.2.2</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-annotations</artifactId>
    <version>2.14.2</version>
</dependency>

代码

package com.pulsar.demo;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
 
public class PulsarProducer {
    private static final Logger log = LoggerFactory.getLogger(PulsarProducer.class);
    private static final String SERVER_URL = "pulsar://192.168.xxx:6650";
    public static void main(String[] args) throws Exception {
        // 构造Pulsar Client
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVER_URL)
                .enableTcpNoDelay(true)
                .build();
        // 构造生产者
        Producer<String> producer = client.newProducer(Schema.STRING)
                .producerName("my-producer")
                .topic("my-topic")
                .batchingMaxMessages(1024)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .enableBatching(true)
                .blockIfQueueFull(true)
                .maxPendingMessages(512)
                .sendTimeout(10, TimeUnit.SECONDS)
                .blockIfQueueFull(true)
                .create();
        // 同步发送消息
        MessageId messageId = producer.send("Hello World");
        log.info("message id is {}",messageId);
        System.out.println(messageId.toString());
        // 异步发送消息
        CompletableFuture<MessageId> asyncMessageId = producer.sendAsync("This is a async message");
        // 阻塞线程,直到返回结果
        log.info("async message id is {}",asyncMessageId.get());
 
        
        producer.close();
 
        // 关闭licent的方式有两种,同步和异步
        // client.close();
        client.closeAsync();
 
    }
}

java消费消息

package com.pulsar.demo;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.concurrent.TimeUnit;
 

public class PulsarConsumer {
    private static final String SERVER_URL = "pulsar://192.168.xxx:6650";
    private static final String topic = "persistent://public/default/my-topic"; // 要订阅的topic
 
    public static void main(String[] args) throws Exception {
        // 构造Pulsar Client
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVER_URL)
                .enableTcpNoDelay(true)
                .build();
        Consumer consumer = client.newConsumer()
                .consumerName("my-consumer")
                .topic("my-topic")
                .subscriptionName("my-subscription")
                .ackTimeout(10, TimeUnit.SECONDS)
                .maxTotalReceiverQueueSizeAcrossPartitions(10)
                .subscriptionType(SubscriptionType.Exclusive)
                .subscribe();
        while (true) {
            Message msg = consumer.receive();
 
            try {
                System.out.printf("Message received: %s\n", new String(msg.getData()));
 
                consumer.acknowledge(msg);
            } catch (Exception e) {
                consumer.negativeAcknowledge(msg);
            }
        }
    }
}

停止Pulsar

完成后,您可以关闭 Pulsar 集群。在启动集群的终端窗口中按Ctrl-C 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1575753.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

arcgis10.5安装步骤

目录 一、安装License 二、安装ArcGIS_Desktop 三、安装汉化包&#xff0c;解压后&#xff0c;直接双击等待安装即可 一、安装License 双击ArcGIS_License_Manager_Windows_105_154033 选择【Next】 勾选I accept&#xff0c;然后选择【Next】 选择License的安装目录&#x…

实战webSocket压测(三)Jmeter真实接口联调

背景&#xff1a; 接口地址为&#xff1a;ws://sunlei.demo 接口说明&#xff1a;websocket接口&#xff0c;首次连接&#xff0c;通过Text请求设置开启标志&#xff0c;然后通过wav文件流传输&#xff0c;达到后端服务可以根据传输信息进行解析满足指定标准后&#xff0c;web…

SpringBoot响应式RedisClient配置

大多数场景&#xff0c;默认配置的Redis客户端不满足业务场景&#xff0c;根源在于Redis key、value 序列化反序列化问题。因此&#xff0c;有必要配置自定义的客户端来满足需求。 默认配置源码如下&#xff0c;采用jdk序列化/反序列化方式进行&#xff0c;我们只需要配置相同…

SpringMVC数据响应和请求

文章目录 1.SpringMVC简介2. SpringMVC快速入门3. SpringMVC执行的流程4.SpringMVC注解解释5. 视图解析器6.SpringMVC的数据响应6.1返回ModelView对象6.2直接返回字符串6.3返回json字符串 7.SpringMVC获得请求数据7.1 获得基本类型参数7.2获得POJO类型参数7.3获取数组类型参数7…

基于Swin Transformers的乳腺癌组织病理学图像多分类

乳腺癌的非侵入性诊断程序涉及体检和成像技术&#xff0c;如乳房X光检查、超声检查和磁共振成像。成像程序对于更全面地评估癌症区域和识别癌症亚型的敏感性较低。 CNN表现出固有的归纳偏差&#xff0c;并且对于图像中感兴趣对象的平移、旋转和位置有所不同。因此&#xff0c;…

WPS解决插入公式在正文带来行间距变大问题

问题描述 写论文解释公式时&#xff0c;插入对应的变量&#xff0c;导致行间距变大&#xff0c;如图 显然上文与下文行间距不等。但无法通过修改数值修改下文行间距。 解决办法

(已解决)引入本地bootstrap无效,bootstrap和jquery的引入

问题&#xff1a; 首先我是跟着张天宇老师下载的bootstrap文件&#xff0c;新建了一个css文件夹&#xff0c;但是这样子<link rel"stylesheet" type"text/css" src"./css/bootstrap.css">在index.html引入没有用。 解决办法: 1.把建立的…

贪心算法|1005.K次取反后最大化的数组和

力扣题目链接 class Solution { static bool cmp(int a, int b) {return abs(a) > abs(b); } public:int largestSumAfterKNegations(vector<int>& A, int K) {sort(A.begin(), A.end(), cmp); // 第一步for (int i 0; i < A.size(); i) { // 第二步if…

DSOX3034T是德科技DSOX3034T示波器

181/2461/8938产品概述&#xff1a; 特点: 带宽:350 MHz频道:4存储深度:4 Mpts采样速率:5 GSa/s更新速率:每秒1000000个波形波形数学和FFT自动探测接口用于连接、存储设备和打印的USB主机和设备端口 触摸: 8.5英寸电容式触摸屏专为触摸界面设计 发现: 业界最快的无损波形更…

MSOLSpray:一款针对微软在线账号(AzureO365)的密码喷射与安全测试工具

关于MSOLSpray MSOLSpray是一款针对微软在线账号&#xff08;Azure/O365&#xff09;的密码喷射与安全测试工具&#xff0c;在该工具的帮助下&#xff0c;广大研究人员可以直接对目标账户执行安全检测。支持检测的内容包括目标账号凭证是否有效、账号是否启用了MFA、租户账号是…

[开源]基于SVM的时间序列预测python代码

整理了SVM的时间序列预测python代码分享给大家。记得点赞哦 #!/usr/bin/env python # coding: utf-8import numpy as np import matplotlib.pyplot as plt import pandas as pd from sklearn import preprocessing from sklearn.metrics import mean_squared_error from math i…

华为海思校园招聘-芯片-数字 IC 方向 题目分享——第九套

华为海思校园招聘-芯片-数字 IC 方向 题目分享&#xff08;有参考答案&#xff09;——第九套 部分题目分享&#xff0c;完整版获取&#xff08;WX:didadidadidida313&#xff0c;加我备注&#xff1a;CSDN huawei数字芯片题目&#xff0c;谢绝白嫖哈&#xff09; 单选 1&…

MTK i500p AIoT解决方案

一、方案概述 i500p是一款强大而高效的AIoT平台&#xff0c;专为便携式、家用或商用物联网应用而设计&#xff0c;这些应用通常需要大量的边缘计算&#xff0c;需要强大的多媒体功能和多任务操作系统。该平台集成了Arm Cortex-A73 和 Cortex-A53 的四核集群&#xff0c;工作频…

2024 Tuxera NTFS for Mac功能介绍及如何安装使用

随着科技的发展&#xff0c;我们的日常生活和工作越来越依赖于电子设备。而在这些设备中&#xff0c;Mac由于其出色的稳定性和易用性&#xff0c;成为了许多用户的首选。然而&#xff0c;尽管Mac自带的文件系统已经足够强大&#xff0c;但仍有一些用户希望获得更加高效、稳定的…

Ubuntu22.04平台编译完美解决问题“error: GLSL 4.5 is not supported.”【GLSL(OpenGL着色器语言)】

GLSL介绍 GLSL&#xff08;OpenGL着色器语言&#xff09;是用于编写OpenGL着色器程序的语言。GLSL 4.5 是 GLSL 的一个版本&#xff0c;引入了许多新的特性和改进&#xff0c;旨在提高着色器编程的灵活性和性能。GLSL 4.5 工具通常是用于编写、调试和优化 GLSL 4.5 着色器代码…

网络基础知识入门

目录 一、局域网与广域网 1、局域网 2、广域网 二、协议 1、概念 2、协议的理解 3、协议的分层 1、分层 2、OSI七层模型 三、网络传输基本流程 1、报头 2、局域网通信原理 3、跨网络传输流程 四、IP地址和MAC地址 1、IP地址 2、MAC地址 3、两者的区别 一、局域…

基于微信小程序的无中介租房系统

本基于微信小程序的无中介租房系统主要实现了房东功能模块、租客功能模块和管理员功能模块三大部分&#xff0c;系统结构图如图4-1所示。 4.3.2 数据库表设计 本基于微信小程序的无中介租房系统在开发过程中使用MySQL数据库系统进行系统数据的储存&#xff0c;以下是本系统的主…

倍讯科技Ethernet/iP转modbusTCP网关在电厂里为通信设备搭建桥梁

在电厂里&#xff0c;Ethernet/IP网关扮演着极为重要的角色&#xff0c;它作为不同设备和系统之间的通信桥梁&#xff0c;使得基于不同协议的设备能够无缝交流&#xff0c;从而实现整个电厂的高效、可靠运行。Ethernet/IP网关的使用提高了数据的实时性和可靠性&#xff0c;优化…

蓝桥杯 历届真题 杨辉三角形【第十二届】【省赛】【C组】

资源限制 内存限制&#xff1a;256.0MB C/C时间限制&#xff1a;1.0s Java时间限制&#xff1a;3.0s Python时间限制&#xff1a;5.0s 思路&#xff1a; 由于我第一写没考虑到大数据的原因&#xff0c;直接判断导致只得了40分&#xff0c;下面是我的代码&#xff1a; #…

Flutter第六弹 基础列表ListView

目标&#xff1a; 1&#xff09;Flutter有哪些常用的列表组建 2&#xff09;怎么定制列表项Item&#xff1f; 一、ListView简介 使用标准的 ListView 构造方法非常适合只有少量数据的列表。我们还将使用内置的 ListTile widget 来给我们的条目提供可视化结构。ListView支持…