flink单机部署和简单使用

news2025/1/12 13:27:54

flink单机部署

Java版本:1.8.0_45

flink下载:https://archive.apache.org/dist/flink/flink-1.7.2/flink-1.7.2-bin-scala_2.11.tgz

解压安装包:

[root@vm-9f-mysteel-dc-ebc-test03 opt]# tar -zxvf flink-1.7.2-bin-scala_2.11.tgz 
flink-1.7.2/
flink-1.7.2/opt/
flink-1.7.2/opt/flink-table_2.11-1.7.2.jar
flink-1.7.2/opt/flink-metrics-graphite-1.7.2.jar
flink-1.7.2/opt/flink-queryable-state-runtime_2.11-1.7.2.jar
flink-1.7.2/opt/flink-swift-fs-hadoop-1.7.2.jar
flink-1.7.2/opt/flink-metrics-statsd-1.7.2.jar
flink-1.7.2/opt/flink-ml_2.11-1.7.2.jar
flink-1.7.2/opt/flink-s3-fs-hadoop-1.7.2.jar

进入到bin目录,启动flink:

[root@vm-9f-mysteel-dc-ebc-test03 bin]# ./start-cluster.sh

访问web界面:http://192.168.201.143:8081

  实时处理socket流数据

 执行以下命令构建flink项目:

D:\\\apache-maven-3.6.0\\\bin\\\mvn.cmd archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java  -DarchetypeVersion=1.7.2

构建完之后导入至idea,项目结构如下:

 编写实时处理类:

package com.fen;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamingJob {

	public static void main(String[] args) throws Exception {
		//参数检查
		if (args.length != 2) {
			System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
			return;
		}
		String hostname = args[0];
		Integer port = Integer.parseInt(args[1]);

		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		DataStreamSource<String> stream = env.socketTextStream(hostname, port);
		//计数
		SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
				.keyBy(0)
				.sum(1);
		sum.print();
		env.execute("Java WordCount from SocketTextStream Example");
	}
	public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
		@Override
		public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
			String[] tokens = s.toLowerCase().split("\\W+");

			for (String token: tokens) {
				if (token.length() > 0) {
					collector.collect(new Tuple2<String, Integer>(token, 1));
				}
			}
		}
	}
}

这个实时处理作业的source 是 scoket ,slink是print,对实时接收数据中的单词进行个数统计

打包:

 

 把jar包上传至安装了flink的服务器中:

 在linux中安装nc:

[root@vm-9f-mysteel-dc-ebc-test03 bin]# yum install nc

 利于nc启动socket server

[root@vm-9f-mysteel-dc-ebc-test03 ~]# nc -l 127.0.0.1  8888

 执行flink job:

[root@vm-9f-mysteel-dc-ebc-test03 bin]# ./flink run -c com.fen.StreamingJob  ./com.fen-1.0-SNAPSHOT.jar  127.0.0.1 8888

 看streaming作业会一直处于running运行中

 再来看看怎么对实时数据进行统计的:

停止flink job

1、可以通过以下界面的形式进行取消:

 2、使用命令取消:

[root@vm-9f-mysteel-dc-ebc-test03 bin]# ./flink list
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-1.7.2/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/apply/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Waiting for response...
------------------ Running/Restarting Jobs -------------------
13.12.2022 19:17:49 : c99f75881ce909ef625c8311e0f5e575 : Java WordCount from SocketTextStream Example (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
[root@vm-9f-mysteel-dc-ebc-test03 bin]# ./flink cancel -m 127.0.0.1:8081 c99f75881ce909ef625c8311e0f5e575
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-1.7.2/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/apply/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Cancelling job c99f75881ce909ef625c8311e0f5e575.
Cancelled job c99f75881ce909ef625c8311e0f5e575.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/86058.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

莽荒纪人物出场数据统计

今天继续给大家介绍Python相关知识&#xff0c;本文主要内容莽荒纪人物出场数据统计。 一、中文文本词频统计思路 在上文Python英文词频统计&#xff08;哈姆雷特&#xff09;程序示例中&#xff0c;我们进行了英文单词的统计。今天&#xff0c;我们进行中文人物出场频率统计…

java服装经销系统服装进销系统

简介 Ssm服装经销系统&#xff0c;主要分为6个角色&#xff1a;管理员、资料员、采购员、仓库员、售卖员、财务。采购员进行采购入库&#xff1b;仓库员进行采购入库、退货入库、提货出库、折损出库等库存管理&#xff1b;售卖员进行填单的创建&#xff0c;然后去仓库那里提货…

Score Matching

目录简介Score Function求解方法emm参考简介 score matching算法是一种求解概率密度函数的参数的算法。 在很多情况下&#xff0c;概率密度函数可以表示为&#xff1a; p(ξ;θ)1Z(θ)q(ξ;θ)p(\xi;\theta)\frac{1}{Z(\theta)}q(\xi;\theta) p(ξ;θ)Z(θ)1​q(ξ;θ) 假设我…

[Java] 什么是锁?什么是并发控制?线程安全又是什么?锁的本质是什么?如何实现一个锁?

文章目录前言并发控制并发访问控制是什么&#xff1f;如何实现并发访问控制&#xff1f;并发访问控制 与 线程安全锁是什么&#xff1f;1. 加锁操作2. 解锁操作锁状态是什么&#xff1f;如何实现一个锁&#xff1f;笔者相关博客连接结语前言 多线程编程中&#xff0c;锁是最重要…

oracle (+)学习

最近工作需要将oracle的存储过程转化为hive的sql脚本。遇到很多不一样的地方&#xff0c;例如oracle连接中有()号的用法。 借鉴这篇文章&#xff0c;但是这个排版比较烂。。。 oracle ()的,Oracle中()的作用_大雪菜的博客-CSDN博客 先建表和插入数据 --生成部门表CREATE TA…

2014年蓝桥杯Java C组——猜年龄

2014年蓝桥杯Java C组——猜年龄 标题&#xff1a;猜年龄 小明带两个妹妹参加元宵灯会。别人问她们多大了&#xff0c;她们调皮地说:“ 我们俩的年龄之积是年龄之和的6倍”。 小明又补充说:“她们可不是双胞胎&#xff0c;年龄差肯定也不超过8岁啊。” 请你写出:小明的较小的…

状态模式

文章目录状态模式1.状态模式的本质2.何时选用状态模式3.优缺点4.状态模式的结构5.实现上下文中维护状态及转换状态上下文中维护状态处理类中转换状态状态模式 状态模式说白了就是不同的状态&#xff0c;执行不同的行为&#xff0c;也就是状态和行为分离 1.状态模式的本质 状态模…

Eclipse+Java+Swing+Mysql实现自助存取款机(ATM)系统

EclipseJavaSwingMysql实现自助存取款机ATM系统一、系统介绍1.系统功能2.环境配置3.数据库4.工程截图二、系统展示1.登录页1.1登录成功2.注册系统3.取款3.1取款成功4.存款4.1 存款成功5.转账6.余额查询7.退出系统三、部分代码DBUtil.javaLoginFrame.javaAccount.java四、其他获…

python编程 for循环注意点与大写转换案例

作者简介&#xff1a;一名在校计算机学生、每天分享Python的学习经验、和学习笔记。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​​ 目录 前言 range方法 一for循环表达式 for循环表达式 与数字有关 与单位有关 前言…

2023最新SSM计算机毕业设计选题大全(附源码+LW)之java基于信息安全的无锡旅游服务系统5l83d

面对老师五花八门的设计要求&#xff0c;首先自己要明确好自己的题目方向&#xff0c;并且与老师多多沟通&#xff0c;用什么编程语言&#xff0c;使用到什么数据库&#xff0c;确定好了&#xff0c;在开始着手毕业设计。 1&#xff1a;选择课题的第一选择就是尽量选择指导老师…

MHA的故障切换你掌握了吗?

MHA的概述 什么是MHA MHA&#xff08;MasterHigh Availability&#xff09;是一套优秀的MySQL高可用环境下故障切换和主从复制的软件。 MHA 的出现就是解决MySQL 单点的问题。 MySQL故障切换过程中&#xff0c;MHA能做到0-30秒内自动完成故障切换操作。 MHA能在故障切换的过程…

用Python构建Amazon产品推荐系统!这项目价值能有2000吗?

引言 该项目的目标是部分重建Amazon电子类产品的推荐系统。 现在是十二月&#xff01;你是什么类型的购物者&#xff1f;双十二都购买了吗&#xff1f;你是把当天想购买的所有产品都保存起来&#xff0c;还是宁愿打开网站&#xff0c;查看带有巨大折扣的现场优惠呢&#xff1…

从单服务器模式到负载均衡设计

从单服务器模式到负载均衡设计 作者&#xff1a;Grey 原文地址&#xff1a; 博客园&#xff1a;从单服务器模式到负载均衡设计 CSDN&#xff1a;从单服务器模式到负载均衡设计 单服务器模型是最简单的一种架构&#xff0c;参考如下图 用户访问一个 URL&#xff0c;URL 会…

java计算机毕业设计基于安卓Android的校园外卖点餐APP

项目介绍 餐饮行业是一个传统的行业。根据当前发展现状,网络信息时代的全面普及,餐饮行业也在发生着变化,单就点餐这一方面,利用手机点单正在逐步进入人们的生活。传统的点餐方式,不仅会耗费大量的人力、时间,有时候还会出错。Android系统伴随智能手机为我们提供了新的方向。手…

基于转换器 (MMC) 技术和电压源转换器 (VSC) 的高压直流 (HVDC) 模型(MatlabSimulink实现)

目录 1 概述 2 主要模块说明 2.1 简化电网 2.2 转换器 1 2.3 直流电路 2.4 控制器 2.5 示波器和测量 3 讲解 3.1 参数设置 3.2 SPS 比较 3.3 结果比较 3.4 参考文献 4 Matlab代码实现 1 概述 1000 MW HVDC-MMC 互连的 SPS 模型。本文基于模块化多电平转换器 (MMC)…

计算机网络-数据链路层:以太网协议、ARP协议、MAC、MTU

目录 一、以太网协议 1. 认识以太网 2. 协议格式 二、 MAC地址 1. 认识MAC地址 2. 对比MAC地址与IP地址 三、ARP协议 1. 认识ARP协议 2. ARP协议的作用 3. ARP协议的工作流程 4. ARP欺骗攻击 四、MTU 1. 认识MTU 2. MTU对IP协议的影响&#xff08;了解&#xff…

在线教育系统源码讲解与代码分析

目前&#xff0c;许多行业已经开始向直播领域靠拢&#xff0c;例如直播带货、教育直播、娱乐直播等领域&#xff0c;想要在此分一杯羹&#xff0c;以在线教育系统来说&#xff0c;在2020年以后便进入了“白热化”&#xff0c;更多的直播、教育展现在大众视野中。在粉丝经济的时…

Linux的进程空间管理

用户态和内核态的划分 进程的虚拟地址空间&#xff0c;其实就是站在项目组的角度来看内存&#xff0c;所以我们就从task_struct出发来看。这里面有一个struct mm_struct结构来管理内存。 struct mm_struct *mm; 在struct mm_struct里面&#xff0c;有这样一个成员变量&#…

如何利用ArcGIS实现电子地图可视化表达?分析空间数据?提升SCI论文的层次?探究环境与生态因子对水体、土壤、大气污染物等影响?

查看原文>>>如何利用ArcGIS探究环境与生态因子对水体、土壤、大气污染物等影响 如何利用ArcGIS实现电子地图可视化表达&#xff1f;如何利用ArcGIS分析空间数据&#xff1f;如何利用ArcGIS提升SCI论文的层次&#xff1f;制图是地理数据展现的直观形式&#xff0c;也是…

ARM存储模型(数据存储、指令存储)

目录 1、ARM数据存储 (1) ARM数据类型 (2) ARM数据存储的方式 2、ARM的指令存储 (1) 指令集的分类 (2) 为什么ARM指令集的PC值与低2位无关&#xff1f; 1、ARM数据存储 (1) ARM数据类型 ARM采用32位架构&#xff0c;即ARM一次可以处理32bit的数据&#xff0c;基本的数据…