Java版Flink使用指南——将消息写入到RabbitMQ的队列中

news2024/9/9 1:14:18

大纲

  • 新建工程
    • 新增依赖
  • 编码
    • 自动产生数据
    • 写入RabbitMQ
  • 测试
  • 工程代码

在 《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们介绍了如何使用Java在Flink中读取RabbitMQ中的数据,并将其写入日志中。本文将通过代码产生一些数据,然后将它们写入到另外一个RabbitMQ队列中。

新建工程

我们在IntelliJ中新建一个工程SinkToRabbitMQ。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-rabbitmq</artifactId>
			<version>3.0.1-1.17</version>
		</dependency>

编码

自动产生数据

这段代码将产生两个字符串数据,后续这些数据会被写入到RabbitMQ的队列中。

List<String> data = new ArrayList<>();
data.add("Hello, World!");
data.add("Hello, Flink!");
DataStream<String> stream = env.fromCollection(data);

写入RabbitMQ

不同于《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》中创建RMQSource用来接收RabbitMQ队列中数据,这次我们创建RMQSink用来发布数据。

String sinkQueueName = "data.to.rbtmq"; // name of the queue to send data to
String host = "172.21.112.140"; // IP of the rabbitmq server
int port = 5672;
String username = "admin";
String password = "fangliang";
String virtualHost = "/";
int parallelism = 1;

RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder()
		.setHost(host)
		.setPort(port)
		.setUserName(username)
		.setPassword(password)
		.setVirtualHost(virtualHost)
		.build();

RMQSink<String> stringRMQSink = new RMQSink<>(rmqConnectionConfig, sinkQueueName, new SimpleStringSchema());
stream.addSink(stringRMQSink).name(username + "'s sink to " + sinkQueueName).setParallelism(parallelism);	

测试

打包、提交并运行任务
在这里插入图片描述
然后在RabbitMQ的后台可以看到收到两条消息
在这里插入图片描述
其内容也是我们之前在代码中生成的内容
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1917245.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OTA与OTA升级

目录 一、OTA简介 二、OTA升级 三、操作方式 一、OTA简介 在嵌入式领域当中&#xff0c;OTA&#xff08;Over-The-Air&#xff09;指的是通过无线通信技术对嵌入式设备的软件进行远程更新和管理。这种技术广泛应用于物联网设备、智能家电、汽车电子、智能手机等领域。通过OTA…

基于 PyTorch 的迁移学习介绍 (图像分类实战演示)

1. 介绍 迁移学习&#xff08;Transfer Learning&#xff09;允许我们采用另一个模型从另一个问题中学到的模式&#xff08;也称为权重&#xff09;并将它们用于我们自己的问题。 例如&#xff0c;我们可以采用计算机视觉模型从 ImageNet&#xff08;包含数百万张不同对象的图…

51单片机嵌入式开发:8、 STC89C52RC 操作LCD1602原理

STC89C52RC 操作LCD1602原理 1 LCD1602概述1.1 LCD1602介绍1.2 LCD1602引脚说明1.3 LCD1602指令介绍 2 LCD1602外围电路2.1 LCD1602接线方法2.2 LCD1602电路原理 3 LCD1602软件操作3.1 LCD1602显示3.2 LCD1602 protues仿真 4 总结 1 LCD1602概述 1.1 LCD1602介绍 LCD1602是一种…

java如何判断某个数在区间是否存在?

&#x1f3c6;本文收录于《CSDN问答解惑-专业版》专栏&#xff0c;主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&收…

数字园区新视界:智慧管理的全景体验

通过图扑可视化技术&#xff0c;智慧园区管理实现实时监控与数据分析&#xff0c;优化各类资源的配置与使用&#xff0c;提高整体运营效率与智能决策能力。

Java中关于File类的详解

File类 File类是文件和目录路径名称的抽象表示&#xff0c;主要用于文件和目录的创建、查找和删除等操作。在创建File对象的时候&#xff0c;需要传递一个路径&#xff0c;这个路径定位到哪个文件或者文件夹上&#xff0c;File就代表哪个对象。 File file new File("D:…

记一次Ueditor上传Bypss

前言 前一段时间和小伙伴在某内网进行渗透测试&#xff0c;目标不给加白&#xff0c;只能进行硬刚了&#xff0c;队友fscan一把梭发现某资产疑似存在Ueditor组件&#xff0c;但初步测试是存在waf和杀软的&#xff0c;无法进行getshell&#xff0c;经过一番折腾最终getshell&am…

揭秘”大模型加速器”如何助力大模型应用

文章目录 一、大模型发展面临的问题二、“大模型加速器”助力突破困难2.1 现场效果展示2.1.1 大模型加速器——文档解析引擎2.2.2 图表数据提取 三、TextIn智能文档处理平台3.1 在线免费体验3.1.1 数学公式提取3.1.2 表格数据提取 四、acge文本向量化模型4.1 介绍4.2 技术创新4…

Python基础语法:运算符详解(算术运算符、比较运算符、逻辑运算符、赋值运算符)②

文章目录 Python中的运算符详解一、算术运算符二、比较运算符三、逻辑运算符四、赋值运算符五、综合示例结论 Python中的运算符详解 在Python编程中&#xff0c;运算符用于执行各种操作&#xff0c;例如算术计算、比较、逻辑判断和赋值。了解并掌握这些运算符的使用方法是编写…

CTF php RCE (四)

0x08 取反以及异或、或 这两个东西呢相当的好玩&#xff0c;也能够达到一下小极限的操作 <?php error_reporting(0); if(isset($_GET[code])){$code$_GET[code];if(strlen($code)>40){die("This is too Long.");}if(preg_match("/[A-Za-z0-9]/",$…

Firealpaca 解锁版下载及安装教程 (火焰羊驼绘画软件)

前言 FireAlpaca是一款简单易用的电脑绘画软件&#xff0c;采用了类似于Photoshop的图层绘画方式。对于喜欢手绘和创作漫画的朋友来说&#xff0c;FireAlpaca的多图层功能使得绘画过程更加便捷和简单。作为一个小型图像编辑软件&#xff0c;它能够轻松处理多个图层或手绘图&am…

拥抱UniHttp,规范Http接口对接之旅

前言 如果你项目里还在用传统的编程式Http客户端比如HttpClient、Okhttp去直接对接第三方Http接口&#xff0c; 那么你项目一定充斥着大量的对接逻辑和代码&#xff0c; 并且针对不同的对接渠道方需要每次封装一次调用的简化&#xff0c; 一旦封装不好系统将会变得难以维护&am…

策略模式(大话设计模式)C/C++版本

策略模式 商场收银软件 根据客户所购买商品的单价和数量来收费 需求分析&#xff1a; 1. 输入单价数量 > 界面逻辑 2. 计算&#xff08;可能打折或者促销&#xff09; > 业务逻辑 3. 输出结果 > 界面逻辑感觉和计算器的逻辑流程差不多&#xff0c;可以用简单工厂模式…

浪潮天启防火墙TQ2000远程配置方法SSL-xxx、L2xx 配置方法

前言 本次设置只针对配置VXX&#xff0c;其他防火墙配置不涉及。建议把防火墙内外网都调通后再进行Vxx配置。 其他配置可参考&#xff1a;浪潮天启防火墙配置手册 配置SSLVxx 在外网端口开启SSLVxx信息 开启SSLVxx功能 1、勾选 “启用SSL-Vxx” 2、设置登录端口号&#xff0…

Unity3D 太空大战射击游戏

一、前言 本案例是初级案例&#xff0c;意在帮助想使用unity的初级开发者能较快的入门&#xff0c;体验unity开发的方便性和简易性能。 本次我们将使用团结引擎进行开发&#xff0c;帮助想体验团结引擎的入门开发者进行较快的环境熟悉。 本游戏案例以太空作战为背景&#xff0c…

如何分析软件测试中发现的Bug!

假如你是一名软件测试工程师&#xff0c;每天面对的就是那些“刁钻”的Bug&#xff0c;它们像是隐藏在黑暗中的敌人&#xff0c;时不时跳出来给你一个“惊喜”。那么&#xff0c;如何才能有效地分析和处理这些Bug&#xff0c;让你的测试工作变得高效且有趣呢&#xff1f;今天我…

Threadlocal使用获取最后更新人信息

Threadlocal 的作用范围是一个线程&#xff0c;tomcat启动默认开启一个线程 首先点击登录&#xff0c;登录方法会返回token 拿到token后放在请求头中发送商品的插入请求&#xff0c;在插入是设置拿到token中的nickName&#xff08;花名&#xff09;放入&#xff08;lastUpdate…

C 语言中如何实现字符串的拼接?

&#x1f345;关注博主&#x1f397;️ 带你畅游技术世界&#xff0c;不错过每一次成长机会&#xff01; &#x1f4d9;C 语言百万年薪修炼课程 【https://dwz.mosong.cc/cyyjc】通俗易懂&#xff0c;深入浅出&#xff0c;匠心打磨&#xff0c;死磕细节&#xff0c;6年迭代&…

轻松搭建RAG:澳鹏RAG开发工具

我们很高兴地宣布推出RAG开发工具&#xff0c;这是澳鹏大模型智能开发平台的一项新功能。此功能可帮助团队轻松创建高质量的检索增强生成 (RAG) 模型。 什么是 RAG&#xff1f; 检索增强生成 (RAG) 通过利用大量外部数据源&#xff08;例如企业的知识库&#xff09;显著增强了…

git查看版本,查看安装路径、更新版本

git version 查看版本 git update-git-for-windows 更新版本 git version 查看版本