Flink写入数据到Doris

news2025/1/18 19:01:40

文章目录

      • 1.Doris建表
      • 2.Doris依赖
      • 3.Bean实体类
      • 4.Doris业务写入逻辑
      • 5.测试写入类
      • 6.发送数据

1.Doris建表

Doris中建表

CREATE TABLE IF NOT EXISTS demo.user
(
 `id`   INT NOT NULL,
 `name` VARCHAR(255),
 `age`  INT
) DISTRIBUTED BY HASH(`id`)
PROPERTIES (
 "replication_num" = "1"
);

2.Doris依赖

Flink开发相关依赖

    <properties>
        <flink.version>1.12.1</flink.version>
        <scala.version>2.12.13</scala.version>
        <mysql.version>8.0.25</mysqlc.version>
        <lombok.version>1.18.12</lombok.version>
    </properties>

    <dependencies>
        <!-- 写入数据到doris -->
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
             <version>${mysql.version</version>
         </dependency>
        <!-- flink核心API -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>
    </dependencies>

3.Bean实体类

User.java

package com.daniel.bean;

import lombok.Builder;
import lombok.Data;

/**
 * @Author Daniel
 * @Date: 2023/7/3 15:35
 * @Description
 **/

@Data
@Builder
public class User {
    public int id;
    public String name;
    public int age;
}

4.Doris业务写入逻辑

DorisSinkFunction.java

package com.daniel.util;

import com.daniel.bean.User;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
* @Author Daniel
* @Date: 2023/7/3 15:36
* @Description
**/


public class DorisSinkFunction extends RichSinkFunction<User> {
 Connection conn = null;
 String sql;

 public DorisSinkFunction(String sql) {
     this.sql = sql;
 }

 @Override
 public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     conn = getConn("localhost", 9030, "demo");
 }

 @Override
 public void close() throws Exception {
     super.close();
     if (conn != null) {
         conn.close();
     }
 }

 // 定义具体的操作
 @Override
 public void invoke(User user, Context context) throws Exception {
     // 批量插入
     PreparedStatement preparedStatement = conn.prepareStatement(sql);
     preparedStatement.setLong(1, user.id);
     preparedStatement.setString(2, user.name);
     preparedStatement.setLong(3, user.age);
     preparedStatement.addBatch();

     long startTime = System.currentTimeMillis();
     int[] batchResult = preparedStatement.executeBatch();
     long endTime = System.currentTimeMillis();
     System.out.println("批量插入用时:" + (endTime - startTime) + "ms -- 插入数据行数:" + batchResult.length);
 }

 public Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException {
     Class.forName("com.mysql.cj.jdbc.Driver");
     String address = "jdbc:mysql://" + host + ":" + port + "/" + database;
     conn = DriverManager.getConnection(address, "root", "");
     return conn;
 }
}
  • open():在SinkFunction实例化后调用,用于初始化连接或资源。这在处理每个并行任务的子任务之前只被调用一次。

  • invoke():定义了在每个元素到达Sink操作时所执行的逻辑。用户需要实现这个方法来定义如何将数据写入外部存储系统或执行其他操作。

  • close():在SinkFunction关闭之前调用,用于释放资源、关闭连接等操作。

5.测试写入类

DorisWriteTest.java

package com.daniel;

import com.daniel.bean.User;
import com.daniel.util.DorisSinkFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


/**
* @Author Daniel
* @Date: 2023/7/3 15:37
* @Description
**/

public class DorisWriteTest {
 public static void main(String[] args) throws Exception {
     StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
     // Source
     DataStream<String> ds = env.socketTextStream("localhost", 9999);

     // Transform
     SingleOutputStreamOperator<User> dataStream = ds.map((MapFunction<String, User>) data -> {
         String[] split = data.split(",");
         return User.builder()
                 .id(Integer.parseInt(split[0]))
                 .name(split[1])
                 .age(Integer.parseInt(split[2]))
                 .build();
     });

     // Sink
     String sql = "INSERT INTO demo.user (id, name, age) VALUES (?,?,?)";
     DorisSinkFunction jdbcSink = new DorisSinkFunction(sql);
     dataStream.addSink(jdbcSink);
     env.execute("flink-doris-write");
 }
}


6.发送数据

使用nc或者任意工具向指定端口发送数据
例如

nc -L -p 9999

发送数据

1,Daniel,25
2,David,38
3,James,16
4,Robert,27

然后启动DorisWriteTest.java程序

在这里插入图片描述
查询数据

 select *
 from demo.user;

由于这里是并行插入,所以没有顺序可言

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/779852.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前端工程化第三章:webpack5基础(下)

文章目录 1. TypeScript支持&#xff08;ts-loader&#xff09;1.1. ts-loader1.1.1. webpack.config.js1.1.2. tsconfig.json1.1.3. src/index.ts 1.2. 使用babel-loader将ts转换为js1.2.1. webpack.config.js1.2.2. src/index.ts 2. 代码规范检查&#xff08;Eslint&#xff…

「深度学习之优化算法」(十八)头脑风暴算法

1. 头脑风暴算法简介 (以下描述,均不是学术用语,仅供大家快乐的阅读)   可能大家对“头脑风暴”这个词不怎么熟,毕竟是外来词汇,其大概含义就是分组讨论,畅所欲言。   头脑风暴算法(Brain Storm Optimization)是根据人们进行“头脑风暴”讨论困难问题的解决方案的过…

RabbitMQ消息可靠性问题及解决

说明&#xff1a;在RabbitMQ消息传递过程中&#xff0c;有以下问题&#xff1a; 消息没发到交换机 消息没发到队列 MQ宕机&#xff0c;消息在队列中丢失 消息者接收到消息后&#xff0c;未能正常消费&#xff08;程序报错&#xff09;&#xff0c;此时消息已在队列中移除 …

Android WiFi框架概览

概览 Android 提供默认 Android 框架实现&#xff0c;其中包括对各种 WLAN 协议和模式的支持&#xff0c;这些协议和模式包括&#xff1a; WLAN 基础架构 (STA)网络共享模式或仅限本地模式下的 WLAN 热点 (Soft AP)WLAN 直连&#xff08;点对点&#xff09;WLAN 感知 (NAN)WL…

3.19 Bootstrap 面板(Panels)

文章目录 Bootstrap 面板&#xff08;Panels&#xff09;面板标题面板脚注带语境色彩的面板带表格的面板带列表组的面板 Bootstrap 面板&#xff08;Panels&#xff09; 本章将讲解 Bootstrap 面板&#xff08;Panels&#xff09;。面板组件用于把 DOM 组件插入到一个盒子中。创…

Python采集某网站小视频内容, m3u8视频内容下载

目录标题 前言环境使用:模块使用:代码实现步骤代码展示尾语 前言 嗨喽~大家好呀&#xff0c;这里是魔王呐 ❤ ~! 环境使用: python 3.8 运行代码 pycharm 2021.2 辅助敲代码 模块使用: import requests >>> pip install requests 内置模块 你安装好python环境就…

【机器学习】支持向量机SVM入门

优化目标 相较于之前学习的线性回归和神经网络&#xff0c;支持向量机&#xff08;Supprot Vector Machine&#xff0c;简称SVM&#xff09;在拟合复杂的非线性方程的时候拥有更出色的能力&#xff0c;该算法也是十分经典的算法之一。接下来我们需要学习这种算法 首先我们回顾…

ffplay播放器剖析(4)----音频输出和音频重采样流程

文章目录 1. 音频输出模块1.1 音频输出流程1.2 音频输出模型图 2. 打开SDL音频设备audio_open详解sdl_audio_callbackaudio_decode_frame 3. 音频重采样样本补偿 1. 音频输出模块 1.1 音频输出流程 打开SDL音频设备,设置参数启动SDL音频设备播放SDL音频回调函数读取数据,也就…

虚拟仿真实验室未授权获取账号密码

你应该在以后短暂的岁月里&#xff0c;真正活的不负众爱 漏洞描述 虚拟仿真实验室存在未授权访问漏洞&#xff0c;通过访问构造的Url可以获取敏感信息 漏洞复现 访问漏洞url&#xff1a; /admin/student/studentlist.html?page1成功获取所有用户的账号密码信息 文笔生疏…

遥感目标检测(2)--SCRDet

目录 一、概述 二、三个挑战 三、网络结构 1、SF-Net 2、MDA-Net&#xff08;Multi-Dimensional Attention Network&#xff09; 3、Rotation Branch 四、损失函数 五、实验 一、概述 SCRDet&#xff08;Towards More Robust Detection for Small,Cluttered and Rotate…

实验数据origin作图使用经验总结

使用Origin绘制实验数据图表时&#xff0c;可以遵循以下经验总结&#xff1a; 选择合适的图表类型&#xff1a; 根据实验数据的性质和目的&#xff0c;选择合适的图表类型&#xff0c;例如散点图、折线图、柱状图、饼图等。确保图表类型能够清晰地展示数据趋势和关系。 规范坐…

jenkins中运行python脚本时,报错:collecting ... collected 0 items

【问题描述】&#xff1a;jenkins在windows环境下运行python脚本时总是报collecting … collected 0 items 【问题定位】&#xff1a;jenkins工作目录和python文件目录不一样导致 【解决办法】&#xff1a;需要先把路径切换到项目目录下&#xff0c;再进行运行xxx.py文件&…

zabbix钉钉报警

登录钉钉客户端,创建一个群,把需要收到报警信息的人员都拉到这个群内. 然后点击群右上角 的"群机器人"->"添加机器人"->"自定义", 记录该机器人的webhook值。 添加机器人 在钉钉群中&#xff0c;找到只能群助手 添加机器人 选择自定义机…

springboot 根据不同环境 ,配置不同日志输出路径

logback-spring.xml<?xml version"1.0" encoding"UTF-8"?> <!-- scan&#xff1a;当此属性设置为true时&#xff0c;配置文件如果发生改变&#xff0c;将会被重新加载&#xff0c;默认值为true。 scanPeriod&#xff1a;设置监测配置文件是否有…

《零基础入门学习Python》第057讲:论一只爬虫的自我修养5:正则表达式

如果你在课后有勤加练习&#xff0c;那么你对于字符串的查找应该是已经深恶痛绝了&#xff0c;你发现下载一个网页是很容易的&#xff0c;但是要在网页中查找到你需要的内容&#xff0c;那就是困难的&#xff0c;你发现字符串查找并没有你想象的那么简单&#xff0c;并不是说直…

macOS mysql 8.0 忘记密码

╰─➤ mysql -V mysql Ver 8.0.33 for macos13.3 on arm64 (Homebrew)mysql.server status mysql.server stopskip-grant-tables 启动mysql ─➤ /opt…

Elemui表单合并

原代码形式 <template><el-table:data"tableData"borderstyle"width: 100%"><el-table-columnprop"date"label"日期"width"180"></el-table-column><el-table-columnprop"name"label…

Qt6 Qt Quick UI原型学习QML第五篇

文章目录 效果QML语法父文件 MyQML.qmlQML语法子文件 TLineEditV1.qmlQML语法子文件 TTextEdit.qml 效果 QML语法父文件 MyQML.qml import QtQuick 2.12 import QtQuick.Window 2.12 import QtQuick.Controls 2.12Window {id: windowvisible: truewidth: 600height: 600title:…

【043】解密C++ STL:深入理解并使用 list 容器

解密C STL&#xff1a;深入理解并使用list容器 引言一、list 容器概述二、list容器常用的API2.1、构造函数2.2、数据元素插入和删除操作2.3、大小操作2.4、赋值操作2.5、数据的存取2.6、list容器的反转和排序 三、使用示例总结 引言 &#x1f4a1; 作者简介&#xff1a;一个热爱…

详细解析python视频选择--【思维导图知识范围】

C ,JAVA JAVAWEB ,微信小程序等 都有视频选择的分析。 语言视频选择收录专辑链接C张雪峰推荐选择了计算机专业之后-在大学期间卷起来-【大学生活篇】JAVA黑马B站视频JAVA部分的知识范围、学习步骤详解JAVAWEB黑马B站视频JAVAWEB部分的知识范围、学习步骤详解SpringBootSpringB…