开发一个springboot项目
- 代码迭代整合工具 gitee
- 建模意义
- 程序处理方式
- 开发功能的步骤
- web服务
- 网络状态码
- web应用的开发分层
- springboot的作用
- springboot框架搭建
- 框架中各组件作用
- 框架的演变
- 如何提取hive中的表结构
- 创建springboot 工程的引导模版 要选择aliyun ,否则不支持java8
- springboot 工程
- spring 相关组件
- 代码生成器
- 动态数据源
- 整合druid连接池
- 具体代码:
- 代码说明:
- Controller层
- 注解 说明
- 小练习
- Redis(代码)
- 5大数据类型 场景
- 自测题
- HBase代码
- Region Server
- HBase写操作
- memstore 刷新机制
- StoreFile Compaction(文件合并)
- HBase读操作
- 布隆过滤器
- hbase 海量数据 的读取 高效是怎么做到的(面试重点)
- 数据倾斜
- 预分区(自定义分区)
- 小练习
- Flink
- Flink初级
- Yarn应用模式作业提交模式![啊啊](https://i-blog.csdnimg.cn/direct/9fe4a4dfb6cf48e187286c536d89fc56.png)
- union
- connect
- Flink高级
- 迟到数据的处理
- 非barrier对齐的精准一次 过程:
- 两次提交过程描述
- 设置事务的超时时间
- Kafka案例演示
代码迭代整合工具 gitee
建模意义
我们用最后一种方式
程序处理方式
开发功能的步骤
web服务
因为之前没有提过
所以现在简单提一下
网络状态码
1xx 正在处理中
2xx 200 成功
3xx 重定向 永久重定向 临时重定向
4xx 400 参数不匹配 404 资源找不到了 405 method not allowed 请求类型(get和post)没匹配上
5xx 500 后台程序抛异常 503 后端服务器等抛异常了
xxx 600 找第三方支付等问,这都是自定义的
web应用的开发分层
springboot的作用
springboot框架搭建
框架中各组件作用
- Springboot 作为一个web应用快速开发框架,整合了SpringMVC、Spring、Mybatis等框架。其中:
- SpringMVC负责接收返回web请求。
- Spring负责把业务处理类的组件化,管理生命周期。
- Mybatis负责连接数据库、封装参数、封装结果。
- Mybatis-plus是Mybatis的第三方扩展依赖,可以 方便的对数据单表进行插删改查操作。
- Mybatis-plus 提供了代码生成器generator ,辅助我们快速生成基础代码。
- DynamicDatasource是用来对接多个jdbc数据源的工具。
- Druid 是一个非常好用的数据连接池,可以使得数据连接循环利用减少开销。
- JSON 工具的使用
- SQL日志的控制打印
框架的演变
- php perl .net asp jsp
- ssh structs (1\2) + spring + hibernate
- ssm springmvc +spring + mybatis
- springboot:
- 简化开发流程 约定大于配置
- 内置了web服务容器 ,打好的jar包 直接运行即可 不用提前安装web容器
- 整了各种第三方的组件 starter application统一配置
如何提取hive中的表结构
hiveserver2 jdbc数据
metastore 元数据服务
HiveMetaStoreClient
package com.liang.dga.meta.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SimplePropertyPreFilter;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.liang.common.utils.SqlUtil;
import com.liang.dga.meta.bean.TableMetaInfo;
import com.liang.dga.meta.bean.TableMetaInfoQuery;
import com.liang.dga.meta.bean.TableMetaInfoVO;
import com.liang.dga.meta.mapper.TableMetaInfoMapper;
import com.liang.dga.meta.service.TableMetaInfoExtraService;
import com.liang.dga.meta.service.TableMetaInfoService;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* <p>
* 元数据表 服务实现类
* </p>
*
* @author liang
* @since 2024-07-24
*/
@Service
@DS("dga")
public class TableMetaInfoServiceImpl extends ServiceImpl<TableMetaInfoMapper, TableMetaInfo> implements TableMetaInfoService {
@Value("${hive.metastore.uris}")
String hiveMetaUri;
@Value("${default.database}")
String schemaName;
HiveMetaStoreClient hiveMetaStoreClient;
@Autowired
TableMetaInfoExtraService tableMetaInfoExtraService;
@PostConstruct
public void initHiveClient() {
HiveConf hiveConf = new HiveConf();
hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetaUri);
try {
hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
} catch (MetaException e) {
throw new RuntimeException(e);
}
}
public void initTableMetaInfo(String assessDate) throws TException {
List<String> allTableNames = hiveMetaStoreClient.getAllTables(schemaName);
List<TableMetaInfo> tableMetaInfoList = new ArrayList<>(allTableNames.size());
Table table = hiveMetaStoreClient.getTable(schemaName, allTableNames.get(0));
System.out.println("111111111111111111111tableMetaInfoList in initTableMetaInfo: " + JSON.toJSONString(table));
for (String tableName : allTableNames) {
// 调用方法提取元数据信息
TableMetaInfo tableMetaInfo = this.extractTableMetaInfoFromHive(tableName, schemaName);
hiveMetaStoreClient.getTable(schemaName, tableName);
// 调用方法提取HDFS环境信息
this.extractHdfsInfo(tableMetaInfo);
// 设置其他信息
tableMetaInfo.setCreateTime(new Date());
tableMetaInfo.setAssessDate(assessDate);
tableMetaInfoList.add(tableMetaInfo);
}
// 去重,把今天的都删了
this.remove(new QueryWrapper<TableMetaInfo>().eq("assess_date",assessDate));
// 保存元数据列表
saveBatch(tableMetaInfoList);
QueryWrapper<TableMetaInfo> wrapper = new QueryWrapper<TableMetaInfo>().eq("assess_date", assessDate)
.notInSql("concat(schema_name,table_name)",
"select concat(schema_name,table_name) from table_meta_info_extra");
List<TableMetaInfo> tableMetaInfoExtraWithoutList = this.list(wrapper);
System.out.println("222222222222222222tableMetaInfoListWithout in initTableMetaInfo" + JSON.toJSONString(tableMetaInfoExtraWithoutList));
// 补充辅助信息,把没填写的表格格式化一下
tableMetaInfoExtraService.initTableMetaInfoExtra(tableMetaInfoExtraWithoutList);
}
private TableMetaInfo extractTableMetaInfoFromHive(String tableName,String SchemaName){
try {
// 获取表信息
Table table = hiveMetaStoreClient.getTable(SchemaName, tableName);
TableMetaInfo tableMetaInfo = new TableMetaInfo();
tableMetaInfo.setTableName(tableName);
tableMetaInfo.setSchemaName(SchemaName);
// 获取分区字段信息
SimplePropertyPreFilter filter = new SimplePropertyPreFilter("comment", "name", "type");
tableMetaInfo.setPartitionColNameJson(JSON.toJSONString(table.getPartitionKeys(), filter));
// 获取表字段信息
StorageDescriptor sd = table.getSd();
tableMetaInfo.setColNameJson(JSON.toJSONString(sd.getCols(),filter));
// 获取表参数信息
tableMetaInfo.setTableParametersJson(JSON.toJSONString(table.getParameters()));
// 获取表格式信息
tableMetaInfo.setTableInputFormat(sd.getInputFormat());
tableMetaInfo.setTableOutputFormat(sd.getOutputFormat());
// 分桶字段
tableMetaInfo.setTableBucketColsJson(JSON.toJSONString(sd.getBucketCols()));
// 获取表分桶排序信息
tableMetaInfo.setTableSortColsJson(JSON.toJSONString(sd.getSortCols()));
// 获取表桶数
tableMetaInfo.setTableBucketNum(sd.getNumBuckets()+0L);
// 获取表创建时间 ts->Date->string "yyyy-MM-dd HH:mm:ss"
Date createTime = new Date(table.getCreateTime() * 1000L);
tableMetaInfo.setTableCreateTime(DateFormatUtils.format(createTime,"yyyy-MM-dd HH:mm:ss"));
// 获取表类型
tableMetaInfo.setTableType(table.getTableType());
// 获取表注释
tableMetaInfo.setTableComment(table.getParameters().get("comment"));
// 获取表拥有者
tableMetaInfo.setTableFsOwner(table.getOwner());
// 获取序列化类
tableMetaInfo.setTableRowFormatSerde(sd.getSerdeInfo().getSerializationLib());
//获取表路径
tableMetaInfo.setTableFsPath(sd.getLocation());
System.out.println("333333333333333333333333 tableMetaInfo in = extractTableMetaInfoFromHive: " + JSON.toJSONString(tableMetaInfo));
return tableMetaInfo;
} catch (Exception e){
e.printStackTrace();
throw new RuntimeException(e);
}
}
private void extractHdfsInfo(TableMetaInfo tableMetaInfo){
try {
// 获取表存储信息
URI uri = new URI(tableMetaInfo.getTableFsPath());
FileSystem fs = FileSystem.get(uri, new Configuration(), "atguigu");
// 当前目录下的一级子节点 的节点状态(目录和文件)
FileStatus[] fileStatuses = fs.listStatus(new Path(tableMetaInfo.getTableFsPath()));
// 递归获得该文件的信息
getHdfsInfoRec(fs, fileStatuses, tableMetaInfo);
System.out.println("tableMetaInfo = " +tableMetaInfo.getTableName()+":"+ tableMetaInfo.getTableSize()
+":accessTime:"+tableMetaInfo.getTableLastAccessTime()
+"modifyTIme:"+tableMetaInfo.getTableLastModifyTime());
tableMetaInfo.setFsCapcitySize(fs.getStatus().getCapacity());
tableMetaInfo.setFsRemainSize(fs.getStatus().getRemaining());
tableMetaInfo.setFsUsedSize(fs.getStatus().getUsed());
} catch (Exception e){
e.printStackTrace();
throw new RuntimeException(e);
}
}
private void getHdfsInfoRec(FileSystem fs, FileStatus[] fileStatuses, TableMetaInfo tableMetaInfo) {
for (FileStatus fileStatus : fileStatuses) {
if(fileStatus.isDirectory()){
try {
// 递归
FileStatus[] listStatus = fs.listStatus(fileStatus.getPath());
getHdfsInfoRec(fs, listStatus, tableMetaInfo);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}else {
// 获取表大小信息
// 获取表最后修改时间
// 获取表最后访问时间
try {
// 获得文件大小
tableMetaInfo.setTableSize(tableMetaInfo.getTableSize()+fileStatus.getLen());
tableMetaInfo.setTableTotalSize(tableMetaInfo.getTableTotalSize()+fileStatus.getLen());
// 获取表访问信息
if (tableMetaInfo.getTableLastAccessTime() == null){
tableMetaInfo.setTableLastAccessTime(new Date(fileStatus.getAccessTime()));
} else {
if (fileStatus.getAccessTime() > tableMetaInfo.getTableLastAccessTime().getTime()){
tableMetaInfo.setTableLastAccessTime(new Date(fileStatus.getAccessTime()));
}
}
// 获取表最后修改时间
if (tableMetaInfo.getTableLastModifyTime() == null){
tableMetaInfo.setTableLastModifyTime(new Date(fileStatus.getModificationTime()));
} else {
if (fileStatus.getModificationTime() > tableMetaInfo.getTableLastModifyTime().getTime()){
tableMetaInfo.setTableLastModifyTime(new Date(fileStatus.getModificationTime()));
}
}
} catch (Exception e){
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
}
// 获取前端的需连接两个表和分页的信息
@Override
public List<TableMetaInfoVO> getListForQuery(TableMetaInfoQuery tableMetaInfoQuery) {
StringBuilder sb = new StringBuilder(2048);
sb.append("select tm.id ,tm.table_name,tm.schema_name,table_comment,table_size,table_total_size,tec_owner_user_name,\n" +
" busi_owner_user_name, table_last_access_time,table_last_modify_time from table_meta_info tm\n" +
" left join table_meta_info_extra te\n" +
" on tm.table_name=te.table_name and tm.schema_name=te.schema_name\n" +
"where assess_date = (select max( assess_date ) from table_meta_info tmi\n" +
" where tm.table_name = tmi.table_name\n" +
" and tm.schema_name=tmi.schema_name )");
// 接收动态条件
if(tableMetaInfoQuery.getSchemaName()!=null&&!tableMetaInfoQuery.getSchemaName().trim().equals("")){
sb.append(" and tm.schema_name like '%"+ SqlUtil.filterUnsafeSql(tableMetaInfoQuery.getSchemaName()) +"%'");
}
if (tableMetaInfoQuery.getTableName()!=null&&!tableMetaInfoQuery.getTableName().trim().equals("")){
sb.append(" and tm.table_name like '%"+SqlUtil.filterUnsafeSql(tableMetaInfoQuery.getTableName())+"%'");
}
if (tableMetaInfoQuery.getDwLevel()!=null&&!tableMetaInfoQuery.getDwLevel().trim().equals("")) {
sb.append(" and te.dw_level like '%" + SqlUtil.filterUnsafeSql(tableMetaInfoQuery.getDwLevel()) + "%'");
}
// 处理分页
sb.append("limit " + (tableMetaInfoQuery.getPageNo()-1) * tableMetaInfoQuery.getPageSize() + "," + tableMetaInfoQuery.getPageSize());
System.out.println("sb = " + sb);
List<TableMetaInfoVO> tableMetaInfoVOList = baseMapper.selectTableMetaListForQuery(sb.toString());
return tableMetaInfoVOList;
}
// 有个数据总条数得单独算
@Override
public Integer getTotalForQuery(TableMetaInfoQuery tableMetaInfoQuery) {
StringBuilder sb = new StringBuilder(2048);
sb.append("select count(*) from table_meta_info tm\n" +
" left join table_meta_info_extra te\n" +
" on tm.table_name=te.table_name and tm.schema_name=te.schema_name\n" +
"where assess_date = (select max( assess_date ) from table_meta_info tmi\n" +
" where tm.table_name = tmi.table_name\n" +
" and tm.schema_name=tmi.schema_name )");
// 接收动态条件
if(tableMetaInfoQuery.getSchemaName()!=null&&!tableMetaInfoQuery.getSchemaName().trim().equals("")){
sb.append(" and tm.schema_name like '%"+ SqlUtil.filterUnsafeSql(tableMetaInfoQuery.getSchemaName()) +"%'");
}
if (tableMetaInfoQuery.getTableName()!=null&&!tableMetaInfoQuery.getTableName().trim().equals("")){
sb.append(" and tm.table_name like '%"+SqlUtil.filterUnsafeSql(tableMetaInfoQuery.getTableName())+"%'");
}
if (tableMetaInfoQuery.getDwLevel()!=null&&!tableMetaInfoQuery.getDwLevel().trim().equals("")) {
sb.append(" and te.dw_level like '%" + SqlUtil.filterUnsafeSql(tableMetaInfoQuery.getDwLevel()) + "%'");
}
Integer total = baseMapper.selectTableMetaTotalForQuery(sb.toString());
return total;
}
@Override
public List<TableMetaInfo> getListWithExtra(String assessDate) {
List<TableMetaInfo> tableMetaInfoList = baseMapper.selectListWithExtra(assessDate);
return tableMetaInfoList;
}
}
创建springboot 工程的引导模版 要选择aliyun ,否则不支持java8
springboot 选2.7.x
springboot 工程
xxxxxxApplication
启动类 用于启动整个应用程序 ,里面不添加任何业务代码
第一次打开新工程可能会有乱码,是编码集的问题,都改成UTF-8就可以了
spring 相关组件
- spring内部有一个组件容器池
包含了所有组件的单态实例
组件 :类上 有 @RestController @Servcie @Mapper @Component
这些组件在应用启动时都会以单态的形式创建并保存在容器中 - 如果想引用这些对象 利用@Autowired把对象 装配在引用上
这些组件都是固定的模式,所以有了代码生成器
代码生成器
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.generator.FastAutoGenerator;
import com.baomidou.mybatisplus.generator.config.TemplateConfig;
import com.baomidou.mybatisplus.generator.config.rules.DateType;
import com.baomidou.mybatisplus.generator.engine.FreemarkerTemplateEngine;
import org.apache.ibatis.annotations.Mapper;
import java.util.function.Consumer;
public class CodeGen3531 {
public static void main(String[] args) {
String[] tables = {"table_meta_info"};
FastAutoGenerator.create("jdbc:mysql://hadoop102:3306/dga", "root", "000000")
.globalConfig(builder -> {
builder.author("liang") //作者
.outputDir("D:\\sgg\\19-数据治理Git\\dga02\\src\\main\\java") //输出路径(写到java目录)
.commentDate("yyyy-MM-dd")
.dateType(DateType.ONLY_DATE); //选择实体类中的日期类型 ,Date or LocalDatetime
})
.packageConfig(builder -> { //各个package 名称
builder.parent("com.atguigu.dga")
.moduleName("meta")
.entity("bean") //目录名
.service("service") //目录名
.serviceImpl("service.impl") //目录名
.controller("controller") //目录名
.mapper("mapper"); //目录名
})
.strategyConfig(builder -> {
builder.addInclude(tables)
.serviceBuilder()
.formatServiceFileName("%sService") //类后缀
.formatServiceImplFileName("%sServiceImpl") //类后缀
.entityBuilder()
.enableLombok() //允许使用lombok
.controllerBuilder()
.formatFileName("%sController") //类后缀
.enableRestStyle() //生成@RestController 否则是@Controller
.mapperBuilder()
//生成通用的resultMap 的xml映射
.enableBaseResultMap() //生成xml映射
.superClass(BaseMapper.class) //标配
.formatMapperFileName("%sMapper") //类后缀
//.enableFileOverride() //生成代码覆盖已有文件 谨慎开启
.mapperAnnotation(Mapper.class); //生成代码Mapper上自带@Mapper
})
.templateConfig(new Consumer<TemplateConfig.Builder>() {
@Override
public void accept(TemplateConfig.Builder builder) {
// 实体类使用我们自定义模板
builder.entity("templates/myentity.java");
}
})
.templateEngine(new FreemarkerTemplateEngine()) // 使用Freemarker引擎模板,默认的是Velocity引擎模板
.execute();
}
}
配合其他配置即可使用
首先是在pom.xml中增加依赖
增加完后:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>3.3.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-generator</artifactId>
<version>3.5.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity-engine-core</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
<exclusions>
<exclusion>
<artifactId>hadoop-annotations</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-server-common</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-common</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-core-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-mapper-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>jersey-core</artifactId>
<groupId>com.sun.jersey</groupId>
</exclusion>
<exclusion>
<artifactId>jersey-server</artifactId>
<groupId>com.sun.jersey</groupId>
</exclusion>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
把这个配置文件复制一份
放到这个位置
修改名称和文件中的两个地方即可
package ${package.Entity};
<#list table.importPackages as pkg>
import ${pkg};
</#list>
<#if springdoc>
import io.swagger.v3.oas.annotations.media.Schema;
<#elseif swagger>
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
</#if>
<#if entityLombokModel>
import lombok.Data;
<#if chainModel>
import lombok.experimental.Accessors;
</#if>
</#if>
/**
* <p>
* ${table.comment!}
* </p>
*
* @author ${author}
* @since ${date}
*/
<#if entityLombokModel>
@Data
<#if chainModel>
@Accessors(chain = true)
</#if>
</#if>
<#if table.convert>
@TableName("${schemaName}${table.name}")
</#if>
<#if springdoc>
@Schema(name = "${entity}", description = "$!{table.comment}")
<#elseif swagger>
@ApiModel(value = "${entity}对象", description = "${table.comment!}")
</#if>
<#if superEntityClass??>
public class ${entity} extends ${superEntityClass}<#if activeRecord><${entity}></#if> {
<#elseif activeRecord>
public class ${entity} extends Model<${entity}> {
<#elseif entitySerialVersionUID>
public class ${entity} implements Serializable {
<#else>
public class ${entity} {
</#if>
<#if entitySerialVersionUID>
private static final long serialVersionUID = 1L;
</#if>
<#-- ---------- BEGIN 字段循环遍历 ---------->
<#list table.fields as field>
<#if field.keyFlag>
<#assign keyPropertyName="${field.propertyName}"/>
</#if>
<#if field.comment!?length gt 0>
<#if springdoc>
@Schema(description = "${field.comment}")
<#elseif swagger>
@ApiModelProperty("${field.comment}")
<#else>
/**
* ${field.comment}
*/
</#if>
</#if>
<#if field.keyFlag>
<#-- 主键 -->
<#if field.keyIdentityFlag>
@TableId(value = "${field.annotationColumnName}", type = IdType.AUTO)
<#elseif idType??>
@TableId(value = "${field.annotationColumnName}", type = IdType.${idType})
<#elseif field.convert>
@TableId("${field.annotationColumnName}")
</#if>
<#-- 普通字段 -->
<#elseif field.fill??>
<#-- ----- 存在字段填充设置 ----->
<#if field.convert>
@TableField(value = "${field.annotationColumnName}", fill = FieldFill.${field.fill})
<#else>
@TableField(fill = FieldFill.${field.fill})
</#if>
<#elseif field.convert>
@TableField("${field.annotationColumnName}")
</#if>
<#-- 乐观锁注解 -->
<#if field.versionField>
@Version
</#if>
<#-- 逻辑删除注解 -->
<#if field.logicDeleteField>
@TableLogic
</#if>
private ${field.propertyType} ${field.propertyName};
</#list>
<#------------ END 字段循环遍历 ---------->
<#if !entityLombokModel>
<#list table.fields as field>
<#if field.propertyType == "boolean">
<#assign getprefix="is"/>
<#else>
<#assign getprefix="get"/>
</#if>
public ${field.propertyType} ${getprefix}${field.capitalName}() {
return ${field.propertyName};
}
<#if chainModel>
public ${entity} set${field.capitalName}(${field.propertyType} ${field.propertyName}) {
<#else>
public void set${field.capitalName}(${field.propertyType} ${field.propertyName}) {
</#if>
this.${field.propertyName} = ${field.propertyName};
<#if chainModel>
return this;
</#if>
}
</#list>
</#if>
<#if entityColumnConstant>
<#list table.fields as field>
public static final String ${field.name?upper_case} = "${field.name}";
</#list>
</#if>
<#if activeRecord>
@Override
public Serializable pkVal() {
<#if keyPropertyName??>
return this.${keyPropertyName};
<#else>
return null;
</#if>
}
</#if>
<#if !entityLombokModel>
@Override
public String toString() {
return "${entity}{" +
<#list table.fields as field>
<#if field_index==0>
"${field.propertyName} = " + ${field.propertyName} +
<#else>
", ${field.propertyName} = " + ${field.propertyName} +
</#if>
</#list>
"}";
}
</#if>
}
最后运行 CodeGen3531.java 的主函数就可以生成代码了。
动态数据源
步骤:
- 加依赖
- application.properties 加配置
- 在Service实现类 和Mapper上 加@DS(“xx”)
- 如果是不同数据库类型 ,修改不同的jdbc协议 和 不同的驱动包
- 方法和类上都有@DS 以方法为准
在pom.xml中增加依赖
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>3.3.2</version>
</dependency>
在service层和 mapper的类上增加默认数据源
@Service
@DS("mysql001")
public class CustomerServiceImpl extends ServiceImpl<CustomerMapper, Customer> implements CustomerService {
@Mapper //实现类由mybatis 提供
@DS("mysql001")
public interface CustomerMapper extends BaseMapper<Customer> {
在特定的方法上增加特定数据源
@Insert("insert into customer(name,age) values (#{customer.name}, #{customer.age} )")
@DS("mysql002")
public void insertCustomer002(@Param("customer") Customer customer);
spring.datasource.dynamic.datasource.demo.url=jdbc:mysql://bigdata01:3306/demo?characterEncoding=utf-8&useSSL=false
spring.datasource.dynamic.datasource.demo.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.dynamic.datasource.demo.username=root
spring.datasource.dynamic.datasource.demo.password=000000
spring.datasource.dynamic.datasource.demo2.url=jdbc:mysql://bigdata01:3306/demo2?characterEncoding=utf-8&useSSL=false
spring.datasource.dynamic.datasource.demo2.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.dynamic.datasource.demo2.username=root
spring.datasource.dynamic.datasource.demo2.password=000000
整合druid连接池
数据连接池的主要目的是当应用反复访问数据库时,不会每次都重复创建连接。而是复用已有的连接。
假设应用程序访问一次数据库 100ms
查询流程:
- 格式转换 <1ms (视情况而定:内存、CPU、外存、网页前端、后端代码……)
- 网络传输 双向 60ms (很费时所以要用sql语句提前过滤)
- 建立连接 20ms(连接池在此处节约时间)
- sql的执行 20ms(根据主键进行查询 10万级)
首先要引入依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.15</version>
</dependency>
配文件中加入
spring.autoconfigure.exclude=com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
spring.datasource.dynamic.datasource.demo.url=jdbc:mysql://bigdata01:3306/demo?characterEncoding=utf-8&useSSL=false
spring.datasource.dynamic.datasource.demo.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.dynamic.datasource.demo.username=root
spring.datasource.dynamic.datasource.demo.password=000000
spring.datasource.dynamic.datasource.demo.druid.initial-size=5
spring.datasource.dynamic.datasource.demo.druid.min-idle=5
spring.datasource.dynamic.datasource.demo.druid.max-active=20
spring.datasource.dynamic.datasource.demo.druid.max-wait=60000
spring.datasource.dynamic.datasource.demo.druid.test-on-borrow=true
spring.datasource.dynamic.datasource.demo.druid.test-while-idle=true
spring.datasource.dynamic.datasource.demo.druid.test-on-return=false
spring.datasource.dynamic.datasource.demo2.url=jdbc:mysql://bigdata01:3306/demo2?characterEncoding=utf-8&useSSL=false
spring.datasource.dynamic.datasource.demo2.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.dynamic.datasource.demo2.username=root
spring.datasource.dynamic.datasource.demo2.password=000000
spring.datasource.dynamic.datasource.demo2.druid.initial-size=5
spring.datasource.dynamic.datasource.demo2.druid.min-idle=5
spring.datasource.dynamic.datasource.demo2.druid.max-active=20
spring.datasource.dynamic.datasource.demo2.druid.max-wait=60000
spring.datasource.dynamic.datasource.demo2.druid.test-on-borrow=true
spring.datasource.dynamic.datasource.demo2.druid.test-while-idle=true
spring.datasource.dynamic.datasource.demo2.druid.test-on-return=false
参数 | 含义 |
---|---|
initial-size | 初始连接数 |
min-idle | 最少保持空闲连接数 |
max-active | 最大可活动的连接数 |
max-wait | 最长等待时长 |
test-on-borrow | 借走时测试 |
test-while-idle | 空闲时测试 |
test-on-return | 归还时测试 |
具体代码:
gitee仓库地址
代码说明:
用hiveMetaStoreClient提取hive中所有表名,然后根据表名即可获取该表所有信息
然后放入我们事先准备好的TableMetaInfo bean中
分两部分放,普通字段信息和HDFS环境信息
体力活儿,挨个取
Controller层
注解 说明
注****解 | 位****置 | |
---|---|---|
@RestController | 类 | 标识入口类 |
@RequestMapping | 方法 | 标识入口方法 |
@GetMapping | 方法 | 标识入口方法GET请求专用 |
@PostMapping | 方法 | 标识入口方法POST请求专用 |
@RequestParam | 参数 | 接收路径上的键值对参数 http://xxxxx/xx?name=xxx&age=xxx |
@PathVariable | 参数 | 接收路径上的值http://xxxxx/customer/123 |
@RequestBody | 参数 | 接收请求体(payload) 中的参数 |
小练习
1 关于web开发描述正确的是:bce
a post请求的路径中不能携带键值对参数
b 请求路径中的键值对参数使用@RequestParam接收到方法参数中
c /customer/123 中的123可以通过@PathVariable 接收到方法参数中
d @ResponseBody 可以负责接收请求体中的数据。
e get请求中一般不携带请求体数据
2 关于springboot描述正确的是ad
a @Service标识对象默认会在web容器启动时以单态的方式加载到内存中
b 被@Autowired标识的接口,默认只有在其方法被调用时springboot才会为其装配实现类。(@Lazy)
c 控制层直接访问数据层并不违反开发规范
d 服务层的各个服务之间可以互相调用
3 关于状态码描述正确的是abcd
a 见到页面500的报错,直接查web服务器端控制台日志,
b 405的状态,一般是浏览器请求的get或post 与服务器的get或post不匹配
c 400的状态,往往是因为参数传递缺失或者类型不匹配造成的。
d 404的状态,有可能是请求路径填错了
4 关于sprinboot分层描述正确的是abcd
a 一个控制层的方法代表一个请求
b 一个服务层的方法代表一个业务操作
c 一个数据层的类代表一张表
d 一个数据层的方法代表一条sql语句
5 关于开发调试正确的是abcdf
a idea 进入当前断点的方法的快捷键是F7
b idea 从当前断点位置执行一行代码的快捷键是F8
c idea 从当前断点位置继续执行后面的代码的快捷键是F9
d evaluate expression 可以帮助查看断点方法的执行结果
e 通过run执行java代码,比debug方式执行,运行性能有明显提升
f 在各种主流的浏览器上执行F12,都可以打开开发者工具窗口。
6 关于web开发描述正确的是abcde
A.只要是支持jdbc的数据库,都可以使用mybatis
B.如果使用#{} 在SQL中作为参数替代符,即使是字符串参数也不用加单引
C.如果存在多张表join的操作,则无法使用mybatis-plus ,只能使用mybatis
D.mybatis-plus不仅封装了很多现成的数据层方法,还封装了很多服务层的方法
E.mybatis-plus中提供的数据层的方法,一定是使用select、insert、update、delete开头的,没有例外。
7 以下对数据库连接池描述正确的是acd
A 可以节省程序反复创建连接的开销
B 连接池的初始连接数 主要看平均在线人数(平均访问并发数)
C 连接池的最大活跃数 主要看后台数据库能承担的压力
D 如果后台数据库发生了重启,所有的连接都会立刻失效。
E test-while-idle 可以绝对避免用户借走坏掉的连接 on-borrow可以
1 以下为程序开发时的注意事项正确的是:
A 避免即席查询中在循环中操作数据库 对 场景 避免1+N关联查询 解决 1 join 2 从数据库查出来在java中拼接
B 避免循环使用String频繁拼接字符串 对 解决 stringbuilder stringbuffer
C mybatis中执行sql中涉及#{}时, 要手工对用户提交的参数进行过滤特殊字符. 错 只有${}时才涉及sql注入问题 才需要过滤特殊字符
D 大量数据存入一个集合对象时,要对集合对象进行容量初始化。 对 避免扩容
E 对数据库的插入操作较多时可以通过建立数据库索引提高速度。错 索引的目的是优化查询性能 对于插入操作来说 索引往往都是负担
2 向以下数据集合对象频繁写入操作会造成扩容的是
A hashMap put 会扩容 初始容量 16 扩容比例 翻倍 扩容因子 0.75
B Stringbuilder append 会扩容 初始容量16 新值2倍+2 不够就加
C ArrayList add 会扩容 初始容量10 扩容比例+50% 不够就加
D LinkedList add 不会扩容
3 以下时间复杂度描述正确的是
A 判断HashSet中是否存在某个元素 O(N) 错
B 判断HashSet中是否存在某个元素 O(1) 对
C 有序数组的折半查找指定值 O(logN) 对
D 无序List进行遍历查找指定值 O(N) 对
E 数组按下标查询指定值 O(N) 错 O(1)
F 链表按下标进行查询 O(logN) 错 O(N)
G 冒泡排序 O(N^2) 对
H 快速排序 O(N) 错 O(NlogN)
I Mysql 如果用主键查询 时间复杂度 O(1) 错 O(LogN) b+tree 非二叉树
J Hashmap中 的 红黑树 时间复杂度 O(1) 错 O(LogN) 二叉树
为什么mysql选择了非二叉树 而hashmap选择二叉树 原因 二叉树非常的高 如果放在磁盘操作会增加很多的io
4 以下关于mapper层映射的描述正确的是
A 如果java对象的属性刚好与数据库字段对应(驼峰对蛇形),不需要任何手动映射处理,否则需要手动干预字段与属性的映射。
B 如果单表查询,实体类中存在某些属性,不存在于数据库中,需要增加@TableField(exists=false)
C @DS 可以放在方法上,当方法上的@DS和类上的@DS指向不同数据源时,以类上的为准。
D Mybatis-plus 执行getById方法时,一定是根据数据库表的主键进行查询。
E 当对象中还有子对象时,select语句查询的结果需要借助<association>封装到子对象中。
5 以下信息哪些是java程序可以和Hdfs中获得
A 数据表的单副本大小
B 数据表含副本的大小
C 数据表下的文件数
D 数据表下的分区数
E 数据表的行数
F 数据表的最后访问和写入时间
G 数据库表的字段数
1 以下关于设计模式描述正确的是
A 单例模式的初衷是减少对象重复创建,节省内存开销。 对
B 在多线程场景下,单例模式要考虑内部属性的线程安全问题。 对 i++(不安全) (使用原子对象) hashmap.put (不安全) cocurrentHashMap (线程安全) 或者使用锁
C 模板模式践行了开闭原则,封装不变的部分,扩展可变的部分。 对
D 模板模式践行了单一职责原则,每个子类实现一个职责。 对
E 模板模式由抽象父类进行控制,子类实现核心逻辑。 对
F 模板模式必要时子类可以重写父类的主控制方法。 错 为了避免子类重写父类的控制方法 可以在父类控制方法中加final
2 以下对多线程场景描述正确的是
A 需要事务之间没有先后依赖
B 适合每个事务有大量io的操作
C 线程越多并行效果越明显
D 适合需要频繁访问共享资源的场景
3 以下可以从dolphinscheduler获得的信息
A 任务的执行耗时
B 任务的sql定义
C 任务的yarn_id
D 任务的stage个数
4 以下关于正则表达式的描述,哪些是正确的?
A. ^[a] 表示字符串中不能含有a字符
B. \d+ 表示至少有一个数字字符
C. .* 表示匹配任意长度的任意字符
D. \w 表示匹配任意字母和数字
E a{2,4} 可以匹配ababaa
F. (abc|def) 表示匹配字符串 "abc" 或 "def"
5 关于数据治理考评平台
A 致力于对数据事前、事中、事后三个阶段进行治理。
B 比起庞大数据中台,搭建数据治理考评平台更快更直观且开发成本更低的暴露数据治理的问题。
C 如果有数据中台,则不需要进行数据考评治理。
D 如果有数据中台,则可以更好的对数据考评进行治理。
E 数据治理考评平台属于技术型项目,不需要管理制度的支持。
6 关于语法树解析的描述正确的是
A 凡是使用java开发的框架使用的语法树解析工具都是通用的
B hive的语法树解析器中的walker遍历器使用后序遍历。
C 不同的需求要开发不同的自定义Dispatcher,放入遍历器,在遍历中收集信息。
D 在语法树中只用TOK_TABREF可以定位来源表
E 在语法树中只用TOK_TABLE_OR_COL可以定位where条件的字段
7 关于数据考评指标的描述正确的是
A 所有指标分成【规范】、【存储】、【计算】、【质量】、【安全】五个板块。
B “是否数据倾斜”属于计算板块。
C “是否长期无产出” 属于存储板块。
D “生命周期合理” 属于存储版块
E 如果增加一个“产出数据行数监控”指标应该放在质量板块。
Redis(代码)
5大数据类型 场景
1 string 1对1 单值的kv 存不太容易变化的对象 key –json
2 list 1对多v 轻量级的队列 栈
3 set 1对多v 判存 去重 集合运算
4 zset 1对多v 排序
5 hash 1对多kv 可以存储 对象 需要字段独立管理 变化的场景
很简单,就是创建连接,创建对象,提交
看不懂可以用插件 通义灵码 解释一下,如图
gitee仓库地址
自测题
1 以下对 redis 描述正确的是
A redis 的默认端口号是6379 ,并且可以在配置文件中修改。 对
B 因为redis可以备份数据到磁盘中,所以不会丢失数据。 错 rdb有可能 aof也会丢概率极小 always不会丢 单代价极大
C 启动redis-server时如果不声明配置文件,则使用安装目录中的redis.conf 错 如果不指定配置文件 则使用程序中硬编码的配置
D redis默认是单线程的,可以通过配置改为多线程。错 redis就是单线程 配置6.x配置多线程只是辅助工作 核心工作线程还是单线程
2 以下关于redis 命令描述正确的是
A keys * 命令 可以查看所有库下的key 错 当前库 当前默认0号库
B expire 设置过期时间默认单位是毫秒 错 默认秒
C lrange mylst 0 3 含义是从左边取前4个元素 对 左闭右闭
D zrevrange 是从大到小 ,按下标范围取值 对
3 以下redis命令 为幂等操作的是
A sadd sadd s1 v1 幂等
B append 非幂等
C set 幂等
D incr 非幂等
E hmset 幂等
F lpush lpush l1 v1 非幂等
G zadd 幂等
H zincrby 非幂等
4 需求:在实时计算中利用redis记录当日访问过网站的用户id,并且统计人数 ,则最好选用哪个数据类型
A string 需要一对多
B list 不能去重
C zset 不需要排序
D hash 没有kv结构 不需要
E set 推荐 可以去重 也可以去个数
5 以下对 redis 描述正确的是
A redis默认不写错误日志,必须手动开启 对 配置logfile
B redis的内存淘汰策略 maxmemory-policy ,只有在内存使用达到了阈值才会触发 对
D 在命令行中执行config set 操作修改的配置,重启redis后就会失效 对
E 如果给redis设置了timeout 时间,会有可能导致应用系统中的连接池的对象连接失效 对 为了避免连接池失效 testOnBorrow(true)
6 redis 在正常使用一段时间后,抛出如下异常:
redis.clients.jedis.exceptions.JedisConnectionException : Could not get a resource from the pool
应该考虑以下哪个解决方案 :
A 检查配置文件 bind 是否注释,保护模式是否关闭 错 如果有该问题 不会能够正常使用一段时间
B 检查redis.conf配置中,对于超时时间过短可能造成连接池中的连接损坏 timeout
C 检查程序中使用过的连接是否close掉 一开始可以从连接池中获得连接 但是因为没有close也就是没有归还连接 造成连接池中的连接 枯竭 所有报 题干上的错误
D 检查redis服务内存使用过大,超过阈值 OOM out of memory
7 以下关于rdb描述正确的是
A redis的rdb备份位置可以通过配置文件配置。 可以 dir "/xxx/xx"
B 当redis进行rdb备份时,会瞬间造成内存总量翻倍。 不会翻倍 copy on write 可以共享内存
C redis自动备份数据时,会阻塞主线程,一定程度影响用户使用。 错 自动备份使用独立进程 不影响用户使用
D flushall和shutdown 命令会触发rdb备份。 对
8 以下关于aof描述正确的是
A 相对于rdb,aof更占用存储空间。 对
B aof文件可以通过文本编辑器查看,但是不能修改。 错 可以改
C aof本质上是由rdb加命令文本日志组成。 对 通过rewrite 周期性的压缩为rdb
D aof和rdb同时启用,redis会优先加载rdb。 错 优先aof
E aof重写可以压缩aof的数据量。 对
9 以下关于主从复制描述正确的是
A 主从复制的主要作用是缓解服务器压力,扩展容量。 错 作用是高可用
B master如果宕机, slave不会自己自动升级为master。 对 原地等
C slave无法继承master建立关系前的历史数据。 错 可以获得
D slave一旦宕机,再次启动并成为slave后,无法获得失联这段时间的master数据。 错 可获得
E 如果已有自己数据的节点,被设定成为另个节点的从机,那么原先自己的数据依然还会保留。 错 不会保留
10 关于哨兵描述正确的是
A 一个主从集群中通常会有一个或多个哨兵,监控master及slave的健康状态。 对
B 当master出现健康问题时,哨兵们会发起投票,以默认过半数原则确定是否进行主从切换。 错 决定切换的票数 是 配置的 没有过半数的默认值
C 哨兵会选择优先级数值更小的slave 对
D 当旧master再次上线后,哨兵会把旧master转为slave。 对
E 哨兵可以为客户端提供路由功能,会把写操作分给master ,读操作分给slave,从而实现压力的分摊。
错 哨兵 会把所有的请求都给master 不会做读写分离 读写分离会存在数据不一致的情况 而且还有线程不安全的情况
HBase代码
很简单,就是创建连接,创建对象,提交
gitee仓库地址
Region Server
1)MemStore
写缓存,由于HFile中的数据要求是有序的,所以数据是先存储在MemStore中,排好序后,等到达刷写时机才会刷写到HFile,每次刷写都会形成一个新的HFile,写入到对应的文件夹store中。
2)WAL
由于数据要经MemStore排序后才能刷写到HFile,但把数据保存在内存中会有很高的概率导致数据丢失,为了解决这个问题,数据会先写在一个叫做Write-Ahead logfile的文件中,然后再写入MemStore中。所以在系统出现故障的时候,数据可以通过这个日志文件重建。(write ahead log 预写日志 防治出现崩溃造成数据丢失
一般情况下是不会读取该日志 顺序写入(性能好) (随机读写非常难) 只有原主不在了 才会读 , WAL一定保存在Hdfs
MasterProcWAL 是记录了master日常的ddl操作
防止master崩溃 backupmaster可以通过读取wal 快速接手master 未完成事情 )
3)BlockCache
读缓存,每次查询出的数据会缓存在BlockCache中,方便下次查询。
HBase写操作
1 节点定位阶段 : 定位regionServer
1首次访问需要查询zookeeper获得meta的regionserver
2在访问regionServer (meta) 获得meta表信息
3 根据数据的rowkey 查询meta表 定位到regionServer (data)
4 把数据提交给regionServer
优化: 在客户端会缓存meta信息 meta cache ,以后同样的region 不会再问zookeeper 跳过 1-2 环节 提高访问效率 减少通信 io
2 写操作提交阶段 : 向regionserver提交数据
1 为什么写wal 避免regionserver 挂掉 刚刚写入缓存的数据丢失
2 为什么不直接写磁盘 因为wal是日志 是顺序写 写入速度非常快
3 先写wal还是缓存memstore 先写wal 如果先写缓存 会有数据丢失风险
如果完成wal和缓存的写入 本次put就完成了 会等数据真实落盘到hdfs
3 数据落盘阶段 : 写入hdfs
把memstore中的数据 排序 写入到对应store下 形成一个文件, store会有多个文件 ,是因为memstore 不同时间点的flush 产生 ,可能大也可能效 ,不同的文件可能 保存的是同样rowkey 数据 只是version不同
4 数据整理阶段 compaction 把文件进行合并整理
memstore 刷新机制
1 到达阈值 128 日常刷写
2 高频访问压力大 堆内的40%
3 空闲 每小时刷写一次
4 wal个数 超过32 刷写
StoreFile Compaction(文件合并)
memstore每次刷写都会生成一个新的HFile,文件过多读取不方便,所以会进行文件的合并,清理掉过期和删除的数据,会进行StoreFile Compaction。
major 会最终把一个store文件合并为一个 而且还会清理失效数据 非常的重 不好估计执行时间 ,建议设置为手动 ,根据企业情况 执行
minor 把文件 分为小 中大 大的一定不合并 小的一定合并(128mb) 中 :看和其他文件的相对大小 相对小的合并
合并个数 3到10 ,避免一次合并消耗太大
HBase读操作
布隆过滤器
hbase 海量数据 的读取 高效是怎么做到的(面试重点)
- 规划:hbase 把一个大表的数据分成多个region 分布在不同的regionsServer节点
- 定位:hbase的客户端可以缓存metacache 快速定位 数据节点
- 一张表的数据根据store(列族)进行划分,避免读取没有必要的列族
- 该文件有索引 根据rowkey 判断是否要扫描该文件
- 还可以根据布隆过滤器 判断是否要扫描该文件
- 如果确实要扫描该文件 该文件有block cache 可以直接从内存中读取
- hbase 还对已经存储的文件进行compaction ,进一步优化读取效率
数据倾斜
-
放在分布式批处理系统 产生的问题是 长尾效应
-
放在即席查询的场景 产生热点问题(hot pot)
-
预分区 + rowkey的设计 解决热点问题 让数据散列分布
预分区(自定义分区)
系统自动分区一般是伴随着数据量的增长,但是根据不同数据特征我们可以提前规划,直接把数据热点问题扼杀在前期。
每一个region维护着startRow与endRowKey,如果加入的数据符合某个region维护的rowKey范围,则该数据交给这个region维护。那么依照这个原则,我们可以将数据所要投放的分区提前大致的规划好,以提高HBase性能。
1)手动设定预分区
create ‘staff1’,‘info’, SPLITS => [‘1000’,‘2000’,‘3000’,‘4000’]
生成效果如下:
注意:范围是左闭右开的。
2)按照文件中设置的规则预分区
(1)创建splits.txt文件内容如下:
aaaa
bbbb
dddd
cccc
(2)然后执行:
create ‘staff3’, ‘info’,SPLITS_FILE => ‘/xx/xx/splits.txt’
小练习
1 以下是Hbase的主要特征的是
A 大规模数据的随机读写 对
B 在线分析处理 (online-analysis-process) 错
C NoSQL 对
D 经过配置可以实现原子性事务 错 mysql oracle postgresql tidb
E 适合存储稀疏的数据 对
F 对于增加列族有非常高的弹性 错 对于增加列有非常高的弹性
2 以下哪个组件会负责存储数据
A HMaster 错
B RegionServer 对 memstore
C Zookeeper 错
D HDFS 对 wal+region
3 以关于建立hbase表下描述正确的是
A 每个表都至少有一个列族 对
B 列族中的数据都至少会保留3个版本 错 1个版本
C 列族的名称可以随意更改 错 不能改 只能删掉在加新的
D 建议每张表有5个以上列族为宜 错 1-3个
4 对于hbase写流程描述正确的是
A 数据提交到RegionServer中先保存MemStore,再保存WAL。 错 先写wal 保证数据不丢失
B MemStore每次写入Hdfs都会经过排序。 对
C flush时每个store对应一个memstore,会产生一个文件。 对
D 当某个memstore达到阈值触发flush时,该region下所有的memstore都会一起保存。 对
E flush时,都会阻塞往regionserver提交数据。 错 达到更高的阈值的时候才会阻塞住写操作
5 以下哪些因素会影响flush的触发
A 某个memstore的大小是否超过阈值 对 128
B 所有memstore占regionserver总内存的比例超过阈值 对 40%
C 超过某个阈值时长未进行flush 。 对 1小时
D WAL 文件数达到某个阈值。 对 32 个
6 影响触发小压缩 minor compaction的因素
A 有足够小的文件 对
B 有足够大的文件 错
C 需要压缩的文件个数 对
D 足够多的未合并天数 错
7 以下哪些是hbase对读取性能的优化
A 海量数据行被切分为多个region 对 分布式
B meta cache 对 客户端快速定位节点
C 列族 对 减少查询的列
D HFile上有rowkey范围索引 对 不在范围不扫描
E HFile上有布隆过滤器 对 过滤不属于该文件的rowkey
F Block Cache 对 内存
G Compaction压缩 对 减少小文件
8 关于region 拆分描述正确的是
A 拆分的目的是为了避免过多的数据集中。 对 分布式
B 目前版本拆分规则采用阶梯制。 错 上个版本阶梯制
C 目前首次拆分以memstore flush为准,之后以region最大文件大小为准。 错 根据当前rs下是否是一个region memstore flush *2
D 刚刚拆分后只是逻辑拆分, 直到compaction才物理拆分。 对
E 预分区为了解决数据热点问题 对
F 预分区的region不会再发生拆分 错 大了还是会拆
9 rowkey设计要考虑的
A 唯一性 对
B 满足任意查询场景 错 只能满足个别查询场景
C 散列避免热点 对
D 长度尽可能优化 对
Flink
Flink学习仓库地址
Flink初级
Flink
是一个框架
是一个分布式处理引擎
对有界或者无界流数据进行实时计算
Flink和Sparkstreaming对比
Flink Sparkstreaming
时间语义 事件时间、处理时间 处理时间
窗口 灵活 不够灵活
状态 有状态 无状态(updateStateByKey)
容错 检查点 弱
动态SQL 支持 不支持
Flink的分层API
SQL
TableAPI
DataStreamAPI|DataSetAPI(Flink1.12后,基本不用)
处理函数 process
越顶层,使用越简单
越底层,使用越灵活(可以处理较复杂的业务)
通过WordCount对比DataStreamAPI和DataSetAPI
DataStreamAPI DataSetAPI
准备环境 StreamExecutionEnvironment ExecutionEnvironment
分组 keyBy groupBy
作业提交 env.execute() 自动提交
处理方式 流中每来一条数据都会对其进行处理 收集齐所有数据后,处理一次
Flink运行模式(指定Flink程序在什么地方执行)
Standalone、k8s、mesos、Yarn...
Flink集群中的角色
客户端
JobManager
TaskManager
Flink部署模式
会话模式-session
需要先启动集群
多个job共享集群资源,作业在共享资源的时候,相互之间会有影响
当作业取消的时候,对集群是没有影响的
单作业模式-per-job
不需要先启动集群
当作业提交的时候,会给每一个作业单独启动一个集群
当作业取消的时候,集群也会停掉
应用模式-application
不需要先启动集群
当应用提交的时候,会给每一个应用(一个应用下(app),可能会有多个作业(job))单独启动一个集群
当作业取消的时候,集群也会停掉
如果是会话模式以及单作业模式,在客户端会执行一些转换操作,客户端压力比较大
如果是应用模式,转换操作会到服务器(JobManager)上执行,客户端压力变小
在开发的时候,首选应用模式
Flink On Yarn
Yarn会话
启动集群 bin/yarn-session.sh
WEBUI提交作业
命令行提交作业 bin/flink run -d -c 类的全限定名 jar包路径
Yarn-Per-Job
不需要启动集群
只能通过命令行提交作业 bin/flink run -d -t yarn-per-job -c 类的全限定名 jar包路径
YarnApplication
不需要启动集群
只能通过命令行提交作业 bin/flink run-application -d -t yarn-application -c 类的全限定名 jar包路径
Yarn应用模式作业提交模式
流程描述:
客户端将作业(做一些参数的解析和封装)以yarn-application形式打包并上传至Flink后去Yarn的ResourceManager
Yarn的ResourceManager接收到请求后根据作业封装一个AM,也是Flink的JobMaster,共包含分发器(针对每一个job封装一个jobMaster对象)、JobMaster(根据提交的作业生成逻辑、作业、执行流图)、资源管理器(根据执行流图向Yarn的资源管理器申请资源)
Yarn的ResourceManager启动TaskManager,看看哪一个TaskManager有足够的空间就创建slot,并将slot返回给对应的Task去运行job
该任务
并行度
当要处理的数据量非常大时,我们可以把一个算子操作,“复制”多份到多个节点,数据来了之后就可以到其中任意一个执行。
这样一来,一个算子任务就被拆分成了多个并行的“子任务”(subtasks)
一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)
一个Flink应用程序的并行度取决于并行度最大的算子的并行度个数
默认情况下,如果在本地(Idea)中运行程序,如果没有指定并行度,并行度的个数为当前CPU的线程数
如何设置并行度
在代码中全局设置
env.setParallelism(3)
针对某一个算子单独设置并行度
算子.setParallelism(3)
在flink的配置文件 flink-conf.yaml
parallelism.default: 1
在提交作业的时候通过-p参数指定
并行度优先级
算子单独设置并行度 > 全局设置 > 在提交作业的时候通过-p参数指定 > 在flink的配置文件
算子链
在Flink中,并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task)
这样原来的算子就成为了真正任务里的一部分,这样的技术被称为“算子链”(Operator Chain)
算子链是Flink提供一种非常有效的优化手段,不需要我们做什么处理,默认就会进行合并
前提:
算子间必须是one-to-one的关系
并行度相同
不能存在重分区操作
禁用算子链
全局禁用算子链 env.disableOperatorChaining();
算子禁用 算子.disableChaining()
开始新链 算子.startNewChain()
任务槽
Flink程序在执行的时候,会被划分为多个算子子任务,每个子任务在执行的时候,需要TaskManager提供资源
TaskManager是一个JVM进程,开始开启多个线程执行任务,每一个线程叫做任务槽
任务槽可以均分TaskManager的内存资源
在flink-conf.yaml的配置文件中可以设置一个tm上slot的数量
taskmanager.numberOfTaskSlots: 1
在设置slot数量的时候,除了要考虑内存资源外,也需要考虑CPU情况,一般slot的数量和cpu核数之间的关系是1:1或者2:1
slot共享
只要属于同一个作业,那么对于不同任务节点(算子)的并行子任务,就可以放到同一个slot上执行
一个Flink应用执行需要的Slot的数量 = 各个共享组中并行度最大的算子并行度个数之和
四张图
逻辑流程---程序执行拓扑
作业图 ---合并算子链
执行图 ---作业图的并行化版本
物理"图" --- 程序的执行过程
以Yarn的应用模式为例描述Flink作业提交流程
DataStreamAPI
环境准备
创建本地环境
StreamExecutionEnvironment.createLocalEnvironment();
创建本地环境带WEBUI
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())
需要在pom.xml文件中提前添加flink-runtime-web依赖
创建远程环境
StreamExecutionEnvironment.createRemoteEnvironment(远程服务器ip,端口号,jar)
根据实际执行场景自动创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
->如果需要自动创建本地带webUI的执行环境,需要传递Configuration,并显式指定端口号
关于运行模式设置
STREAMING
BATCH
AUTOMATIC
源算子-Source
从集合中读取数据
env.fromCollection(集合)
从指定的元素中读取数据
env.fromElements(元素...)
从指定的网络端口读取数据
从文件中读取数据
env.readTextFile() 已过时
文件连接器
FileSource<String> source = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("D:\\dev\\workspace\\bigdata-0318\\input\\words.txt"))
.build();
env.fromSource(数据源对象,WaterMark,数据源名)
Flink1.12前
env.addSource(SourceFunction)
Flink1.12后
env.fromSource(Source)
从kafka主题中读取数据
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
//设置kafka集群的地址
.setBootstrapServers("hadoop102:9092")
//设置消费的主题
.setTopics("first")
//设置消费者组
.setGroupId("test")
//设置消费位点
//作为消费者,如何保证消费的精准一次:手动维护偏移量
KafkaSource->KafkaSourceReader->SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>>
// 从最早位点开始消费
//.setStartingOffsets(OffsetsInitializer.earliest())
// 从最末尾位点开始消费
.setStartingOffsets(OffsetsInitializer.latest())
// 从时间戳大于等于指定时间戳(毫秒)的数据开始消费
//.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
// 从消费组提交的位点开始消费,不指定位点重置策略
//.setStartingOffsets(OffsetsInitializer.committedOffsets())
// 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
//.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
//设置反序列化器
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
从DataGen中读取数据
自定义Source数据源
转换算子-Transform
基本的转化算子
map、filter、flatMap
聚合算子
必须先进行keyBy,经过keyBy之后,得到的流的类型是KeyedStream,分组流或者键控流,只有键控流才能直接调用聚合算子
聚合算子会将原来的计算结果保留到状态中,用于计算
简单聚合算子
sum、min、minBy、max、maxBy
规约聚合
reduce
如果流中只有一条数据,reduce方法不会被执行
reduce(value1,value2)
value1:累加的结果
value2:新来的数据
输出算子-Sink
提交作业
同步提交作业 env.execute()
异步提交作业 env.executeAsync()
以异步提交作业为例,如果一个应用App下面有多个作业Job,部署到Yarn的不同模式下效果
会话-session
先启动集群,多个job都部署到同一个集群中,共享集群资源
作业停止,不会对集群产生影响
单作业-perjob
不需要先启动集群,给每一个作业启动一个集群
如果停止某一个作业,当前作业对应的集群也会停止,对其它的作业对应的集群没有影响
应用-application
不需要先启动集群,给每一个应用启动一个集群
同一个应用下的多个作业,共享这一个集群
如果停止某一个作业,整个集群停止,对其它的作业也会有影响
富函数
在流中数据进行处理的时候,相关算子要求传递一个处理函数实体作为参数
例如:map(MapFunction)、filter(FilterFunction)、flatMap(FlatMapFunction)
默认参数声明的形式是接口类型,其实除了接口之外,每一个声明的函数类,都有一个对应的富函数(抽象类)实现
富函数的形式 Rich +接口名 例如: MapFunction -> RichMapFunction
富函数和普通的接口比起来,多出了如下功能
提供了上下文对象,可以通过上下文对象获取更丰富的信息
提供了带生命周期的方法
open
方法初始化的时候执行的代码
在每一个算子子任务(并行度)上,只执行一次
用于进行连接的初始化
close
方法执行结束的时候执行的代码
在每一个算子子任务(并行度)上,只执行一次
用于资源的释放
如果处理的是无界数据,close方法不会被执行
分区操作(算子)
shuffle
rebalance
rescale
broadcast
global
one-to-one --- forward
keyBy---hash
custom---custom
分流
将一条流拆分为2条流或者多条流
filter
涉及对同一条流的数据处理多次,效率较低
侧输出流
首先定义侧输出流标签OutputTag
在定义侧输出流标签的时候,存在泛型擦除
可以通过匿名内部类的方式创建对象
可以在创建对象的传递侧输出流中的数据类型
注意:如果要向使用侧输出流,只能通过process算子对流中数据进行处理,因为在底层的processElement方法中,可以获取上下文对象
合流
union
可以合并2条或者多条流
要求:参与合并的流中数据类型必须一致
connect
只能对2条流进行合并
参与连接的两条流数据类型可以不一致
Flink读写外部系统的方式
Flink1.12前
env.addSource(SourceFunction)
流.addSink(SinkFunction)
从Flink1.12开始
env.fromSource(Source)
流.sinkTo(Sink)
Flink从kafka中读取数据
创建KafkaSource
env.fromSource(kafkaSource)
KafkaSource可以保证读取的精准一次,KafkaSource->KafkaSourceReader->成员变量维护偏移量
Flink向kafka中写入数据,一致性如何保证(先了解)
检查点必须要开启
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("xxx")
//检查点超时时间 < 事务超时时间 <= 事务最大超时时间(15min)
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,15*60*1000 + "")
在消费端,设置消费的隔离级别read_committed
.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed")
union
connect
Flink高级
Jdbc连接器
SinkFunction jdbcSinkFunction = JdbcSink.sink(
sql,
给问号占位符赋值,
攒批配置(可选),
连接选项
);
流.addSink(jdbcSinkFunction)
构造者设计模式
对象创建和赋值一步搞定
链式调用
自定义Sink
class 类 implements SinkFunction|extends RichSinkFunction{
invoke:向不同外部系统写入数据的逻辑
}
时间语义
事件时间:数据真正产生的时间
处理时间:Flink中算子对数据进行处理的操作
摄入时间(了解):数据进入到Flink的Source时间
从Flink1.12开始,默认时间语义事件时间
窗口
将无限的流数据划分一个个有限的数据库进行处理,就是所谓的窗口
在理解窗口的时候,窗口是"桶"不是"框"
窗口分类
按照驱动方式分
时间窗口:以时间作为窗口起始或者结束的标记
计数窗口:以元素的个数作为窗口的结束标记
按照数据划分的方式分
滚动窗口
特点:窗口和窗口之间首尾相接,不会重叠,同一个元素不会同时属于多个窗口
需要传递的参数:窗口大小
滑动窗口
特点:窗口和窗口会重叠,同一个元素同时属于多个窗口
需要传递的参数:窗口大小、滑动步长
会话窗口
特点:窗口和窗口不会重叠,同一个元素不会同时属于多个窗口,窗口和窗口之间应该有一个间隙(Gap)
需要传递的参数:时间间隔
全局窗口
特点:将数据放到同一个窗口中,默认是没有结束的,所以在使用全局窗口的时候,需要自定义触发器
计数窗口的底层就是通过全局窗口实现的
窗口API
开窗前是否进行了keyBy操作
没有keyBy,针对整条流进行开窗,相当于并行设置为1
wsDS.windowAll()
wsDS.countWindowAll()
keyBy,针对keyBy之后的每一组进行单独开窗,窗口之间相互不影响
wsDS.keyBy().window()
wsDS.keyBy().countWindow()
窗口分配器---开什么样的窗口
滚动处理时间窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
滑动处理时间窗口
.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(2)))
处理时间会话窗口
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
滚动事件时间窗口
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
滑动事件时间窗口
.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(2)))
事件时间会话窗口
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
滚动计数窗口
.countWindow(10)
滑动计数窗口
.countWindow(10,2)
全局窗口
.window(GlobalWindows.create())
窗口处理函数---如何处理窗口中的元素
增量处理函数---窗口中来一条数据处理一次,不会对数据进行缓存
优点:不会缓存数据,省空间
缺点:获取不到窗口更详细的信息
.reduce()
窗口中元素的类型以及向下游传递的类型是一致的
如果窗口中只有一个元素,reduce方法不会被执行
reduce(value1,value2)
value1:中间累加结果
value2:新来的数据
.aggregate()
窗口中元素的类型、累加器的类型以及向下游传递的数据的类型可以不一致
createAccumulator:属于当前窗口的第一个元素进来的时候执行
add:窗口中每来一条数据都会被执行一次
getResult: 窗口触发计算的时候执行一次
全量处理函数---会将当前窗口的数据全部缓存下来,等窗口触发计算的时候再进行处理
优点:可以获取窗口更详细的信息
缺点:会将窗口的数据进行缓存,比较占用空间
.apply()
apply(String key, TimeWindow window, Iterable<WaterSensor> input, Collector<String> out)
.process()
process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out)
process更底层,通过context对象除了可以获取窗口对象之外,还可以获取更丰富的信息
在实际使用的过程中,可以增量 + 全量
reduce + apply (WindowFunction)
reduce + process (ProcessWindowFunction)
aggregate + apply (WindowFunction)
aggregate + process (ProcessWindowFunction)
窗口触发器---何时触发窗口的计算
.trigger()
窗口移除器---在窗口触发计算后,在处理函数执行前或者后执行一些移除操作
.evictor()
以滚动处理时间窗口为例
什么时候创建窗口对象
当有属于这个窗口的第一个元素到来的时候,创建窗口对象
窗口对象是左闭右开的[ )
窗口对象的起始和结束时间
起始时间:向下取整
final long remainder = (timestamp - offset) % windowSize;
// handle both positive and negative cases
if (remainder < 0) {
return timestamp - (remainder + windowSize);
} else {
return timestamp - remainder;
}
结束时间: 起始时间 + 窗口大小
最大时间: 结束时间 - 1ms
窗口如何触发计算
当系统时间到了窗口的最大时间的时候触发计算
水位线
前提:事件时间语义
在Flink内部维护一个逻辑时钟,用于衡量事件时间的进展,我们称这个逻辑时钟为水位线(Watermark)
主要用于事件时间窗口的触发、事件时间定时器的执行
水位线也会作为流中的元素向下游传递
flink会任务水位线前的数据都已经处理完了
水位线是递增的,不会变小
maxTimestamp - outOfOrdernessMillis - 1
Flink提供水位线生成的两种方式
单调递增--流中的数据不会出现乱序
.<WaterSensor>forMonotonousTimestamps()
new AscendingTimestampsWatermarks ---> extends ---> BoundedOutOfOrdernessWatermarks
有界乱序--流中的数据可能出现乱序
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofMillis(3))
new BoundedOutOfOrdernessWatermarks
单调递增是有界乱序的子类,单调递增是有界乱序的一种特殊情况,乱序程度是0
水位线的传递
上游是一个并行度,下游是多个并行度
广播
上游是多个并行度,下游是一个并行度
将上游所有的并行度上水位值拿过来取最小
上游是多个并行度,下游也是多个并行度
先广播,再取最小
空闲数据源
以滚动事件时间窗口为例
窗口对象什么时候创建
当属于这个窗口的第一个元素到来的时候创建窗口对象
窗口起始时间
向下取整的算法
窗口的结束时间
起始时间 + 窗口大小
窗口最大时间
maxTimestamp = 结束时间 - 1ms
窗口什么时候触发计算
水位线到了窗口的最大时间
window.maxTimestamp() <= ctx.getCurrentWatermark()
窗口什么时候关闭
水位线到了 window.maxTimestamp() + allowedLateness
迟到数据的处理
指定水位线的生成策略为有界乱序,指定乱序程度
在开窗的时候,指定窗口的允许迟到时间
侧输出流
基于时间双流join
基于窗口
滚动
滑动
会话
基于状态
IntervalJoin
基本语法:
keyedA
.intervalJoin(keyedB)
.between(下界,上界)
.process()
底层实现:
connect + 状态
具体处理步骤
判断是否迟到
将当前数据放到状态中缓存起来
用当前数据和另外一条流中缓存的数据进行关联
清状态
局限性:只支持内连接
迟到数据的处理
- 指定水位线的生成策略为有界乱序,指定乱序程度
- 在开窗的时候,指定窗口的允许迟到时间
- 侧输出流
处理函数
.process(处理函数)
处理函数可以获取上下文对象。通过上下文对象可以操作侧输出流、可以获取TimeService
处理函数分类
ProcessFunction
最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。
KeyedProcessFunction
对流按键分组后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,必须基于KeyedStream。
ProcessWindowFunction
开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。
ProcessAllWindowFunction
同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入。
CoProcessFunction
合并(connect)两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。
ProcessJoinFunction
间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。
BroadcastProcessFunction
广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物。关于广播流的相关操作,我们会在后续章节详细介绍。
KeyedBroadcastProcessFunction
按键分组的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。
定时器
当处理时间或者事件时间到了定时器指定的时间点的时候
定时器会触发执行---onTimer
注意:只有KeyedProcessFunction中才能使用定时器
状态
作用:用于保存程序运行的中间结果
状态的分类
原始状态
托管状态(我们主要学习)
算子状态
作用范围:和普通的成员变量作用范围相同,算子子任务(每个并行度/每个slot) 普通的成员变量只能在内存中保存,状态可以被持久化
ListState状态的使用流程
实现CheckpointedFunction接口
在成员变量的位置声明状态
重写initializeState和snapshotState方法
在initializeState方法中给状态进行初始化
context.getOperatorStateStore().getListState(listStateDescriptor)
在snapshotState方法中将成员变量放到状态中存起来
在处理函数中使用的还是成员变量
BroadcastState状态的使用流程
准备两条流(非广播流-主流、广播流-规则|配置流)
广播流进行广播---broadcast(广播状态描述器)
将主流和广播流进行关联---connect
对关联后的数据进行处理---process
【Keyed】BroadcastProcessFunction
processElement:处理主流数据 --- 从广播状态取规则|配置对主流数据进行处理
processBroadCastElement:处理广播流数据 --- 将数据放到广播状态中
将处理结果向下游传递
键控状态
作用范围:经过keyBy之后的每一个组单独维护一个状态,组和组之间状态不共享,相互隔离
值状态
ValueState
列表状态
ListState
map状态
MapState
规约状态
ReducingState
聚合状态
AggregatingState
键控状态的使用流程
在成员变量的位置声明状态
在open方法中给状态进行初始化
getRuntimeContext().getState(状态描述器对象)
在处理函数中使用状态
状态后端
主要决定状态以及检查点存储位置
Flink1.13前
状态 检查点
Memory TaskManager堆内存 JobManager堆内存
Fs TaskManager堆内存 文件系统 ,例如HDFS
RocksDB rocksDB库 文件系统 ,例如HDFS
Flink1.13后
HashMap TaskManager堆内存 JobManager堆内存||文件系统 ,例如HDFS
RocksDB rocksDB库 文件系统 ,例如HDFS
检查点
检查点是对状态进行的备份
是状态的副本
底层使用异步分界线快照算法
核心:barrier
工作原理
当到了检查点触发时机后,JobManager中的检查点协调器会向Source发送barrier
当source接收到barrier后,会将source的状态进行备份
barrier也会随着流中的流动向下游传递
下游算子接收到barrier后,也会对状态进行备份
直到barrier走到sink,对sink上的状态做完备份后,说明barrier之前的数据肯定是已经处理完了
相关的状态也已经都备份成功,这次检查点结束
barrier的传递(指的是barrier对齐的情况)
上游是一个并行度,下游是多个并行度,广播
上游是多个并行度,下游是一个并行度,将上游所有的barrier收齐后再进行备份
上游是多个并行度,下游是多个并行度,先广播,再对齐
检查点算法
Barrier对齐的精准一次
在等待barrier对齐的过程中,如果已经到达的barrier上,又有新的数据进来
不会对其进行处理,会将新来的数据缓存起来的,等到barrier对齐后再进行处理
优点:可以保证精准一次
缺点:时效性差
Barrier对齐的至少一次
在等待barrier对齐的过程中,如果已经到达的barrier上,又有新的数据进来
直接对其进行处理,如果出现故障,可能会出现重复处理的情况
优点:时效性好
缺点:保证不了精准一次
非Barrier对齐的精准一次(Flink1.11后)
只要上游并行度的barrier过来,就开始进行备份,但是在备份前,需要做如下几件事
将已经到达的barrier移到算子的输出缓存区末端
标记当前barrier跳过的数据
标记其它未到的barrier之前的数据
将标记的数据以及状态都保存到检查点
优点:时效性好,能够保证精准一次
缺点:保存数据量大,如果数据量大、算子链长、性能底
非barrier对齐的精准一次 过程:
当barrier来时,将其从输入缓冲区直接放到输出缓冲区末端,并将其(barrier)和跳过的数据(已处理的、待处理的)、和其他流的同一批未到来barrier之前的数据都保存到当前检查点(即备份)
检查点相关的配置
启用检查点
检查点存储
检查点模式(CheckpointingMode)
超时时间(checkpointTimeout)
最小间隔时间(minPauseBetweenCheckpoints)
最大并发检查点数量(maxConcurrentCheckpoints)
开启外部持久化存储(enableExternalizedCheckpoints)--job取消后,检查点是否保留
检查点连续失败次数(tolerableCheckpointFailureNumber)
非对齐检查点(enableUnalignedCheckpoints)
对齐检查点超时时间(alignedCheckpointTimeout)
通用增量 checkpoint (changelog)
最终检查点
检查点配置位置
在flink-conf.yaml文件中
在代码中通过API指定
保存点
底层原理实现和检查点一样
检查点是程序自动进行快照
保存点是程序员手动进行快照
--必须指定uid
一致性级别
最多一次(At-Most-Once)
至少一次(At-Least-Once)
精确一次(Exactly-Once)
Flink的端到端一致性
Source(读取数据的外部系统)
可重置偏移量
Flink流处理框架本身
检查点
Sink(写入数据的外部系统)
幂等
外部系统必须支持幂等
事务
WAL:外部系统不支持事务
*2PC:外部系统支持事务
两次提交过程描述
当处理完的数据到达Flink程序的SInk端时,要在接收端(如Kafka)开启一个事务,然后再将数据写过去(预提交)
一直写,直到barrier到达Sink,将当前检查点的状态(事务执行状态:未提交)保存起来,并开启新的状态,当所有并行度的检查点做完这两件事之后,JobManager向所有节点发送“本轮成功”的回调消息,预提交结束,进行真正提交,将写入Kafka的数据标记为已提交(若该步骤未完成,就从上次快照(检查点,就是本轮的检查点)的位置重新提交),提交成功后,Kafka会给SInk返回一个值,此时,Kafka将检查点(事务执行)状态修改为:已完成
设置事务的超时时间
检查点超时时间 < 事务的超时时间 <= 事务的最大超时时间
Kafka案例演示
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;
/**
* 该案例演示了两阶段提交_Sink
* 如果将Flink流中的数据写到Kafka主题中,要想保证写入的精准一次,需要做如下操作
* 开启检查点
* 设置一致性级别为精准一次(默认barrier对齐)
* .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
* 设置事务id的前缀
* .setTransactionalIdPrefix("2pc_sink")
* 设置事务的超时时间
* 检查点超时时间 < 事务的超时时间 <= 事务的最大超时时间
* 在消费端设置消费隔离级别
* .setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed") *
*/
public class Flink02_2pc_Sink {
public static void main(String[] args) throws Exception {
//TODO 1.环境准备
//1.1 指定流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.2 设置并行度
env.setParallelism(1);
//开启检查点
env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
//TODO 2.从指定的网络端口读取数据
DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 8888);
//TODO 3.将流的数据写到kafka的主题
//3.1 创建KafkaSink
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("hadoop102:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("first")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("2pc_sink")
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,15*60*1000 + "")
.build();
//3.2 写入
socketDS.sinkTo(kafkaSink);
//TODO 4.提交作业
env.execute();
}
}
启动yarn的历史服务器bin/historyserv
flink客户端:bin/sql-client.sh -i conf/sql-client-init.sql