某马头条——day06

news2025/1/16 13:54:53

自媒体文章上下架

使用消息队列在自媒体下架时通知文章微服务。 

kafka概述

 

kafka环境搭建

docker pull zookeeper:3.4.14
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

安装kafka

docker pull wurstmeister/kafka:2.12-2.3.1
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

kafka入门案例

 

依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

生产者类 

package com.heima.kafka.sample;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * 生产者
 */
public class ProducerQuickStart {

    public static void main(String[] args) {
        //1.kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //2.生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

        //封装发送的消息
        ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");

        //3.发送消息
        producer.send(record);

        //4.关闭消息通道,必须关闭,否则消息发送不成功
        producer.close();
    }

}

 消费者类

package com.heima.kafka.sample;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * 消费者
 */
public class ConsumerQuickStart {

    public static void main(String[] args) {
        //1.添加kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //2.消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        //3.订阅主题
        consumer.subscribe(Collections.singletonList("itheima-topic"));

        //当前线程一直处于监听状态
        while (true) {
            //4.获取消息
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
            }
        }

    }

}

存在一个消费者组和多个消费者组的区别

 分区机制

相当于数据分片?redis也有这个机制。

高可用设计方案

kafka生产者详解

发送类型

参数详解

acks=0和UDP很像

kafka消费者详解

多个分区无法保证消息的有序性,相当于,a先后发了两条消息给b,两条消息进了不同的分区,结果因为网络原因导致b先接收到了第二条消息。

提交和偏移量

这里因为不是每次消费都提交就会出现丢失和重复的问题。

偏移量提交方式

SpringBoot集成kafka收发消息

1.导入spring-kafka依赖信息

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- kafkfa -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
</dependencies>

2.在resources下创建文件application.yml

server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 3.消息生产者

package com.heima.kafka.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping("/hello")
    public String hello(){
        kafkaTemplate.send("itcast-topic","黑马程序员");
        return "ok";
    }
}

4.消息消费者

package com.heima.kafka.listener;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class HelloListener {

    @KafkaListener(topics = "itcast-topic")
    public void onMessage(String message){
        if(!StringUtils.isEmpty(message)){
            System.out.println(message);
        }

    }
}

传递消息为对象时

发送

@GetMapping("/hello")
public String hello(){
    User user = new User();
    user.setUsername("xiaowang");
    user.setAge(18);

    kafkaTemplate.send("user-topic", JSON.toJSONString(user));

    return "ok";
}

接受

@Component
public class HelloListener {

    @KafkaListener(topics = "user-topic")
    public void onMessage(String message){
        if(!StringUtils.isEmpty(message)){
            User user = JSON.parseObject(message, User.class);
            System.out.println(user);
        }

    }
}

自媒体文章上下架功能实现

  • 已发表且已上架的文章可以下架

  • 已发表且已下架的文章可以上架

流程说明

 文章表存在一个属性字段是否上架

功能接口开发

在heima-leadnews-wemedia工程下的WmNewsController新增方法

@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto dto){
    return wmNewsService.downOrUp(dto);
}

在WmNewsDto中新增enable属性 

    /**
     * 上下架 0 下架  1 上架
     */
    private Short enable;

在WmNewsService新增方法

/**
 * 文章的上下架
 * @param dto
 * @return
 */
public ResponseResult downOrUp(WmNewsDto dto);

实现方法

/**
 * 文章的上下架
 * @param dto
 * @return
 */
@Override
public ResponseResult downOrUp(WmNewsDto dto) {
    //1.检查参数
    if(dto.getId() == null){
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }

    //2.查询文章
    WmNews wmNews = getById(dto.getId());
    if(wmNews == null){
        return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在");
    }

    //3.判断文章是否已发布
    if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态,不能上下架");
    }

    //4.修改文章enable
    if(dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2){
        update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable())
                .eq(WmNews::getId,wmNews.getId()));
    }
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

测试通过

消息通知article数据同步

 

在heima-leadnews-common模块下导入kafka依赖

<!-- kafkfa -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

在自媒体端的nacos配置中心配置kafka的生产者

spring:
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

在自媒体端文章上下架后发送消息

//发送消息,通知article端修改文章配置
if(wmNews.getArticleId() != null){
    Map<String,Object> map = new HashMap<>();
    map.put("articleId",wmNews.getArticleId());
    map.put("enable",dto.getEnable());
    kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
}

在article端的nacos配置中心配置kafka的消费者

spring:
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    consumer:
      group-id: ${spring.application.name}
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

在article端编写监听,接收数据

import com.alibaba.fastjson.JSON;
import com.heima.article.service.ApArticleConfigService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@Slf4j
public class ArtilceIsDownListener {

    @Autowired
    private ApArticleConfigService apArticleConfigService;

    @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
    public void onMessage(String message){
        if(StringUtils.isNotBlank(message)){
            Map map = JSON.parseObject(message, Map.class);
            apArticleConfigService.updateByMap(map);
            log.info("article端文章配置修改,articleId={}",map.get("articleId"));
        }
    }
}

新建ApArticleConfigService

public interface ApArticleConfigService extends IService<ApArticleConfig> {

    /**
     * 修改文章配置
     * @param map
     */
    public void updateByMap(Map map);
}

实现

@Service
@Slf4j
@Transactional
public class ApArticleConfigServiceImpl extends ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements ApArticleConfigService {


    /**
     * 修改文章配置
     * @param map
     */
    @Override
    public void updateByMap(Map map) {
        //0 下架 1 上架
        Object enable = map.get("enable");
        boolean isDown = true;
        if(enable.equals(1)){
            isDown = false;
        }
        //修改文章配置
        update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown));

    }
}

测试成功tmd

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1398785.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CVE-2023-46226 Apache iotdb远程代码执行漏洞

项目介绍 Apache IoTDB 是针对时间序列数据收集、存储与分析一体化的数据管理引擎。它具有体量轻、性能高、易使用的特点&#xff0c;完美对接 Hadoop 与 Spark 生态&#xff0c;适用于工业物联网应用中海量时间序列数据高速写入和复杂分析查询的需求。 项目地址 https://io…

像 Google SRE 一样 OnCall

在 Google SRE 的著作《Google运维解密》(原作名&#xff1a;Site Reliability Engineering: How Google Runs Production Systems)中&#xff0c;Google SRE 的关键成员们几乎不惜用了三个章节的篇幅描述了在 Google 他们是如何 OnCall 的。 Google SRE 实践中&#xff0c;有…

python--re库

目录 re库初识 re库基础使用方法 compile()函数 基本用法 正则表达式常用规则字符 match与search方法 match search match/search findall与finditer方法 使用findall()返回所有匹配项 使用findall()提取多个组的匹配 使用finditer()逐个返回Match对象 使用findi…

会声会影2024旗舰版新功能介绍及2024最新视频制作教程

随着科技的不断发展&#xff0c;视频制作已经不再是专业人士的专属领域&#xff0c;越来越多的人开始使用各种视频制作软件来记录生活、创作内容。其中&#xff0c;会声会影是被广泛使用的一款视频制作软件&#xff0c;其旗舰版更是备受关注。 据悉&#xff0c;会声会影2024旗舰…

pyqt5+python子域名扫描程序

import sysfrom PyQt5 import uic from PyQt5.QtWidgets import * #requests库内置了不同的方法来发送不同类型的http请求 import requests#BS主要功能是从网页抓取数据&#xff0c;提供一些简单的、python 式的函数用来处理导航、搜索、修改分析树等功能 from bs4 import Beau…

Google Gemini API快速上手

一、前言 12月6日&#xff0c;谷歌发布新一代大模型Gemini的demo, 同时&#xff0c;Bard已将模型更新为Gemini Pro Gemini 是谷歌目前最新最强的大语言模型&#xff0c;支持多模态&#xff08;文字&#xff0c;图片&#xff0c;音频&#xff0c;视频等等&#xff09;处理 美…

K8S--部署Nacos

原文网址&#xff1a;K8S--部署Nacos-CSDN博客 简介 本文介绍K8S部署Nacos的方法。Nacos版本是&#xff1a;2.2.3。 部署方案 本文为了简单&#xff0c;使用此部署方式&#xff1a;使用本地pvconfigmap&#xff0c;以embedded模式部署单机nacos。以nodePort方式暴露端口。 …

如何使用 Typora 进行效率写作

引言 在数字化时代&#xff0c;写作已经成为我们生活中不可或缺的一部分。为了提高写作效率&#xff0c;寻找一款简单而功能强大的编辑器显得尤为重要。而Typora&#xff0c;作为一款所见即所得的Markdown编辑器&#xff0c;以其高效的特性和用户友好的界面吸引了众多写作者的…

Linux设备管理模型-02:sysfs

文章目录 sysfs1 使用sysfs控制GPIO2 sysfs编程2.1 完善sysfs属性文件的读写操作 上一篇文: 设备管理模型中的基础数据结构 sysfs sysfs是用于导出内核对象的文件系统&#xff0c;它是一个基于ram的文件系统&#xff0c;最初基于ramfs。 sysfs通常挂载在/sys目录下。它提供了一…

推荐新版AI智能聊天系统网站源码ChatGPT NineAi

Nine AI.ChatGPT是基于ChatGPT开发的一个人工智能技术驱动的自然语言处理工具&#xff0c;它能够通过学习和理解人类的语言来进行对话&#xff0c;还能根据聊天的上下文进行互动&#xff0c;真正像人类一样来聊天交流&#xff0c;甚至能完成撰写邮件、视频脚本、文案、翻译、代…

管理信息系统知识点复习

目录 一、名词解释题1.企业资源规划(ERP)2.面向对象方法&#xff1a;3.电子健康&#xff1a;4.供应链5.数据挖掘6.“自上而下”的开发策略&#xff1a;7.业务流程重组8.面向对象&#xff1a;9.决策支持系统10.聚类11.集成开发环境&#xff1a;12.供应商协同13.数据仓库14.深度学…

interpret,一个超酷的 Python 库

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 大家好&#xff0c;今天为大家分享一个超酷的 Python 库 - interpret。 Github地址&#xff1a;https://github.com/interpretml/interpret Python Interpret 是一个强大的开源工具&#xff0c;它为 Python 开发…

我为什么不建议使用框架默认的 DefaultMeterObservationHandler

我为什么不建议使用框架默认的 DefaultMeterObservationHandler 个人创作公约&#xff1a;本人声明创作的所有文章皆为自己原创&#xff0c;如果有参考任何文章的地方&#xff0c;会标注出来&#xff0c;如果有疏漏&#xff0c;欢迎大家批判。如果大家发现网上有抄袭本文章的&a…

传统语音识别系统流程

文章目录 概述语音识别原理公式语音识别术语&#xff1a;分帧提取声学特征声学模型 概述 语音识别传统方法主要分两个阶段&#xff1a;训练和识别&#xff0c;训练阶段主要是生成声学模型和语言模型给识别阶段用。传统方法主要有五大模块组成&#xff0c;分别是特征提取&#…

JVM 如何判断一个对象可以被回收

Hi&#xff0c; 我是 浮生。 今天分享一道一线互联网公司必问的面试题。 ”JVM 如何判断一个对象可以被回收“ 关于这个问题&#xff0c;来看看高手的回答。 一、问题解析 在 JVM 里面&#xff0c;要判断一个对象是否可以被回收&#xff0c;最重要的是判断这个对象是否还在被…

XHCMS靶场小记(熊海)

文件包含漏洞 template下的header.php中存在文件包含漏洞&#xff08;该文件被file文件夹下的多数文件进行包含&#xff09; f参数可以包含任意文件通过php格式解析&#xff08;这是文件包含点&#xff09; 代码分析 根目录下的index.php文件&#xff1b;r参数用于获取包含文…

怎样使用崭新的硬盘

新买的一块硬盘&#xff0c;接到电脑上&#xff0c;打开机器&#xff0c;却找不到新的硬盘&#xff0c;怎么回事&#xff1f;新的硬盘是坏的么&#xff1f;怎样才能把新硬盘用起来&#xff1f; 可能有几种原因导致您的电脑无法识别新的硬盘。以下是一些建议的解决方法&#xff…

SOCKET编程和TCP通信案例三次握手四次挥手

文章目录 一、SOCKET1、网络套接字SOCKET2、网络字节序2.1、小端法2.2、大端法2.3、字节序转换3、IP地址转换函数3.1、本地字节序转网络字节序3.1.1、函数原型&#xff1a;3.1.2、返回值3.2、网络字节序转本地字节序3.2.1、函数原型3.2.2、返回值4、sockaddr地址结构&#xff0…

Android Termux技能大揭秘:安装MySQL并实现公网远程连接

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;网络奇遇记、Cpolar杂谈 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 &#x1f4cb;前言一. 安装MariaDB二. 安装cpolar内网穿透工具三. 创建安全隧道映射mysql四. 公网…

Linux下安装docker

1、查看系统版本 Docker支持64位版本的CentOS 7和CentOS 8及更高版本&#xff0c;它要求Linux内核版本不低于3.10。查看Linux版本的命令这里推荐两种&#xff1a;lsb_release -a或cat /etc/redhat-release。 显然&#xff0c;当前Linux系统为CentOS7。再查一下内核版本是否不低…