JavaWeb_LeadNews_Day6-Kafka

news2024/12/26 20:50:46

JavaWeb_LeadNews_Day6-Kafka

  • Kafka
    • 概述
    • 安装配置
    • kafka入门
    • kafka高可用方案
    • kafka详解
      • 生产者同步异步发送消息
      • 生产者参数配置
      • 消费者同步异步提交偏移量
    • SpringBoot集成kafka
  • 自媒体文章上下架
    • 实现思路
    • 具体实现
  • 来源
  • Gitee

Kafka

概述

  • 对比
  • 选择
  • 介绍
    • producer: 发布消息的对象称之为主题生产者 (Kafka topic producer)
    • topic: Kafka将消息分门别类,每一类的消息称之为一个主题 (Topic)
    • consumer:订阅消息并处理发布的消息的对象称之为主题消费者 (consumers)
    • broker:已发布的消息保存在一组服务器中,称之为Kafka集群,集群中的每一个服务器都是一个代理(Broker)。消费者可以订阅个或多个主题 (topic),并从Broker拉数据,从而消费这些已发布的消息

安装配置

  • 安装zookeeper
    // 下载zookeeper镜像
    docker pull zookeeper:3.4.14
    // 创建容器
    docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
    
  • 安装kafka
    // 下载kafka镜像
    docker pull wurstmeister/kafka:2.12-2.3.1
    // 创建容器
    docker run -d --name kafka \
    --env KAFKA_ADVERTISED_HOST_NAME=192.168.174.133 \
    --env KAFKA_ZOOKEEPER_CONNECT=192.168.174.133:2181 \
    --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.174.133:9092 \
    --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
    --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
    --net=host wurstmeister/kafka:2.12-2.3.1
    
    // 解释
    --net=host,直接使用容器宿主机的网络命名空间,即没有独立的网络环境。它使用宿主机的ip和端口(云主机会不好使)
    

kafka入门

  • 依赖
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
    </dependency>
    
  • Producer
    public class ProducerQuickStart {
        public static void main(String[] args) {
            // 1. kafka链接配置信息
            Properties prop = new Properties();
            // 1.1 kafka链接地址
            prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");
            // 1.2 key和value的序列化
            prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            // 2. 创建kafka生产者对象
            KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
            // 3. 发送信息
            // 参数列表: topic, key, value
            ProducerRecord<String, String> record = new ProducerRecord<>("topic-first", "key1", "Hello Kafka!");
            producer.send(record);
            // 4. 关闭消息通道
            // 必须关闭, 否则消息发送bucg
            producer.close();
        }
    }
    
  • Consumer
    public class ConsumerQuickStart {
        public static void main(String[] args) {
            // 1. kafka的配置信息
            Properties prop = new Properties();
            // 1.1 链接地址
            prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");
            // 1.2 key和value的反序列化器
            prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            // 1.3 设置消费者组
            prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
            // 2. 创建消费者对象
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
            // 3. 订阅主题
            consumer.subscribe(Collections.singleton("topic-first"));
            // 4. 拉取信息
            while(true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.key());
                    System.out.println(record.value());
                }
            }
        }
    }
    
  • 总结
    • 同一组只有一个消费者能够接收到消息, 如果需要所有消费者都能接收到消息, 需要消费者在不同的组

kafka高可用方案

  • 集群

  • 备份

    kafka定义了两类副本:

    • 领导者副本
    • 追随者副本

    数据在领导者副本存储后, 会同步到追随者副本

    同步方式
    leader失效后, 选择leader的原则

    1. 优先从ISR中选取, 因为ISR的数据和leader是同步的.
    2. ISR中的follower都不行了, 就从其他的follower中选取.
    3. 当所有的follower都失效了, 第一种是等待ISR中的follower活过来, 数据可靠, 但等待时间不确定, 第二种是等待任意follower活过来, 最快速度恢复可用性, 但数据不一定完整.

kafka详解

生产者同步异步发送消息

// 同步发送
RecordMetadata metadata = producer.send(record).get();
System.out.println(metadata.offset());

// 异步发送
producer.send(record, new Callback(){
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if(e != null) {
            System.out.println("记录异常信息到日志表中");
        }
        System.out.println(recordMetadata.offset());
    }
});

生产者参数配置

  • 消息确认
    确认机制说明
    acks=0生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快
    acks=1(默认值)只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应
    acks=all只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应
    prop.put(ProducerConfig.ACKS_CONFIG, "all");
    
  • 消息重传
    设置消息重传次数, 默认每次重试之间等待100ms
    prop.put(ProducerConfig.RETRIES_CONFIG, 10);
    
  • 消息压缩
    默认情况, 消息发送不会压缩
    使用压缩可以降低网络传输开销和存储开销, 而这往往是向kafka发送消息的瓶颈所在
    压缩算法说明
    snappy占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果看重性能和网络带宽,建议采用
    lz4占用较少的 CPU,压缩和解压缩速度较快,压缩比也很客观
    gzip占用较多的CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法
    prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    

消费者同步异步提交偏移量

// 同步提交偏移量
consumer.commitSync();

// 异步提交偏移量
consumer.commitAsync(new OffsetCommitCallback(){
    @Override
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
        if(e!=null){
            System.out.println("记录错误的提交偏移量"+map+", 异常信息为"+e);
        }
    }
});

// 同步异步提交
try {
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.key());
            System.out.println(record.value());
            System.out.println(record.partition());
            System.out.println(record.offset());
        }
        // 异步提交偏移量
        consumer.commitAsync();
    }
} catch (Exception e) {
    e.printStackTrace();
    System.out.println("记录错误的信息:"+e);
}finally {
    // 同步
    consumer.commitSync();
}

SpringBoot集成kafka

  • 依赖
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
    
  • 配置
    server:
      port: 9991
    spring:
      application:
        name: kafka-demo
      kafka:
        bootstrap-servers: 192.168.174.133:9092
        producer:
          retries: 10
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          group-id: ${spring.application.name}-test
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
  • Producer
    @RestController
    public class HelloController {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @GetMapping("/hello")
        public String hello()
        {
            kafkaTemplate.send("itcast-topic", "黑马程序员");
            return "ok";
        }
    }
    
  • Consumer
    @Component
    public class HelloListener {
    
        @KafkaListener(topics = "itcast-topic")
        public void onMessage(String message)
        {
            if(!StringUtils.isEmpty(message)){
                System.out.println(message);
            }
        }
    }
    
  • 传递对象
    // Producer
    User user = new User();
    user.setName("tom");
    user.setAge(18);
    kafkaTemplate.send("itcast-topic", JSON.toJSONString(user));
    
    // Consumer
    System.out.println(JSON.parseObject(message, User.class));
    

自媒体文章上下架

实现思路

具体实现

  • Producer
    public ResponseResult downOrUp(WmNewsDto dto) {
        // 1. 检验参数
        // 1.0 检查文章dto是否为空
        if(dto == null){
            return ResponseResult.errorRe(AppHttpCodeEnum.PARAM_INVALID, "不可缺少");
        }
        // 1.1 检查文章上架参数是否合法
        if(dto.getEnable() != 0 && dto.getEnabl!= 1){
            // 默认上架
            dto.setEnable((short) 1);
        }
        // 2. 查询文章
        WmNews news = getById(dto.getId());
        if(news == null){
            return ResponseResult.errorRe(AppHttpCodeEnum.DATA_NOT_EXIST, 存在");
        }
        // 3. 查询文章状态
        if(news.getStatus() != WmNews.StaPUBLISHED.getCode()){
            return ResponseResult.errorRe(AppHttpCodeEnum.PARAM_INVALID, 章不是发布状态, 不能上下架");
        }
        // 4. 上下架
        news.setEnable(dto.getEnable());
        updateById(new
        // 5. 发送消息, 通知article修改文章的配置
        if(news.getArticleId() != null){
            HashMap<String, Object> map = HashMap<>();
            map.put("articleId", news.getArtic());
            map.put("enable", news.getEnable());
            kafkaTemplate.(WmNewsMessageConstaWM_NEWS_UP_OR_DOWN_TOPIC, JtoJSONString(map));
    
        return ResponseResult.okRe(AppHttpCodeEnum.SUCCESS);
    }
    
  • Consumer
// Listener
@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
public void onMessage(String message)
{
    if(StringUtils.isNotBlank(message)){
        Map map = JSON.parseObject(message, Map.class);
        apArticleConfigService.updateByMap(map);
    }
}

// Service
public void updateByMap(Map map) {
    // 0 下架, 1 上架
    Object enable = map.get("enable");
    boolean isDown = true;
    if(enable.equals(1)){
        isDown = false;
    }
    // 修改文章
    update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId, map.get("articleId")).
            set(ApArticleConfig::getIsDown, isDown));
}

来源

黑马程序员. 黑马头条

Gitee

https://gitee.com/yu-ba-ba-ba/leadnews

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/892026.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JVM——配置常用参数,GC调优策略

文章目录 JVM 配置常用参数Java内存区域常见配置参数概览堆参数回收器参数项目中常用配置常用组合 常用 GC 调优策略GC 调优原则GC 调优目的GC 调优策略 JVM 配置常用参数 Java内存区域常见配置参数概览堆参数&#xff1b;回收器参数&#xff1b;项目中常用配置&#xff1b;常…

一、数学建模之线性规划篇

1.定义 2.例题 3.使用软件及解题 一、定义 1.线性规划&#xff08;Linear Programming&#xff0c;简称LP&#xff09;是一种数学优化技术&#xff0c;线性规划作为运筹学的一个重要分支&#xff0c;专门研究在给定一组线性约束条件下&#xff0c;如何找到一个最优的决策&…

YOLOv1基础

目录 深度学习经典检测方法指标分析核心思想网络架构损失函数非极大值抑制优缺点 深度学习经典检测方法 预选框&#xff0c;在论文中叫RPN&#xff0c;也就是区域建议网络 指标分析 核心思想 网络架构 损失函数 非极大值抑制 优缺点

【java毕业设计】基于Spring Boot+Vue+mysql的论坛管理系统设计与实现(程序源码)-论坛管理系统

基于Spring BootVuemysql的论坛管理系统设计与实现&#xff08;程序源码毕业论文&#xff09; 大家好&#xff0c;今天给大家介绍基于Spring BootVuemysql的论坛管理系统设计与实现&#xff0c;本论文只截取部分文章重点&#xff0c;文章末尾附有本毕业设计完整源码及论文的获取…

ssh远程连接慢解决方法

一、关闭SERVER上的GSS认证 将GSSAPIAuthentication改为no ,如果在配置文件中&#xff0c;以下值是被注释的就拿掉注释&#xff0c;因为默认开关就是yes # vi /etc/ssh/sshd_config GSSAPIAuthentication no二、关闭SERVER上DNS反向解析 在linux中&#xff0c;默认就是开启了S…

java代码审计11.1之反序列化基础学习

文章目录 1、 序列化与反序列化2、序列化与反序列化案例2.1、使用idea生成代码与serialVersionUID2.2、实例化对象2.3、序列化对象2.4、反序列化 3、稍微深入serialVersionUID综上小结&#xff0c; 4、transient 作⽤5、反序列化漏洞 之前的文章&#xff0c; php代码审计15.1之…

部署piwigo网页 通过cpolar分享本地电脑上的图片

通过cpolar分享本地电脑上有趣的照片&#xff1a;发布piwigo网页 文章目录 通过cpolar分享本地电脑上有趣的照片&#xff1a;发布piwigo网页前言1. 设定一条内网穿透数据隧道2. 与piwigo网站绑定3. 在创建隧道界面填写关键信息4. 隧道创建完成 总结 前言 首先在本地电脑上部署…

微服务最佳实践,零改造实现 Spring Cloud Apache Dubbo 互通

作者&#xff1a;孙彩荣 很遗憾&#xff0c;这不是一篇关于中间件理论或原理讲解的文章&#xff0c;没有高深晦涩的工作原理分析&#xff0c;文后也没有令人惊叹的工程数字统计。本文以实际项目和代码为示例&#xff0c;一步一步演示如何以最低成本实现 Apache Dubbo 体系与 S…

k8s集群监控方案--node-exporter+prometheus+grafana

目录 前置条件 一、下载yaml文件 二、部署yaml各个组件 2.1 node-exporter.yaml 2.2 Prometheus 2.3 grafana 2.4访问测试 三、grafana初始化 3.1加载数据源 3.2导入模板 四、helm方式部署 前置条件 安装好k8s集群&#xff08;几个节点都可以&#xff0c;本人为了方便实验k8s集…

搭载KaihongOS的工业平板、机器人、无人机等产品通过3.2版本兼容性测评,持续繁荣OpenHarmony生态

近日&#xff0c;搭载深圳开鸿数字产业发展有限公司&#xff08;简称“深开鸿”&#xff09;KaihongOS软件发行版的工业平板、机器人、无人机等商用产品均通过OpenAtom OpenHarmony&#xff08;以下简称“OpenHarmony”&#xff09;3.2 Release版本兼容性测评&#xff0c;获颁O…

探索Perfetto:开源性能追踪工具的未来之光

探索Perfetto&#xff1a;开源性能追踪工具的未来之光 1. 引言 A. 介绍Perfetto的背景和作用 随着移动应用、桌面软件和嵌入式系统的不断发展&#xff0c;软件性能优化变得愈发重要。在这个背景下&#xff0c;Perfetto作为一款开源性能追踪工具&#xff0c;日益引起了开发者…

LangChain手记 Agent 智能体

整理并翻译自DeepLearning.AILangChain的官方课程&#xff1a;Agent&#xff08;源代码可见&#xff09; “人们有时会将LLM看作是知识库&#xff0c;因为它被训练所以记住了来自互联网或其他地方的海量信息&#xff0c;因而当你向它提问时&#xff0c;它可以回答你的问题。有一…

centos7 yum获取软件所有依赖包 创建本地yum源 yum离线安装软件

centos7 yum获取软件所有依赖包 创建本地yum源 离线安装软件 1、以安装docker 20.10为例2、centos7 yum获取docker 20.10 所有依赖包3、创建本地docker yum源4、yum使用本地docker源 离线安装docker 1、以安装docker 20.10为例 参考链接&#xff1a; 添加docker 清华软件源 y…

Spring Clould 搜索技术 - elasticsearch

视频地址&#xff1a;微服务&#xff08;SpringCloudRabbitMQDockerRedis搜索分布式&#xff09; 初识ES-什么是elasticsearch&#xff08;P77&#xff0c;P78&#xff09; 1.elasticsearch的作用 elasticsearch是一款非常强大的开源搜索引擎&#xff0c;具备非常多强大功能…

【论文阅读】 Model Sparsity Can Simplify Machine Unlearning

Model Sparsity Can Simplify Machine Unlearning 背景主要内容Contribution Ⅰ&#xff1a;对Machine Unlearning的一个全面的理解Contribution Ⅱ&#xff1a;说明model sparsity对Machine Unlearning的好处Pruning方法的选择sparse-aware的unlearning framework Experiments…

选择大型语言模型自定义技术

推荐&#xff1a;使用 NSDT场景编辑器 助你快速搭建可二次编辑器的3D应用场景 企业需要自定义模型来根据其特定用例和领域知识定制语言处理功能。自定义LLM使企业能够在特定的行业或组织环境中更高效&#xff0c;更准确地生成和理解文本。 自定义模型使企业能够创建符合其品牌…

Android Studio实现解析HTML获取图片URL将图片保存到本地

目录 效果activity_main.xmlMainActivityImageItemImageAdapter 效果 项目本来是要做成图片保存到手机然后读取数据后瀑布流展示&#xff0c;但是有问题&#xff0c;目前只能做到保存到手机 activity_main.xml <?xml version"1.0" encoding"utf-8"?…

《python编程基础及应用》,python编程基础及应用pdf

大家好&#xff0c;小编为大家解答python编程基础课后答案上海交通大学出版社周志化的问题。很多人还不知道python编程基础及应用课后答案高等教育出版社&#xff0c;现在让我们一起来看看吧&#xff01; 单项选择题 第一章python语法基础 1. Python 3.x 版本的保留字总数是C A…

UDP TCP 报文内容

1.UDP 2.TCP 源/目的端口号:表示数据是从哪个进程来,到哪个进程去; 32位序号/32位确认号:后面详细讲;4位TCP报头长度:表示该TCP头部有多少个32位bit(有多少个4字节);所以TCP头部最大长度是15*460 6位标志位: o URG:紧急指针是否有效 ——urgent 紧急的 o ACK:确认号是否有…

K8S核心组件etcd详解(上)

1 介绍 https://etcd.io/docs/v3.5/ etcd是一个高可用的分布式键值存储系统&#xff0c;是CoreOS&#xff08;现在隶属于Red Hat&#xff09;公司开发的一个开源项目。它提供了一个简单的接口来存储和检索键值对数据&#xff0c;并使用Raft协议实现了分布式一致性。etcd广泛应用…