大数据-玩转数据-Sink到Kafka

news2024/10/7 12:18:40

一、添加Kafka Connector依赖

pom.xml 中添加

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>${kafka.version}</version>
    </dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.75</version>
</dependency>

二、启动Kafka集群

启动zookeeper

./bin/zookeeper-server-start.sh config/zookeeper.properties

启动 kafka

./bin/kafka-server-start.sh config/server.properties

启动一个消费者

./bin/kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic topic_sensor

三、Flink sink 到 kafka

package com.lyh.flink06;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

public class SinkToKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource<String> dataStreamSource = env.fromElements("a-----------------------------", "b*****************************");
        DataStreamSource<Integer> integerDataStreamSource = env.fromElements(1, 2);
        ConnectedStreams<String, Integer> datain = dataStreamSource.connect(integerDataStreamSource);
        datain.getFirstInput().addSink(new FlinkKafkaProducer<String>("hadoop100:9092","topic_sensor",new SimpleStringSchema()));
        env.execute();
    }
}

运行程序后看到消费者消费成功
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/851338.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IMV4.0

背景内容&#xff1a; 经历了多个版本&#xff0c;基础内容在前面&#xff0c;可以使用之前的基础环境&#xff1a; v1&#xff1a; https://blog.csdn.net/wtt234/article/details/132139454 v2&#xff1a; https://blog.csdn.net/wtt234/article/details/132144907 v3&#…

Docker mysql+nacos单机部署

docker 网络创建 由于nacos需要访问mysql的数据&#xff0c;因此mysql容器和nacos容器之间需要进行通信。容器间通信有很多方式&#xff0c;在这里采用同一网络下的方式进行实现。因此需要创建网络。创建网络的命令如下&#xff1a; docker network create --driver bridge n…

致远OA任意管理员登录

真的&#xff0c;如果痛苦不能改变生存&#xff0c;那还不如平静地将自己毁灭。毁灭。一切都毁灭了&#xff0c;只有生命还在苟延残喘。这样的生命还有什么存在的价值&#xff1f; 漏洞复现 访问漏洞url 构造payload POST /seeyon/thirdpartyController.do HTTP/1.1methoda…

【数据结构与算法】平衡二叉树(AVL树)

平衡二叉树&#xff08;AVL树&#xff09; 给你一个数列{1,2,3,4,5,6}&#xff0c;要求创建二叉排序树&#xff08;BST&#xff09;&#xff0c;并分析问题所在。 BST 存在的问题分析&#xff1a; 左子树全部为空&#xff0c;从形式上看&#xff0c;更像一个单链表。插入速度…

pg实现月累计

获取每月累计数据&#xff1a; ​​​ SELECT a.month, SUM(b.total) AS total FROM ( SELECT month, SUM(sum) AS total FROM ( SELECT to_char(date("Joinin"),YYYY-MM) AS month , COUNT(*) AS sum FROM "APP_HR_Staff_Basic_Info" GROUP BY month ) …

Vue中使用uuid生成唯一ID(脚手架创建自带的)

1.utils 说明&#xff1a;一般封装工具函数。 // 单例模式 import { v4 as uuidv4 } from uuid; // 要生成一个随机的字符串&#xff0c;且每次执行不能发生变化 // 游客身份还要持久存储 function getUUID(){// 先从本地获取uuid&#xff0c;本地存储里面是否有let uuid_tok…

淘宝整店商品如何批量获取?获取淘宝店铺所有商品接口item_search_shop

在竞争日益激烈的电商行业&#xff0c;不少商家出于以下的考虑&#xff0c;想要实现一键批量获取淘宝店铺的所有商品。 竞争分析&#xff1a;通过获取某个店铺内的所有商品信息&#xff0c;可以对竞争对手的产品进行全面的了解和分析。可以了解到对手的产品种类、价格、销量等情…

git和github学习

一、什么是git和github? 二、学会使用github desktop应用程序 初始使用&#xff1a; 一开始我们是新账户&#xff0c;里面是没有仓库的&#xff0c;需要手动创建一个仓库。此时&#xff0c;这个仓库是创建在本地仓库里面&#xff0c;需要用到push命令&#xff08;就是那个pub…

【C++进阶】:异常

异常 一.异常的概念二.基本使用三.异常重新抛出四.异常规范五.异常安全六.异常的优缺点 一.异常的概念 c语言 传统的错误处理机制&#xff1a; 1. 终止程序&#xff0c;如assert&#xff0c;缺陷&#xff1a;用户难以接受。如发生内存错误&#xff0c;除0错误时就会终止程序。…

PCL 大规模点云显示

文章目录 一、什么是LOD?二、基本思想三、LOD算法常见的实现方式四、PCL库中LOD的实现官方代码结果展示参考文献:随着三维激光扫描技术的发展,我们目前能采集到 海量的点云数据,但是如何将千万甚至上亿级别的点云进行流畅显示,一直以来都是困扰业界的一大难题。尤其在早期…

volte端到端问题分析(一)

1、MME专载保持功能验证 **描述&#xff1a;**当无线环境较差时&#xff0c;有可能由于“Radio_Connection_with_UE_Lost” 原因造成的VoLTE通话掉话&#xff0c;如果UE发生RRC重建成功&#xff0c;手机将不会掉话。 对MME1202进行功能验证&#xff1a;开启后&#xff0c;MME专…

react-virtualized可视化区域渲染的使用

介绍 github地址&#xff1a;https://github.com/bvaughn/react-virtualized 实例网址&#xff1a;react-virtualized如果体积太大&#xff0c;可以参考用react-window。 使用 安装&#xff1a; yarn add react-virtualized。在项目入口文件index.js中导入样式文件&#xff…

Linux下QtCreator勾选Use root user后出现error while loading shared libraries的问题

文章目录 背景解决办法其他解决办法 背景 在linux下调试程序时&#xff0c;有时候需要取得root权限才能连接操作某些设备。 之前我是通过脚本方式 [在QtCreator中先执行自定义命令再执行程序]来进行的。也就是在脚本中取得权限&#xff0c;脚本内容类似这样&#xff1a; echo…

图书管理借阅系统【Java简易版】Java三大特征封装,继承,多态的综合运用

前言 前几篇文章讲到了Java的基本语法规则&#xff0c;今天我们就用前面学到的数组&#xff0c;类和对象&#xff0c;封装&#xff0c;继承&#xff0c;多态&#xff0c;抽象类&#xff0c;接口等做一个图书管理借阅系统。 文章目录 &#x1f947;1.分析图书管理系统要实现的功…

ClickHouse(十五):Clickhouse MergeTree系列表引擎 - AggregatingMergeTree

进入正文前&#xff0c;感谢宝子们订阅专题、点赞、评论、收藏&#xff01;关注IT贫道&#xff0c;获取高质量博客内容&#xff01; &#x1f3e1;个人主页&#xff1a;含各种IT体系技术&#xff0c;IT贫道_Apache Doris,大数据OLAP体系技术栈,Kerberos安全认证-CSDN博客 &…

VMnet0 桥接设置

VMnet0 一定要设置为你的硬件物理网卡&#xff0c;不能设置自动&#xff0c;不然后&#xff0c;网线一断&#xff0c;就再也连不上了。必须重启电脑才能连上&#xff0c;这个问题找了很久才找到。 下面有个hyper-V虚拟网卡&#xff0c;如果选自动的话&#xff0c;物理网卡一掉…

关于MySQL中的binlog

介绍 undo log 和 redo log是由Inno DB存储引擎生成的。 在MySQL服务器架构中&#xff0c;分为三层&#xff1a;连接层、服务层&#xff08;server层&#xff09;、执行层&#xff08;存储引擎层&#xff09; bin log 是 binary log的缩写&#xff0c;即二进制日志。 MySQL…

交叉编译详细版总结

1.交叉编译 交叉编译&#xff1a;在一个平台生成另外一个平台可执行的代码。 编译&#xff1a;在一个平台上生成在该平台上的可执行代码。 C51/32 交叉编译发送在Keil&#xff08;集成环境上面&#xff09;&#xff0c;windows上面编写51/32代码 ,不是在windows上面运行 在…

[QT编程系列-41]:Qt QML与Qt widget 深入比较,快速了解它们的区别和应用场合

目录 1. Qt QML与Qt widget之争 1.1 出现顺序 1.2 性能比较 1.3 应用应用领域 1.4 发展趋势 1.5 QT Creator兼容上述两种设计风格 2. 界面描述方式的差别 3. QML和Widgets之间的一些比较 4. 选择QML和Widgets之间的Qt技术时&#xff0c;可以考虑以下几个因素&#xff…

protobuf中zigzag编码原理

前面两篇博客 varint原理 - 正数的编码和解码_YZF_Kevin的博客-CSDN博客 varint原理 - 负数的编码和解码_YZF_Kevin的博客-CSDN博客 我们分析了varint对正数&#xff0c;负数的编码解码方式&#xff0c;也知道了如果用varint表示负数的坑&#xff0c;那就是负数直接占10个字…