打开BigData-KongGuan项目
- 打开BigData-KongGuan项目,在上一个任务(“用户登录”)的基础上继续完成本阶段任务。
- 初始化加载SpringBoot项目的代码所在位置src/main/java/com/qrsoft/BigDataKongGuanApplication.java ,代码如下:
package com.qrsoft;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class BigDataKongGuanApplication{
public static ConfigurableApplicationContext appConfig;
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException {
appConfig = SpringApplication.run(BigDataKongGuanApplication.class, args);
}
}
2、编写TimeTaskService类,并配置HBase参数和基础参数
- 代码所在位置src/main/java/com/qrsoft/service/TimeTaskService.java。
- 在com.qrsoft.service包下,创建TimeTaskService类,首先,定义一些变量用于临时保存查询结果:
package com.qrsoft.service;
import com.alibaba.fastjson.JSON;
import com.qrsoft.common.Constants;
import com.qrsoft.util.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Service
@Slf4j
public class TimeTaskService {
//本次查询的最后一个rowKey
//kg_PlanData计划数据
private static byte[] planDataLastRowKey = null;
//Kg_WarnFlightHistory航班指令告警数据
private static byte[] warnFlightLastRowKey = null;
//Kg_ATCDutyInfo管制值班人员数据
private static byte[] aTCDutyLastRowKey = null;
//Kg_WarnSimilarHistory相似航班号告警数据
private static byte[] warnSimilarLastRowKey = null;
//kg_AFTN报文数据
private static byte[] aFTNLastRowKey = null;
//kg_ATC报文数据
private static byte[] aTCLastRowKey = null;
//kg_callSaturation扇区通话饱和度数据
private static byte[] callSaturationLastRowKey = null;
//kg_ATC扇区数据 计时
private static String atcDataTime = "20180101";
//kg_AFTN报文数据 计时
private static String aftnDataTime = "20180101";
//kg_callSaturation扇区通话饱和度数据 计时
private static String callDataTime = "20180101";
//kg_PlanData计划数据 计时
private static String planDataTime = "20180101";
//Kg_WarnFlightHistory航班指令告警数据 计时
private static String warnFlightDataTime = "20180101";
//Kg_WarnSimilarHistory相似航班号告警数据 计时
private static String warnSimilarDataTime = "20180101";
//Kg_ATCDuty 管制员数据 计时
private static String atcDutyDataTime = "20180101";
//每次查询的数据量
private static int selectLimit = 30;
// … 后面将继续编写以下方法 …
// … getDateAndDepositInKafka方法 …
// … timingGetData方法 …
}
3、在TimeTaskService类中,添加getDateAndDepositInKafka方法,实现HBase分页查询,查询指定数量的数据,将数据放到Kafka中
- 在getDateAndDepositInKafka方法会使用到其他一些工具类,例如:常量工具类、分页模型工具类、HBase数据访问工具类等;
1)编写getDateAndDepositInKafka方法,用于读取HBase的数据,并将数据放到不同的Kafka Topic中:
private static byte[] getDateAndDepositInKafka(String topic, String tableName,byte[] lastRowKey, String date, String family, String column) throws SQLException {
return null;
// … 此处需要补充代码 …
}
类名 | 方法名 | 参数 | 返回值 |
---|---|---|---|
TimeTaskService | getDateAndDepositInKafka | topic:kafka topic名称 tableName:HBase中的表名 lastRowKey:起始行键 date:查询的飞行计划时间 family:HBase表的列族 column:查询HBase表的列 | HBase本次分页查询记录最后的行键值 lastrowkey,该值将作为下次查询的起始行键 |
类引用/依赖 | Constants:定义常量的工具类,例如,Kafka Topic的名称。 HBasePageModel:自定义的HBase表数据分页模型工具类。 HQueryFilterUtil:hbase查询fileter组合工具类。 HBaseUtils:HBase数据访问的工具类。 KafkaUtils:Kafka操作的工具类。 DateUtils:日期操作的工具类。 MultiRadar:雷达数据表对应的实体类。 MultiRadarMapper:读取雷达数据表的数据访问类。 |
2)编写com.qrsoft.common.Constants类,该类为当前项目中使用的常量的定义,包括当前任务阶段使用的Kafka Topic名称等:
代码所在位置src/main/java/com/qrsoft/common/Constants.java,核心代码如下:
package com.qrsoft.common;
public class Constants {
//间隔时间
public final static int INTERVAL_TIME_10MIN = 10*60*1000;// 10分钟
public final static int INTERVAL_TIME_5MIN = 5*60*1000;// 5分钟
public final static int INTERVAL_TIME_1MIN = 60*1000;// 1分钟
public final static int INTERVAL_TIME_30SEC = 30*1000;// 30秒
public final static int INTERVAL_TIME_10SEC = 10*1000;// 10秒
public final static int INTERVAL_TIME_5SEC = 5*1000;// 5秒
//每分钟读取条数
public final static int READ_COUNT = 10;
//kg_airport
public final static String TABLE_AIRPORT = "kg_airport";
//kg_airlinecompany
public final static String TABLE_AIRLINECOMPANY = "kg_airlinecompany";
//kg_PlanData计划数据
public final static String TASK_PlANDATA = "task_PlanData";
public final static String TABLE_PlANDATA = "Kg_PlanData";
public final static String FAMILY_PlANDATA = "ReportHome";
public final static String COLUMN_PlANDATA = "EXECUTE_DATE";
//kg_MultiRadarData综合航迹数据
public final static String TASK_RADAR = "task_Radar";
public final static String TABLE_RADAR = "Kg_MultiRadarData";
public final static String FAMILY_RADAR = "RadarHome";
public final static String COLUMN_RADAR = "EXECUTE_DATE";
//kg_AFTN报文数据
public final static String TASK_AFTN = "task_Aftn";
public final static String TABLE_AFTN = "Kg_AFTN";
public final static String FAMILY_AFTN = "AFTNHome";
public final static String COLUMN_AFTN = "EXECUTE_DATE";
//Kg_ATCDutyInfo管制值班人员数据
public final static String TASK_ATCDUTY = "task_ATCDuty";
public final static String TABLE_ATCDUTY = "Kg_ATCDutyInfo";
public final static String FAMILY_ATCDUTY = "ATCDutyHome";
public final static String COLUMN_ATCDUTY = "SEND_TIME";
//Kg_WarnFlightHistory航班指令告警数据
public final static String TASK_WARNFLIGHT = "task_WarnFlight";
public final static String TABLE_WARNFLIGHT = "Kg_WarnFlightHistory";
public final static String FAMILY_WARNFLIGHT = "WarnFlightHome";
public final static String COLUMN_WARNFLIGHT = "GJ_DATE";
//Kg_WarnSimilarHistory相似航班号告警数据
public final static String TASK_WARNSIMILAR = "task_WarnSimilar";
public final static String TABLE_WARNSIMILAR = "Kg_WarnSimilarHistory";
public final static String FAMILY_WARNSIMILAR = "WarnSimilarHome";
public final static String COLUMN_WARNSIMILAR = "GJ_DATE";
//Kg_ATC扇区信息
public final static String TASK_ATC = "task_ATC";
public final static String TABLE_ATC = "Kg_ATC";
public final static String FAMILY_ATC = "ATCHome";
public final static String COLUMN_ATC = "EXECUTE_DATE";
//Kg_CallSaturation 扇区通话饱和度信息
public final static String TASK_CALLSATURATION = "task_CallSaturation";
public final static String TABLE_CALLSATURATION = "Kg_CallSaturation";
public final static String FAMILY_CALLSATURATION = "SaturationHome";
public final static String COLUMN_CALLSATURATION = "SEND_TIME";
}
3)编写com.qrsoft.util.HBasePageModel类,该类是自定义的HBase表数据分页模型工具类,可参考以下代码实现分页功能,也可以根据这个思路自行查找资料,独立完成此类的编写。此类主要用于构建HBase分页模型:
代码所在位置src/main/java/com/qrsoft/util/HBasePageModel.java,核心代码如下:
package com.qrsoft.util;
import org.apache.hadoop.hbase.client.Result;
import java.io.Serializable;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class HBasePageModel implements Serializable {
private static final long serialVersionUID = 330410716100946538L;
private int pageSize = 100;
private int pageIndex = 0;
private int prevPageIndex = 1;
private int nextPageIndex = 1;
private int pageCount = 0;
private int pageFirstRowIndex = 1;
private byte[] pageStartRowKey = null;
private byte[] pageEndRowKey = null;
private boolean hasNextPage = true;
private int queryTotalCount = 0;
private long startTime = System.currentTimeMillis();
private long endTime = System.currentTimeMillis();
private List<Result> resultList = new ArrayList<Result>();
private List<Map<String,String>> list= new ArrayList<Map<String,String>>();
public HBasePageModel(int pageSize) {
this.pageSize = pageSize;
}
//获取分页记录数量
public int getPageSize() {
return pageSize;
}
//设置分页记录数量
public void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
//获取当前页序号
public int getPageIndex() {
return pageIndex;
}
//设置当前页序号
public void setPageIndex(int pageIndex) {
this.pageIndex = pageIndex;
}
//获取分页总数
public int getPageCount() {
return pageCount;
}
//设置分页总数
public void setPageCount(int pageCount) {
this.pageCount = pageCount;
}
//获取每页的第一行序号
public int getPageFirstRowIndex() {
this.pageFirstRowIndex = (this.getPageIndex() - 1) * this.getPageSize() + 1;
return pageFirstRowIndex;
}
//获取每页起始行键
public byte[] getPageStartRowKey() {
return pageStartRowKey;
}
//设置每页起始行键
public void setPageStartRowKey(byte[] pageStartRowKey) {
this.pageStartRowKey = pageStartRowKey;
}
//获取每页结束行键
public byte[] getPageEndRowKey() {
return pageEndRowKey;
}
//设置每页结束行键
public void setPageEndRowKey(byte[] pageEndRowKey) {
this.pageEndRowKey = pageEndRowKey;
}
//获取上一页序号
public int getPrevPageIndex() {
if(this.getPageIndex() > 1) {
this.prevPageIndex = this.getPageIndex() - 1;
} else {
this.prevPageIndex = 1;
}
return prevPageIndex;
}
//获取下一页序号
public int getNextPageIndex() {
this.nextPageIndex = this.getPageIndex() + 1;
return nextPageIndex;
}
//获取是否有下一页
public boolean isHasNextPage() {
//这个判断是不严谨的,因为很有可能剩余的数据刚好够一页。
if(this.getResultList().size() == this.getPageSize()) {
this.hasNextPage = true;
} else {
this.hasNextPage = false;
}
return hasNextPage;
}
//获取已检索总记录数
public int getQueryTotalCount() {
return queryTotalCount;
}
//获取已检索总记录数
public void setQueryTotalCount(int queryTotalCount) {
this.queryTotalCount = queryTotalCount;
}
//初始化起始时间(毫秒)
public void initStartTime() {
this.startTime = System.currentTimeMillis();
}
//初始化截止时间(毫秒)
public void initEndTime() {
this.endTime = System.currentTimeMillis();
}
//获取毫秒格式的耗时信息
public String getTimeIntervalByMilli() {
return String.valueOf(this.endTime - this.startTime) + "毫秒";
}
//获取秒格式的耗时信息
public String getTimeIntervalBySecond() {
double interval = (this.endTime - this.startTime)/1000.0;
DecimalFormat df = new DecimalFormat("#.##");
return df.format(interval) + "秒";
}
//获取HBase检索结果集合
public List<Result> getResultList() {
return resultList;
}
//设置HBase检索结果集合
public void setResultList(List<Result> resultList) {
this.resultList = resultList;
}
public List<Map<String, String>> getList() {
return list;
}
public void setList(List<Map<String, String>> list) {
this.list = list;
}
}
4)编写com.qrsoft.util.HQueryFilterUtil类,该类是HBase查询过滤器的组合工具。
代码所在位置src/main/java/com/qrsoft/util/HQueryFilterUtil.java,核心代码如下:
package com.qrsoft.util;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.List;
public class HQueryFilterUtil {
public static final int subStringFilter=1;
public static Filter newFilter(int type, String family, String column, String value){
switch (type) {
case subStringFilter:
return newSubStringFilter(family, column, value);
default:
throw new RuntimeException("filter type not exists");
}
}
//family:column中包含字符串value
public static Filter newSubStringFilter(String family, String column, String value){
SubstringComparator comp = new SubstringComparator(value);
return new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(column), CompareOp.EQUAL,comp);
}
//查询最多返回行数
public static Filter rowLimitFilter(int rowcount){
return new PageFilter(rowcount);
}
public static FilterList or(List<Filter> filterList){
return newList(FilterList.Operator.MUST_PASS_ONE,filterList);
}
public static FilterList and(List<Filter> filterList){
return newList(FilterList.Operator.MUST_PASS_ALL,filterList);
}
private static FilterList newList(FilterList.Operator oper, List<Filter> filterList){
FilterList list = new FilterList(oper);
for(Filter f:filterList){
list.addFilter(f);
}
return list;
}
}
5)编写com.qrsoft.util.KafkaUtils类,该类是Kafka操作的工具类。
代码所在位置src/main/java/com/qrsoft/util/KafkaUtils.java,核心代码如下:
package com.qrsoft.util;
import kafka.serializer.StringEncoder;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.core.io.support.PropertiesLoaderUtils;
import java.io.IOException;
import java.util.Properties;
/***
* kafka工具类
*/
public class KafkaUtils {
@SuppressWarnings("rawtypes")
private static KafkaProducer _producer;
/**
* 获取kafka生产者消息
*/
@SuppressWarnings("rawtypes")
private static KafkaProducer GetProducer() {
if (_producer == null) {
_producer = createProducer();
}
return _producer;
}
/**
* 向kafka中传入数据
*
* @param topic topic名称
* @param message 消息
* @return true成功,false失败
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public static boolean SendMessage(String topic, String message) {
// 创建一个producer的对象
Producer producer = GetProducer();
try {
// 使用produer发送消息
producer.send(new ProducerRecord(topic, "message" + message, message));
//TimeUnit.SECONDS.sleep(5);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/***
* 创建Producer的实例
*/
@SuppressWarnings("rawtypes")
private static KafkaProducer createProducer() {
Properties properties = null;
try {
properties = PropertiesLoaderUtils.loadAllProperties("kafka.properties");
} catch (IOException e) {
e.printStackTrace();
}
String bootstrap = properties.get("bootstrap.servers").toString();
String zookeeper = properties.get("zookeeper.connect").toString();
String metadata = properties.get("metadata.broker.list").toString();
Properties props = new Properties();
// 该地址是集群的子集,用来探测集群。
props.put("bootstrap.servers", bootstrap);
// 声明zk
props.put("zookeeper.connect", zookeeper);
props.put("serializer.class", StringEncoder.class.getName());
// 声明Broker的地址
props.put("metadata.broker.list", metadata);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer(props);
}
}
6)编写com.qrsoft.util.DateUtils类,该类是日期操作的工具类。
代码所在位置src/main/java/com/qrsoft/util/DateUtils.java,核心代码如下:
package com.qrsoft.util;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
/**
* Title: DateUtils.java Description: 日期工具类
*/
public class DateUtils {
public static String myDateUtils(String time){
if(time.equals("20181231")){
return "20180101";
}
SimpleDateFormat sd = new SimpleDateFormat("yyyyMMdd");
String newDate = time;
try {
Date parse = sd.parse(time);
long newTime = parse.getTime()+(1000*60*60*24);
Date newDate1 = new Date(newTime);
newDate = sd.format(newDate1);
} catch (ParseException e) {
e.printStackTrace();
}
return newDate;
}
}
7)编写com.qrsoft.util.HBaseUtils类,该类是HBase数据访问的工具类。
代码所在位置src/main/java/com/qrsoft/util/HBaseUtils.java。
初始化HBase连接:
package com.qrsoft.util;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.core.io.support.PropertiesLoaderUtils;
import java.io.IOException;
import java.util.*;
public class HBaseUtils {
private static Configuration conf = null;
private static Connection conn = null;
static { getInstance(); }
public static Connection getInstance() {
try {
Properties properties =
PropertiesLoaderUtils.loadAllProperties("hbase.properties");
String hbase = properties.get("hbase.zookeeper.quorum").toString();
String zookeeper = properties.get("zookeeper.znode.parent").toString();
if (null == conn) {
conf = HBaseConfiguration.create();
conf.setInt("hbase.rpc.timeout", 24000);
conf.setInt("hbase.client.operation.timeout", 300000);
conf.setInt("hbase.client.scanner.timeout.period", 240000);
conf.set("hbase.zookeeper.quorum", hbase);
conf.set("zookeeper.znode.parent", zookeeper);
conn = ConnectionFactory.createConnection(conf);
return conn;
} else {
return conn;
}
} catch (Exception e) {
System.out.println("Hbase连接异常:" + e.getMessage());
}
return conn;
}
// … scanResultByPageFilter实现分页检索表数据 …
// … selectFirstResultRow 实现检索指定表的第一行记录 …
}
8)添加scanResultByPageFilter方法实现分页检索表数据
/**
* (如果在创建表时为此表指定了非默认的命名空间,则需拼写上命名空间名称,格式为【namespace:tablename】)。
* @param tableName 表名称(*)。
* @param startRowKey 起始行键(可以为空,如果为空,则从表中第一行开始检索)。
* @param endRowKey 结束行键(可以为空)。
* @param filterList 检索条件过滤器集合(不包含分页过滤器;可以为空)。
* @param maxVersions 指定最大版本数【如果为最大整数值,则检索所有版本;如果为最小整数值,则检索最新版本;否则只检索指定的版本数】。
* @param pageModel 分页模型(*)。
* @return 返回HBasePageModel分页对象。
*/
public static HBasePageModel scanResultByPageFilter(String tableName, byte[] startRowKey, byte[] endRowKey,FilterList filterList, int maxVersions, HBasePageModel pageModel) {
if (pageModel == null) {
pageModel = new HBasePageModel(10);
}
if (maxVersions <= 0) {
// 默认只检索数据的最新版本
maxVersions = Integer.MIN_VALUE;
}
pageModel.initStartTime();
pageModel.initEndTime();
if (StringUtils.isBlank(tableName)) {
return pageModel;
}
HTable table = null;
try {
// 根据HBase表名称,得到HTable表对象
table = (HTable) getInstance().getTable(TableName.valueOf(tableName.getBytes()));
//getInstance().getTable(TableName.valueOf(tableName));
//HBaseTableManageUtil.getHBaseTable(tableName);
int tempPageSize = pageModel.getPageSize();
boolean isEmptyStartRowKey = false;
if (startRowKey == null) {
// 则读取表的第一行记录,这里用到了笔者本人自己构建的一个表数据操作类。
Result firstResult = selectFirstResultRow(tableName, filterList);
if (firstResult == null) {
return pageModel;
}
startRowKey = firstResult.getRow();
}
if (pageModel.getPageStartRowKey() == null) {
isEmptyStartRowKey = true;
pageModel.setPageStartRowKey(startRowKey);
} else {
if (pageModel.getPageEndRowKey() != null) {
pageModel.setPageStartRowKey(pageModel.getPageEndRowKey());
}
// 从第二页开始,每次都多取一条记录,因为第一条记录是要删除的。
tempPageSize += 1;
}
Scan scan = new Scan();
scan.setStartRow(pageModel.getPageStartRowKey());
if (endRowKey != null) {
scan.setStopRow(endRowKey);
}
PageFilter pageFilter = new PageFilter(pageModel.getPageSize() + 1);
if (filterList != null) {
filterList.addFilter(pageFilter);
scan.setFilter(filterList);
} else {
scan.setFilter(pageFilter);
}
if (maxVersions == Integer.MAX_VALUE) {
scan.setMaxVersions();
} else if (maxVersions == Integer.MIN_VALUE) {
} else {
scan.setMaxVersions(maxVersions);
}
ResultScanner scanner = table.getScanner(scan);
List<Result> resultList = new ArrayList<Result>();
int index = 0;
Map<String, String> rowMap;
List<Map<String, String>> rowList = new ArrayList<Map<String, String>>();
for (Result rs : scanner.next(tempPageSize)) {
if (isEmptyStartRowKey == false && index == 0) {
index += 1;
continue;
}
if (!rs.isEmpty()) {
//默认存入集合
resultList.add(rs);
//制作每一行数据集合
rowMap = new HashMap<String, String>();
//存入主键
rowMap.put("rowKey", Bytes.toString(rs.getRow()));
//存入各字段
NavigableMap<byte[], NavigableMap<byte[], byte[]>> rowMap1 = rs.getNoVersionMap();
for (byte[] f : rowMap1.keySet()) {
NavigableMap<byte[], byte[]> colMap = rowMap1.get(f);
for (byte[] c : colMap.keySet()) {
rowMap.put(Bytes.toString(c), Bytes.toString(colMap.get(c)));
}
}
rowList.add(rowMap);
}
index += 1;
}
scanner.close();
pageModel.setResultList(resultList);
pageModel.setList(rowList);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
int pageIndex = pageModel.getPageIndex() + 1;
pageModel.setPageIndex(pageIndex);
if (pageModel.getResultList().size() > 0) {
// 获取本次分页数据首行和末行的行键信息
byte[] pageStartRowKey = pageModel.getResultList().get(0).getRow();
byte[] pageEndRowKey = pageModel.getResultList().get(pageModel.getResultList().size() - 1).getRow();
pageModel.setPageStartRowKey(pageStartRowKey);
pageModel.setPageEndRowKey(pageEndRowKey);
}
int queryTotalCount = pageModel.getQueryTotalCount() + pageModel.getResultList().size();
pageModel.setQueryTotalCount(queryTotalCount);
pageModel.initEndTime();
//pageModel.printTimeInfo();
return pageModel;
}
9)编写selectFirstResultRow方法, 实现检索指定表的第一行记录
/**
* 检索指定表的第一行记录。<br>
* (如果在创建表时为此表指定了非默认的命名空间,则需拼写上命名空间名称,格式为【namespace:tablename】)。
*
* @param tableName 表名称(*)。
* @param filterList 过滤器集合,可以为null。
* @return
*/
public static Result selectFirstResultRow(String tableName, FilterList filterList) {
if (StringUtils.isBlank(tableName)) return null;
HTable table = null;
try {
table = (HTable) getInstance().getTable(TableName.valueOf(tableName.getBytes()));
//getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
//scan.setCaching(20);
if (filterList != null && filterList.getFilters().size() != 0) {
scan.setFilter(filterList);
}
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> iterator = scanner.iterator();
int index = 0;
if(iterator.hasNext()){
Result next = iterator.next();
return next;
}
scanner.close();
return null;
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
}
10)编写com.qrsoft.entity.MultiRadar类,该类是访问雷达数据的数据实体类。
代码所在位置src/main/java/com/qrsoft/entity/MultiRadar.java,核心代码如下:
package com.qrsoft.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("multiradar_number")
public class MultiRadar implements Serializable {
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
@TableField(value = "TRACK_NUMBER")
private String trackNumber;
@TableField(value = "AREA_SOURCE")
private String areaSource;
@TableField(value = "SEND_RADAR_TIME")
private String sendRadarTime;
@TableField(value = "RADAR_TYPE")
private String radarType;
@TableField(value = "ACID")
private String acid;
@TableField(value = "SSR_CODE")
private String ssrCode;
@TableField(value = "ZHIJIAO_X")
private String zhijiaoX;
@TableField(value = "ZHIJIAO_Y")
private String zhijiaoY;
@TableField(value = "RADAR_LONGTITUDE")
private String radarLongtitude;
@TableField(value = "RADAR_LATITUDE")
private String radarLatitude;
@TableField(value = "RADAR_HEIGHT")
private String radarHeight;
@TableField(value = "SPEED_X")
private String speedX;
@TableField(value = "SPEED_Y")
private String speedY;
@TableField(value = "RADAR_SPEED")
private String radarSpeed;
@TableField(value = "DIRECTION")
private String direction;
@TableField(value = "RADAR_CFL")
private String radarCfl;
@TableField(value = "FCU")
private String fcu;
@TableField(value = "TIME")
private String time;
@TableField(value = "FLYSTATUS")
private String flystatus;
@TableField(value = "CLIMBORDOWN_SPEED")
private String climbordownSpeed;
}
11)编写com.qrsoft.mapper.MultiRadarMapper接口,该接口是访问雷达数据的数据访问接口。
代码所在位置src/main/java/com/qrsoft/mapper/MultiRadarMapper.java。
package com.qrsoft.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.qrsoft.entity.MultiRadar;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface MultiRadarMapper extends BaseMapper<MultiRadar> {
}
- 回到com.qrsoft.service.TimeTaskService类中,继续编写getDateAndDepositInKafka方法,执行分页查询,每次查询指定数量的数据,将查询的HBase中的数据,推送到Kafka的topic中
private static byte[] getDateAndDepositInKafka(String topic, String tableName, byte[] lastRowKey, String date, String family, String column)
throws SQLException {
// 从hbase中取出数据
List<Filter> rowFilters = new ArrayList<Filter>();
FilterList filterList = new FilterList(rowFilters);
HBasePageModel pageModel = new HBasePageModel(selectLimit + 1);
// if(!topic.equals(Constants.TASK_RADAR)) {
if (date != null) {
Filter filter = HQueryFilterUtil.newSubStringFilter(family, column, date);
filterList.addFilter(filter);
}
// }
HBasePageModel hbasePageModel = HBaseUtils.scanResultByPageFilter(
tableName, lastRowKey, null, filterList, 0, pageModel);
// 获取本次查询的最后一个rowKey
int dataSize = hbasePageModel.getList().size();
if (dataSize > 0) {
Map<String, String> map = hbasePageModel.getList().get(dataSize - 1);
String rowkey = map.get("rowKey");
lastRowKey = rowkey.getBytes();
// 把每一条数据放入Kafka
for (int i = 0; i < hbasePageModel.getList().size() - 1; i++) {
// System.out.println(JSONArray.fromObject(hbasePageModel.getList().get(i)).toString());
KafkaUtils.SendMessage(topic, JSON.toJSONString(hbasePageModel.getList().get(i)));
System.out.println("============" + JSON.toJSONString(hbasePageModel.getList().get(i)) + "==============");
}
System.out.println(tableName + "此批次最后一个rowkey:" + rowkey);
}
System.out.println(dataSize + "===================================" + selectLimit);
if (dataSize < selectLimit) {
if (topic.equals(Constants.TASK_ATC)) {
atcDataTime = DateUtils.myDateUtils(date);
System.out.println(Constants.TASK_ATC + "更新数据时间为" + atcDataTime);
} else if (topic.equals(Constants.TASK_AFTN)) {
aftnDataTime = DateUtils.myDateUtils(date);
System.out.println(Constants.TASK_AFTN + "更新数据时间为" + aftnDataTime);
} else if (topic.equals(Constants.TASK_PlANDATA)) {
planDataTime = DateUtils.myDateUtils(date);
System.out.println(Constants.TASK_PlANDATA + "更新数据时间为" + planDataTime);
} else if (topic.equals(Constants.TASK_WARNFLIGHT)) {
warnFlightDataTime = DateUtils.myDateUtils(date);
System.out.println(Constants.TASK_WARNFLIGHT + "更新数据时间为" + warnFlightDataTime);
} else if (topic.equals(Constants.TASK_WARNSIMILAR)) {
warnSimilarDataTime = DateUtils.myDateUtils(date);
System.out.println(Constants.TASK_WARNSIMILAR + "更新数据时间为" + warnSimilarDataTime);
} else if (topic.equals(Constants.TASK_CALLSATURATION)) {
callDataTime = DateUtils.myDateUtils(date);
System.out.println(Constants.TASK_CALLSATURATION + "更新数据时间为" + callDataTime);
} else if (topic.equals(Constants.TASK_ATCDUTY)) {
atcDutyDataTime = DateUtils.myDateUtils(date);
System.out.println(Constants.TASK_ATCDUTY + "更新数据时间为" + atcDutyDataTime);
}
}
return lastRowKey;
}
- 在resources目录下创建或导入HBase和Kafka相关配置文件
1)创建或导入hbase.properties文件,内容如下:
#hbase.rpc.timeout=24000
#hbase.client.operation.timeout=300000
#hbase.client.scanner.timeout.period=240000
# 注意:zookeeper.quorum请根据你当前环境所安装zookeeper的服务器名称进行设置
hbase.zookeeper.quorum=node1,node2,node3
zookeeper.znode.parent=/hbase
2)创建或导入kafka.properties文件,内容如下:
# 注意:请根据你当前环境所安装zookeeper和kafka的服务器名称进行设置
bootstrap.servers=node1:9092,node2:9092,node3:9092
zookeeper.connect=node1:2181,node2:2181,node3:2181
metadata.broker.list=node1:9092,node2:9092,node3:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
4、创建定时任务,启动数据获取,设置Topic名称
代码所在位置src/main/java/com/qrsoft/service/TimeTaskService.java。
- 回到com.qrsoft.service.TimeTaskService类中,继续编写timingGetData方法(每间隔30秒执行一次定时任务),核心代码如下:
@Scheduled(cron = "*/30 * * * * *")
public void timingGetData() {
System.out.println("定时任务启动获取数据");
try {
planDataLastRowKey = getDateAndDepositInKafka(Constants.TASK_PlANDATA, Constants.TABLE_PlANDATA, planDataLastRowKey, planDataTime, Constants.FAMILY_PlANDATA, Constants.COLUMN_PlANDATA);
warnFlightLastRowKey = getDateAndDepositInKafka(Constants.TASK_WARNFLIGHT, Constants.TABLE_WARNFLIGHT, warnFlightLastRowKey, warnFlightDataTime, Constants.FAMILY_WARNFLIGHT, Constants.COLUMN_WARNFLIGHT);
aTCDutyLastRowKey = getDateAndDepositInKafka(Constants.TASK_ATCDUTY, Constants.TABLE_ATCDUTY, aTCDutyLastRowKey, atcDutyDataTime, Constants.FAMILY_ATCDUTY, Constants.COLUMN_ATCDUTY);
warnSimilarLastRowKey = getDateAndDepositInKafka(Constants.TASK_WARNSIMILAR, Constants.TABLE_WARNSIMILAR, warnSimilarLastRowKey, warnSimilarDataTime, Constants.FAMILY_WARNSIMILAR, Constants.COLUMN_WARNSIMILAR);
aFTNLastRowKey = getDateAndDepositInKafka(Constants.TASK_AFTN, Constants.TABLE_AFTN, aFTNLastRowKey, aftnDataTime, Constants.FAMILY_AFTN, Constants.COLUMN_AFTN);
aTCLastRowKey = getDateAndDepositInKafka(Constants.TASK_ATC, Constants.TABLE_ATC, aTCLastRowKey, atcDataTime, Constants.FAMILY_ATC, Constants.COLUMN_ATC);
callSaturationLastRowKey = getDateAndDepositInKafka(Constants.TASK_CALLSATURATION, Constants.TABLE_CALLSATURATION, callSaturationLastRowKey, callDataTime, Constants.FAMILY_CALLSATURATION, Constants.COLUMN_CALLSATURATION);
// 注意:
//1)“实时飞行数据”对应的Topic,由于直接从HBase读取实时飞行数据推送到Kafka后,在使用时会因为实时飞行数据的时间隔太短,导致在地图上显示飞行状态和位置时,移动的不明显;
//2)为了在演示的时候提高体验度,所以将下面一行代码注释掉,而这部分功能单独写了一个TimeTask2Service.java的类;
//3)在TimeTask2Service.java的类中,使用的实时飞行数据是从HBase中读取的已经处理过的保存到本地的数据,实时飞行数据的时间间隔变大,在使用该数据时能明显感觉到飞机在移动;
//radarLastRowKey=getDateAndDepositInKafka(Constants.TASK_RADAR,Constants.TABLE_RADAR,radarLastRowKey,multiRadarDataTime,Constants.FAMILY_RADAR,Constants.COLUMN_RADAR);
} catch (SQLException e) {
e.printStackTrace();
log.info("获取数据失败!", e.getMessage());
}
}
5、单独处理“实时飞行数据”,将数据推送到相应的Kafka Topic
注意: 1)处理“实时飞行数据”推送对应的Topic。由于直接从HBase读取实时飞行数据推送到Kafka后,在使用时会因为实时飞行数据的时间隔太短,导致在地图上显示飞行状态和位置时,移动的不明显; 2)为了在演示的时候提高体验度,这部分功能单独写了一个TimeTask2Service.java的类; 3)在TimeTask2Service.java的类中,使用的实时飞行数据是从HBase中读取的已经处理过的保存到本地的数据,实时飞行数据的时间间隔变大,在使用该数据时能明显感觉到飞机在移动; |
- 准备数据
将提供的数据文件放到/opt/data/目录(也可以放到任意目录下,但是需要修改下代码中读取文件的路径),可以查看一下数据是否存在,如图:
- 代码所在位置src/main/java/com/qrsoft/service/TimeTask2Service.java
- 创建com.qrsoft.service.TimeTask2Service类,核心代码如下:
package com.qrsoft.service;
import cn.hutool.core.io.file.FileReader;
import com.qrsoft.util.KafkaUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
@Service
@Slf4j
public class TimeTask2Service {
/**
* 实时航线飞行,为了模拟数据飞行数据飞行,从历史数据取出文本文件,通过读取每行文件,得到飞行轨迹,每个文本文件代表一架飞机
*/
public static List<String> str0 = null;
public static int count0 = 0;
public static List<String> str1 = null;
public static int count1 = 0;
public static List<String> str2 = null;
public static int count2 = 0;
public static List<String> str3 = null;
public static int count3 = 0;
public static List<String> str4 = null;
public static int count4 = 0;
public static List<String> str5 = null;
public static int count5 = 0;
public static List<String> str6 = null;
public static int count6 = 0;
public static List<String> str7 = null;
public static int count7 = 0;
public static List<String> str8 = null;
public static int count8 = 0;
public static List<String> str9 = null;
public static int count9 = 0;
public static List<String> str10 = null;
public static int count10 = 0;
public static List<String> str11 = null;
public static int count11 = 0;
/**
* 获取文件位置,如果是linux服务器发布需要放在换成linux目录 如果是window可以使用data里的数据
*/
public TimeTask2Service() throws URISyntaxException, IOException, InterruptedException {
FileReader fileReader0 = new FileReader("/opt/data/part-00000");
str0 = fileReader0.readLines();
FileReader fileReader1 = new FileReader("/opt/data/part-00001");
str1 = fileReader1.readLines();
FileReader fileReader2 = new FileReader("/opt/data/part-00002");
str2 = fileReader2.readLines();
FileReader fileReader3 = new FileReader("/opt/data/part-00003");
str3 = fileReader3.readLines();
FileReader fileReader4 = new FileReader("/opt/data/part-00004");
str4 = fileReader4.readLines();
FileReader fileReader5 = new FileReader("/opt/data/part-00005");
str5 = fileReader5.readLines();
FileReader fileReader6 = new FileReader("/opt/data/part-00006");
str6 = fileReader6.readLines();
FileReader fileReader7 = new FileReader("/opt/data/part-00007");
str7 = fileReader7.readLines();
FileReader fileReader8 = new FileReader("/opt/data/part-00008");
str8 = fileReader8.readLines();
FileReader fileReader9 = new FileReader("/opt/data/part-00009");
str9 = fileReader9.readLines();
FileReader fileReader10 = new FileReader("/opt/data/part-00010");
str10 = fileReader10.readLines();
FileReader fileReader11= new FileReader("/opt/data/part-00011");
str11 = fileReader11.readLines();
}
@Scheduled(cron = "*/10 * * * * *")
public void timingGetData() {
System.out.println("实时轨迹任务");
if (count0 >= str0.size()) {
count0 = 0;
System.out.println("数据归零");
}
String s0 = str0.get(count0++);
if (count1 >= str1.size()) {
count1 = 0;
System.out.println("数据归零");
}
String s1 = str1.get(count1++);
if (count2 >= str2.size()) {
count2 = 0;
System.out.println("数据归零");
}
String s2 = str2.get(count2++);
if (count3 >= str3.size()) {
count3 = 0;
System.out.println("数据归零");
}
String s3 = str3.get(count3++);
if (count4 >= str4.size()) {
count4 = 0;
System.out.println("数据归零");
}
String s4 = str4.get(count4++);
if (count5 >= str5.size()) {
count5 = 0;
System.out.println("数据归零");
}
String s5 = str5.get(count5++);
if (count6 >= str6.size()) {
count6 = 0;
System.out.println("数据归零");
}
String s6 = str6.get(count6++);
if (count7 >= str7.size()) {
count7 = 0;
System.out.println("数据归零");
}
String s7 = str7.get(count7++);
if (count8 >= str8.size()) {
count8 = 0;
System.out.println("数据归零");
}
String s8 = str8.get(count8++);
if (count9 >= str9.size()) {
count9 = 0;
System.out.println("数据归零");
}
String s9 = str9.get(count9++);
if (count10 >= str10.size()) {
count10 = 0;
System.out.println("数据归零");
}
String s10 = str10.get(count10++);
if (count11 >= str11.size()) {
count11 = 0;
System.out.println("数据归零");
}
String s11 = str11.get(count11++);
KafkaUtils.SendMessage("task_Radar", s0);
KafkaUtils.SendMessage("task_Radar", s1);
KafkaUtils.SendMessage("task_Radar", s2);
KafkaUtils.SendMessage("task_Radar", s3);
KafkaUtils.SendMessage("task_Radar", s4);
KafkaUtils.SendMessage("task_Radar", s5);
KafkaUtils.SendMessage("task_Radar", s6);
KafkaUtils.SendMessage("task_Radar", s7);
KafkaUtils.SendMessage("task_Radar", s8);
KafkaUtils.SendMessage("task_Radar", s9);
KafkaUtils.SendMessage("task_Radar", s10);
KafkaUtils.SendMessage("task_Radar", s11);
System.out.println("----------------------航迹数据更新----------------------");
}
}
6、测试
- 确保Hadoop、HBase、Kafka集群都已经正常启动,如果没有正常启动,请参考前面环境安装部署的相应任务完成集群的启动。
- 启动SpringBoot项目
- 在控制台显示类似下面的结果:
- 在node1节点上,查看kafka中的topic是否有数据
[root@node1 ~]# kafka-console-consumer.sh --bootstrap-server node1:9092 --from-beginning --topic task_CallSaturation