11_Pulsar Adaptors适配器、kafka适配器、Spark适配器

news2024/12/24 20:40:33

2.3. Pulsar Adaptors适配器
2.3.1.kafka适配器
2.3.2.Spark适配器

2.3. Pulsar Adaptors适配器

2.3.1.kafka适配器

Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。
在生产者中, 如果想不改变原有kafka的代码架构, 就切换到Pulsar的平台中, 那么Pulsar adaptor on kafka就变的非常的有用了, 它可以帮助我们在不改变原有kafka的代码基础上, 即可接入pulsar, 但是需要注意, 相关配置信息需要进行一些调整, 例如: 地址与topic

  • 1- 需要导入Pulsar集成kafka的依赖包, 删除掉原有Kafka-client包
<dependency> 
     <groupId>org.apache.pulsar</groupId> 
     <artifactId>pulsar-client-kafka</artifactId> 
     <version>2.8.0</version> 
</dependency>

注: 目前Pulsar并在Maven中央仓库中并没有提供Pulsar-client-kafka 2.8.1的包, 故此处导入2.8.0

  • 2-编写生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaAdaptorProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1. 创建kafka生产者的核心类对象: KafkaProducer
        // 1.1: 创建生产者配置对象: 设置相关配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");
        // 消息的确认方案
        props.put("acks", "all");
        // key序列化类型
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value 序列化类型
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props); 
        //2. 发送数据 
        for (int i = 0; i < 10; i++) { 
            //2.1: 创建 生产者数据承载对象 一个对象代表是一条消息数据
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("persistent://public/default/txn_t1",Integer.toString(i), Integer.toString(i)); 
            producer.send(producerRecord).get(); 
        }
        
        //3. 释放资源 
        producer.close();
    }

}
  • 3-编写消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaAdaptorConsumer {
    public static void main(String[] args) {
        //1. 创建kafka的消费者的核心对象: KafkaConsumer
        //1.1: 创建消费者配置对象, 并设置相关的参数:
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");
        //消费者组的 id
        props.setProperty("group.id", "test");
        //是否启动消费者自动提交消费偏移量
        props.setProperty("enable.auto.commit", "true");
        //每间隔多长时间提交一次偏移量:单位 毫秒
        props.setProperty("auto.commit.interval.ms","1000");
        //key 反序列化
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //val 发序列化
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //2. 给消费者设置订阅topic:
        consumer.subscribe(Arrays.asList("persistent://public/default/txn_t1"));
        //3. 循环获取相关的消息数据
        while (true) {
            //3.1: 从kafka中获取消息数据: 参数表示等待超时时间
            //注意: 如果没有获取到数据, 返回一个空集合对象, 如果数据集合中有多个 ConsumerRecord 对象
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            //3.2 遍历ConsumerRecords 获取每一个 ConsumerRecord 对象 : ConsumerRecord 消费者数据承载对象, 一个对象就是一条消息
            for (ConsumerRecord<String, String> record : records) {
                String massage = record.value();
                System.out.println("消息数据为:"+massage);
            }
        } 
    } 
}
  • 4- 先运行消费者, 进行监听, 然后运行生产者, 观察消费者是否可以正常消费到数据
    在这里插入图片描述

2.3.2.Spark适配器

Pulsar 的 Spark Streaming 接收器是一个自定义的接收器,它使用 Apache Spark Streaming 能够从 Pulsar 接
收原始数据。

应用程序可以通过 Spark Streaming receiver 接收 Resilient Distributed Dataset (RDD) 格式的数据,并可
以通过多种方式对其进行处理。

  • 1-导入相关的依赖包
<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-spark</artifactId>
    <version>2.8.0</version>
</dependency>
  • 2-编写spark的流式代码
String serviceUrl = "pulsar://localhost:6650/"; 
String topic = "persistent://public/default/test_src"; 
String subs = "test_sub"; 
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example"); 
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60)); 
ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData(); 
Set<String> set = new HashSet<>(); 
set.add(topic); 
pulsarConf.setTopicNames(set); 
pulsarConf.setSubscriptionName(subs); 
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver( 
serviceUrl, 
pulsarConf, 
new AuthenticationDisabled()); 
JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/850685.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C#之控制台版本得贪吃蛇

贪吃蛇小时候大家都玩过&#xff0c;具体步骤如下: 1.给游戏制造一个有限得空间。 2.生成墙壁&#xff0c;小蛇碰撞到墙壁或者咬到自己的尾巴&#xff0c;游戏结束。 3.生成随机的食物。 4.吃掉食物&#xff0c;增加自身的体长&#xff0c;并生成新的食物。 具体代码如下&…

一文解决JWT相同签名不匹配问题【JWT signature does not match locally computed signature.】

今天做项目的时候&#xff0c;涉及到一个支付记账的功能&#xff0c;想着不能将这些金额数据显示暴露的通过常规的请求体封装来进行传输&#xff0c;想着要是被中途抓包修改了不就麻烦了&#xff0c;所以考虑到这种安全性的需求&#xff0c;就利用上了JWT来进行数据的封装传递&…

IO学习-消息队列

1&#xff0c;要求用消息队列实现AB进程间的对话 a,A进程先发送一句话给B进程&#xff0c;B进程接收后打印。 b,B进程回复一句话给A进程&#xff0c;A进程接收后打印。 c,重复a,b步骤&#xff0c;当收到quit后要结束进程。 A进程 B进程 运行结果&#xff1a; 2&#xff0c;…

力扣 494. 目标和

题目来源&#xff1a;https://leetcode.cn/problems/target-sum/description/ C题解&#xff08;来源代码随想录&#xff09;&#xff1a;将该问题转为01背包问题。 假设加法的总和为x&#xff0c;那么减法对应的总和就是sum - x。所以我们要求的是 x - (sum - x) target。x …

学会RabbitMQ的延迟队列,提高消息处理效率

系列文章目录 手把手教你&#xff0c;本地RabbitMQ服务搭建&#xff08;windows&#xff09; 消息队列选型——为什么选择RabbitMQ RabbitMQ灵活运用&#xff0c;怎么理解五种消息模型 RabbitMQ 能保证消息可靠性吗 推或拉&#xff1f; RabbitMQ 消费模式该如何选择 死信是什么…

[CVPR-23-Highlight] Magic3D: High-Resolution Text-to-3D Content Creation

目录 Abstract Background: DreamFusion High-Resolution 3D Generation Coarse-to-fine Diffusion Priors Scene Models Coarse-to-fine Optimization NeRF optimization Mesh optimization Experiments Controllable 3D Generation Personalized text-to-3D Prom…

自动化实践-全量Json对比在技改需求提效实践

1 背景 随着自动化测试左移实践深入&#xff0c;越来越多不同类型的需求开始用自动化测试左移来实践&#xff0c;在实践的过程中也有了新的提效诉求&#xff0c;比如技改类的服务拆分项目或者BC流量拆分的项目&#xff0c;在实践过程中&#xff0c;这类需求会期望不同染色环境…

当前服务器版本不支持该功能,请联系经销商升级服务器 - - 达梦数据库报错

当前服务器版本不支持该功能&#xff0c;请联系经销商升级服务器 - - 达梦数据库报错 环境介绍1 搭建测试环境2 报错内容3 标准版介绍 环境介绍 某项目使用标准版数据库中&#xff0c;使用insert into 正常操作表&#xff0c;插入数据时报错&#xff0c;表为普通表。 1 搭建测…

【云原生•监控】基于Prometheus实现自定义指标弹性伸缩(HPA)

【云原生•监控】基于Prometheus实现自定义指标弹性伸缩(HPA) 什么是弹性伸缩 「Autoscaling即弹性伸缩&#xff0c;是Kubernetes中的一种非常核心的功能&#xff0c;它可以根据给定的指标&#xff08;例如 CPU 或内存&#xff09;自动缩放Pod副本&#xff0c;从而可以更好地管…

new function是什么?(小写function)

参考链接&#xff1a;https://juejin.cn/post/7006232342398238733

【SpringBoot笔记】定时任务(cron)

定时任务就是在固定的时间执行某个程序&#xff0c;闹钟的作用。 1.在启动类上添加注解 EnableScheduling 2.创建定时任务类 在这个类里面使用表达式设置什么时候执行 cron 表达式&#xff08;也叫七子表达式&#xff09;&#xff0c;设置执行规则 package com.Lijibai.s…

地震预警系统全平台开通攻略

大家好&#xff0c;我是熊哥。 最近地震频发&#xff0c;做为一个有社为责任感的人&#xff0c;我认为我有必要为大家总结这一份安全手册&#xff1b;做为一个小V有必要让更多的人看见&#xff0c;积德攒人品。希望大家平安健康。 效果 在地震来临前&#xff0c;手机、电视会…

每天一道leetcode:剑指 Offer 59 - II. 队列的最大值(中等)

今日份题目&#xff1a; 请定义一个队列并实现函数 max_value 得到队列里的最大值&#xff0c;要求函数max_value、push_back 和 pop_front 的均摊时间复杂度都是O(1)。 若队列为空&#xff0c;pop_front 和 max_value 需要返回 -1 示例1 输入: ["MaxQueue",&qu…

centos7实现负载均衡

目录 一、基于 CentOS 7 构建 LVS-DR 集群。 1.1 配置lvs负载均衡服务 1.1.1 下载ipvsadm 1.1.2 增加vip 1.1.3 配置ipvsadm 1.2 配置rs1 1.2.1 编写测试页面 1.2.2 手工在RS端绑定VIP、添加路由 1.2.3 抑制arp响应 1.3 配置rs2 1.4 测试 二、配置nginx负载…

Springboot后端通过路径映射获取本机图片资源

项目场景&#xff1a; 项目中对图片的处理与查看是必不可少的&#xff0c;本文将讲解如何通过项目路径来获取到本机电脑的图片资源 如图所示&#xff0c;在我的本机D盘的图片测试文件夹(文件夹名字不要有中文)下有一些图片&#xff0c; 我们要在浏览器上访问到这些图片&#…

Flutter系列文章-实战项目

在本篇文章中&#xff0c;我们将通过一个实际的 Flutter 应用来综合运用最近学到的知识&#xff0c;包括保存到数据库、进行 HTTP 请求等。我们将开发一个简单的天气应用&#xff0c;可以根据用户输入的城市名获取该城市的天气信息&#xff0c;并将用户查询的城市列表保存到本地…

灵活利用ChatAI,减轻工作任务—语言/翻译篇

前言 ChatAI在语言和翻译方面具有重要作用。它能够帮助用户进行多语言交流、纠正错误、学习新语言、了解不同文化背景&#xff0c;并提供文本翻译与校对等功能。通过与ChatAI互动&#xff0c;我们能够更好地利用技术来拓展自己在语言领域的能力和知识&#xff0c;实现更加无障…

P11-Transformer学习1.1-《Attention Is All You Need》

Transformer目录:《Transformer Paper》1.0 CV Transformer必读论文5篇_汉卿HanQ的博客-CSDN博客 前文参考:Transformer1.0-预热_汉卿HanQ的博客-CSDN博客 全文1w3字左右&#xff0c;按照论文翻译个人理解精读&#xff0c;如果对你有所帮助&#xff0c;欢迎点个赞哦&#xff…

【C语言】初识C语言+进阶篇导读

✨个人主页&#xff1a; Anmia.&#x1f389;所属专栏&#xff1a; C Language &#x1f383;操作环境&#xff1a; Visual Studio 2019 版本 本篇目的是面向编程新手&#xff0c;没接触过编程的人。以及C进阶的导读。 内容是C语言重要知识点的简单解释&#xff0c;不做详解。给…

征稿 | 第三届粤港澳大湾区人工智能与大数据论坛(AIBDF 2023)

第三届粤港澳大湾区人工智能与大数据论坛&#xff08;AIBDF 2023&#xff09; 2023 3rd Guangdong-Hong Kong-Macao Greater Bay Area Artificial Intelligence And Big Data Forum 本次高端论坛围绕建设国家数字经济创新发展试验区进行选题。全面贯彻落实党的二十大精神&…