Java版Flink使用指南——定制RabbitMQ数据源的序列化器

news2024/12/30 4:11:10

大纲

  • 新建工程
    • 新增依赖
    • 数据对象
    • 序列化器
    • 接入数据源
  • 测试
    • 修改Slot个数
    • 打包、提交、运行
  • 工程代码

在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象,则需要定制序列化器。本文我们就将讲解数据源序列化器的定制方法。

新建工程

我们在IntelliJ中新建一个工程SourceSerializer。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-rabbitmq</artifactId>
			<version>3.0.1-1.17</version>
		</dependency>

新增Json库依赖

		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-core</artifactId>
			<version>2.17.1</version>
		</dependency>

新增lombok库,主要是为了使用它的一些注解

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.32</version>
            <scope>provided</scope>
        </dependency>

数据对象

我们新建一个简单的数据对象SampleData
src/main/java/org/example/vo/SampleData.java

package org.example.vo;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class SampleData {
    private Long id;
    private String name;
    private int age;
    private Boolean married;
    private Double salary;

    public String toJson() throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.writeValueAsString(this);
    }

    public static SampleData fromJson(String json) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.readValue(json, SampleData.class);
    }
}

这个方法包含两个方法,一个是将SampleData 转换成字符串,另一个是将字符串转成SampleData 对象。

序列化器

我们定义的数据源序列化器要实现AbstractDeserializationSchema接口,主要是通过deserialize方法将二进制数组转换成SampleData 对象。

src/main/java/org/example/serializer/SampleDataRabbitMQSourceSerializer.java

package org.example.serializer;

import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.example.vo.SampleData;

import java.io.IOException;

public class SampleDataRabbitMQSourceSerializer extends AbstractDeserializationSchema<SampleData> {
    @Override
    public SampleData deserialize(byte[] message) throws IOException {
        return SampleData.fromJson(new String(message));
    }

    @Override
    public boolean isEndOfStream(SampleData nextElement) {
        return false;
    }

    @Override
    public TypeInformation<SampleData> getProducedType() {
        return TypeInformation.of(SampleData.class);
    }
}

接入数据源

我们在《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》一文中,往data.to.rbtmq对了写入了大量SampleData 数据。这次我们将其作为数据源来做测试
这次我们在创建RMQSource时传入序列化器SampleDataRabbitMQSourceSerializer。它会将从RabbitMQ获取的数据转换成SampleData对象。
然后我们获取所有“已婚”(filter.getMarried() == true)的数据,将其打印到日志中。

		String queueName = "data.to.rbtmq";
		String host = "172.21.112.140"; // IP of the rabbitmq server
		int port = 5672;
		String username = "admin";
		String password = "fangliang";
		String virtualHost = "/";
		int parallelism = 1;

		// create a RabbitMQ source
		RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder()
				.setHost(host)
				.setPort(port)
				.setUserName(username)
				.setPassword(password)
				.setVirtualHost(virtualHost)
				.build();

		RMQSource<SampleData> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SampleDataRabbitMQSourceSerializer());
		final DataStream<SampleData> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);

		stream.filter(filter -> filter.getMarried() == true).print().name(username + "'s sink to stdout").setParallelism(parallelism);

测试

修改Slot个数

由于我们要运行两个流式计算任务,于是需要两个Slot。

vim conf/config.yaml 

将numberOfTaskSlots的值改成2。

打包、提交、运行

我们将本例和《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》中的包都提交运行
在这里插入图片描述
然后在日志中可以看到“已婚”的数据都在输出

 tail -f log/*

在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1908461.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

white-space属性换行

white-space 属性可以控制元素中文本的换行方式。常用的取值有&#xff1a; normal&#xff08;默认值&#xff09;&#xff1a;根据容器的大小自动换行。nowrap&#xff1a;文本不进行换行&#xff0c;超过容器宽度时会溢出。pre&#xff1a;保留原始的空白符&#xff08;空格…

5.Python学习:面向对象

1.面向对象和面向过程的区别 以下五子棋为例&#xff1a; 2.类和实例 &#xff08;1&#xff09;类是抽象的模板&#xff0c;实例是根据模板创建出来的具体的对象 &#xff08;2&#xff09;比如人类就是一个类&#xff0c;刘亦菲就是人类的一个实例 2.1 新建类和类的实例…

【uniapp-ios】App端与webview端相互通信的方法以及注意事项

前言 在开发中&#xff0c;使用uniapp开发的项目开发效率是极高的&#xff0c;使用一套代码就能够同时在多端上线&#xff0c;像笔者之前写过的使用Flutter端和webview端之间的相互通信方法和问题&#xff0c;这种方式本质上实际上是h5和h5之间的通信&#xff0c;网上有非常多…

计算机的错误计算(二十五)

摘要 介绍&#xff08;不&#xff09;停机问题。给了一个算式&#xff0c;当计算机的输出为0时&#xff0c;一般需要提高计算精度继续计算&#xff0c;一直到获得非0值或有效数字。但是&#xff0c;由于事先不清楚算式的准确值是否为0或不为0&#xff0c;因此往往陷入两难境地…

LLM - Transformer 的 多头自注意力(MHSA) 理解与源码

欢迎关注我的CSDN:https://spike.blog.csdn.net/ 本文地址:https://spike.blog.csdn.net/article/details/140281680 免责声明:本文来源于个人知识与公开资料,仅用于学术交流,欢迎讨论,不支持转载。 在 Transformer 中,多头自注意力机制 (MHSA, Multi-Head Self-Attenti…

关系型数据库MySQL和时序数据库的区别?

时序数据库和关系型数据库是两种不同类型的数据库系统&#xff0c;它们在设计理念、存储结构、性能优化等方面有显著差异&#xff0c;以适应不同的应用场景和需求。具体对比如下&#xff1a; 数据存储结构 时序数据库&#xff1a;使用列式存储&#xff0c;每条记录通常包含时间…

Johnson Counter

目录 描述 输入描述&#xff1a; 输出描述&#xff1a; 参考代码 描述 请用Verilog实现4位约翰逊计数器&#xff08;扭环形计数器&#xff09;&#xff0c;计数器的循环状态如下。 电路的接口如下图所示。 输入描述&#xff1a; input clk , input …

力扣喜刷刷--day1

1.无重复字符的最长子串 知识点&#xff1a;滑动窗口 基本概念 窗口&#xff1a;窗口是一个连续的子序列&#xff0c;可以是固定长度或可变长度。滑动&#xff1a;窗口在数据序列上移动&#xff0c;可以是向左或向右。边界&#xff1a;窗口的起始和结束位置。 应用场景 字符…

YOLOv10改进 | Conv篇 | 利用DualConv二次创新C2f提出一种轻量化结构(轻量化创新)

一、本文介绍 本文给大家带来的改进机制是利用DualConv改进C2f提出一种轻量化的C2f&#xff0c;DualConv是一种创新的卷积网络结构&#xff0c;旨在构建轻量级的深度神经网络。它通过结合33和11的卷积核处理相同的输入特征映射通道&#xff0c;优化了信息处理和特征提取。Dual…

关于 off-by-one 的学习

pwn的功底还很浅&#xff0c;仅仅是记录自己学习的一点心得体会。 后续随着学习深入&#xff0c;还会补知识点和题目上来。 知识点 优秀的学习资料 关于off by null的学习总结 | ZIKH26 Chunk Extend and Overlapping | ctfwiki 一点理解 与off-by-one联系很紧密的就是上…

Fastapi在docekr中进行部署之后,uvicorn占用的CPU非常高

前一段接点小活&#xff0c;做点开发&#xff0c;顺便学了学FASTAPI框架&#xff0c;对比flask据说能好那么一些&#xff0c;至少并发什么的不用研究其他的asgi什么的&#xff0c;毕竟不是专业开发&#xff0c;能少研究一个东西就省了很多的事。 但是部署的过程中突然之间在do…

典型案例 | 基于全数字实时仿真的嵌入式DevOps解决方案

为丰富浙江省信息技术应用创新&#xff08;以下简称“信创”&#xff09;产业生态&#xff0c;在全社会各领域形成示范效应&#xff0c;浙江省经信厅联合省密码管理局开展2023年浙江省深化信创典型案例评选工作。 经过征集申报、专家评选、名单公示等程序&#xff0c;确定36个…

秒懂设计模式--学习笔记(6)【创建篇-建造者模式】

目录 5、建造者模式5.1 介绍5.2 建造步骤的重要性5.3 地产开发商的困惑5.4 建筑施工方5.5 工程总监5.6 项目实施5.7 建造者模式的各角色定义5.8 建造者模式 5、建造者模式 5.1 介绍 建造者模式&#xff08;Builder&#xff09;又称为生成器模式&#xff0c;主要用于对复杂对象…

20.呼吸灯:利用PWM控制小灯在相同时间段内的不同占空比

&#xff08;1&#xff09;设计一段代码&#xff0c;实现led灯在一秒内由完全熄灭到完全点亮&#xff0c;在第二秒由完全点亮转为完全熄灭&#xff0c;循环往复。 &#xff08;2&#xff09;Verilog代码&#xff1a; module breath_led(clk,reset_n,led);input clk;input res…

Open3D 计算点云的欧式距离

目录 一、概述 1.1欧式距离定义 1.2作用和用途 二、代码实现 2.1关键函数 2.2完整代码 三、实现效果 3.1原始点云 3.2处理后点云 一、概述 在Open3D中&#xff0c;compute_point_cloud_distance函数用于计算两个点云之间的距离。具体来说&#xff0c;它计算的是源点云…

进程 VS 线程(javaEE篇)

&#x1f341; 个人主页&#xff1a;爱编程的Tom&#x1f4ab; 本篇博文收录专栏&#xff1a;JavaEE初阶&#x1f449; 目前其它专栏&#xff1a;c系列小游戏 c语言系列--万物的开始_ 等 &#x1f389; 欢迎 &#x1f44d;点赞✍评论⭐收藏&#x1f496;三连支…

一.4 处理器读并解释储存在内存中的指令

此刻&#xff0c;hello.c源程序已经被编译系统翻译成了可执行目标文件hello&#xff0c;并被存放在硬盘上。要想在Unix系统上运行该可执行文件&#xff0c;我们将它的文件名输入到称为shell的应用程序中&#xff1a; linux>./hello hello, world linux> shell是一个命令…

YOLOv10改进 | 添加注意力机制篇 | 添加FocusedLinearAttention助力yolov10实现有效涨点(含二次创新PSA机制)

一、本文介绍 本文给大家带来的改进机制是Focused Linear Attention&#xff08;聚焦线性注意力&#xff09;是一种用于视觉Transformer模型的注意力机制(但是其也可以用在我们的YOLO系列当中从而提高检测精度)&#xff0c;旨在提高效率和表现力。其解决了两个在传统线性注意力…

博美犬插画:成都亚恒丰创教育科技有限公司

​博美犬插画&#xff1a;萌动心灵的细腻笔触 在浩瀚的艺术海洋中&#xff0c;有一种艺术形式总能以它独有的温柔与细腻&#xff0c;触动人心最柔软的部分——那便是插画。而当插画遇上博美犬这一萌宠界的明星&#xff0c;便诞生了一幅幅令人爱不释手的作品&#xff0c;成都亚…

云计算【第一阶段(28)】DNS域名解析服务

一、DNS解析的定义与作用 1.1、DNS解析的定义 DNS解析&#xff08;Domain Name System Resolution&#xff09;是互联网服务中的一个核心环节&#xff0c;它负责将用户容易记住的域名转换成网络设备能够识别和使用的IP地址。一般来讲域名比 IP 地址更加的有含义、也更容易记住…