kafak集群搭建-基于kRaft方式

news2024/9/26 5:20:57

kafak集群搭建-基于kRaft方式

  • 1、服务器规划
  • 2、kafka集群部署配置
    • 2.1、解压三个kafka
    • 2.2、配置/config/kraft/server.properties
  • 3、启动kafka集群
  • 4、SpringBoot集成kafka的kRaft集群
    • 4.1、消费者
    • 4.2、生产者
    • 4.3、配置类
    • 4.4、实体类
    • 4.5、JSON工具类
    • 4.6、项目配置文件
    • 4.7、测试类

在这里插入图片描述

1、服务器规划

在这里插入图片描述

2、kafka集群部署配置

至少需要三个节点

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

2.1、解压三个kafka

tar -zxvf kafka_2.13-3.7.0.tgz -C /usr/local/

2.2、配置/config/kraft/server.properties

其中马赛克处需要修改为你的服务器真实IP

配置第一台

在这里插入图片描述在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

配置第二台:在这里插入图片描述

在这里插入图片描述在这里插入图片描述
在这里插入图片描述

配置第三台:
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

3、启动kafka集群

在这里插入图片描述
1、生成Cluster UUID(集群UUID)

./kafka-storage.sh random-uuid

在这里插入图片描述

2、格式化日志目录

./kafka-storage.sh format -t UBLFE9CCRwauv0Mc3tU7qQ -c ../config/kraft/server.properties

在这里插入图片描述

3、启动kafka

//后台启动
./kafka-server-start.sh ../config/kraft/server.properties &

4、关闭kafka

./kafka-server-stop.sh ../config/kraft/server.properties

4、SpringBoot集成kafka的kRaft集群

在这里插入图片描述

4.1、消费者

package com.power.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {

    /**
     * topics 用于指定从哪个主题中消费消息
     * concurrency 用于指定有多少个消费者
     * @param record
     */
    @KafkaListener(topics = {"kraftClusterTopic"}, groupId = "kraftclusterGroup")
    public void onEventA(ConsumerRecord<String, String> record) {
        System.out.println(Thread.currentThread().getId()+"---> 消费消息 record = " + record);
    }
}

4.2、生产者

package com.power.producer;

import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate;

    public void sendEvent(){
        for (int i = 0; i < 2; i++) {
            User user = User.builder().id(i).phone("1567676767"+i).birthday(new Date()).build();
            String userJson = JSONUtils.toJSON(user);
            kafkaTemplate.send("kraftClusterTopic","k"+i, userJson);
        }
    }

}

4.3、配置类

package com.power.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaConfig {

    @Bean
    public NewTopic newTopic(){
        //设置副本个数不能为0,也不能大于节点个数,否则将不能创建Topic
        return new NewTopic("kraftClusterTopic",2, (short)2);
    }
}

4.4、实体类

package com.power.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {

    private Integer id;

    private String phone;

    private Date birthday;

}

4.5、JSON工具类

package com.power.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JSONUtils {

    private static final ObjectMapper OBJECTMAPPER = new ObjectMapper();

    public static String toJSON(Object object){
        try {
            return OBJECTMAPPER.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public static <T> T toBean(String json,Class<T> clazz){
        try {
            return OBJECTMAPPER.readValue(json,clazz);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}

4.6、项目配置文件

spring:
  application:
    #应用名称
    name: spring-boot-08-kafka-kRaftCluster

  #kafka连接地址(ip+port)
  kafka:
    bootstrap-servers: <你的服务器IP>:9091,<你的服务器IP>:9092,<你的服务器IP>:9093

    #配置消费者的反序列化
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

4.7、测试类

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot08KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void sendInterceptor(){
        eventProducer.sendEvent();
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2089524.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【web开发】Spring Boot 快速搭建Web项目

Date: 2024.08.30 13:52:20 author: lijianzhan 简述&#xff1a;【Spring Boot 快速搭建Web项目应用】是一篇关于Java Web项目构建的文章&#xff0c;主要讲解了如何借助Maven工具来管理和构建Web应用程序。Maven是Java开发中广泛使用的自动化构建工具&#xff0c;能够帮助开…

顺序循环队列

顺序循环队列 队头插入元素&#xff0c;队尾删除元素 本来应该判空和判断是否存满的条件都是&#xff1a;队头 队尾&#xff0c;但这样就没办法区分了&#xff0c;所以&#xff0c;就牺牲一个空间&#xff08;比如长度为10&#xff0c;但只能存9个&#xff09;&#xff0c;这…

基层医疗云HIS系统源码:云计算、大数据等现代信息技术研发

云HIS源码&#xff0c;基层云HIS系统源码&#xff0c;基层医疗云HIS系统 利用云计算、大数据等现代信息技术研发的基层医疗云HIS系统实现了医院信息化从局域网向互联网转型&#xff0c;重新定义医疗卫生信息化建设的理念、构架、功能和运维体系。实现了医院信息化由局域网向互…

CAN协议通信 学习笔记

文章目录 1.CAN通信简介2.物理层2.1 CAN总线的电气特性2.2 CAN的位同步机制&#xff08;了解&#xff0c;用于理解CAN的初始化参数的配置原理&#xff09;硬同步方式重新同步方式 2.3 CAN对比其他常用协议的优势 3. 数据链路层3.1 CAN协议的数据帧3.2 仲裁机制3.3 访问控制3.4 …

python-FastApi框架

文章目录 FastApi一. 简介二. 特性三. 安装1. 安装fastapi模块2. 安装ASGI服务器( Uvicorn 或者 Hypercorn) 四. 实例1. 创建**main.py**文件(GET请求)2. 运行3. 测试4. 更新main_py(加入PUT请求) 五. 自动化API文档1. Swagger UI(交互式文档)2. ReDoc(可选式文档) FastApi 一…

华为云征文|Flexus云服务器X,云上性能新飞跃,开启业务增长新纪元

&#x1f3c6;作者简介&#xff0c;黑夜开发者&#xff0c;CSDN领军人物&#xff0c;全栈领域优质创作者✌&#xff0c;CSDN博客专家&#xff0c;阿里云社区专家博主&#xff0c;2023年6月CSDN上海赛道top4。 &#x1f3c6;数年电商行业从业经验&#xff0c;AWS/阿里云资深使用…

想告诉所有人,我找到脸书视频保存方法啦!

各位集美集帅们&#xff0c;我可算找到脸书视频的保存教学啦。作为社媒体人&#xff0c;在脸书看到有趣的素材却保存不了时真的要急的爆炸了。试了好多方式&#xff0c;这软件是最给力哒&#xff0c;我不管&#xff0c;下面的步骤介绍你一定要看完&#xff01; 打开脸书&#x…

JVM面试(一)什么是虚拟机?什么是class文件?

什么是java虚拟机&#xff1f; 如果通俗点来讲&#xff0c;我们在电脑上一行行敲出来的代码&#xff0c;电脑本身是不认识的&#xff0c;最终是要转成电脑可以运行的101001这种字节。 但是这些我们又不可能手动来转换&#xff0c;所以呢&#xff0c;就需要一个工具&#xff0…

关于redis存储数据类型选择

项目使用的spring-boot&#xff0c;操作redis使用的是spring redis的api 在序列化的时候&#xff0c;如果往redis存入的是比较小的数字&#xff0c;反序列化的时候&#xff0c;会是integer类型 如果字段定义的是Long类型&#xff0c;因为比较小&#xff0c;所以被反序列化成i…

Cadence高速板设计技巧(全志H3)

市场上一般的电视屏幕是4K的&#xff1a; cadence查找&#xff1a; 右侧的面板FIND里面输入要查找的名字就可以进行查找。 全局查找需要鼠标点击到.DSN的&#xff0c;进入全局。 在视图里选择一个层就可以单独查看这个层的东西&#xff0c;屏蔽掉其他层的东西&#xff1a; 共…

linux命令:用于删除空目录的命令行工具rmdir详细介绍

目录 一、概述 二、用法 1、基本语法 &#xff08;1&#xff09;选项 &#xff08;2&#xff09;目录... 2、主要选项 &#xff08;1&#xff09;-p, --parents &#xff08;2&#xff09; -v, --verbose &#xff08;3&#xff09; -h, --help &#xff08;4&#x…

Mysql基础练习题 596.查询至少有5个学生的所有班级 (力扣)

596.查询至少有5个学生的所有班级 建表插入数据&#xff1a; Create table If Not Exists Courses (student varchar(255), class varchar(255)) Truncate table Courses insert into Courses (student, class) values (A, Math) insert into Courses (student, class) value…

指针进阶(多级指针)

0.多级指针命名 多级指针命名&#xff0c;最主要的是要知道该指针指向的是什么数据。 一.1级指针 - 指向一个变量 若定义一个变量 int a&#xff0c;那么 目标类型就为 int。 所以该指针应该定义为 int *p; /* 目标 */ int a;/* 目标类型 *p */ int *p;/* 指向目标 */ p a;二…

服务器数据恢复—磁盘坏扇区导致raid6阵列崩溃的数据恢复案例

服务器存储数据恢复环境&#xff1a; 一台存储中有一组由12块SAS硬盘组建的raid6磁盘阵列&#xff0c;划分了1个卷&#xff0c;由数台Vmware ESXI主机共享存储。卷中存放了大量的Windows系统虚拟机。这些虚拟机系统盘大小一致&#xff0c;数据盘大小不确定&#xff0c;数据盘都…

安卓主板_MTK安卓主板定制_联发科主板/开发板方案

这款安卓主板采用了联发科的MTK8788、MTK8768及MTK8766系列芯片平台&#xff0c;运用了64位四核/八核 Cortex-A53/A73架构&#xff0c;主频高达2.0 GHz。主板配置了4GB LPDDR3内存和64GB eMMC存储&#xff0c;同时配备了ARM Mail-T450 MP2图形处理单元(GPU)&#xff0c;使其在4…

PbootCMS程序安全设置建议

近期遇到部分使用PbootCMS开源程序的客户反馈网站被挂马/入侵等情况&#xff0c;我司核实原因是由于此程序存在漏洞&#xff0c;用户可以按照以下建议进行安全设置。 虚拟主机 一、程序建议&#xff1a; 1、登录后台&#xff0c;将程序升级到最新版本&#xff0c;密码重置为…

算力网络痛点;对象存储OSS;CPN功能模块

目录 算力网络 算力网络痛点:度量困难、种类繁多、分布广泛、归属复杂。 CPN功能模块 对象存储OSS 算力网络 在分析算力资源的特点前,我们首先要明确算力的概念。算力,也称为计算力或计算能力。该词的最早来源已经不可查证,互联网上的资料大多与区块链相关。这是因为区…

MyBatis拦截器面试题

JDBC的执行流程 &#xff08;面试题一&#xff09; MyBatis执行流程&#xff08;面试题二&#xff09; (我的猜测:1执行器通过工厂执行Mapper类,,2.语句映射器处理mappe文件成对象,3把前端传过来的参数映射到对象里,4输出结果映射) Mybatis拦截器(四个) 拦截的执行顺序是Execu…

ubuntu设置为自己需要的屏幕分辨率

先说一下我处理该问题的大体背景&#xff1a;我是学习Linux的新手&#xff0c;刚学完嵌入式Linux驱动开发相关课程。现在想接着学习一下QT开发。我是在电脑上装了虚拟机之后安装的ubuntu系统。因为换了电脑&#xff0c;所以重新装了ubuntu系统。但是&#xff0c;装完ubuntu系统…

AI Agent从实操体验到代码理解

一、从体验出发 从chatbot到co-pilot&#xff0c;LLM应为的场景不断在扩大&#xff0c;形式也越来越多样化&#xff0c;到如今&#xff0c;chatbot的风头基本已过&#xff0c;co-pilot正在成为大模型嵌入到产品里面的主要形态&#xff0c;但随着PMF&#xff08;产品市场契合度&…