[flink] flink macm1pro 快速使用从零到一

news2025/1/22 16:50:00

文章目录

  • 快速使用

快速使用

  1. 打开 https://flink.apache.org/downloads/ 下载 flink

因为书籍介绍的是 1.12版本的,为避免不必要的问题,下载相同版本

image.png
image.png

  1. 解压
 tar -xzvf flink-1.11.2-bin-scala_2.11.tgz

image.png

  1. 启动 flink
./bin/start-cluster.sh

image.png

  1. 打开 flink web 页面 localhost:8081

image.png

  1. 编写结合 Kafka 词频统计程序

具体参考 https://weread.qq.com/web/reader/51032ac07236f8e05107816k1f032c402131f0e3dad99f3?

package org.example;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class WordCountKafkaInStdOut {

    public static void main(String[] args) throws Exception {

        // 设置Flink执行环境 
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka参数 
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "flink-group");
        String inputTopic = "Shakespeare";
        String outputTopic = "WordCount";

        // Source 
        FlinkKafkaConsumer<String> consumer =
            new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(),
                properties);
        DataStream<String> stream = env.addSource(consumer);

        // Transformation 
        // 使用Flink  API对输入流的文本进行操作 
        // 按空格切词、计数、分区、设置时间窗口、聚合 
        DataStream<Tuple2<String, Integer>> wordCount = stream
            .flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {
                String[] tokens = line.split("\\s");
                // 输出结果  
                for (String token : tokens) {
                    if (token.length() > 0) {
                        collector.collect(new Tuple2<>(token, 1));
                    }
                }
            })
            .returns(Types.TUPLE(Types.STRING, Types.INT))
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .sum(1);

        // Sink 
        wordCount.print();

        // execute 
        env.execute("kafka streaming word count");

    }
} 
  1. 打包应用(当然在这之前需要本地调试一下,至少得运行通吧😄)
  2. 使用Flink提供的命令行工具flink,将打包好的作业提交到集群上。命令行的参数 --class 用来指定哪个主类作为入口。
./bin/flink run --class org.example.WordCountKafkaInStdOut xxtarget/flink_study-1.0-SNAPSHOT.jar

class 建议直接拷贝引用
image.png

  1. web 页面查看作业提交成功

image.png

  1. kafka 生产者随便发点消息

image.png

  1. 查看作业日志,词频统计结果

image.png
image.png

  1. 关闭 flink
./bin/stop-cluster.sh

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1552488.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【JavaSE】String类详解

目录 前言 1. 什么是String类 1.1 String的构造 1.2 String类的基本操作&#xff1a;打印、拼接、求字符串长度 2. String类的常用方法 2.1 字符串查找 2.2 字符串替换 2.3 字符串拆分 2.4 字符串截取 2.5 字符串和其他类型的转换 2.6 去除字符串左右两边的空格 3.…

大模型 智能体 智能玩具 智能音箱 构建教程 wukong-robot

视频演示 10:27 一、背景 继上文《ChatGPT+小爱音响能擦出什么火花?》可以看出大伙对AI+硬件的结合十分感兴趣,但上文是针对市场智能音响的AI植入,底层是通过轮询拦截,算是hack兼容,虽然官方有提供开发者接口,也免不了有许多局限性(比如得通过特定指令唤醒),不利于我…

计算机网络——29ISP之间的路由选择:BGP

ISP之间的路由选择&#xff1a;BGP 层次路由 一个平面的路由 一个网络中的所有路由器的地位一样通过LS&#xff0c;DV&#xff0c;或者其他路由算法&#xff0c;所有路由器都要知道其他所有路由器&#xff08;子网&#xff09;如何走所有路由器在一个平面 平面路由的问题 …

JavaEE初阶Day 4:多线程(2)

目录 Day4&#xff1a;多线程&#xff08;2&#xff09;1. catch语句2. sleep的处理3. Thread3.1 Thread构造方法3.2 Thread的属性3.2.1 ID3.2.2 优先级3.2.3 后台线程3.2.4 存活3.2.5 start3.2.6 中断3.2.6.1 控制线程结束代码3.2.6.2 interrupt和isInterrupted Day4&#xff…

学习笔记——微信小程序读取当前时间

<view class"box"><text>日期:</text><view class"date">{{obtaindate}}</view></view> wxml中定义了一个文本元素&#xff0c;通过{{obtaindate}}获取js页面传递的日期数据 data:{obtaindate:"" }, onlo…

公链角逐中突围,Solana 何以成为 Web3 世界的流量焦点?

在众多区块链公链中&#xff0c;Solana 凭借其创纪录的处理速度和极低的交易费用&#xff0c;成为了众多开发者和投资者的宠儿。就像网络上流行的那句话所说&#xff1a;“Why slow, when you can Solana?”&#xff0c;Solana 正以它的速度和强大的生态系统&#xff0c;重新定…

nacos的各种类型的配置文件 yml 、json、 Properties、 text 等文件类型 发生变化怎么热更新,实现实时监听nacos配置文件变化

本文用的是 Nacos作为配置中心注册监听器方法 实现热更新 nacos 配置文件 从而不用重启项目 依赖、工具类 这边就不写了 因为项目用的是 Json 类型的配置文件 所以下文 主要是对json文件进行实现 别的文件大同小异 先说扯淡的东西 在nacos 的配置文件中 dataId 这两种声明 是…

Postman传对象失败解决

文章目录 情景复现解决方案总结 情景复现 postman中调用 debug发现pId传入失败 分析解释&#xff1a; 实体类中存在pId、uid和num字段 controller层将GoodsCar作为请求体传入 解决方案 当时觉得很奇怪&#xff0c;因为uid和num可以被接收&#xff0c;而pId和num的数据类型相…

图腾柱PFC:HP1010为您的电动两轮车之旅提供绿色,高效,安全的动力

电动两轮车不仅为当今生活提供了便利&#xff0c;更是一种健康和绿色的出行方式。想象一下&#xff0c;在经过一整晚的充分休息&#xff0c;骑上爱车&#xff0c;满血复活的准备开始新的一天。您会愿意带着如何给心爱的两轮车充电的担心开始这一天吗&#xff1f; 随着越来越…

HackTheBox-Machines--Legacy

文章目录 1 端口扫描2 测试思路3 445端口漏洞测试4 flag Legacy 测试过程 1 端口扫描 nmap -sC -sV 10.129.227.1812 测试思路 目标开启了135、139、445端口&#xff0c;445 SMB服务存在很多可利用漏洞&#xff0c;所以测试点先从445端口开始。而且在Nmap扫描结果中&#xff0c…

Unity 窗口化设置

在Unity中要实现窗口化&#xff0c;具体设置如下&#xff1a; 在编辑器中&#xff0c;选择File -> Build Settings。在Player Settings中&#xff0c;找到Resolution and Presentation部分。取消勾选"Fullscreen Mode"&#xff0c;并选择"Windowed"。设…

Unity2018发布安卓报错 Exception: Gradle install not valid

Unity2018发布安卓报错 Exception: Gradle install not valid Exception: Gradle install not valid UnityEditor.Android.GradleWrapper.Run (System.String workingdir, System.String task, System.Action1[T] progress) (at <c67d1645d7ce4b76823a39080b82c1d1>:0) …

通用指南-营销和设计中的增强现实(AR)

原文作者&#xff1a;Superside 翻译&#xff1a;数字化营销工兵 --- 经典万字长文&#xff0c;权威解读&#xff0c;分享经典&#xff0c;预计阅读完需要30分钟&#xff0c;建议收藏&#xff01; 目录 一、引言 为什么要尝试AR AR到底是什么&#xff1f;营销人员和创意人…

网络工程师实验命令(华为数通HCIA)

VRP系统的基本操作 dis version #查看设备版本信息 sys #进入系统视图 system-name R1 #改设备名字为R1进入接口配置IP地址 int g0/0/0 ip address 192.168.1.1 255.255.255.0 #配置接口地址为192.168.1.1/255.255.255.0 ip address 192.168.1.2 24 sub #此…

Intellij IDEA安装配置Spark与运行

目录 Scala配置教程 配置Spark运行环境 编写Spark程序 1、包和导入 2、定义对象 3、主函数 4、创建Spark配置和上下文 5、定义输入文件路径 6、单词计数逻辑 7、输出结果 8、完整代码&#xff1a; Scala配置教程 IDEA配置Scala&#xff1a;教程 配置Spark运行环境 …

持续集成流程主要系统构成介绍(CI)

目录 一、概述 二、版本控制系统 2.1 概述 2.2 版本控制系统使用流程示意图 2.3 版本控制软件划分 2.3.1 集中式版本控制软件 2.3.2 分布式版本控制软件 2.3.3 总结 2.4 常用版本控制软件介绍 三、编译构建系统 3.1 概述 3.2 编译构建流程示意图 3.3 列举Java 源码…

uniApp使用XR-Frame创建3D场景(5)材质贴图的运用

上一篇讲解了如何在uniApp中创建xr-frame子组件并创建简单的3D场景。 这篇我们讲解在xr-frame中如何给几何体赋予贴图材质。 先看源码 <xr-scene render-system"alpha:true" bind:ready"handleReady"><xr-node><xr-assets><xr-asse…

向量法求点在直线上的投影

已知直线上两点a、b和直线外一点p&#xff0c;求p在直线ab上的投影点。 根据《计算几何之 点在直线上的投影 代码模板与证明》一文中所述&#xff0c;p的投影点p’就是a x ⃗ \vec x x &#xff08;直线的点向式&#xff09;&#xff0c;所以我们只要求出 x ⃗ \vec x x 就能…

基于单片机的二维码LCD显示控制设计

**单片机设计介绍&#xff0c;基于单片机的二维码LCD显示控制设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机的二维码LCD显示控制设计是一个集硬件、软件与通信于一体的综合性项目。此设计的主要目标是实现单片机…

AI新工具 又一个开源大模型DBRX击败GPT3.5;根据音频和图像输入生成会说话、唱歌的动态视频

✨ 1: AniPortrait 腾讯开源&#xff1a;根据音频和图像输入生成会说话、唱歌的动态视频 AniPortrait 是个先进的框架&#xff0c;专门用来生成高质量的、由音频和参考肖像图片驱动的动画。如果你有视频&#xff0c;也可以用来实现面部的再现&#xff08;Face reenactment&am…