kafka微服务学习

news2024/12/24 12:19:15

消息中间件对比:
1、吞吐、可靠性、性能
在这里插入图片描述

Kafka安装

Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

  • Docker安装zookeeper

下载镜像:

docker pull zookeeper:3.4.14

创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
  • Docker安装kafka

下载镜像:

docker pull wurstmeister/kafka:2.12-2.3.1

创建容器

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

kafka入门

  • 生产者发送消息,多个消费者只能有一个消费者接收到消息
  • 生产者发送消息,多个消费者都可以接收到消息

(1)创建kafka-demo项目,导入依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

(2)生产者发送消息

package com.heima.kafka.sample;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * 生产者
 */
public class ProducerQuickStart {

    public static void main(String[] args) {
        //1.kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //2.生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

        //封装发送的消息
        ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");

        //3.发送消息
        producer.send(record);

        //4.关闭消息通道,必须关闭,否则消息发送不成功
        producer.close();
    }

}

(3)消费者接收消息

package com.heima.kafka.sample;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * 消费者
 */
public class ConsumerQuickStart {

    public static void main(String[] args) {
        //1.添加kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //2.消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        //3.订阅主题
        consumer.subscribe(Collections.singletonList("itheima-topic"));

        //当前线程一直处于监听状态
        while (true) {
            //4.获取消息
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
            }
        }

    }

}

kafka高可用设计

1、设计集群模式:

Kafka的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个Broker 组成。当一个机器宕机了,另外一个机器就会替补山
在这里插入图片描述

2、备份机制:

Kafka定义了两类副本

  1. 领导者副本(Leader Replica)
  2. 追随者副本 (Follower Replica)
    追随者副本分为两类:
    1、一种是ISR副本,同步保存
    2、普通的副本,异步保存
    出现主节点宕机,会先选ISR副本中的一个成为新的主节点,保证数据一致性,没有ISR节点,再从普通节点中挑选
    针对全部节点宕机的情况,有两种策略:
    1、等待第一个ISR副本,保证了数据的尽可能一致
    2、等待一个复活的追随者,无论是ISR还是普通,提高系统的高可用性。

kafka生产者详解

1发送类型

  • 同步发送

    使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功

RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
System.out.println(recordMetadata.offset());
  • 异步发送

    调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数

//异步消息发送
producer.send(kvProducerRecord, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if(e != null){
            System.out.println("记录异常信息到日志表中");
        }
        System.out.println(recordMetadata.offset());
    }
});

2参数详解

  • ack

代码的配置方式:

//ack配置  消息确认机制
prop.put(ProducerConfig.ACKS_CONFIG,"all");

参数的选择说明

确认机制说明
acks=0生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快
acks=1(默认值)只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应
acks=all只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应
  • retries

生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms

代码中配置方式:

//重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);
  • 消息压缩

默认情况下, 消息发送时不会被压缩。

代码中配置方式:

//数据压缩
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
压缩算法说明
snappy占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用
lz4占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观
gzip占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法

使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。

kafka消费者

消息的有序性

方法:一个topic分区能保证自己的数据是按照先后消费的,但是不能保证跨分区消息处理的先后顺序。我么只能使用一个分区,在单分区种,消息可以保证严格顺序消费

提交和偏移量

在这里插入图片描述
自动提交:
当enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll0方法接收的最大偏移量提交上去,这样只是记录了规定时间内的最大偏移量,其实与数据提交的偏移量存在偏差,因此可能会出现数据的重复提交或者丢失
手动提交
当enableauto.commit被设置为false可以有以下三种提交方式

  • 提交当前偏移量(同步提交)
  • 异步提交
  • 同步和异步组合提交

同步提交:commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。

while (true){
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
        System.out.println(record.key());
        try {
            consumer.commitSync();//同步提交当前最新的偏移量
        }catch (CommitFailedException e){
            System.out.println("记录提交失败的异常:"+e);
        }

    }
}

异步提交:手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。消息没有重试机制

while (true){
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
        System.out.println(record.key());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
            if(e!=null){
                System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
            }
        }
    });
}

同步和异步组合提交

异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖

举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

try {
    while (true){
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
            System.out.println(record.key());
        }
        consumer.commitAsync();
    }
}catch (Exception e){+
    e.printStackTrace();
    System.out.println("记录错误信息:"+e);
}finally {
    try {
        consumer.commitSync();
    }finally {
        consumer.close();
    }
}

springboot整合kafka

1、在父类中的pop文件中导入依赖包

```xml
<!-- kafkfa -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

2、在需要用到kafka的微服务的naco中分别配置生产者和消费者配置

spring:
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
spring:
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    consumer:
      group-id: ${spring.application.name}
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

传递消息为对象

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式

  • 发送消息
@GetMapping("/hello")
public String hello(){
    User user = new User();
    user.setUsername("xiaowang");
    user.setAge(18);

    kafkaTemplate.send("user-topic", JSON.toJSONString(user));

    return "ok";
}
  • 接收消息
package com.heima.kafka.listener;

import com.alibaba.fastjson.JSON;
import com.heima.kafka.pojo.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class HelloListener {

    @KafkaListener(topics = "user-topic")
    public void onMessage(String message){
        if(!StringUtils.isEmpty(message)){
            User user = JSON.parseObject(message, User.class);
            System.out.println(user);
        }

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1185848.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux 下最主流的文件系统格式——ext

硬盘分成相同大小的单元&#xff0c;我们称为块&#xff08;Block&#xff09;。一块的大小是扇区大小的整数倍&#xff0c;默认是 4K。在格式化的时候&#xff0c;这个值是可以设定的。 一大块硬盘被分成了一个个小的块&#xff0c;用来存放文件的数据部分。这样一来&#xf…

操作系统 day06(进程控制、原语)

进程控制的概念 原语 怎么实现进程控制—用原语实现 如果不能一气呵成&#xff0c;那么会出现操作系统中的某些关键数据结构信息不统一的情况&#xff0c;这会影响操作系统进行别的管理工作&#xff0c;如下图所示&#xff1a; 原语的原子性怎么实现 正常情况下&#xff…

Source insight 创建工程研读库源码

下载库文件解压 创建工程添加路径 添加文件到工程 同步文件 下载库文件解压 创建工程添加路径 添加文件到工程 同步文件 数据库更新 强制重新分析所有文件仅同步当前源文件 添加和删除文件 自动添加新文件从项目中删除丢失的文件

程序员想要网上接单?那这几点注意事项你可要记好了!不看后悔!

相信网上接单对于程序员来说并不陌生&#xff0c;甚至有些程序员还以此为主业&#xff0c;靠网上接单来增加收入&#xff0c;维持生计&#xff0c;但是你真的确定你懂网上接单的套路吗&#xff1f;你知道网上接单的注意事项吗&#xff1f;这期文章就来盘点一下&#xff0c;无论…

跟着森老师学React Hooks(1)——使用Vite构建React项目

Vite是一款构建工具&#xff0c;对ts有很好的支持&#xff0c;最近也是在前端越来越流行。 以往的React项目的初始化方式大多是通过脚手架create-react-app(本质是webpack)&#xff0c;其实比起Vite来构建&#xff0c;启动会慢一些。 所以这次跟着B站的一个教程&#xff0c;使用…

JavaWeb Day05 前后端请求响应与分层解耦

目录 一、请求与响应 &#xff08;一&#xff09;请求的参数接收 ①数组参数 ②集合参数 ③日期参数 ④json参数 ⑤路径参数 总结 &#xff08;二&#xff09;响应 ①简单文本text ②数组 ③列表 ④同一响应数据格式 ⑤总结 二、三层架构与分层解耦 &#xff0…

Windows下MSYS2下载与安装

下载地址&#xff1a; 官网下载地址 https://www.msys2.org/阿里云镜像下载 https://mirrors.aliyun.com/msys2/distrib/x86_64/https://mirrors.aliyun.com/msys2/distrib/x86_64/msys2-x86_64-20231026.exe?spma2c6h.25603864.0.0.12b92551XW5OSM官网下载 ![官网下载](htt…

程序员怎样才能学好算法,推荐好书送给大家

前言 数据结构和算法是计算机科学的基石&#xff0c;是计算机的灵魂 要想成为计算机专业人员&#xff0c;学习和掌握算法是十分必要的。不懂数据结构和算法的人不可能写出效率更高的代码。计算机科学的很多新行业都离不开数据结构和算法作为基石&#xff0c;比如大数据、人工…

【AICFD案例教程】轴流风扇仿真分析

AICFD是由天洑软件自主研发的通用智能热流体仿真软件&#xff0c;用于高效解决能源动力、船舶海洋、电子设备和车辆运载等领域复杂的流动和传热问题。软件涵盖了从建模、仿真到结果处理完整仿真分析流程&#xff0c;帮助工业企业建立设计、仿真和优化相结合的一体化流程&#x…

Jmeter接口自动化测试操作流程

在企业使用jmeter开展实际的接口自动化测试工具&#xff0c;建议按如下操作流程&#xff0c; 可以使整个接口测试过程更规范&#xff0c;更有效。 接口自动化的流程&#xff1a; 1、获取到接口文档&#xff1a;swagger、word、excel ... 2、熟悉接口文档然后设计测试用例&am…

QT第2课-GUI程序实例分析

GUI程序开发概述 不同的操作系统GUI开发原理相同不同的操作系统GUI SDK 不同 GUI 程序开发原理 GUI程序在运行时会创建一个消息队列系统内核将用户的键盘鼠标操作翻译成对应的程序消息程序在运行过程中需要实时处理队列中的消息当队列中没有消息时&#xff0c;程序将处于停滞…

第23章(上)_索引原理之索引与约束

文章目录 索引索引分类主键选择索引的代价 约束外键约束约束与索引的区别 索引使用场景不要使用索引的场景总结 索引 索引的概念&#xff1a;索引是一种有序的存储结构。索引按照单个或多个列的值进行排序。 索引的目的&#xff1a;提升搜索效率。 索引分类 按照数据结构分为…

通达OA V12版,引入thinkphp5.1框架,及获取session

通达OA V12版&#xff0c;引入thinkphp5.1框架 如下过程引入 如下过程引入 在D:/MYOA/webroot目录下&#xff0c;通过composer安装thinkphp5.1框架。在tp框架下&#xff0c;找到文件&#xff1a;thinkphp\library\think下的Error.php。方案1&#xff1a;截断异常输出 代码如…

js处理赎金信

给你两个字符串&#xff1a;ransomNote 和 magazine &#xff0c;判断 ransomNote 能不能由 magazine 里面的字符构成。 如果可以&#xff0c;返回 true &#xff1b;否则返回 false 。 magazine 中的每个字符只能在 ransomNote 中使用一次。 示例 1&#xff1a; 输入&…

栈的应用:表达式求值(中缀表达式,后缀表达式,前缀表达式)

目录 1.三种算术表达式1.中缀表达式2.后缀表达式3.前缀表达式 2.后缀表达式相关考点1.中缀表达式转后缀表达式1.手算方法2.例题3.机算 2.后缀表达式求值1.手算方法2.机算 3.前缀表达式相关考点1.中缀表达式转前缀表达式1.手算方法2.例题 2.前缀表达式求值 4.中缀表达式求值 1.三…

无人机航拍技术基础入门,无人机拍摄的方法与技巧

一、教程描述 买了无人机&#xff0c;可是我不敢飞怎么办&#xff1f;禁飞区越来越多&#xff0c;到底哪儿才能飞&#xff1f;我的无人机跟你一样&#xff0c;为什么我拍不出大片&#xff1f;厂家的说明书看不进去&#xff0c;有没有一套无人机的课程&#xff0c;可以快速上手…

【开源】基于Vue和SpringBoot的超市商品管理系统

项目编号&#xff1a; S 001 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S001&#xff0c;文末获取源码。} 项目编号&#xff1a;S001&#xff0c;文末获取源码。 目录 一、摘要1.1 简介1.2 项目录屏 二、研究内容2.1 数据中心模块2.2 超市区域模块2.3 超市货…

Python测试框架之pytest快速入门

pytest是一种流行的Python测试框架&#xff0c;支持创建简单的单元测试&#xff0c;也支持创建复杂的功能和集成测试。它提供了一系列有用的功能&#xff0c;能够方便地编写&#xff0c;组织和运行测试用例&#xff0c;并生成丰富的测试报告。 pytest的主要特点包括&#xff1…

2023年11月编程语言流行度排名

点击查看最新编程语言流行度排名&#xff08;每月更新&#xff09; 2023年11月编程语言流行度排名 编程语言流行度排名是通过分析在谷歌上搜索语言教程的频率而创建的 一门语言教程被搜索的次数越多&#xff0c;大家就会认为该语言越受欢迎。这是一个领先指标。原始数据来自…

『 MySQL数据库 』数据库基础之表的基本操作

文章目录 创建表&#x1f5e1;查看表&#x1f5e1;✒ 查看表内所有信息(描述\表结构等)✒ 根据条件查看表内数据✒ 查看表的具体详细信息: 修改表&#x1f5e1;✒ 修改表名:✒ 修改表的存储引擎、编码集(字符集和校验集):✒ 表内插入数据:insert into✒ 在表中新添一个字段(列)…