Kafka(三)生产者发送消息

news2025/1/6 20:02:58

文章目录

  • 生产者发送思路
  • 自定义序列化类
  • 配置生产者参数
    • 提升吞吐量
  • 发送消息
  • 关闭生产者
  • 结语
  • 示例源码仓库

生产者发送思路

如何确保消息格式正确的前提下最终一定能发送到Kafka? 这里的实现思路是

  1. ack使用默认的all
  2. 开启重试
  3. 在一定时间内重试不成功,则入库,后续由定时任务继续发送
  4. 这里在某些异常情况下一定会生产重复消息,如何确保消息只消费一次,后续在Consumer实现中详细展开
  5. 这里我们只要确保生产的消息,不论重试多少次,最终都只会被发送到同一分区。Kafka的确定消息的分区策略是: 如果提供了key,则根据hash(key)计算分区。由于我们每个消息都有一个消息ID,不管是重试多少次,ID是不会变的,同时我们不会在消息高峰阶段调整分区数量。所以基于这些,我们保证一个消息无论多少次,都会发送到同一分区。

自定义序列化类

消息格式为JSON, 使用Jackson将类序列化为JSON字符串

public class UserDTOSerializer implements Serializer<UserDTO> {
    
    @Override
    @SneakyThrows
    public byte[] serialize(final String s, final UserDTO userDTO) {
        ObjectMapper objectMapper = new ObjectMapper();
        return objectMapper.writeValueAsBytes(userDTO);
    }
}

配置生产者参数

有几点需要注意

  1. 开启压缩
  2. retries 官方建议不配置, 官方建议使用delivery.timeout.ms 参数来控制重试时间, 默认2分钟
  3. buffer.memory 如果没有什么特别情况,使用默认的即可, 32MB
  4. ack使用默认的all
    /**
     * 以下配置建议搭配 官方文档 + kafka权威指南相关章节 + 实际业务场景吞吐量需求 自己调整
     * 如果是本地, bootstrap.server的IP地址和docker-compose.yml中的EXTERNAL保持一致
     * @return
     */
    public static Properties loadProducerConfig(String valueSerializer) {
        Properties result = new Properties();
        result.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "l192.168.0.102:9093");
        result.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        result.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        result.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        // 每封邮件消息大小大约20KB, 使用默认配置吞吐量不高,下列配置增加kafka的吞吐量
        // 默认16384 bytes,太小了,这会导致邮件消息一个一个发送到kafka,达不到批量发送的目的,不符合发送邮件的场景
        result.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576 * 10);
        // 默认1048576 bytes,限制的是一个batch的大小,对于20KB的消息来说,消息太小
        result.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576 * 10);
        // 等10ms, 为了让更多的消息聚合到一个batch中,提高吞吐量
        result.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        return result;
    }

提升吞吐量

  • 在实际场景中,我们的邮件消息一个大概20KB,而batch.size默认是16KB,也就是说,在不修改该参数的情况下,生产者只能一个一个的发消息,这会导致我们的吞吐量上不去, 所以修改batch.size为10MB
  • 只修改这个参数还不行, max.request.size 限制了单次请求的大小,默认为1MB,也就是说即使batch.size为10MB,但是由于一次只能最多发1MB,吞吐量也上不去,所以这里将max.request.size也改为10MB
  • 由于我们将一个批次可发送的数量大大提高,所以可以让生产者等一会再发,等更多的数据到达。linger.ms默认是为0,也就是立刻发送,根据实际情况适当增加等待时间

发送消息

@Log
public class MessageProducer {
    
    public static final KafkaProducer<String, UserDTO> PRODUCER = new KafkaProducer<>(KafkaConfiguration.loadProducerConfig(UserDTOSerializer.class.getName()));
    
    private MessageFailedService messageFailedService = new MessageFailedService();

    /**
     * kafka producer 发送失败时会进行重试,相关参数 retries 和 delivery.timeout.ms, 官方建议使用delivery.timeout.ms,默认2分钟
     * 也就是说在2分钟之后,下列代码中的回调函数会被调用,重试多少次回调函数就会被调用多少次,所以我们在重试期间只要保存一次失败的消息就好,如果在重试期间成功,则去更新
     * @param userDTO
     */
    public void sendMessage(final UserDTO userDTO) {
        ProducerRecord<String, UserDTO> user = new ProducerRecord<>("email", userDTO.getMessageId(),  userDTO);
        try {
            PRODUCER.send( user, (recordMetadata, e) -> {
                // 第一次失败, 应该只保存一次,不应该每次都保存
                if (Objects.nonNull(e) && !ProducerMessageIdCache.contains(userDTO.getMessageId())) {
                    log.severe("message has sent failed");
                    saveOrUpdateFailedMessage(userDTO);
                    ProducerMessageIdCache.add(userDTO.getMessageId());
//                    重试时成功了,应该去更新
                }else if (ProducerMessageIdCache.contains(userDTO.getMessageId()) && Objects.isNull(e)) {
                    saveOrUpdateFailedMessage(userDTO);
                    ProducerMessageIdCache.remove(userDTO.getMessageId());
                } else {
                    log.info("message has sent to topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition() );
                }
            });
        } catch (TimeoutException e) {
            log.info("send message to kafka timeout, message: ");
            // TODO: 自定义逻辑,比如发邮件通知kafka管理员
        }
    }
    
    /**
     * @param userDTO
     */
    @SneakyThrows
    private void saveOrUpdateFailedMessage(final UserDTO userDTO) {
        MessageFailedEntity messageFailedEntity = new MessageFailedEntity();
        messageFailedEntity.setMessageId(userDTO.getMessageId());
        ObjectMapper mapper = new ObjectMapper();
        messageFailedEntity.setMessageContentJsonFormat(mapper.writeValueAsString(userDTO));
        messageFailedEntity.setMessageType(MessageType.EMAIL);
        messageFailedEntity.setMessageFailedPhrase(MessageFailedPhrase.PRODUCER);
        messageFailedService.saveOrUpdateMessageFailed(messageFailedEntity);
    }
}

对上述代码做几点解释

  1. 我们使用异步的方式发送,如果发送成功,打印一条消息
  2. 关键在于重试,如果第一次失败了,在KafkaProducer重试期间,只第一次去向数据库中插入数据,这样做不会频繁去更新数据库;如果在重试期间,消息发送成功了,则去更新数据库;如果在重试期间没成功,则不更新
  3. 对于是不是第一次失败,使用全局的缓存,我这里为了演示,使用的是Map代替。实际场景中可以使用Ehcache或Redis自己实现
public class ProducerMessageIdCache {
    
    private static final Map<String, Integer> MESSAGE_IDS = new ConcurrentHashMap<>();
    
    public static void add(String messageId) {
        MESSAGE_IDS.put(messageId, 0);
    }
    
    public static void remove(String messageId) {
        MESSAGE_IDS.remove(messageId);
    }
    
    public static boolean contains(String messageId) {
        return MESSAGE_IDS.containsKey(messageId);
    }
    
    // TODO 定时清理过期的messageId
    
    
}

关闭生产者

实现ServletContextListener接口, 然后在web.xml的listener元素中配置

public class KafkaListener implements ServletContextListener {

    private static final List<KafkaProducer> KAFKA_PRODUCERS = new LinkedList<>();

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        KAFKA_PRODUCERS.add(MessageProducer.PRODUCER);
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        KAFKA_PRODUCERS.forEach(KafkaProducer::close);
    }
}
<?xml version="1.0" encoding="UTF-8" ?>
<web-app xmlns="https://jakarta.ee/xml/ns/jakartaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee
                      https://jakarta.ee/xml/ns/jakartaee/web-app_6_0.xsd"
         version="6.0">

  <listener>
    <listener-class>com.business.server.listener.KafkaListener</listener-class>
  </listener>
</web-app>

结语

  1. 在实际编码过程中,可以参考官方写的Kafka权威指南对应章节书写,或者参考各大云服务厂商的Kafak的开发者文档。不过我建议还是看Kafka权威指南, 我看了阿里云和华为云的,虽然都号称兼容开源Kafka,但是发现其版本和开源版本之间存在一定的滞后性,许多最佳实践已经过时
  2. Kafka生产者端没什么特别的,主要是根据业务场景设计消息格式,以及如何尽可能的减小消息体积
  3. 如果你的消息很大,比我的场景还大,达到了1M以上,生产者的吞吐量是个问题,消费者的消费速度也是个问题。你要是问我有什么好的想法,没有具体场景,我确实想不出什么好的方式

示例源码仓库

  1. Github地址
  2. 项目下business-server module代表生产者
  3. 运行时IDEA配置如下在这里插入图片描述
    注意Application context的路径, 启动之后访问端口+Application context, 例如
http://localhost:8999/business-server

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1204116.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于Python实现,调用百度通用翻译API-详解

概述 在工作上需要各个国家语言的翻译方面很多地方用的上。 获取API权限: 登录百度账号,在个人信息界面,包括修改密码、绑定手机、身份人证等 https://api.fanyi.baidu.com/api/trans/product/desktop?req=developer 百度翻译开放平台 在开发者中心:需要开通个人账号…

13.利用辗转相除法求两个整数的最大公约数和最小公倍数。如96,36

文章目录 前言一、题目描述 二、题目分析 三、解题 前言 本系列为循环结构编程题&#xff0c;点滴成长&#xff0c;一起逆袭。 一、题目描述 利用辗转相除法求两个整数的最大公约数和最小公倍数,如96,36 二、题目分析 最小公倍数(输入的两个数之积)除(它们的最大公约数) 三…

【斗破年番】老婆们见面针锋相对,万蝎门勾结魂殿,大战一触即发

Hello,小伙伴们&#xff0c;我是小郑继续为大家深度解析国漫资讯。 深度爆料&#xff0c;斗破苍穹年番70集剧情内容。萧炎对小医仙的信任可谓是深不可测&#xff0c;然而美杜莎与小医仙之间的隔阂却始终存在。尽管美杜莎和紫妍都已加入毒宗&#xff0c;但两人之间的冲突从未停…

模型剪枝Lab

这里是MIT 6.5940 Fall 2023的第一个实验Lab1的一些笔记&#xff0c;课程传送门&#xff1a;Han Lab Setup First, install the required packages and download the datasets and pretrained model. Here we use CIFAR10 dataset and VGG network which is the same as what…

java 之 泛型的详细介绍

文章目录 1. 为什么需要泛型&#xff1f;2. 泛型的基本概念3. 泛型类的例子4. 泛型方法的例子5. 泛型的好处 泛型使用的位置1. 泛型类&#xff08;Generic Class&#xff09;&#xff1a;2. 泛型接口&#xff08;Generic Interface&#xff09;&#xff1a;3. 泛型方法&#xf…

vue2按需导入Element(vite打包)

1.安装element 说明&#xff1a;-S是生产依赖。 npm install element-ui2 -S 2.安装babel-plugin-component 说明&#xff1a;-D是开发模式使用。 npm install babel-plugin-component -D 3. vite.config.js 说明&#xff1a;借助 babel-plugin-component &#xff0c;我们可…

人工智能极简史:一文读懂ChatGPT的前世今生

2022年11月30日&#xff0c;OpenAI推出的一款人工智能技术驱动的自然语言处理工具——ChatGPT&#xff0c;迅速在社交媒体上走红&#xff0c;短短5天&#xff0c;注册用户数就超过100万。 2023年1月末&#xff0c;ChatGPT的月活用户已突破1亿&#xff0c;一度成为史上增长最快的…

JSON.parse --- 搜索框

一 &#xff0c; JSON.parse this.num_normsTwo JSON.parse(res.result.normsTwo) 二. 搜索框 <template><div class"app-container"><span style"margin-left:120px;margin-right: 20px;width: 100px; display: inline-block;">物…

大语言模型可以学习规则11.13

大型语言模型可以学习规则 摘要1 引言2 准备3 从假设到理论3.1 诱导阶段&#xff1a;规则生成和验证3.2 演绎阶段&#xff1a;使用规则库进行显式推理 4 实验评估实验装置4.2 数字推理 5 相关工作 摘要 当提示一些例子和中间步骤时&#xff0c;大型语言模型&#xff08;LLM&am…

【网络豆送书第六期】《嵌入式虚拟化技术与应用》

作者简介&#xff1a;一名云计算网络运维人员、每天分享网络与运维的技术与干货。 公众号&#xff1a;网络豆云计算学堂 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a; 网络豆的主页​​​​​ 本期好书推荐&#xff1a;《嵌入式虚拟化技术与应用…

面试 | 再也不怕被问 Binder 机制了

Binder 机制 Binder 机制是 Android 特有的一种进程间通信&#xff08;IPC&#xff09;方式 1.1 Binder 机制的作用和原理&#xff1f; Linux系统将一个进程分为用户空间和内核空间。对于进程之间来说&#xff0c;用户空间的数据不可共享&#xff0c;内核空间的数据可共享&a…

Django下的Race Condition漏洞

目录 环境搭建 无锁无事务的竞争攻击复现 无锁有事务的竞争攻击复现 悲观锁进行防御 乐观锁进行防御 环境搭建 首先我们安装源码包&#xff1a;GitHub - phith0n/race-condition-playground: Playground for Race Condition attack 然后将源码包上传到Ubuntu 为了方便使…

python双端队列_中间是头两边是尾_两边是头中间是尾

双端队列的顺序表存储结构以及两种特殊的双端队列 双端队列 是一种允许我们同时从前端和后端添加和删除元素的特殊队列&#xff0c;它是队列和栈的结合体。 双端队列&#xff08;deque&#xff09;与队列&#xff08;queue&#xff09;就差了两个字&#xff0c;队列里元素只能…

uniapp——项目day04

购物车页面——商品列表区域 渲染购物车商品列表的标题区域 1. 定义如下的 UI 结构&#xff1a; 2.美化样式 渲染商品列表区域的基本结构 1. 通过 mapState 辅助函数&#xff0c;将 Store 中的 cart 数组映射到当前页面中使用&#xff1a; import badgeMix from /mixins/tab…

2023年【建筑电工(建筑特殊工种)】找解析及建筑电工(建筑特殊工种)复审考试

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 建筑电工(建筑特殊工种)找解析是安全生产模拟考试一点通生成的&#xff0c;建筑电工(建筑特殊工种)证模拟考试题库是根据建筑电工(建筑特殊工种)最新版教材汇编出建筑电工(建筑特殊工种)仿真模拟考试。2023年【建筑电…

1 Supervised Machine Learning Regression and Classification

文章目录 Week1OverViewSupervised LearningUnsupervised LearningLinear Regression ModelCost functionGradient Descent Week2Muliple FeatureVectorizationGradient Descent for Multiple RegressionFeature ScalingGradient DescentFeature EngineeringPolynomial Regress…

数据结构线性表——栈

前言&#xff1a;哈喽小伙伴们&#xff0c;今天我们将一起进入数据结构线性表的第四篇章——栈的讲解&#xff0c;栈还是比较简单的哦&#xff0c;跟紧博主的思路&#xff0c;不要掉队哦。 目录 一.什么是栈 二.如何实现栈 三.栈的实现 栈的初始化 四.栈的操作 1.数据入栈…

CTFhub-RCE-读取源代码

源代码&#xff1a; <?php error_reporting(E_ALL); if (isset($_GET[file])) { if ( substr($_GET["file"], 0, 6) "php://" ) { include($_GET["file"]); } else { echo "Hacker!!!"; } } else {…

重磅!2024QS亚洲大学排名出炉!北大蝉联榜首,港大反超新国立、清华!

2023年11月8日&#xff0c;全球高等教育分析机构QS Quacquarelli Symonds发布了2024年QS世界大学排名&#xff1a;亚洲大学排名。 本次排名全方位评估了来自亚洲25个国家和地区的856所大学在全球认可度、研究实力、教学资源和国际化等方面的表现&#xff0c;有148所院校首次跻…

绝望了,软件测试的行业基本盘,崩了......

不得不承认&#xff0c;现在工作不好找 去年很多人都觉得今年的就业环境会好很多&#xff0c;但是到了现在都发现之前想错了&#xff0c;实际上是一塌糊涂… 于是有了很多年轻人焦虑日常的灵魂一问&#xff1a;“快半年了&#xff0c;找不到工作&#xff0c;我好焦虑&#xf…