Kafka框架快速入门及异步通知

news2024/9/25 9:31:33

文章目录

  • 1、异步通信原理
    • 1.1 观察者模式
    • 1.2 生产者消费者模式
    • 1.3 缓冲区
    • 1.4 数据单元
  • 2、消息系统原理
    • 2.1 点对点消息传递
    • 2.2 发布订阅消息传递
  • 3、Kafka简介
    • 3.1 设计目标
    • 3.2 Kafka的优点
  • 4、Kafka系统架构
    • 4.1 Broker
    • 4.2 Topic
    • 4.3 Partition
    • 4.4 Leader
    • 4.5 Follower
    • 4.6 replication
    • 4.7 producer
    • 4.8 consumer
    • 4.9 Consumer Group
    • 4.10 offset偏移量
    • 4.11 Zookeeper
  • 5、Kafka环境搭建
  • 6、Spring Boot集成Kafka
    • 1 入门
      • 导入spring-kafka依赖信息
      • 在resources下创建文件application.yml
      • 消息生产者
      • 消息消费者
    • 2 传递消息为对象

1、异步通信原理

1.1 观察者模式

在这里插入图片描述

1.2 生产者消费者模式

在这里插入图片描述

1.3 缓冲区

在这里插入图片描述

1.4 数据单元

在这里插入图片描述

2、消息系统原理

一个消息系统负责将数据从一个应用传递到另一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的

2.1 点对点消息传递

在这里插入图片描述

2.2 发布订阅消息传递

在这里插入图片描述

3、Kafka简介

官网:http://kafka.apache.org
在这里插入图片描述

3.1 设计目标

在这里插入图片描述

3.2 Kafka的优点

4、Kafka系统架构

在这里插入图片描述

4.1 Broker

在这里插入图片描述

4.2 Topic

在这里插入图片描述

4.3 Partition

在这里插入图片描述

4.4 Leader

在这里插入图片描述

4.5 Follower

在这里插入图片描述

4.6 replication

在这里插入图片描述

4.7 producer

在这里插入图片描述

4.8 consumer

在这里插入图片描述

4.9 Consumer Group

在这里插入图片描述

4.10 offset偏移量

在这里插入图片描述

4.11 Zookeeper

在这里插入图片描述

5、Kafka环境搭建

Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

  • Docker安装Zookeeper
    下载镜像
docker pull zookeeper:3.4.14

创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

Docker安装Kafka

docker pull wurstmeister/kafka:2.12-2.3.1

创建容器

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--restart always=true
--net=host wurstmeister/kafka:2.12-2.3.1

(1)创建Kafka-demo项目,导入依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

(2)生产者发送消息

package com.heima.kafka.sample;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * 生产者
 */
public class ProducerQuickStart {

    public static void main(String[] args) {
        //1.kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //2.生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

        //封装发送的消息
        ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");

        //3.发送消息
        producer.send(record);

        //4.关闭消息通道,必须关闭,否则消息发送不成功
        producer.close();
    }

}

(3)消费者接收消息

package com.heima.kafka.sample;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * 消费者
 */
public class ConsumerQuickStart {

    public static void main(String[] args) {
        //1.添加kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //2.消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        //3.订阅主题
        consumer.subscribe(Collections.singletonList("itheima-topic"));

        //当前线程一直处于监听状态
        while (true) {
            //4.获取消息
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
            }
        }

    }

}

6、Spring Boot集成Kafka

1 入门

导入spring-kafka依赖信息

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- kafkfa -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
</dependencies>

在resources下创建文件application.yml

server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

消息生产者

package com.heima.kafka.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping("/hello")
    public String hello(){
        kafkaTemplate.send("itcast-topic","黑马程序员");
        return "ok";
    }
}

消息消费者

package com.heima.kafka.listener;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class HelloListener {

    @KafkaListener(topics = "itcast-topic")
    public void onMessage(String message){
        if(!StringUtils.isEmpty(message)){
            System.out.println(message);
        }

    }
}

2 传递消息为对象

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式

  • 发送消息
@GetMapping("/hello")
public String hello(){
    User user = new User();
    user.setUsername("xiaowang");
    user.setAge(18);

    kafkaTemplate.send("user-topic", JSON.toJSONString(user));

    return "ok";
}
  • 接受消息
package com.heima.kafka.listener;

import com.alibaba.fastjson.JSON;
import com.heima.kafka.pojo.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class HelloListener {

    @KafkaListener(topics = "user-topic")
    public void onMessage(String message){
        if(!StringUtils.isEmpty(message)){
            User user = JSON.parseObject(message, User.class);
            System.out.println(user);
        }

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/354797.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ACL与NAT

ACL---访问控制列表&#xff0c;是一种策略控制工具 功能&#xff1a;1.定义感兴趣流量&#xff08;数据层面 &#xff09; 2.定义感兴趣路由&#xff08;控制层面&#xff09; ACL 条目表项组成&#xff1a; 编号规则&#xff1a;步数或者跳数默认值为5&#xff0c;…

Rancher 部署 MySQL

文章目录创建 pvc部署 MySQL前置条件&#xff1a;安装 rancher&#xff0c;可参考文章 docker 部署 rancher 创建 pvc MySQL 数据库是需要存储的&#xff0c;所以必须先准备 pvc 创建 pvc 自定义 pvc 名称选择已经新建好的 storageclass&#xff0c;storageclass 的创建可参考…

Mac os如何安装绿盾客户端

环境&#xff1a; Apple Mac mini 八核M1芯片 8G 256G Mac os 11.0 问题描述&#xff1a; Mac os如何安装绿盾客户端 解决方案&#xff1a; 一、关闭系统保护 1.关机电脑&#xff0c;按住“开机键”不放直到屏幕上出现“选项” 点击“继续”&#xff0c;等待进入恢复模…

github 使用

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录一、git与github二、出错的地方1.GitHub没有css样式2、git clone出现错误3、明明创建了responsibility 但git 不显示一、git与github 这个博客写的很好&#xff01;…

剔除绿化后的比较可靠的评价模型

sheet4权重实际评分&#xff08;越高越不好&#xff09;&#xff08;0-5分&#xff09;正向化&#xff08;5-X&#xff09;后分别与权重相乘得到该地的总分总分高&#xff0c;排名高为好&#xff0c;即污染小排序后

【Kubernetes】第三篇 - ci-server 构建节点 Docker、Jenkins 环境搭建

一&#xff0c;前言 上一篇&#xff0c;主要介绍了阿里云服务器的采购和简单配置&#xff1a; 三台服务器规划如下&#xff1a; 服务配置内网IP外网IP说明ci-server2c4g172.17.178.104182.92.4.158Jenkins Nexus Dockerk8s-master2c4g172.17.178.10547.93.9.45Kubernetes …

ARM uboot 源码分析5 -启动第二阶段

一、start_armboot 解析6 1、console_init_f (1) console_init_f 是 console&#xff08;控制台&#xff09;的第一阶段初始化。_f 表示是第一阶段初始化&#xff0c;_r 表示第二阶段初始化。有时候初始化函数不能一次一起完成&#xff0c;中间必须要夹杂一些代码&#xff0c;…

ccc-pytorch-回归问题(1)

文章目录1.简单回归实战&#xff1a;2.手写数据识别1.简单回归实战&#xff1a; 用 线性回归拟合二维平面中的100个点 公式&#xff1a;ywxbywxbywxb 损失函数&#xff1a;∑(yreally−y)2\sum(y_{really}-y)^2∑(yreally​−y)2 迭代方法&#xff1a;梯度下降法&#xff0c;…

【QA】[Vue/复选框全选] v-model绑定每一项的赋初值问题

发生场景&#xff1a;不只是复选框的状态改变&#xff0c;还有的功能要用到复选框的选中状态&#xff0c;比如&#xff1a;购物车计算总价&#xff0c;合计等等。 引入&#xff1a;复选框 checkbox 在使用时&#xff0c;需要用v-model绑定布尔值&#xff0c;来获取选中状态&…

一台电脑安装26个操作系统(windows,macos,linux,chromeOS,Android,静待HarmonyOS)

首先看看安装了哪些操作系统1-4: windows系统 四个5.Ubuntu6.deepin7.UOS家庭版8.fydeOS9.macOS10.银河麒麟11.红旗OS12.openSUSE Leap13.openAnolis14.openEuler(未安装桌面UI)15.中标麒麟&#xff08;NeoKylin&#xff09;16.centos17.debian Edu18.fedora19.oraclelinux(特别…

Rust Web入门(一):TCP 和 HTTP Server

本教程笔记来自 杨旭老师的 rust web 全栈教程&#xff0c;链接如下&#xff1a; https://www.bilibili.com/video/BV1RP4y1G7KF?p1&vd_source8595fbbf160cc11a0cc07cadacf22951 学习 Rust Web 需要学习 rust 的前置知识可以学习杨旭老师的另一门教程 https://www.bili…

【原创】java+swing+mysql图书管理系统设计与实现

图书管理系统是一个比较常见的系统&#xff0c;今天我们主要介绍如何使用javaswiingmysql去开发一个cs架构的图书管理系统&#xff0c;方便学生进行图书借阅。 功能分析&#xff1a; 宿舍报修管理系统的使用角色&#xff0c;一般分为管理员和学生&#xff0c;管理员主要进行学…

学习OpenGL图形2D/3D编程

环境&#xff1a;WindowsVisual Studio 2019最流行的几个库&#xff1a;GLUT&#xff0c;SDL&#xff0c;SFML和GLFWGLFWGLAD库查看显卡OPENGL支持情况VS2019glfwgladopenGL3.3顶点着色器片段着色器VAO-VBO-(EBO)->渲染VAO-VBO-EBO->texture纹理矩阵matrix对图形transfor…

jmx prometheus引起的一次cpu飙高

用户接入了jmx agent进行prometheus监控后&#xff0c;在某个时间点出现cpu飙高 排查思路&#xff1a; 1、top&#xff0c;找到java进程ID 2、top -Hp 进程ID&#xff0c;找到java进程下占用高CPU的线程ID 3、jstack 进程ID&#xff0c;找到那个高CPU的线程ID的堆栈。 4、分析堆…

jenkins基础部署

一、jenkins是什么1.Jenkins的前身是Hudson&#xff0c;采用JAVA编写的持续集成开源工具。Hudson由Sun公司在2004年启动&#xff0c;第一个版本于2005年在java.net发布。2007年开始Hudson逐渐取代CruiseControl和其他的开源构建工具的江湖地位。在2008年的JavaOne大会上在开发者…

【Vue3源码】第二章 effect功能的完善下

【Vue3源码】第二章 effect功能的完善下 前言 上一章节我们实现了effect函数的runner 和 scheduler&#xff0c;这一章我们继续完善effect函数的功能&#xff0c;stop和onstop。 1、实现effect的stop功能 顾名思义&#xff0c;stop就是让effect停下来的函数。那么怎么才能让…

系统分享|分享几个Windows系统镜像下载网站

&#x1f4e3;今日作品&#xff1a;如何关闭Microsoft start方法介绍&#x1f466;创作者&#xff1a;Jum朱⏰预计花费&#xff1a;10分钟&#x1f4d6;个人主页&#xff1a;Jum朱博客的个人主页系统之家传送门&#xff1a;https://www.xitongzhijia.net/这个是老牌一直还在运营…

聊聊RocketMQ 的功能特性

这是RocketMQ的第三篇文章&#xff0c;前两篇文章我们说了一下rocketmq的入门安装和开发配置&#xff0c;以及他的一些名词解释&#xff0c;RocketMQ入门第一次&#xff0c;RocketMQ&#xff08;二&#xff09; 领域名词。今天我们来说说的他的一些功能特性。明确区分这些功能特…

[AI生成图片] 效果最好的Midjourney 的介绍和使用

Midjourney介绍&#xff1a; 是一个文本生成图片的扩散模型&#xff0c;能够根据输入的任何文本生成令人难以置信的图像&#xff0c;让数十亿人在几秒钟内创造惊人的艺术。为方便用户控制和快速生成图片&#xff0c;打开后在页面底部输入文本内容&#xff0c;稍等一小会&#…

基于easyexcel的MySQL百万级别数据的excel导出功能

前言最近我做过一个MySQL百万级别数据的excel导出功能&#xff0c;已经正常上线使用了。这个功能挺有意思的&#xff0c;里面需要注意的细节还真不少&#xff0c;现在拿出来跟大家分享一下&#xff0c;希望对你会有所帮助。原始需求&#xff1a;用户在UI界面上点击全部导出按钮…