业务分析
秒杀业务中,通常会有很多用户同时蜂拥而上去抢购热卖商品,经常会出现抢购人数远大于商品库存。其实在秒杀过程中,热卖商品并不多,几乎只占1%,而99%的流量都源自热卖商品,很有可能因为这1%的热卖商品导致服务器宕机,因此针对热卖商品我们要做特殊处理。
热卖商品我们这里称为热点商品,针对热点商品的处理,有这么几种思路,一是优化,二是限制,三是隔离。
优化:优化热点数据最有效的办法就是缓存热点数据。
限制:限制其实是一种削峰手段,我们可以把热点商品抢单采用队列来存储用户抢单信息,将热点抢单限制在一个队列里,防止热点商品抢单占用太多的资源服务,而使得其他服务无法获取抢单机会。
隔离:隔离其实就是将热点商品和非热点商品进行数据源的隔离、操作流程的隔离,不要因为1%的热点数据影响到另外的99%数据。我们可以把热点商品数据存储到缓存中和非热点数据分开,抢单程序也可以和非热点抢单分开。
热点数据又分为离线热点数据和实时热点数据,离线热点数据主要是分析过往热点商品信息,这个统计起来并无难度,可以直接从历史数据库中查询分析。但根据用户抢单实时数据进行分析是一个很困难的事,首先要存储大量的访问信息,同时还能高效的实时统计访问日志信息,从中获取热点商品信息。
Apache Druid介绍
介绍
Apache Druid 是一个分布式的、支持实时多维 OLAP 分析的数据处理系统。它既支持高速的数据实时摄入,也支持实时且灵活的多维数据分析查询。因此 Druid 最常用的场景是大数据背景下、灵活快速的多维 OLAP 分析。 另外,Druid 还有一个关键的特点:它支持根据时间戳对数据进行预聚合摄入和聚合分析,因此也有用户经常在有时序数据处理分析的场景中用到它。
OLTP与OLAP的区别:
OLTP是传统的关系型数据库的主要应用,主要是基本的、日常的事务处理。
OLAP是数据仓库系统的主要应用,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的分析查询结果。
OLAP和OLTP区别:
OLTP就是面向我们的应用系统数据库的,OLAP是面向数据仓库的。
Apache Druid 特性:
亚秒响应的交互式查询,支持较高并发。
支持实时导入,导入即可被查询,支持高并发导入。
采用分布式 shared-nothing 的架构,可以扩展到PB级。
支持聚合函数,count 和 sum,以及使用 javascript 实现自定义 UDF。
支持复杂的 Aggregator,近似查询的 Aggregator 例如 HyperLoglog 以及 Yahoo 开源的 DataSketches。
支持Groupby,Select,Search查询。
开源OLAP数据处理系统性能方面我们做个对比:
Apache Druid 架构设计
Druid自身包含下面4类节点:
1.Realtime Node:即时摄入实时数据,生成Segment(LSM-Tree实现与Hbase基本一致)文件。
2.Historical Node:加载已生成好的数据文件,以供数据查询。
3.Broker Node:对外提供数据查询服务,并同时从Realtime Node和Historical Node查询数据,合并后返回给调用方。
4.Coordinator Node:负责Historical Node的数据负载均衡,以及通过Rule管理数据生命周期。
同时,Druid集群还包含以下3类外部依赖:
1.元数据库(Metastore):存储druid集群的元数据信息,如Segment的相关信息,一般使用MySQL或PostgreSQL
2.分布式协调服务(Coordination):为Druid集群提供一致性服务,通常为zookeeper
3.数据文件存储(DeepStorage):存储生成的Segment文件,供Historical Node下载,一般为使用HDFS
数据摄入
Apache Druid同时支持流式和批量数据摄入。通常通过像 Kafka 这样的消息总线(加载流式数据)或通过像 HDFS 这样的分布式文件系统(加载批量数据)来连接原始数据源。
Apache Druid安装
Apache Druid的安装方面,我们可以参考官方文档实现。
JDK:java8(8u92+)
下载地址:https://druid.apache.org/downloads.html
解压该压缩包:
tar -xf apache-druid-0.17.0-bin.tar.gz
cd apache-druid-0.17.0
包文件如下:
启动单机版Apache Druid:
./bin/start-micro-quickstart
启动后,访问:http://192.168.211.137:8888
数据摄入
离线数据摄入
从一个文件中将数据加载到Apache Druid,参考地址:https://druid.apache.org/docs/latest/tutorials/tutorial-batch.html,如下操作:
1)点击Load data->Local disk->Connect data
2)选择要导入的数据
我们要导入的数据在/usr/local/server/apache-druid-0.17.0/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz,需要把该文件的相对路径填写到右边表单中,再点击Apply,如下图:
3)解析数据
在上一个步骤上点击Next:Parse data,此时会解析导入的数据,如下图:
4)解析时间
在上一个步骤上点击Next: Parse time,Apache Druid要求每条数据都有一个time列,如果我们导入的数据没有该列,Apache Druid会自动帮助我们创建该列,如下图:
5)数据分区设置
点击下一步一直到Partition,我们根据需要设置数据分区方式,如下图:
讲解:
Type:数据粒度使用的类型
Segment granularity:分片文件每个segment包含的时间戳范围
Force guaranteed rollup:是否启用批量推送模式
Partitioning type:分区类型
Max rows per segment:用于分片。确定每个段中的行数。
更多参数如下图:
6)设置数据源
Publish设置,注意设置数据源名字,这里类似数据库中数据库名字。
7)提交配置
最后一步需要提交配置,如下图,点击submit即可。
实时数据摄入
前面的案例是离线数据的摄入,接着我们实现实时数据摄入,我们以收集用户访问商品详情页的访问记录为例,如下图:
参考地址:https://druid.apache.org/docs/latest/tutorials/tutorial-kafka.html
1)load data
2)配置Kafka源
3)配置数据源名字
其他的步骤和之前文件摄入一样,直到配置数据源名字,我们配置数据源名字叫itemlogs,最后一步submit和之前一样,如下图:
查询效果如下:
Druid SQL
简介
Apache Druid SQL是一个内置的SQL层,是Druid基于JSON的查询语言的替代品,由基于Apache Calcite的解析器和规划器提供支持。Druid SQL将SQL转换为Broker本机Druid查询,然后将其传递给数据进程。除了在Broker上转换SQL的(轻微)开销之外,与本机查询相比,没有额外的性能损失。
语法
每个Druid数据源都显示为“Druid”模式,这也是默认模式,Druid数据源引用为druid.dataSourceName或者简单引用dataSourceName。
可以选择使用双引号引用数据源和列名等标识符。要在标识符中转义双引号,请使用另一个双引号,例如"My ““cat”” identifier",所有标识符都区分大小写。
文字字符串应引用单引号,如’foo’,文字数字可以用100(表示整数),100.0(表示浮点值)或1.0e5(科学记数法)等形式编写。时间戳可以写成TIMESTAMP ‘2000-01-01 00:00:00’。时间算法,可以这样写INTERVAL ‘1’ HOUR,INTERVAL ‘1 02:03’ DAY TO MINUTE,INTERVAL ‘1-2’ YEAR TO MONTH,等等。
Druid SQL支持具有以下结构的SELECT查询:
[ EXPLAIN PLAN FOR ]
[ WITH tableName [ ( column1, column2, ... ) ] AS ( query ) ]
SELECT [ ALL | DISTINCT ] { * | exprs }
FROM table
[ WHERE expr ]
[ GROUP BY exprs ]
[ HAVING expr ]
[ ORDER BY expr [ ASC | DESC ], expr [ ASC | DESC ], ... ]
[ LIMIT limit ]
[ UNION ALL <another query> ]
查询所有:
SELECT * FROM "itemlogs"
查询count列:
SELECT "count" FROM "itemlogs"
查询前5条:
SELECT * FROM "itemlogs" LIMIT 5
分组查询:
SELECT ip FROM "itemlogs" GROUP BY ip
排序:
SELECT * FROM "itemlogs" ORDER BY __time DESC
求和:
SELECT SUM("count") FROM "itemlogs"
最大值:
SELECT MAX("count") FROM "itemlogs"
平均值:
SELECT AVG("count") FROM "itemlogs"
查询6年前的数据:
SELECT * FROM "wikiticker" WHERE "__time" >= CURRENT_TIMESTAMP - INTERVAL '6' YEAR
去除重复查询:
SELECT DISTINCT "count" FROM "accessitem"
JDBC查询Apache Druid
Apache Calcite是面向Hadoop新的查询引擎,它提供了标准的SQL语言、多种查询优化和连接各种数据源的能力,除此之外,Calcite还提供了OLAP和流处理的查询引擎。
如果使用java,可以使用Calcite JDBC驱动程序进行Druid SQL查询。可以下载Avatica客户端jar后,将其添加到类路径并使用连接字符串jdbc:avatica:remote:url=http://192.168.211.137:8082/druid/v2/sql/avatica/
如果是Maven项目,需要引入avatica-core包,如下:
<dependency>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-core</artifactId>
<version>1.15.0</version>
</dependency>
使用案例:
public static void main(String[] args) throws Exception{
//链接地址
String url = "jdbc:avatica:remote:url=http://192.168.211.137:8082/druid/v2/sql/avatica/";
AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(url);
//SQL语句,查询2020-4-10 11:50:30之后的访问uri和访问数量
String sql="SELECT uri,count(*) AS \"viewcount\" FROM(SELECT * FROM \"itemlogs\" WHERE __time>'2020-4-10 11:50:30' ORDER BY __time DESC) GROUP BY uri LIMIT 100";
//创建Statment
AvaticaStatement statement = connection.createStatement();
//执行查询
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
//获取uri
String uri = resultSet.getString("uri");
String viewcount = resultSet.getString("viewcount");
System.out.println(uri+"--------->"+viewcount);
}
}
知识点:
Druid的时区和国内时区不一致,会比我们的少8个小时,我们需要修改配置文件,批量将时间+8,代码如下:
sed -i "s/Duser.timezone=UTC/Duser.timezone=UTC+8/g" `grep Duser.timezone=UTC -rl ./`