Flink DataStream之从Kafka读数据

news2024/10/6 18:34:37
  • 搭建Kafka

参考:centos7下kafka2.12-2.1.0的安装及使用_kafka2.12-2.1.0 steam_QYHuiiQ的博客-CSDN博客

  •  启动zookeeper
[root@localhost kafka_2.12-2.8.1]# pwd
/usr/local/wyh/kafka/kafka_2.12-2.8.1
[root@localhost kafka_2.12-2.8.1]# ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
  • 启动kafka
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-server-start.sh config/server.properties &
  • 查看进程

  •  创建topic
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-flink-topic
  • 查看topic列表
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181                                             test-flink-topic
  • 导入pom依赖
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
</dependency>
  • 新建类
package test01;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TestReadKafka {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //从kafka读
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()//这里的泛型需要指定从kafka读数据的数据类型
                .setBootstrapServers("192.168.126.128:9092")//设置kafka server
                .setGroupId("test-consumer-group")//设置consumer groupid
                .setTopics("test-flink-topic")//设置要读取数据的topic
                .setValueOnlyDeserializer(new SimpleStringSchema())//从Kafka读数据时需要进行反序列化,由于kafka的数据一般是存在value中的,不是key中,所以这里我们使用的序列化器是只对Value进行反序列化。这里的参数是用的String类型的反序列化器,因为在前面build时我们设置了要读取的数据类型是String类型。
                .setStartingOffsets(OffsetsInitializer.latest())//设置flink读取kafka数据的读取策略,这里设置的是从最新数据消费
                .build();

        streamExecutionEnvironment.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource")
                        .print();
        streamExecutionEnvironment.execute();
    }
}
  • 启动程序
  • 在终端向kafka生产数据,同时观察程序控制台flink的读取情况
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-console-producer.sh --broker-list 192.168.126.128:9092 --topic test-flink-topic

 如图说明flink从kafka成功读取数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/718093.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Windows 环境下Docker 安装伪分布式 Hadoop

Windows 环境下Docker 安装伪分布式 Hadoop 1、环境2、拉取镜像3、启动容器4、预备操作4.1安装vim4.1.1 更新软件包信息4.1.2 安装vim 4.2 换源4.2.1 备份镜像源设置文件4.2.2 编辑镜像源设置文件4.2.3 重新更新一下软件包信息 4.3 同步上海时间4.3.1 安装 tzdata4.3.2 设置 tz…

[AJAX]原生AJAX——服务端如何发出JSON格式响应,客户端如何处理接收JSON格式响应

服务端代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title> &l…

IPV6使用越来越广,您会配置吗?

前面针对IPv6写过一篇文章&#xff0c;但是好多网友反映没有读懂&#xff0c;今天再给大家把内容浓缩一下&#xff0c;教给大家如何配置。 IPV6的推出主要是为了解决地址空间的不足&#xff0c;从而进一步的促进互联网的发展。IPV6地址空间大到惊人&#xff0c;有人比喻地球上…

Rust in Action笔记 第十章 进程、线程和容器

Rust的闭包也就是类似lambda表达式&#xff0c;大致的格式是|a, b| {...} &#xff0c;竖线里面的是参数&#xff0c;花括号里面的是函数逻辑&#xff1b;通过thread::spawn(|| {})产生的线程&#xff0c;括号内的参数实际上就是一个闭包&#xff0c;因为创建新的线程不需要参数…

【论文阅读】StyleganV1 算法理解

文章目录 为什么提出&#xff1f;具体是怎么做的&#xff1f;1.解耦的思想&#xff08;对应文章第四章4.Disentanglement studies&#xff09;1.1 感知路径长度&#xff08;对应4.1Perceptual path length&#xff09;1.2 线性可分离性&#xff08;对应4.2Linear separability&…

Chrome DevTools、Vue DevTools、vs和DevTools调试

目录 Elements DOM节点&#xff08;增删改&#xff09;调试 Styles DOM结构 增删属性 模拟元素的伪状态&#xff0c;方便调试 Computed Layout Event Listeners Network Application 资源列表&#xff08;可改&#xff09;本地存储Cookie、WebStorage&#xff08;loca…

人工智能学习07--pytorch22--目标检测:YOLO V3 SPP

视频链接&#xff1a; https://www.bilibili.com/video/BV1t54y1C7ra/?vd_sourceb425cf6a88c74ab02b3939ca66be1c0d yolov3 spp spp&#xff1a;空间金字塔池化 trick&#xff1a;实现的小技巧&#xff0c;方法。&#xff08; up&#xff1a;Bag of Freebies里有很多trick&…

【C++学习笔记】C++中的异常概念异常的使用注意事项异常的优缺点

异常 1 C语言传统的处理异常的方式2 C异常的概念3 异常的使用以及注意事项3.1 异常的简单使用3.2 使用异常的注意事项3.3 异常的重新抛出3.4 异常规范3.5 异常安全 4 C标准库的异常体系5 异常的优缺点6 总结 1 C语言传统的处理异常的方式 C语言传统的错误处理机制&#xff1a;…

https安全传输原理:

内容来自思学堂&#xff1a; 信息裸奔——>对称加密——>非对称加密——>非对称和对称加密——>权威第三方机构CA数字签名

C++图形开发(5):逐渐变大(小)的小球

文章目录 1.逐渐变大的小球2.逐渐变小的小球 今天所讲的逐渐变大&#xff08;小&#xff09;的小球实际上就是基于上次的缓慢下落的小球的基础上的&#xff08;下落的小球详见&#xff1a;C图形开发&#xff08;4&#xff09;&#xff1a;下落的小球&#xff09; 1.逐渐变大的…

蓝桥杯专题-试题版含答案-【数数小木块】【精挑细选】【国王的魔镜】【字符串逆序输出】

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例点击跳转>软考全系列点击跳转>蓝桥系列 &#x1f449;关于作者 专注于Android/Unity和各种游…

每日浅读SLAM论文——简析Cartographer

文章目录 二维激光SLAM简单框架前端scan matchingSubmaps构建 后端分支定界优化csm&#xff08;CorrelativeScanMatch&#xff09; 代码实现框架 Cartographer&#xff08;论文名&#xff1a;Real-Time Loop Closure in 2D LIDAR SLAM&#xff09;是目前二维激光SLAM中应用最广…

postman不能进行并发测试

1.按照网上文档的配置 2.在登录接口里睡眠5s&#xff0c;如果是并发的话&#xff0c;所有的请求都会一起睡眠5s 3.测试结果&#xff1a;请求是每隔5s串行执行的

Linux定时任务-定时执行Shell脚本

主要是使用Linux的crontab工具来实现的&#xff0c;有两个方法&#xff0c;一个放在contab 列表里面&#xff0c;另一个是放在contab文件里面&#xff08;其实原理是一致的&#xff09;。 crontab 列表 cd /tmp # 创建一个shell脚本 vim hello.sh #!/bin/bash echo "hel…

钳形表校准装置单匝法校准钳形电流表

交流大电流源输出标准电流信号到直径为1 m的单匝半圆铜环&#xff0c;电流输出铜环分为大小铜环&#xff0c;适配于校准不同钳口大小的钳形电流表。 方案优势&#xff1a;完全符合《JJF 1075-2015 钳形电流表校准规范》中主要推荐的单匝法校准钳形电流表的要求&#xff0c;操控…

全景感知—让视图上云更便捷,存储更安全

6月15日&#xff0c;由腾讯云主办的“数实共进产业行浙江站”在杭州圆满开展&#xff0c;活动中腾讯云存储高级产品经理张泽南进行了“全景感知&#xff0c;让视图上云更便捷&#xff0c;存储更安全”主题演讲&#xff0c;与行业伙伴深度交流新一代视图计算解决方案&#xff0c…

Redis中的介绍和安装教程(配置文件)

1.Redis简单的介绍 redis是一种键值对的NoSql数据库&#xff0c;这里有两个关键字&#xff1a; 键值对 Nosql 其中键值型&#xff0c;是指Redis中存储的数据都是以key.value对的形式多种多样&#xff0c;可以实字符串、数值、甚至json&#xff0c;可以参考HashMap 然后NoSq…

TiDB(5):TiDB-读取历史数据

接下来介绍 TiDB 如何读取历史版本数据&#xff0c;包括具体的操作流程以及历史数据的保存策略。 1 功能说明 TiDB 实现了通过标准 SQL 接口读取历史数据功能&#xff0c;无需特殊的 client 或者 driver。当数据被更新、删除后&#xff0c;依然可以通过 SQL 接口将更新/删除前…

MySQL容器无法输入或显示中文异常解决

如果使用docker创建了MySQL容器&#xff0c;但是进入容器后发现无法输入中文&#xff0c;也就是在插入数据的时候中文直接显示为空&#xff0c;数据表里的中文也显示为空&#xff0c;解决方法是&#xff1a; 1&#xff0c;临时方法 该方法只在每一次进入容器的命令上添加参数&a…

Web服务器群集:使用Haproxy搭建Web集群

目录 一、理论 1.Haproxy集群 2.常见的web集群调度器 3.三种web集群调度器的区别 4.下载安装 二、部署Haproxy集群 1.部署 2.重新定义Haproxy集群的日志 三、实验 1.部署Haproxy集群 四、问题 1.nginx编译安装与yum安装的网页配置路径 五、总结 一、理论 1.Hapro…