flinksql kafka到mysql累计指标练习

news2024/10/6 2:21:33

flinksql 累计指标练习

数据流向:kafka ->kafka ->mysql

模拟写数据到kafka topic:wxt中

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) throws Exception {
        // 设置kafka服务器地址和端口号
        String kafkaServers = "localhost:9092";

        // 设置producer属性
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaServers);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建Kafka producer对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // 发送消息
        String topic = "wxt";
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("id", 9);
        jsonObject.put("name", "王大大");
        jsonObject.put("age", 11);

        // 将JSON对象转换成字符串
        String jsonString = jsonObject.toString();

        // 输出JSON字符串
        System.out.println("JSON String: " + jsonString);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, jsonString);
        producer.send(record);

        // 关闭producer
        producer.close();
    }
}

kafka topic :wxt1
在这里插入图片描述

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;


public class KafkaToMysqlJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 定义Kafka连接属性
        String kafkaBootstrapServers = "localhost:9092";
        String kafkaTopic = "wxt";
        String groupId = "wxt1";

        // 注册Kafka表
        tEnv.executeSql("CREATE TABLE kafka_table (\n" +
                "  id INT,\n" +
                "  name STRING,\n" +
                "  age INT,\n" +
                "  proctime as PROCTIME()\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = '" + kafkaTopic + "',\n" +
                "  'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +
                "  'properties.group.id' = '" + groupId + "',\n" +
                "  'format' = 'json',\n" +
                "  'scan.startup.mode' = 'earliest-offset'\n" +
                ")");

        // 注册Kafka表
        // latest-offset
        //earliest-offset
        tEnv.executeSql("CREATE TABLE kafka_table2 (\n" +
                "  window_start STRING,\n" +
                "  window_end STRING,\n" +
                "  name STRING,\n" +
                "  age INT\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'wxt2',\n" +
                "  'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +
                "  'properties.group.id' = 'kafka_table2',\n" +
                "  'format' = 'json',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'value.format' = 'csv'\n" +
                ")");


        tEnv.executeSql("CREATE TABLE mysql_sink_table (\n" +
                "  window_start String,\n" +
                "   window_end String,\n" +
                "    name String,\n" +
                "    age INT\n" +
                ") WITH (\n" +
                "   'connector' = 'jdbc',\n" +
                "   'url' = 'jdbc:mysql://localhost:3306/tests?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',\n" +
                "   'username' = 'root',\n" +
                "   'password' = '12345678',\n" +
                "   'table-name' = 'leiji_age'\n" +
                ")");

         tEnv.executeSql("insert into kafka_table2 select cast(window_start as string) as window_start,cast(window_end as string) as window_end,name,sum(age) as age\n" +
                 "from TABLE( CUMULATE( TABLE kafka_table, DESCRIPTOR(proctime), INTERVAL '20' SECOND, INTERVAL '1' DAY))\n" +
                 "group by  window_start,window_end,name");

        tEnv.executeSql("insert into mysql_sink_table select window_start,window_end,name,age from kafka_table2");

        env.execute("KafkaToMysqlJob");
    }
}

kafka topic :wxt2
在这里插入图片描述
mysql结果数据:
在这里插入图片描述
pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flinksql</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.binary.version>2.12</scala.binary.version>
        <flink.version>1.14.3</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!-- <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!-- <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!-- <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <!-- <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.15</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.15</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-math3</artifactId>
            <version>3.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <version>8.0.31</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.11.0</version>
        </dependency>

        <dependency>
            <groupId>org.antlr</groupId>
            <artifactId>antlr-runtime</artifactId>
            <version>3.5.2</version>
        </dependency>


        <dependency>
            <groupId>org.apache.thrift</groupId>
            <artifactId>libfb303</artifactId>
            <version>0.9.3</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.15</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <configuration>
                    <!-- put your configurations here -->
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.10</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1138732.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构—线性表(下)

文章目录 6.线性表(下)(4).栈与队列的定义和ADT#1.ADT#2.栈的基本实现#3.队列的形式#4.队列的几种实现 (5).栈与队列的应用#1.栈的应用i.后缀表达式求值ii.中缀表达式转后缀表达式 #2.队列的应用 (6).线性表的其他存储方式#1.索引存储#2.哈希存储i.什么是哈希存储ii.碰撞了怎么…

windows 安装小乌龟

这是什么 这里简单描述一下在windows上如何安装GIT代码管理工具和使用小乌龟版本来调用GIT&#xff0c;并且配置一下git相关信息&#xff0c;可以使用小乌龟来操作代码。也有一些常规git使用方法。 需要的资源 Git-2.42.0-64-bit.exe&#xff08;这个是git代码管理工具&…

Redis集群搭建真的很简单

背景 很多小伙伴在学习redis的时候都只在windows上搭建过redis&#xff0c;然后工作之后也只是在应用redis。那么redis在Linux上如何搭建呢&#xff1f;集群如何搭建呢&#xff1f;本文不讲原理&#xff0c;只讲实际操作。真的很简单。 环境 Linux-Ubuntu 20.04.6 LTS x86_6…

Flink Hive Catalog操作案例

在此对Flink读写Hive表操作进行逐步记录&#xff0c;需要指出的是&#xff0c;其中操作Hive分区表和非分区表的DDL有所不同&#xff0c;以下分别记录。 基础环境 Hive-3.1.3 Flink-1.17.1 基本操作与准备 1、上传依赖jar包到flink/lib目录下 cp flink-sql-connector-hive-…

C++设计模式_14_Facade门面模式

本篇介绍的Facade门面模式属于“接口隔离”模式的一种&#xff0c;以下进行详细介绍。 文章目录 1. “接口隔离”模式1. 1 典型模式 2. 系统间耦合的复杂度3. 动机(Motivation)4. 模式定义5. Facade门面模式的代码实现6. 结构7. 要点总结8. 其他参考 1. “接口隔离”模式 在组…

聚焦养老主业,平安养老险构建一体化养老生态圈

10月23日&#xff0c;是我们国家的传统节日重阳节&#xff0c;这一天也在1989年被我国政府正式定为“中国老人节”&#xff0c; 按照我国将60岁及以上人群定义为老年人的标准来看&#xff0c;我国老年人的占比已经达到了20%。根据国家统计局2022年度统计公报数据显示&#xff0…

【软件安装】Windows系统中使用miniserve搭建一个文件服务器

这篇文章&#xff0c;主要介绍如何在Windows系统中使用miniserve搭建一个文件服务器。 目录 一、搭建文件服务器 1.1、下载miniserve 1.2、启动miniserve服务 1.3、指定根目录 1.4、开启访问日志 1.5、指定启动端口 1.6、设置用户认证 1.7、设置界面主题 &#xff08;…

【OJ比赛日历】快周末了,不来一场比赛吗? #10.29-11.04 #7场

CompHub[1] 实时聚合多平台的数据类(Kaggle、天池…)和OJ类(Leetcode、牛客…&#xff09;比赛。本账号会推送最新的比赛消息&#xff0c;欢迎关注&#xff01; 以下信息仅供参考&#xff0c;以比赛官网为准 目录 2023-10-29&#xff08;周日&#xff09; #3场比赛2023-10-30…

竞赛选题 深度学习卷积神经网络垃圾分类系统 - 深度学习 神经网络 图像识别 垃圾分类 算法 小程序

文章目录 0 简介1 背景意义2 数据集3 数据探索4 数据增广(数据集补充)5 垃圾图像分类5.1 迁移学习5.1.1 什么是迁移学习&#xff1f;5.1.2 为什么要迁移学习&#xff1f; 5.2 模型选择5.3 训练环境5.3.1 硬件配置5.3.2 软件配置 5.4 训练过程5.5 模型分类效果(PC端) 6 构建垃圾…

布隆过滤器(Bloom Filter)初学习

目录 1、布隆过滤器是什么 2、布隆过滤器的优缺点 3、使用场景 4、⭐基于Redis的布隆过滤器插件安装 4.1 下载布隆过滤器 4.2 创建文件夹并上传文件 4.3 安装gcc 4.4 解压RedisBloom压缩包 4.5 在解压好的文件夹下输入make 4.6 将编译的好的插件拷贝到docker redis容…

Spring Cloud之API网关(Gateway)

目录 API网关 好处 解决方案 Gateway 简介 特征 核心概念 Route(路由) Predicate(断言) Filter(过滤器) 工作流程 Route(路由) 路由配置方式 1.yml配置文件路由 2.bean进行配置 3.动态路由 动态路由 Predicate(断言) 特点 常见断言 示例 Filter(过滤器) …

智能汽车安全:保护车辆远程控制和数据隐私

第一章&#xff1a;引言 智能汽车技术的快速发展为我们带来了许多便利&#xff0c;但也引发了一系列安全和隐私问题。本文将探讨智能汽车安全的重要性&#xff0c;以及如何保护车辆远程控制和数据隐私。 第二章&#xff1a;智能汽车远程控制 智能汽车的远程控制是一项令人兴…

解释一下React中的钩子(hooks),例如useState和useEffect。

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

第二章 基于模型的系统工程 P2|系统建模语言SysML实用指南学习

仅供个人学习记录 建模原则 模型与MBSE方法定义 模型描述的是domain of interest MBSE方法是指以系统模型作为主要制品来实现全部或者部分的系统工程过程 系统建模目的 模型的用途在系统的整个开发全生命周期中是不断完善的&#xff0c;是通过持续完整的需求来描述的。 …

Vue 父子组件传参、插槽

setup 函数中有两个主要的参数&#xff1a;props、context 。 props 用于接收父组件传递过来的数据&#xff0c;父传子。 context 指的是 setup 的上下文&#xff0c;它有三个属性&#xff1a;attrs、slots、emit 。 attrs 用于&#xff1a;当父组件传递过来的数据&#xff…

设计模式(18)桥接模式

一、介绍&#xff1a; 1、定义&#xff1a;桥接(Bridge)模式属于结构型设计模式。通过提供抽象化和实现化之间的桥接结构&#xff0c;来实现二者的解耦。把抽象(abstraction)与行为实现(implementation)分离开来&#xff0c;从而可以保持各部分的独立性以及应对它们的功能扩展…

游戏研发的解决方案有哪些?

游戏研发的解决方案可以根据不同的需求和情境而有所不同&#xff0c;以下是一些常见的游戏研发解决方案&#xff1a; 游戏引擎&#xff1a; 游戏引擎是游戏研发的基础&#xff0c;它提供了开发游戏所需的核心功能&#xff0c;如图形渲染、物理引擎、音效管理、动画等。一些流行…

css四种导入方式

1 行内样式 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title> </head> <body> <h1 style"color: blue">我是标题</h1> </body> </htm…

简单电子琴设计verilog蜂鸣器8音阶,视频/代码

名称&#xff1a;简单电子琴设计verilog 软件&#xff1a;QuartusII 语言&#xff1a;Verilog 代码功能&#xff1a; 简易电子琴电路 1、输入为8个按键&#xff0c;每个按键对应一个音阶 2、输出为speaker蜂鸣器&#xff0c;当其中一直按键按下时&#xff0c;输出特定频率…

k8s 部署nginx前端

1.构建docker镜像&#xff0c;k8s拉取镜像运行 ​​​​​​​docker自己安装 [rootmaster1 ~]# docker pull nginx:1.24.0 [rootmaster1 ~]# mkdir k8s-nginx [rootmaster1 ~]# cd k8s-nginx [rootmaster1 k8s-nginx]# vim nginx.conf server_tokens off;server {listen …