大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka

news2024/9/20 20:37:36

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Sink 的基本概念等内容
  • Sink的相关信息 配置与使用
  • Sink案例写入Redis

在这里插入图片描述

JDBC Sink

在 Apache Flink 中,通过 JDBC Sink,可以将处理后的数据写入到 MySQL 数据库中。这对于将实时处理的数据持久化或与其他系统进行集成非常有用。

Flink JDBC Sink 简介

Flink 提供了 JdbcSink,它是基于 JDBC 协议的 Sink,可以将数据写入各种关系型数据库,包括 MySQL。在使用 JDBC Sink 时,需要提供数据库连接信息和 SQL 语句,通过这些信息,Flink 将数据流中的记录插入或更新到 MySQL 表中。

Flink 到 MySQL 的基本步骤

将数据流写入 MySQL 的步骤主要包括以下几点:

  • 依赖库配置:确保在项目中引入了 Flink 和 MySQL 相关的依赖库,通常需要配置 Maven 或 Gradle。
  • 定义数据源和数据流:创建并处理数据流。
  • 配置 JDBC Sink:提供数据库的连接信息和插入 SQL 语句。
  • 启动任务:将数据流写入 MySQL。

优化建议

在实际项目中,向 MySQL 插入大量数据时,应考虑以下优化策略:

  • 批量插入:通过 JdbcExecutionOptions 配置批量插入,可以大幅提升写入性能。
  • 连接池:对于高并发的写入操作,建议使用连接池来减少数据库连接开销。
  • 索引优化:为插入的表配置合适的索引,可以提高查询性能,但在大量写入时,索引可能会降低- 插入速度,因此需要权衡。
  • 数据分片:对于非常大规模的数据,可以考虑将数据分片并行写入不同的 MySQL 实例或分区表中。

案例:流数据下沉到MySQL

添加依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.28</version>
</dependency>

编写代码

一个Person的类,对应MySQL中的一张表的字段。
模拟几条数据流,写入到 MySQL中。

package icu.wzk;


import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class SinkSqlTest {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Person> data = env.getJavaEnv().fromElements(
                new Person("wzk", 18, 1),
                new Person("icu", 20, 1),
                new Person("wzkicu", 13, 2)
        );
        data.addSink(new MySqlSinkFunction());

        env.execute();
    }

    public static class MySqlSinkFunction extends RichSinkFunction<Person> {

        private PreparedStatement preparedStatement = null;

        private Connection connection = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            String url = "jdbc:mysql://h122.wzk.icu:3306/flink-test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC";
            String username = "hive";
            String password = "hive@wzk.icu";
            connection = DriverManager.getConnection(url, username, password);
            String sql = "INSERT INTI PERSON(name, age, sex) VALUES(?, ?, ?)";
            preparedStatement = connection.prepareStatement(sql);
        }

        @Override
        public void invoke(Person value, Context context) throws Exception {
            preparedStatement.setString(1, value.getName());
            preparedStatement.setInt(2, value.getAge());
            preparedStatement.setInt(3, value.getSex());
            preparedStatement.executeUpdate();
        }

        @Override
        public void close() throws Exception {
            if (null != connection) {
                connection.close();
            }
            if (null != preparedStatement) {
                preparedStatement.close();
            }
        }
    }

    public static class Person {
        private String name;
        private Integer age;
        private Integer sex;

        public Person() {

        }

        public Person(String name, Integer age, Integer sex) {
            this.name = name;
            this.age = age;
            this.sex = sex;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Integer getAge() {
            return age;
        }

        public void setAge(Integer age) {
            this.age = age;
        }

        public Integer getSex() {
            return sex;
        }

        public void setSex(Integer sex) {
            this.sex = sex;
        }
    }
}

数据库配置

我们新建一张表出来,person表,里边有我们需要的字段。
在这里插入图片描述

运行代码

我们运行代码,等待运行结束。
在这里插入图片描述

查看结果

查看数据库中的数据,我们可以看到刚才模拟的数据已经成功写入了。
在这里插入图片描述

案例:写入到Kafka

编写代码

package icu.wzk;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

public class SinkKafkaTest {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> data = env.socketTextStream("localhost", 9999, '\n', 0);
        String brokerList = "h121.wzk.icu:9092";
        String topic = "flink_test";
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema());
        data.addSink(producer);
        env.execute("SinkKafkaTest");
    }

}

运行代码

启动一个 nc

nc -lk 9999

我们通过回车的方式,可以发送数据。
在这里插入图片描述
Java 程序中等待
在这里插入图片描述

查看结果

我们登录到服务器查看信息

./kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic flink_test --from-beginning

可以看到刚才的数据已经写入了:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2103751.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LLM常见问题(RAG部分)

1. 什么是 Graph RAG&#xff1f; Graph RAG 是由悦数图数据提出的概念&#xff0c;是一种基于知识图谱的检索增强技术&#xff0c;通过构建图模型的知识表达&#xff0c;将实体和关系之间的联系用图的形式进行展示&#xff0c;然后利用大语言模型 LLM进行检索增强。 通过图技…

鸿蒙Harmony--状态变量更改通知--@Watch装饰器详解

风雨飘摇中&#xff0c;我心起伏&#xff0c; 万丈雄心&#xff0c;却难以施展。 天高地远&#xff0c;路途迷茫&#xff0c; 理想如星&#xff0c;却遥不可及。 千百次跌倒&#xff0c;千百次爬起&#xff0c; 在命运的手掌中&#xff0c;挣扎前行。 谁知我心中的热血滚烫&…

什么是科学碳目标(SBTI认证)

科学碳目标&#xff08;SBTI认证&#xff09;&#xff0c;这一绿色发展的璀璨明珠&#xff0c;是企业迈向可持续未来的重要里程碑。它不仅是全球环境信息研究中心(CDP)、联合国全球契约组织(UNGC)、世界资源研究所(WRI)与世界自然基金会(WWF)共同铸就的智慧结晶&#xff0c;更是…

【C++模板初阶】

文章目录 一、泛型编程二、函数模板1.函数模板概念2.函数模板格式3.函数模板的原理4 函数模板的实例化1. 隐式实例化2. 显式实例化不同类型形参传参时的处理 5.模板参数的匹配原则 三、类模板1 类模板的定义格式2 类模板的实例化 一、泛型编程 首先大家先思考一个问题&#xff…

Django form.save 方法的详细分析

在 Django 中&#xff0c;form.save() 方法是用于将表单中的数据保存到数据库的核心方法。它的功能和实现可以分为几个重要的部分&#xff0c;下面就是我对 form.save() 方法的详细分析&#xff1a; 1、问题背景 在 Django 中&#xff0c;我们经常会使用 Form 来处理用户提交的…

yum无法使用解决Could not resolve host: mirrorlist.centos.org; Unknown error

报错如下 2、问题原因 CentOS 7 的官方仓库在 2024 年 6 月 30 日之后已经停止维护。CentOS 7 的官方支持已经结束&#xff0c;部分仓库已被移至归档库。导致yum 命令无法找到所需的元数据文件。 3、解决方法 进入/etc/yum.repos.d目录下找到 CentOS-Base.repo cd /etc/yum.…

【鸿蒙蓝牙连接】

鸿蒙蓝牙连接 ble.startBLEScan startBLEScan(filters: Array, options?: ScanOptions): void 发起BLE扫描流程。 需要权限&#xff1a;ohos.permission.ACCESS_BLUETOOTH import { BusinessError } from ohos.base; function onReceiveEvent(data: Array<ble.ScanResu…

【iOS】暑期学习总结

暑期学习总结 前言无限轮播图换头像简单的网络请求UISearchController 前言 暑假在学校完成了五个项目&#xff0c;总的来说学习到了很多新的知识&#xff0c;这里对暑假中学习的内容进行一个小的总结&#xff0c;整理一些个人认为比较重点的内容。 无限轮播图 无限轮播图的…

PDF转换成Excel哪家强?热门工具大比拼

现在咱们周围电脑和手机一大堆&#xff0c;PDF和Excel这两种文件格式特别流行&#xff0c;不管是学习、工作还是平时生活&#xff0c;都能派上用场。但有时候&#xff0c;我们需要把PDF文件里的表格信息转换成Excel格式&#xff0c;这样处理和分析起来更方便。市面上有很多工具…

LabVIEW如何用数字判断布尔数组位置

问&#xff1a;在LabVIEW中&#xff0c;我有一个布尔数组&#xff0c;每次只有一位为True。我想通过判断这个数组对应的数字值来确定哪个位置为True&#xff0c;该如何实现&#xff1f; 答&#xff1a; 你可以将布尔数组转换为一个数字&#xff0c;并通过判断该数字的值来确定…

Java学习中误用 == 比较对象怎么办?

在Java编程中&#xff0c;操作符的误用是一个常见的陷阱&#xff0c;尤其是在比较对象时。理解和正确使用对象的比较机制对编写健壮的、无错误的Java程序至关重要。 一、Java中的操作符 在Java中&#xff0c;操作符用于比较两个变量的值。对于基本数据类型&#xff08;如int、…

element table 表格 span-method 某一列进行相同合并 支持树结构表格

须知 这是 vue2 版本&#xff0c;不过理论上 vue3 也能参考使用 可以直接打开 codepen 查看代码 效果图 代码 打不开 codepen 或者codepen 失效&#xff0c;查看下面代码参考 <script src"//unpkg.com/vue2/dist/vue.js"></script> <script src&…

【加密社】比特币海量数据问题解决方案

加密社 比特币是无敌的存在&#xff0c;刚翻了一遍中本聪的论文&#xff08;其实以前看过一次&#xff0c;那时不明觉厉&#xff09;&#xff0c;发现咱们一直在考虑的问题&#xff0c;基本都能在其论文上找到解决方案了。。 现在出现的这些问题&#xff0c;完全是因为bitcoin…

GS-SLAM论文阅读笔记--GSFusion

介绍 GS-SLAM是最近比较新的方向&#xff0c;由于传统SLAM的研究变得很少&#xff0c;拥抱与新的技术结合的方法也许是个好主意。之前总结了大部分GS-SLAM的文章。但是这个方向在不断发展&#xff0c;而发展初期的很多论文值得参考。所以用博客记录一下比较新的论文阅读笔记。…

代码随想录算法训练营第七天|LeetCode 334.反转字符串、541反转字符串II、151反转字符串中的单词

一、LeetCode 334.反转字符串 题目链接&#xff1a;. - 力扣&#xff08;LeetCode&#xff09; 题目描述&#xff1a;编写一个函数&#xff0c;其作用是将输入的字符串反转过来。输入字符串以字符数组 s 的形式给出。不要给另外的数组分配额外的空间&#xff0c;你必须原地修…

万字长文浅谈三高系统建设方法论和实践

1 概述 整个软件的发展历程是一部软件复杂性对抗史&#xff0c;软件的复杂性分为技术复杂性和业务复杂性&#xff0c;业务复杂性主要是建模和抽象设计&#xff0c;技术复杂性主要是三高&#xff08;高性能&#xff0c;高并发&#xff0c;高可用&#xff09;的应对&#xff0c;…

高中生护眼台灯哪个牌子好?五款性价比高的护眼台灯测评结果

现在市面上形形色色的打着“护眼”口号的台灯太多了&#xff0c;因为眼睛对于我们来说很重要&#xff0c;我们看到美丽的事物都因为有他&#xff0c;所以大家一听到护眼就会选择购买。很多商家为了赚钱&#xff0c;随便贴个标签就说护眼&#xff0c;其实一点用都没有。高中生护…

九源基因第二大客户是供应商:国内分销商减少250家,押注减肥药

《港湾商业观察》廖紫雯 日前&#xff0c;杭州九源基因工程股份有限公司&#xff08;以下简称&#xff1a;九源基因&#xff09;二闯港交所&#xff0c;保荐机构为华泰国际、中信证券。此前&#xff0c;公司曾于1月向港交所提交过上市申请&#xff0c;但后续未有相关进展。 招…

Vue Element Plus el-select 使用 filterable 搜索下 @blur 事件绑定失效

失效原因 使用 filterable 导致 blur 事件绑定在输入框上&#xff0c;而不是整个选择器上 当点击选项时&#xff0c;输入框失去焦点触发 blur 事件 而点击其她位置收起下拉款的时候&#xff0c;并不会触发输入款的 blur 事件 解决方案 使用 element 提供的 visible-change …

【嵌入式学习笔记】---- 时钟源时钟树 RCC

微观引入时钟源 在芯片内部&#xff0c;布满了各种逻辑电路&#xff0c;通过数字信号进行通信 假设芯片内部存在如下这种逻辑电路&#xff1a; ①状态&#xff1a;当A、B均输入1时&#xff0c;与门输出1&#xff0c;异或门输出0&#xff0c;故此时寄存器的值为0 ②状态&…