RabbitMQ系列【18】对象序列化机制

news2025/1/19 3:14:47

有道无术,术尚可求,有术无道,止于术。

文章目录

    • 前言
    • 发送对象
    • 接收对象
    • 使用Jackson 序列化

前言

使用RabbitMQ原生API,发送消息时,发送的是二进制byte[]数据。

    void basicPublish(String var1, String var2, byte[] var4) throws IOException;

使用RabbitTemplate.send方法发送Message对象,也是二进制byte[]数据。

    public Message(byte[] body) {
        this(body, new MessageProperties());
    }

在接收时,需要将二进制数据转为你想要的数据格式。在JAVA 编程中都是基于对象操作,一般消息都是对象,比如订单、日志。

所以RabbitTemplate提供了convertAndSend方法,可以直接发送对象,那么对象在网络传输,就涉及到了序列化机制。

发送对象

首先我们看下RabbitTemplate.convertAndSend是如何工作及序列化对象的。

发送一个用户User 对象,该对象需要实现Serializable序列化接口。

        User user = new User();
        user.setName("张三");
        rabbitTemplate.convertAndSend("bbdbdbdb","aaa.key",user);

convertAndSend也是调用send方法,只是多了一个convertMessageIfNecessary,将对象转为二进制数组,并封装到Message对象中。

   public void convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) throws AmqpException {
   		// this.convertMessageIfNecessary(object) 将JAVA 消息对象转为`Message`
        this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);
    }

convertMessageIfNecessary会判断当前消息是否是Message类型,如果是直接返回,不是则调用消息转换器进行转换。

   protected Message convertMessageIfNecessary(Object object) {
        return object instanceof Message ? (Message)object : this.getRequiredMessageConverter().toMessage(object, new MessageProperties());
    }

获取消息转换器,直接通过RabbitTemplate.getMessageConverter获取其成员属性,也就是SimpleMessageConverter,这是默认值。

    private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
    	//  private MessageConverter messageConverter = new SimpleMessageConverter();
        MessageConverter converter = this.getMessageConverter();
        if (converter == null) {
            throw new AmqpIllegalStateException("No 'messageConverter' specified. Check configuration of RabbitTemplate.");
        } else {
            return converter;
        }
    }

接着调用消息转换器的toMessage方法,

    public final Message toMessage(Object object, @Nullable MessageProperties messagePropertiesArg, @Nullable Type genericType) throws MessageConversionException {
    	// 1. 创建消息属性对象
        MessageProperties messageProperties = messagePropertiesArg;
        if (messagePropertiesArg == null) {
            messageProperties = new MessageProperties();
        }
		// 2. 创建Message对象
        Message message = this.createMessage(object, messageProperties, genericType);
        messageProperties = message.getMessageProperties();
        if (this.createMessageIds && messageProperties.getMessageId() == null) {
            messageProperties.setMessageId(UUID.randomUUID().toString());
        }

        return message;
    }

createMessage创建Message 对象并返回。如果不是 byte[]String类型,最后会查看消息对象是否实现了Serializable接口,如果是,则进行序列化,并设置ContentType:application/x-java-serialized-object,以上都是不是则会抛出IllegalArgumentException异常。

    protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        byte[] bytes = null;
        // 1. byte[] 类型
        if (object instanceof byte[]) {
            bytes = (byte[])object;
            // 设置消息属性 ContentType:application/octet-stream
            messageProperties.setContentType("application/octet-stream");
        } else if (object instanceof String) {
        	// 2. String 类型
            try {
            	// 转为字节
                bytes = ((String)object).getBytes(this.defaultCharset);
            } catch (UnsupportedEncodingException var6) {
                throw new MessageConversionException("failed to convert to Message content", var6);
            }
			// 设置消息属性 ContentType:text/plain
            messageProperties.setContentType("text/plain");
            // 设置消息属性 ContentEncoding:UTF-8
            messageProperties.setContentEncoding(this.defaultCharset);
        } else if (object instanceof Serializable) {
        	// 3. 实现了 Serializable接口
            try {
            	// 转为byte[] 
                bytes = SerializationUtils.serialize(object);
            } catch (IllegalArgumentException var5) {
                throw new MessageConversionException("failed to convert to serialized Message content", var5);
            }
			// 设置消息属性 ContentType:application/x-java-serialized-object
            messageProperties.setContentType("application/x-java-serialized-object");
        }

        if (bytes != null) {
        	// 4. 设置长度
            messageProperties.setContentLength((long)bytes.length);
            // 5. 返回`Message`对象 
            return new Message(bytes, messageProperties);
        } else {
            throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
        }
    }

Message 创建成功后,调用原生的channel.basicPublish方法,发送消息对象、属性。

    protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory, Message message) throws IOException {
        AMQP.BasicProperties convertedMessageProperties = this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding);
        channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
    }

查看控制台,可以看到对象消息的相关信息:
在这里插入图片描述

接收对象

在消费者接收消息时,可以直接接收业务对象。

    @RabbitListener(queues = {"dsfsf"})
    public void receive003(User user) {
        System.out.println("收到消息" + user);
    }

容器监听消息,调用消息转换器SimpleMessageConverter将二进制数据转为相应的对象。

调用的是SimpleMessageConverter.fromMessage方法。

    public Object fromMessage(Message message) throws MessageConversionException {
        Object content = null;
        // 1. 处理消息属性
        MessageProperties properties = message.getMessageProperties();
        if (properties != null) {
        	// 获取contentType ,这里为:application/x-java-serialized-object
            String contentType = properties.getContentType();
            // 2. contentType 以text 开头(字符串),二进制转为字符串返回
            if (contentType != null && contentType.startsWith("text")) {
                String encoding = properties.getContentEncoding();
                if (encoding == null) {
                    encoding = this.defaultCharset;
                }

                try {
                    content = new String(message.getBody(), encoding);
                } catch (UnsupportedEncodingException var8) {
                    throw new MessageConversionException("failed to convert text-based Message content", var8);
                }
            // 3. contentType为 application/x-java-serialized-object(序列化对象),
            } else if (contentType != null && contentType.equals("application/x-java-serialized-object")) {
                try {
                	// 反序列化为对象
                    content = SerializationUtils.deserialize(this.createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
                } catch (IllegalArgumentException | IllegalStateException | IOException var7) {
                    throw new MessageConversionException("failed to convert serialized Message content", var7);
                }
            }
        }
		// 4. 以上都不是,直接返回二进制
        if (content == null) {
            content = message.getBody();
        }
        return content;
    }

使用Jackson 序列化

可是使用其他序列化方式,比如Jackson

只需要在RabbitTemplate 、监听容器工厂RabbitListenerContainerFactory中设置转换器即可。

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate();
        configurer.configure(template, connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/48039.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

1.2 监督学习

1.2 监督学习监督学习的定义监督学习的相关概念监督学习流程图监督学习的定义 监督学习(Supervised Learning&#xff09;是指从标注数据中学习预测模型的机器学习问题&#xff0c;其本质是学习输入到输出的映射的统计规律。 输入空间 (Input Space&#xff09;&#xff1a;输…

11.29总结

目录 一.连续子数组最大和 方法2动态规划 二.查找最小的k对数字 一.从先序遍历还原二叉树 二.完全二叉树 三.判断对称二叉树 四 回文 五.连续子数组最大和 六.TopK问题 思路一如果数据特别大.排序的时间复杂度会很大 思路二:用大根堆或者小根堆然后分别弹出. 思路三…

CANoe-vTESTstudio之State Diagram编辑器(功能介绍)

1. 阶段 State Diagram从测试设计到测试执行,分为4个阶段: Test Design 在测试设计期间,测试设计人员使用图形元素和分配的测试代码来对要测试的SUT的状态和转换进行建模。这个阶段的结果是生成状态图 Evaluation 在评估期间,将验证各个元素及其关系并创建路径。评估产…

SpringBoot中如何实现业务校验,这种方式才叫优雅!

大家好&#xff0c;在日常的接口开发中&#xff0c;为了保证接口的稳定安全&#xff0c;我们一般需要在接口逻辑中处理两种校验&#xff1a; 参数校验 业务规则校验 首先我们先看看参数校验。 参数校验 参数校验很好理解&#xff0c;比如登录的时候需要校验用户名密码是否为…

Opencv边缘检测、轮廓发现、绘制轮廓

Opencv边缘检测、轮廓发现、绘制轮廓 提取图像轮廓的2个步骤 1、 findContours函数找轮廓&#xff0c; 2、 drawContours函数画轮廓 轮廓的查找——cv::findContours() 函数cv::findContour是从二值图像中来计算轮廓的&#xff0c;它可以使用cv::Canny()函数处理的图像&am…

【华为上机真题 2022】字符串分隔

&#x1f388; 作者&#xff1a;Linux猿 &#x1f388; 简介&#xff1a;CSDN博客专家&#x1f3c6;&#xff0c;华为云享专家&#x1f3c6;&#xff0c;Linux、C/C、云计算、物联网、面试、刷题、算法尽管咨询我&#xff0c;关注我&#xff0c;有问题私聊&#xff01; &…

Java---线程详解(并发并行,Thread类,Runnable接口,同步机制,线程死锁......)

目录 一、概念 1、进程 2、线程 &#xff08;1&#xff09;单线程 &#xff08;2&#xff09;多线程 &#xff08;3&#xff09;并发 &#xff08;4&#xff09;并行 二、线程基本使用 1、创建线程的两种方式 &#xff08;1&#xff09;继承Thread类 &#xff08;2&am…

esp8266用arduino连上阿里云(图文操作,100%成功)

最近学习了esp8266/esp32单片机。第一次使用arduino这个IDE&#xff0c;搞多了Keil5&#xff0c;这个实在是有点不习惯。进步都是困难的&#xff0c;现在回想起来&#xff0c;发现也没多难&#xff0c;回到正题。 准备软件&#xff1b;Arduino IDE 准备硬件&#xff1a;esp82…

面试:插件化相关---activity

我们先来看下Android常规Activity的启动流程 如何评价360的Android插件化框架RePlugin&#xff1f; - 知乎 1、调用Context.startActivity -> ActivityManagerProxy -> AMS, AMS通过Intent从PMS拿到ActivityInfo并创建ActivityRecord和token放入前台ActivityStack&…

macOS端React的项目WebPack热更新(HMR)失效问题分析及解决,原因竟是Windows文件系统不区分大小写导致

项目场景&#xff1a; 最近做的项目是一个使用UmiJS搭建的React的前端老项目&#xff0c;项目是上一个开发团队遗留下来的老项目&#xff0c;我们接着在原来的基础上开发。团队成员中有的是Windows电脑&#xff0c;有的是Mac电脑&#xff0c;所以存在规范不统一的情况。 问题描…

[附源码]计算机毕业设计springboot基于web的建设科技项目申报管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

微服务框架 SpringCloud微服务架构 5 Nacos 5.6 环境隔离

微服务框架 【SpringCloudRabbitMQDockerRedis搜索分布式&#xff0c;系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】 SpringCloud微服务架构 文章目录微服务框架SpringCloud微服务架构5 Nacos5.6 环境隔离5.6.1 环境隔离 - namespace5.6.2 使用 namespace5.6.…

Python的PyQt框架的使用-创建主窗体篇

Python的PyQt框架的使用-构建环境篇一、前言二 、创建主窗体一、前言 个人主页: ζ小菜鸡大家好我是ζ小菜鸡&#xff0c;小伙伴们&#xff0c;让我们一起来学习Python的PyQt框架的使用。如果文章对你有帮助、欢迎关注、点赞、收藏(一键三连) 二 、创建主窗体 &#xff08;1&am…

【Linux】高频指令及简单的vim使用(0基础带你快速入门)

目录 一、目录操作指令 1.1、ls 1.2、pwd 1.3、cd 1.4、touch 1.5、cat 1.6、echo 1.7、mkdir 1.8、rm 1.9、mv 1.10、cp 二、Linux中如何手动安装插件 三、vim 3.1、打开文件 3.2、编辑文件 3.3、保存退出 一、目录操作指令 1.1、ls 语法&#xff1a; 第一种&#…

高维多元时序数据之间的相似性度量

1. 简介 时间序列作为一种按时间顺序排列的特殊数据&#xff0c;是数据挖掘的重要研究内容&#xff0c;其中包括数据准备、数据选择、数据预处理、数据缩减、数据挖掘目标确定、挖掘算法确定、数据挖掘、模式解释及知识评价&#xff19;个处理步骤&#xff37;。数据挖掘方面的…

@SentinelResource注解的使用

SentinelResource注解的使用 1、按资源名称限流后续处理 前置条件&#xff1a; 启动Nacos启动Sentinel 1.1、修改cloudalibaba-sentinel-service8401 引入自己的API通用包 <!--自己的公共包,可以使用Payment支持Entuty--> <dependency><groupId>com.zcl.s…

word目录怎么自动生成?用这个方法,快速自动生成

当我们在写论文或者是编写文档的时候&#xff0c;都需要生成导航目录。很多人写完文档之后想要将其自动生成目录&#xff0c;但是不知道该怎么操作&#xff1f;word目录怎么自动生成&#xff1f;下面我就为大家分享三个步骤&#xff0c;快速自动设置文档的目录。 操作环境&…

SpringBoot配置文件

文章目录配置文件的作用配置文件的格式.properties 配置文件说明.properties 基本语法.properies 读取配置信息.yml 配置说明.yml 基本语法.yml 进阶使用.yml 读取配置信息.properties VS .yml配置文件的作用 整个项目中所有重要的数据都是在配置⽂件中配置的&#xff0c;比如…

社区系统项目复盘-6

文章目录什么是Elasticsearch&#xff1f;Spring是怎么整合Elasticsearch的&#xff1f;开发社区搜索功能Elasticsearch实现全文搜索功能什么是Elasticsearch&#xff1f; Elasticsearch简介 一个分布式的、Restful风格的搜索引擎支持对各种类型的数据的检索搜索速度快&#xf…

FSR-Unity-URP 1.0 的性能和兼容性问题

1&#xff09;FSR-Unity-URP 1.0 的性能和兼容性问题 ​2&#xff09;计算大文件MD5耗时问题 3&#xff09;如何监听Unity即将Reload Script 4&#xff09;如何对Unity游戏的Android崩溃和ANR问题进行符号化解析 这是第315篇UWA技术知识分享的推送。今天我们继续为大家精选了若…