第五十四章 DWM层数据处理-Flink异步IO提升性能
第1集 IP解析地理位置问题点和Flink异步IO介绍
简介: IP解析地理位置问题点和Flink异步IO介绍
-
Flink实时计算处理存在的问题
-
IP解析地理位置信息,查询是同步查询,存在阻塞,性能不高
-
在构建实时数仓等应用场景下,与外部维表等的关联需要大量外部存储的交互,去补充更多维度属性信息,如HTTP网络、Redis、Mysql数据库、Hbase等进行查询
-
默认Flink里面用 MapFunction进行对象关联,只能用同步方式去进行IO调用,需要等请求完成才进行发下一个请求,这种等待占了函数时间的绝大部分;
-
一种方式是通过提高Flink并行度,是可以提高效率但是会浪费更多资源,高并行度MapFunction意味着更多的subtask,线程,网络连接,数据库连接
-
-
解决方案
- 在Flink需要与外部系统打交道,由于外部系统的问题,可能导致时间耗时比较长,为了不影响flink的处理性能,flink引入了异步IO来处理这个问题
- Flink 1.2里面引入了Async-IO 异步IO(阿里巴巴贡献的特性),利用异步交互意味着【单个并行函数可以同时处理多个请求并同时接收响应】,哪个请求先返回就先处理,在连续的请求的时候不需要阻塞式等待。
- 异步等待的时间可以与发送其他请求和接收响应重叠,等待时间分摊到多个请求中,系统就有更高的吞吐量
- 先决条件
- 对数据库(或键/值存储)实现适当的异步 I/O 需要该数据库的客户端支持异步请求,许多流行的数据库都提供这样的客户端。
- 在没有这样的客户端的情况下,可以通过【线程池】处理同步调用来,将同步客户端变为支持的异步并发客户端,但是这种方法肯定不如自带的异步客户端有效(一般厂商的会做更多优化)
第2集 Flink异步IO使用步骤和注意事项讲解
简介: Flink异步IO使用步骤和注意事项讲解
-
使用步骤
- 文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio
An implementation of AsyncFunction that dispatches the requests 实现接口 AsyncFunction 用于请求分发 A callback that takes the result of the operation and hands it to the ResultFuture 定义一个callback回调函数,该函数用于取出异步请求的返回结果,并将返回的结果传递给ResultFuture Applying the async I/O operation on a DataStream as a transformation 对DataStream的数据使用Async操作
-
例子
/** * An implementation of the 'AsyncFunction' that sends requests and sets the callback. * 通过向数据库发送异步请求并设置回调方法 */ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> { /** The database specific client that can issue concurrent requests with callbacks 可以异步请求的特定数据库的客户端 */ private transient DatabaseClient client; @Override public void open(Configuration parameters) throws Exception { client = new DatabaseClient(host, post, credentials); } @Override public void close() throws Exception { client.close(); } @Override public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception { // issue the asynchronous request, receive a future for result // 发起一个异步请求,返回结果的 future final Future<String> result = client.query(key); // set the callback to be executed once the request by the client is complete // the callback simply forwards the result to the result future // 设置请求完成时的回调.将结果传递给 result future CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { return result.get(); } catch (InterruptedException | ExecutionException e) { // Normally handled explicitly. return null; } } }).thenAccept( (String dbResult) -> { resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult))); }); } } // create the original stream // 创建一个原始的流 DataStream<String> stream = ...; // apply the async I/O transformation // 添加一个 async I/O ,指定超时时间,和进行中的异步请求的最大数量 DataStream<Tuple2<String, String>> resultStream = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
-
注意事情
- Timeout:定义请求超时时间,异步请求多久没完成会被认为是超时了
- Capacity:定义了同时进行的异步请求的数量,可以限制并发请求数量,不会积压过多的请求
- 超时处理:默认当一个异步 I/O 请求超时时,会引发异常并重新启动作业。 如果要处理超时,可以覆盖该
AsyncFunction的timeout
方法来自定义超时之后的处理方式 - 响应结果的顺序:AsyncDataStream包含两种输出模式,
- unorderedWait无序:响应结果的顺序与异步请求的顺序不同
- orderedWait有序:响应结果的顺序与异步请求的顺序相同
第3集 带你掌握异步编程CompletableFuture核心知识
简介: 带你掌握异步编程CompletableFuture核心知识
-
什么是CompletableFuture
- JDK1.5有了Future和Callable的实现,想要异步获取结果,通常会以轮询的方式去获取结果
@Test public void testFuture() throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); //定义一个异步任务 Future<String> future = executorService.submit(()->{ Thread.sleep(2000); return "我要学海量数据项目"; }); //轮询获取结果,耗费的CPU资源 while (true){ if(future.isDone()) { System.out.println(future.get()); break; } } }
-
JDK8里面引入的CompletableFuture,帮助我们简化异步编程复杂性,函数式编程让代码更加简洁
-
CompletableFuture类实现了Future和CompletionStage接口
Future 表示异步计算的结果,它提供了检查计算是否完成的方法,以等待计算的完成,计算完成后只能使用 get 方法来获取结果,有cancel、get、isDone、isCancelled等方法
-
方法API
-
CompletableFuture静态方法,执行异步任务的API
//无返回值,默认使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码 public static CompletableFuture<Void> runAsync(Runnable runnable) //无返回值,可以自定义线程池 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) //有返回值,默认使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) //有返回值,可以自定义线程池 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
-
CompletableFuture对象,获取结果的API
//如果返回值没有返回,一直阻塞 V get() //设置等待超时的时间 V get(long timeout,Timeout unit); //有返回值就返回, 线程抛出异常就返回设置的默认值 T getNow(T defaultValue);
-
CompletableFuture对象,其他重点API
//无返回值,当前任务正常完成以后执行,当前任务的执行结果可以作为下一任务的输入参数 thenAccept //有返回值,当前任务正常完成以后执行,当前任务的执行的结果会作为下一任务的输入参数 thenApply //对不关心上一步的计算结果,执行下一个操作 thenRun
-
第4集 异步编程CompletableFuture核心API实战
简介: 异步编程CompletableFuture核心API实战
- 案例
@Test
public void testFuture2() throws ExecutionException, InterruptedException {
//有返回值,默认使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() ->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) { }
System.out.println(Thread.currentThread()+"执行,返回 ");
return "执行,返回,";
});
System.out.println("future1返回值:" + future1.get()); //
}
@Test
public void testFuture3() throws ExecutionException, InterruptedException, TimeoutException {
//有返回值,默认使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() ->{
System.out.println("执行任务一");
return "冰冰一,";
});
//有返回值,当前任务正常完成以后执行,当前任务的执行的结果会作为下一任务的输入参数
CompletableFuture<String> future2 = future1.thenApply((element) -> {
System.out.println("入参:"+element);
System.out.println("执行任务二");
return "冰冰二";
});
System.out.println("future2返回值:" + future2.get(1, TimeUnit.SECONDS));
}
@Test
public void testFuture4() throws ExecutionException, InterruptedException, TimeoutException {
//有返回值,默认使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() ->{
System.out.println("执行任务一");
return "冰冰一,";
});
//无返回值,当前任务正常完成以后执行,当前任务的执行结果可以作为下一任务的输入参数
CompletableFuture<Void> future2 = future1.thenAccept((element) -> {
System.out.println("入参:"+element);
System.out.println("执行任务二");
});
//System.out.println("future2返回值:" + future2.get(1, TimeUnit.SECONDS));
System.out.println("future2返回值:" + future2.get());
}
第5集 Flink异步IO优化Ip地理位置解析实战
简介: Flink异步IO优化Ip地理位置解析实战
-
asyncInvoke方法两种方式实现异步
-
ExecutorService线程池
-
HTTP异步客户端
-
-
引入异步http客户端
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.5</version>
</dependency>
- 编码开发
@Slf4j
public class AsyncLocationRequestFunction extends RichAsyncFunction<ShortLinkWideDO, String> {
private static final String IP_PARSE_URL = "https://restapi.amap.com/v3/ip?ip=%s&output=json&key=4f6e1b4212a5fdec6198720f261892bd";
private CloseableHttpAsyncClient httpClient;
@Override
public void open(Configuration parameters) throws Exception {
this.httpClient = createAsyncHttpClient();
}
@Override
public void close() throws Exception {
if (httpClient != null) {
httpClient.close();
}
}
@Override
public void asyncInvoke(ShortLinkWideDO shortLinkWideDO, ResultFuture<String> resultFuture) throws Exception {
String ip = shortLinkWideDO.getIp();
String url = String.format(IP_PARSE_URL, ip);
HttpGet httpGet = new HttpGet(url);
try {
// 发起异步请求,获取异步请求的future对象, callback是回调函数,也可通过回调函数拿结果
Future<HttpResponse> future = httpClient.execute(httpGet, null);
// 从Future中取数据
CompletableFuture<ShortLinkWideDO> completableFuture =
CompletableFuture.supplyAsync(new Supplier<ShortLinkWideDO>() {
@Override
public ShortLinkWideDO get() {
try {
HttpResponse response = future.get();
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == HttpStatus.SC_OK) {
HttpEntity entity = response.getEntity();
String result = EntityUtils.toString(entity, "UTF-8");
JSONObject locationObj = JSON.parseObject(result);
String city = locationObj.getString("city");
String province = locationObj.getString("province");
shortLinkWideDO.setProvince(province);
shortLinkWideDO.setCity(city);
}
return shortLinkWideDO;
} catch (Exception e) {
log.error("异步请求异常:{}",shortLinkWideDO);
return null;
}
}
});
// 取出的数据,存入ResultFuture,返回给方法
completableFuture.thenAccept(new Consumer<ShortLinkWideDO>() {
@Override
public void accept(ShortLinkWideDO result) {
//complete()里面需要的是Collection集合,集合使用singleton单例模式
resultFuture.complete(Collections.singleton(JSON.toJSONString(result)));
}
});
} catch (Exception e) {
log.error("ip解析错误,value={},msg={}", shortLinkWideDO, e.getMessage());
resultFuture.complete(Collections.singleton(null));
}
}
private CloseableHttpAsyncClient createAsyncHttpClient() {
try {
RequestConfig requestConfig = RequestConfig.custom()
//返回数据的超时时间
.setSocketTimeout(20000)
//连接上服务器的超时时间
.setConnectTimeout(10000)
//从连接池中获取连接的超时时间
.setConnectionRequestTimeout(1000)
.build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
//设置连接池最大是500个连接
connManager.setMaxTotal(500);
//MaxPerRoute是对maxtotal的细分,每个主机的并发最大是300,route是指域名
connManager.setDefaultMaxPerRoute(300);
CloseableHttpAsyncClient httpClient = HttpAsyncClients.custom().setConnectionManager(connManager)
.setDefaultRequestConfig(requestConfig)
.build();
httpClient.start();
return httpClient;
} catch (IOReactorException e) {
log.error("初始化 CloseableHttpAsyncClient异常:{}",e.getMessage());
return null;
}
}
}
第6集 Flink异步IO优化Ip地理位置解析链路测试
简介: Flink异步IO优化Ip地理位置解析链路测试
- 开发
SingleOutputStreamOperator<String> shortLinkWideDS =
AsyncDataStream.unorderedWait(deviceWideDS, new AsyncLocationRequestFunction(),
1000, TimeUnit.MILLISECONDS, 100);
- 测试
{"ip":"113.68.152.139","ts":1646145133665,"event":"SHORT_LINK_TYPE","udid":null,"bizId":"026m8O3a","data":{"referer":null,"accountNo":"693100647796441088","user-agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.109 Safari/537.36"}}
第五十五章 DWM层数据处理-UV访客统计实战
第1集 大厂里面的日活跃用户-UV-活跃留存用户讲解
简介: 大厂里面的日活跃用户-UV-活跃留存用户讲解
-
在互联网公司里面,高级开发或者技术Leader都是有两个指标
- 技术指标:RT响应时间、服务器利用率、接口2xx、5xx的占比等
- 业务指标:UV、PV、留存、日活、点击率、转化率等
-
概念(产品视野的拓展)
-
UV(Uniqued Visitor)
- 独立访客就是独立IP访客(Unique Visitor),访问网站的一台电脑客户端为一个访客,在 00:00-24:00内相同的客户端只被计算一次。
- 记录独立访客数的时间标准一般可为一天,一个月,一般不计算年UV数
-
PV(Page View)
- 页面浏览量或点击量,用户每次刷新即被计算一次。指某站点总共有被浏览多少个页面,它是重复累计的,同一个页面被重复浏览也被计入PV。
-
独立IP数
- 是指1天内多少个独立的IP浏览了页面,即统计不同的IP浏览用户数量。
- 同一IP不管访问了几个页面,独立IP数均为1;不同的IP浏览页面,计数会加1,如果用户不断更换IP,则有可能被多次统计
- IP是基于用户广域网IP地址来区分不同的访问者的,多个用户(多个局域网IP)在同一个路由器(同一个广域网IP)内上网,可能被记录为一个独立IP访问者
-
什么是活跃用户 Active User
- 分为 【DAU 日活跃用户数】、【WAU 周活跃用户数】、【MAU 月活跃用户数】
- 不同产品是不一样的概念,好比你访问了【新浪】网站,停留了几秒钟,这个算活跃用户?需要有活跃用户的标准,定义好关键行为
- 不同产品类型,不同的阶段,不同的场景,对活跃用户的定义差别很大
- 例子
- 社交类产品,前期:注册登录过,后期:添加好有多少个才算等
- 内容类 新闻、小说:打开网站的不算,阅读过多少分钟或者多少篇才算
-
留存用户
- 1日留存、7日留存、30日留存等,一般不要求连续做【某个关键路径】
- 第N日留存:指的是新增用户日之后的第N日依然活跃的用户占新增用户的比例
- 第7日留存率:(当天新增的用户中,新增日之后的第7天还活跃的用户数)/第一天新增总用户数;
-
-
这些有啥用呢?
-
数据可视化、数据仓库里等核心的统计指标(熟悉概念)
-
作为产品的核心指标,公司可以指定目标值和正常值,进一步优化提升
-
根据产品的增长情况,可以判断产品是否要进行大规模推广或者暂缓
-
第2集 短链平台里面的UV用户统计需求和实现思路
简介: 短链平台里面的UV用户统计需求和实现思路
-
短链里面统计日活UV
- 日活跃用户:访问过短链的即可算入
- 怎么统计
- 需要知道用户的唯一ID
- 需要知道访问时间
- 如果是同一天访问的就可以去重
-
实现思路和注意事项
- 利用KeyState,分组存储是否当天访问过
- 程序一直运行下去,要定期清理内存里的过时数据
-
Flink中的状态
- 算子状态(Operatior State)
- 数据结构:ListState、BroadcaseState
- 键控状态(Keyed State)
- 数据结构:ValueState、ListState、MapState
- KeyedStream上有任务出现的状态,定义的不同的key来维护这个状态;
- 不同的key是独立访问的,一个key只能访问它自己的状态,不同key之间也不能互相访问
- Flink为每个key维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中
- 状态后端(State Backends)
- 当检查点(checkpoint)机制启动时,状态将在检查点中持久化来应对数据丢失以及恢复
- MemoryStateBackend 、FsStateBackend、RocksDBStateBackend
Flink中的状态管理器包括MemoryStateBackend、FsStateBackend和RocksDBStateBackend。 MemoryStateBackend:将状态数据全部存储于JVM堆内存中,是Flink的默认状态管理器。状态数据包括用户使用的DataStream API创建的Key/Value State、窗口中缓存的状态数据和触发器等数据。基于内存的状态管理器速度快且高效,但具有内存容量有限等限制。一旦存储的状态数据过多就会导致系统内存溢出等问题。因此MemoryStateBackend存在数据安全性问题,不建议用户在生产环境中使用。 FsStateBackend:基于文件系统的状态管理器。如果使用FsStateBackend,默认是异步的,比较稳定,有3个副本,比较安全。不会出现任务无法恢复等问题。状态大小受磁盘容量限制,将工作state保存在TaskManager的内存中,并将checkpoint数据存储在文件系统中。适用场景包括状态比较大、窗口比较长、大的KV状态等。 RocksDBStateBackend:一种类似于hbase的kv存储本地数据库,依赖于lsm实现,可以将数据保存到本地磁盘上。读写状态时会涉及到序列化反序列化操作,与内存相比,性能会偏低些。但其可以保存比较大的状态,受限于磁盘大小。其key value依赖于byte数组,大小受byte[]限制。同时rocksdb后端支持增量checkpoint。
- 算子状态(Operatior State)
-
状态存活时间 State Time-To-Live (TTL)
- 文档:https://www.ververica.com/blog/state-ttl-for-apache-flink-how-to-limit-the-lifetime-of-state
- Flink 1.6版本开始引入State TTL机制,可以允许作业中定义的keyed状态进行超时自动处理
- State TTL 功能给每个 Flink 的 Keyed 状态增加了一个“时间戳”,而 Flink 在状态创建、写入或读取(可选)时更新这个时间戳,并且判断状态是否过期
- 如果状态过期,会根据可见性参数,来决定是否返回已过期但还未清理的状态等等。
- 可以将存活时间(TTL)分配给任何类型的keyed state,如果配置了TTL并且状态值已过期,则将尽力清除存储的值
- 状态的清理并不是即时的,是使用了一种 Lazy 的算法来实现(类似Redis惰性删除key),从而减少状态清理对性能的影响
- 默认情况下,只有在明确读出过期值时才会删除过期值,例如通过调用
ValueState.value()
, 默认情况下,如果未读取过期状态,则不会删除它,可能会导致状态不断增长 - 构建
StateTtlConfig
配置对象,可以在任何状态描述符中启用TTL功能
StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); Time.seconds(1) 周期过期时间 setUpdateType 更新类型 setStateVisibility 是否在访问state的时候返回过期值 表示状态时间戳的更新的时机 setUpdateType: StateTtlConfig.UpdateType.OnCreateAndWrite - 只在创建和写的时候清除 (默认) StateTtlConfig.UpdateType.OnReadAndWrite - 在读和写的时候清除 表示对已过期但还未被清理掉的状态如何处理 setStateVisibility: StateTtlConfig.StateVisibility.NeverReturnExpired - 从不返回过期值 StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 如果仍然可用,
第3集 DWM层数据处理-短链UV统计开发实战《上》
简介: DWM层数据处理-短链UV统计开发实战《上》
- 开发思路
- 获取数据
- 转换对象
- 分组统计
- 排重过滤
- 写入Kafka
- 编码实战
public class DwmUniqueVisitorApp {
/**
* 定义source topic
*/
public static final String SOURCE_TOPIC = "dwm_link_visit_topic";
/**
* 定义消费者组
*/
public static final String GROUP_ID = "dwm_unique_visitor_group";
/**
* 定义输出
*/
public static final String SINK_TOPIC = "dwm_unique_visitor_topic";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//DataStream<String> ds = env.socketTextStream("127.0.0.1",8888);
// 1、获取流
FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(SOURCE_TOPIC, GROUP_ID);
DataStreamSource<String> ds = env.addSource(kafkaConsumer);
//2、对象转换
SingleOutputStreamOperator<JSONObject> jsonObjDS = ds.map(jsonStr -> JSON.parseObject(jsonStr));
jsonObjDS.print("获取数据");
//3、分组
KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(new KeySelector<JSONObject, String>() {
@Override
public String getKey(JSONObject value) throws Exception {
return value.getString("udid");
}
});
//4、排重过滤
SingleOutputStreamOperator<String> uniqueVisitorDS = keyedStream.filter(new UniqueVisitorFilterFunction());
uniqueVisitorDS.print("独立访客》》》");
FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(SINK_TOPIC);
//数据写到kafka存储
uniqueVisitorDS.addSink(kafkaProducer);
env.execute();
}
}
第4集 DWM层数据处理-短链UV统计开发实战《下》
简介: DWM层数据处理-短链UV统计开发实战《下》
- Filter函数开发
public class UniqueVisitorFilterFunction extends RichFilterFunction<JSONObject> {
//定义状态
private ValueState<String> lastVisitDateState = null;
@Override
public void open(Configuration parameters) throws Exception {
//初始化状态
ValueStateDescriptor<String> lastVisitDateStateDes = new ValueStateDescriptor<>("visitDateState", String.class);
//统计日活DAU,状态数据当天有效
StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(20)).build();
lastVisitDateStateDes.enableTimeToLive(stateTtlConfig);
this.lastVisitDateState = getRuntimeContext().getState(lastVisitDateStateDes);
}
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
//获取当前访问时间
Long visitTime = jsonObj.getLong("visitTime");
String udid = jsonObj.getString("udid");
//转换为日期字符串 yyyy-MM-dd
String currentVisitDate = TimeUtil.format(visitTime);
//获取上次状态日期时间
String lastVisitDate = lastVisitDateState.value();
//用当前页面的访问时间和状态时间进行对比
if (StringUtils.isNotBlank(lastVisitDate) && lastVisitDate.equals(currentVisitDate)) {
//System.out.println(udid + "已经在" + lastVisitDate + "时间访问过");
return false;
} else {
//System.out.println(udid + "在" + currentVisitDate + "时间初次访问");
lastVisitDateState.update(currentVisitDate);
return true;
}
}
}
第5集 DWM层数据处理-短链UV统计链路测试
简介: DWM层数据处理-短链UV统计链路测试
- 链路测试
//ODS层数据
{"ip":"113.68.152.139","ts":1646145133665,"event":"SHORT_LINK_TYPE","udid":null,"bizId":"026m8O3a","data":{"referer":null,"accountNo":"693100641796441088","user-agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.109 Safari/537.36"}}
- UV统计和新老用户标记的区分说明
- 新老用户目前是用天进行区分,后续可根据业务规则改为周或者月维度区分新老访客
- 比如超过一周才来的就是新访客
- UV可以用天维度进行统计,月UV可以用月维度进行统计;
- 如果新老用户是天区分,UV也是天区分,则两者是一样的
- 新老用户目前是用天进行区分,后续可根据业务规则改为周或者月维度区分新老访客
第五十六章 Flink多流合并和DWS层数据聚合实战
第1集 DWS层数据处理-多流数据聚合和Union算子讲解
简介: DWS层数据处理-多流数据聚合讲解
-
数据分析中最重要、最基础的2个概念:维度Dimensions 和度量Measures
- 度量是数据表中的数值数据,维度是类别数据
- 例子:各个城市访问的UV数量
- 一个数据指标一般由一种或多种维度加上一种度量组成
- 度量是数据表中的数值数据,维度是类别数据
-
需求统计点
- DWS层数据是做啥的?
- 度量:PV、UV
- 维度 : 新老用户、地区信息、设备信息等
- DWS层数据处理后-存储到ClickHouse、Redis、Mysql、ElasticSearch等都可以
- ADS层需要使用的数据就从上述的DWS层进行读取,主要是根据各种报表及可视化来生成统计数据。
- DWS层数据是做啥的?
-
数据分层
数据分层 | 分层描述 | 数据生成计算工具 | 存储 |
---|---|---|---|
ODS | 原生数据,短链访问基本信息 | SpringBoot生成 | Kafka |
DWD | 对 ODS 层做数据清洗和规范化,新老访客标记等 | Flink | Kafka |
DWM | 对DWD数据进一步加工补齐数据,独立访客统计,操作系统/ip/城市,做宽表 | Flink | kafka |
DWS | 对DWM进行处理,多流合并,分组|聚合|开窗|统计,形成主题宽表 | Flink | ClickHouse |
ADS | 从ClickHouse中读取数据,根据需求进行筛选聚合,可视化展示 | ClickHouseSql | web可视化展示 |
-
- 统计指标
- PV -》从DWM层 dwm_link_visit_topic统计
- UV-》从DWM层 dwm_unique_visitor_topic统计
- 统计指标
-
Flink多流合并
- Flink提供了多流转换算子,方便我们对多个数据流进行整合处理
- union
- 可以合并多个【同类型】的数据流,并生成【同类型】的数据流
- 数据将按照先进先出(First In First Out)的模式合并,且不去重
- connect
- 和union类型,但是各有利弊
- connect只能连接两个不同类型的数据流,union可以连接多个同类型数据流
- 经过connect之后被转化为ConnectedStreams,会对两个流的数据应用不同的处理方法
第2集 DWS层数据处理-业务Topic主题开发实战《上》
简介: DWS层数据处理-业务Topic主题开发实战《上》
- 创建Bean对象 ShortLinkVisitStatsDO
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ShortLinkVisitStatsDO {
/**
* 窗口开始时间 Clickhouse里面会用到
*/
String startTime;
/**
* 窗口结束时间
*/
String endTime;
/**
* ================================================
*/
/**
* 短链压缩码
*/
private String code;
/**
* 租户id
*/
private Long accountNo;
/**
* 访问时间
*/
private Long visitTime;
/**
* 站点来源,只记录域名
*/
private String referer;
/**
* 1是新访客,0是老访客
*/
private Integer isNew;
/**
* 唯一标识
*/
private String udid;
//==============RegionInfoDO==================
/**
* 省份
*/
private String province;
/**
* 城市
*/
private String city;
/**
* 运营商
*/
private String isp;
/**
* 访问ip
*/
private String ip;
//==============DeviceInfoDO==================
/**
* 浏览器名称
*/
private String browserName;
/**
* 操作系统
*/
private String os;
/**
* 系统版本
*/
private String osVersion;
/**
* 设备类型
*/
private String deviceType;
/**
* 设备厂商
*/
private String deviceManufacturer;
//==============度量==================
private Long uv=0L;
private Long pv=0L;
}
- 编码实战
//1、获取多个数据
//2、结构转换 uniqueVisitorDS、shortLinkDS
//3、多流合并(合并相同结构的流)
//4、设置WaterMark
//5、多维度、多个字段分组
//6、开窗 15秒一次数据插入到 ck
//7、聚合统计(补充统计起止时间)
//8、输出Clickhouse
public class DwsShortLinkVisitStatsApp {
/**
* 宽表
*/
public static final String SHORT_LINK_SOURCE_TOPIC = "dwm_link_visit_topic";
public static final String SHORT_LINK_SOURCE_GROUP = "dws_link_visit_group";
/**
* uv的数据流
*/
public static final String UNIQUE_VISITOR_SOURCE_TOPIC = "dwm_unique_visitor_topic";
public static final String UNIQUE_VISITOR_SOURCE_GROUP = "dws_unique_visitor_group";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1、获取多个数据
FlinkKafkaConsumer<String> shortLinkSource = KafkaUtil.getKafkaConsumer(SHORT_LINK_SOURCE_TOPIC,SHORT_LINK_SOURCE_GROUP);
DataStreamSource<String> shortLinkDS = env.addSource(shortLinkSource);
FlinkKafkaConsumer<String> uniqueVisitorSource = KafkaUtil.getKafkaConsumer(UNIQUE_VISITOR_SOURCE_TOPIC, UNIQUE_VISITOR_SOURCE_GROUP);
DataStreamSource<String> uniqueVisitorDS = env.addSource(uniqueVisitorSource);
//2、结构转换 uniqueVisitorDS、shortLinkDS
SingleOutputStreamOperator<ShortLinkVisitStatsDO> shortLinkMapDS = shortLinkDS.map(new MapFunction<String, ShortLinkVisitStatsDO>() {
@Override
public ShortLinkVisitStatsDO map(String value) throws Exception {
ShortLinkVisitStatsDO visitStatsDO = parseVisitStats(value);
visitStatsDO.setPv(1L);
visitStatsDO.setUv(0L);
return visitStatsDO;
}
});
SingleOutputStreamOperator<ShortLinkVisitStatsDO> uniqueVisitorMapDS = uniqueVisitorDS.map(new MapFunction<String, ShortLinkVisitStatsDO>() {
@Override
public ShortLinkVisitStatsDO map(String value) throws Exception {
ShortLinkVisitStatsDO visitStatsDO = parseVisitStats(value);
visitStatsDO.setPv(0L);
visitStatsDO.setUv(1L);
return visitStatsDO;
}
});
//3、多流合并(合并相同结构的流)
DataStream<String> unionDS = shortLinkDS.union(uniqueVisitorDS);
//4、设置WaterMark
//5、多维度、多个字段分组
//6、开窗 15秒一次数据插入到 ck
//7、聚合统计(补充统计起止时间)
//8、输出Clickhouse
}
private static ShortLinkVisitStatsDO parseVisitStats(String value) {
JSONObject jsonObj = JSON.parseObject(value);
ShortLinkVisitStatsDO visitStatsDO = ShortLinkVisitStatsDO.builder()
.code(jsonObj.getString("code"))
.accountNo(jsonObj.getLong("accountNo"))
.visitTime(jsonObj.getLong("visitTime"))
.referer(jsonObj.getString("referer"))
.isNew(jsonObj.getInteger("isNew"))
.udid(jsonObj.getString("udid"))
//地理位置信息
.province(jsonObj.getString("province"))
.city(jsonObj.getString("city"))
.isp(jsonObj.getString("isp"))
.ip(jsonObj.getString("ip"))
//设备信息
.browserName(jsonObj.getString("browserName"))
.os(jsonObj.getString("os"))
.osVersion(jsonObj.getString("osVersion"))
.deviceType(jsonObj.getString("deviceType"))
.build();
return visitStatsDO;
}
}
第3集 DWS层数据处理-业务Topic主题开发实战《下》
简介: DWS层数据处理-业务Topic主题开发实战《下》
在Apache Flink中,Watermark是一个用于处理时间序列数据的概念。Watermark的主要作用是解决数据的时间相关性问题,特别是处理乱序事件和延迟事件的情况。
在流处理中,事件是按照时间顺序到达的,但有时候由于网络延迟、系统故障或其他原因,某些事件可能会延迟到达。Watermark允许我们处理这些延迟到达的事件,而不会因为等待延迟事件而阻塞整个处理流程。
Watermark的基本原理是在每个事件中添加一个时间戳,表示该事件的时间属性。然后,Flink会根据Watermark来决定如何处理这些事件。当Watermark到达时,Flink会等待直到所有早于该Watermark的事件都已到达,然后一起处理这些事件。这样可以确保事件按照时间顺序进行处理,而不会因为某个事件的延迟到达而影响整个处理流程。
使用Watermark时,需要注意以下几点:
确定合适的Watermark生成策略:需要根据实际情况确定如何生成Watermark,以确保能够准确地反映事件的时间属性。
合理设置Watermark延迟:Watermark延迟是为了处理网络延迟、系统故障等导致的事件延迟到达的情况。需要根据实际情况合理设置Watermark延迟,以避免过度等待或错过重要事件。
考虑乱序事件的影响:如果存在乱序事件,需要采取适当的策略来处理它们,以确保数据处理的正确性。
监控和调整Watermark配置:需要定期监控Watermark的使用情况,并根据实际需要调整配置参数,以确保数据处理的效率和准确性。
总之,Watermark是Apache Flink中一个重要的概念,用于解决时间序列数据处理中的时间相关性问题。通过合理使用Watermark,可以确保数据按照时间顺序进行处理,并提高数据处理的效率和准确性。
- 编码实战
//1、获取多个数据
//2、结构转换 uniqueVisitorDS、shortLinkDS
//3、多流合并(合并相同结构的流)
//4、设置WaterMark
//5、多维度、多个字段分组
//6、开窗 15秒一次数据插入到 ck
//7、聚合统计(补充统计起止时间)
//8、输出Clickhouse
//3、多流合并(合并相同结构的流)
DataStream<ShortLinkVisitStatsDO> unionDS = shortLinkMapDS.union(uniqueVisitorMapDS);
//4、设置WaterMark
SingleOutputStreamOperator<ShortLinkVisitStatsDO> watermarkDS = unionDS.assignTimestampsAndWatermarks(WatermarkStrategy
//指定允许乱序延迟的最大时间 3 秒
.<ShortLinkVisitStatsDO>forBoundedOutOfOrderness(Duration.ofSeconds(3))
//指定POJO事件时间列,毫秒
.withTimestampAssigner((event, timestamp) -> event.getVisitTime()));
//5、多维度、多个字段分组
// code、referer、isNew
// province、city、ip
// browserName、os、deviceType
KeyedStream<ShortLinkVisitStatsDO, Tuple9<String, String, Integer, String, String, String, String, String, String>> keyedStream = watermarkDS
.keyBy(new KeySelector<ShortLinkVisitStatsDO,
Tuple9<String, String, Integer, String, String, String, String, String, String>>() {
@Override
public Tuple9<String, String, Integer, String, String, String, String, String, String> getKey(ShortLinkVisitStatsDO obj) throws Exception {
return Tuple9.of(obj.getCode(), obj.getReferer(), obj.getIsNew(),
obj.getProvince(), obj.getCity(), obj.getIp(),
obj.getBrowserName(), obj.getOs(), obj.getDeviceType());
}
});
//6、开窗 10秒一次数据插入到 ck
WindowedStream<ShortLinkVisitStatsDO, Tuple9<String, String, Integer, String, String, String, String, String, String>, TimeWindow> windowedStream = keyedStream
.window(TumblingEventTimeWindows.of(Time.seconds(10)));
//7、聚合统计(补充统计起止时间)
SingleOutputStreamOperator<ShortLinkVisitStatsDO> reduceDS = windowedStream.reduce(
new ReduceFunction<ShortLinkVisitStatsDO>() {
@Override
public ShortLinkVisitStatsDO reduce(ShortLinkVisitStatsDO statsDO1, ShortLinkVisitStatsDO statsDO2) throws Exception {
statsDO1.setPv(statsDO1.getPv() + statsDO2.getPv());
statsDO1.setUv(statsDO1.getUv() + statsDO2.getUv());
return statsDO1;
}
}, new ProcessWindowFunction<ShortLinkVisitStatsDO, ShortLinkVisitStatsDO, Tuple9<String, String, Integer, String, String, String, String, String, String>, TimeWindow>() {
@Override
public void process(Tuple9<String, String, Integer,String, String, String, String, String, String> tuple,
Context context, Iterable<ShortLinkVisitStatsDO> elements, Collector<ShortLinkVisitStatsDO> out) throws Exception {
for (ShortLinkVisitStatsDO visitStatsDO : elements) {
//窗口开始时间
String startDate = TimeUtil.format(context.window().getStart());
//窗口结束时间
String endDate = TimeUtil.format(context.window().getEnd());
visitStatsDO.setStartTime(startDate);
visitStatsDO.setEndTime(endDate);
out.collect(visitStatsDO);
}
}
}
);
//8、输出nosql
reduceDS.print(">>>>>");
env.execute();
第4集 短链访问-模拟IP数据处理链路测试实战
简介: 短链访问-模拟IP数据处理链路测试实战
- 链路测试需求
- 访问短链->记录日志->KAFKA->ODS->DWD->DWM(设备补齐、地区补齐、UV统计、PV统计)->DWS
- 问题
- 本地访问是局域网ip,没法解析地理位置信息,临时编写死ip
- 编码
private static List<String> ipList = new ArrayList<>();
static {
//深圳
ipList.add("14.197.9.110");
//广州
ipList.add("113.68.152.139");
}
private static List<String> refererList = new ArrayList<>();
static {
refererList.add("https://taobao.com");
refererList.add("https://douyin.com");
}
//ip、浏览器信息
//String ip = CommonUtil.getIpAddr(request);
Random random = new Random();
String ip = ipList.get(random.nextInt(ipList.size())) ;
String referer = refererList.get(random.nextInt(refererList.size()));
//全部请求头
Map<String,String> headerMap = CommonUtil.getAllRequestHeader(request);
Map<String,String> availableMap = new HashMap<>();
availableMap.put("user-agent",headerMap.get("user-agent"));
//availableMap.put("referer",headerMap.get("referer"));
availableMap.put("referer",referer);
availableMap.put("accountNo",accountNo.toString());
- 测试链路
- 短链地址 http://localhost:8003/026m8O3a
- A浏览器 访问短链
- B浏览器 访问短链
- Bug修复
- KeyBy分组字段修复
- 时间工具类修复
public class TimeUtil {
/**
* 默认日期格式
*/
private static final String DEFAULT_PATTERN = "yyyy-MM-dd";
private static final String DEFAULT_PATTERN_WITH_TIME = "yyyy-MM-dd hh:mm:ss";
/**
* 默认日期格式
*/
private static final DateTimeFormatter DEFAULT_DATE_FORMATTER = DateTimeFormatter.ofPattern(DEFAULT_PATTERN);
/**
* 默认日期时间格式
*/
private static final DateTimeFormatter DEFAULT_DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern(DEFAULT_PATTERN_WITH_TIME);
private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
/**
* LocalDateTime 转 字符串,指定日期格式
* @param time
* @param pattern
* @return
*/
public static String format(LocalDateTime localDateTime, String pattern){
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
String timeStr = formatter.format(localDateTime.atZone(DEFAULT_ZONE_ID));
return timeStr;
}
/**
* Date 转 字符串, 指定日期格式
* @param time
* @param pattern
* @return
*/
public static String format(Date time, String pattern){
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
String timeStr = formatter.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
return timeStr;
}
/**
* Date 转 字符串,默认日期格式
* @param time
* @return
*/
public static String format(Date time){
String timeStr = DEFAULT_DATE_FORMATTER.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
return timeStr;
}
/**
* Date 转 字符串,默认日期格式
* @param time
* @return
*/
public static String format(Long timestamp){
String timeStr = DEFAULT_DATE_FORMATTER.format(new Date(timestamp).toInstant().atZone(DEFAULT_ZONE_ID));
return timeStr;
}
/**
* timestamp 转 字符串,指定日期格式
*
* @param time
* @return
*/
public static String formatWithTime(long timestamp) {
String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(new Date(timestamp).toInstant().atZone(DEFAULT_ZONE_ID));
return timeStr;
}
/**
* 字符串 转 Date
*
* @param time
* @return
*/
public static Date strToDate(String time) {
LocalDateTime localDateTime = LocalDateTime.parse(time, DEFAULT_DATE_FORMATTER);
return Date.from(localDateTime.atZone(DEFAULT_ZONE_ID).toInstant());
}
}
第五十七章 海量数据存储Flink整合ClickHouse开发实战
第1集 海量数据存储ClickHouse数据库介绍
简介: 海量数据存储ClickHouse数据库介绍
- ClickHouse的由来和应用场景
- 俄罗斯Yandex在2016年开源,使用C++编写的列式存储数据库,近几年在OLAP领域大范围应用
- 官网:https://clickhouse.com/
- 中文:https://clickhouse.com/docs/zh/
- GitHub: https://github.com/ClickHouse/ClickHouse
- 特点和应用场景
- 不依赖Hadoop 生态、安装和维护简单
- 列存储在聚合、统计等操作性能会优于行存储
- 列存储将多行记录的列连续存储在一起,一列接着一列
- 列式存储是同个数据类型,会进行数据压缩率更高,更省空间
- 列存储数据更新成本较高,一般适合读多写少的场景,适合 OLAP 分析型系统
- 拓展性强,在生产中经过实战测试,从单服务器部署到具有数千个节点的集群的线性水平可扩展性
- 支持主流的大部分SQL语法和函数、吞吐能力强,官方测试支持,支持多种存储引擎,满足多数业务场景
- 广泛应用:互联网电商、在线教育、金融等领域用,户行为数据记录和分析,搭建数据可视化平台
-
有谁在用?
- 国内:阿里、字节、腾讯 、虎牙、青云、新浪等
- 国外:优步、Ebay、Spotify、思科、等
-
支持多种部署方式
- 系统:Linux或者Mac OS 源码部署,或者 Docker部署
- 如果是Window的需要成功安装Docker或者安装Linux虚拟机
第2集 Linux云服务器-ClickHouse部署安装实战
简介: Linux云服务器-ClickHouse部署安装实战
- RPM包安装
- Linux机器安装ClickHouse,版本:ClickHouse 22.1.2.2,保持一致即可
- 文档地址:https://clickhouse.com/docs/zh/getting-started/install/
- 课程资料提供安装包,上传到Linux服务器
- 直接使用rpm -ivh后面跟上所有的包安装就可以了
- 基本上不缺少其他依赖,安装之后clickhouse会自动加到systemd启动当中
- Linux机器安装ClickHouse,版本:ClickHouse 22.1.2.2,保持一致即可
#各个节点上传到新建文件夹
/usr/local/software/*
#安装
sudo rpm -ivh *.rpm
#启动
systemctl start clickhouse-server
#停止
systemctl stop clickhouse-server
#重启
systemctl restart clickhouse-server
#状态查看
sudo systemctl status clickhouse-server
#查看端口占用,如果命令不存在 yum install -y lsof
lsof -i :8123
#查看日志
tail -f /var/log/clickhouse-server/clickhouse-server.log
tail -f /var/log/clickhouse-server/clickhouse-server.err.log
#开启远程访问,取消下面的注释
vim /etc/clickhouse-server/config.xml
#编辑配置文件
<listen_host>0.0.0.0</listen_host>
#重启
systemctl restart clickhouse-server
-
网络安全组记得开放http端口是8123,tcp端口是9000, 同步端口9009
- 常规企业内网通信则不用,我们是阿里云部署,本地测试
- web可视化界面:http://ip:port/play
-
通过ClickHouse可视化工具连接
- 资料文件夹提供软件
- win和mac苹果都有
- 资料文件夹提供软件
-
其他安装方式
- Docker
docker run -d --name xdclass_clickhouse --ulimit nofile=262144:262144 \ -p 8123:8123 -p 9000:9000 -p 9009:9009 --privileged=true \ -v /mydata/docker/clickhouse/log:/var/log/clickhouse-server \ -v /mydata/docker/clickhouse/data:/var/lib/clickhouse clickhouse/clickhouse-server:22.2.3.5
第3集 海量数据存储-ClickHouse引擎知识回顾
简介: 海量数据存储-ClickHouse引擎特点知识回顾
-
MergeTree系列【王炸重点】
- CLickhouse最强大的表引擎,有多个不同的种类
- 适用于高负载任务的最通用和功能最强大的表引擎,可以快速插入数据并进行后续的后台数据处理
- 支持主键索引、数据分区、数据副本等功能特性和一些其他引擎不支持的其他功能
-
去重合并树ReplaceMergeTree
-
MergeTree的拓展,该引擎和 MergeTree 的不同之处在它会删除【排序键值】相同重复项,根据OrderBy字段
-
数据的去重只会在数据合并期间进行,合并会在后台一个不确定的时间进行
-
ReplacingMergeTree 适用于在后台清除重复的数据以节省空间,但是它不保证没有重复的数据出现
-
如何判断数据重复
- 在去除重复数据时,是以ORDER BY排序键为基准的,而不是PRIMARY KEY
- 若排序字段为两个,则两个字段都相同时才会去重
-
何时删除重复数据
- 在执行分区合并时触发删除重复数据,optimize的合并操作是在后台执行的,无法预测具体执行时间点,除非是手动执行
-
不同分区的重复数据不会被去重
- ReplacingMergeTree是以分区为单位删除重复数据的,在相同的数据分区内重复的数据才会被删除,而不同数据分区之间的重复数据依然不能被删除的
-
删除策略
- ReplacingMergeTree() 填入的参数为版本字段,重复数据就会保留版本字段值最大的。
- 如果不填写版本字段,默认保留插入顺序的最后一条数据
-
参数
-
ver
— 版本列。类型为UInt*
,Date
或DateTime
。可选参数。在数据合并的时候,
ReplacingMergeTree
从所有具有相同排序键的行中选择一行留下:- 如果
ver
列未指定,保留最后一条。 - 如果
ver
列已指定,保留ver
值最大的版本
- 如果
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = ReplacingMergeTree([ver]) [PARTITION BY expr] [ORDER BY expr] [SAMPLE BY expr] [SETTINGS name=value, ...]
-
-
第4集 短链数据统计-ClickHouse引擎选择和建表语句
简介: 短链数据统计-ClickHouse引擎选择和建表语句
-
引擎选择
- ReplacingMergeTree
- SummingMergeTree
- 分析
- 需要保证幂等性,合并的时候一样要sum,不知道分区啥时候合并
- SummingMergeTree其他的非聚合字段不相同,在聚合时会【保留最初】那条数据,新插入的数据对应的那个字段值会被舍弃,如果后续统计维度更多,没保留原始数据则会产生更多问题
- 所以选择 ReplacingMergeTree
-
分区选择
- 去重合并树
- 都是基于分区内的数据进行操作去重的,根据orderBy字段进行操作去重判断
- ReplacingMergeTree() 填入的参数为版本字段,重复数据就会保留版本字段值最大的。
- 如果按照小时秒 分区,会造成分区太多,容易产生异常
- 分区使用天进行分区,查询更多也是基于天进行
- 去重合并树
-
想实现数据统计需求
- 同个指标数据,Flink有很多种方式进行处理获取的、包括建表规则等
- 好比 老王-想钓鱼,可以去海边、家里的金鱼池、水库、鱼塘、溪流 等,多种方式都可以完成钓鱼
-
建表语句
CREATE TABLE default.visit_stats
(
`code` String,
`referer` String,
`is_new` UInt64,
`account_no` UInt64,
`province` String,
`city` String,
`ip` String,
`browser_name` String,
`os` String,
`device_type` String,
`pv` UInt64,
`uv` UInt64,
`start_time` DateTime,
`end_time` DateTime,
`ts` UInt64
)
ENGINE = ReplacingMergeTree(ts)
PARTITION BY toYYYYMMDD(start_time)
ORDER BY (
start_time,
end_time,
code,
province,
city,
referer,
is_new,
ip,
browser_name,
os,
device_type);
- 先建立表,后续有问题再调整
第5集 DWS层数据存储-Flink整合ClickHouse写入实战
简介: DWS层数据处理-Flink整合ClickHouse写入实战
- 添加依赖
<!--引入clickhouse依赖,排查版本影响-->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
- 数据库配置
#----------clickhouse配置--------------
clickhouse.servers=jdbc:clickhouse://120.79.150.146:8123/default
- Sink输出ClickHouse
@Slf4j
public class MyClickHouseSink {
/**
* CLICK_HOUSE地址
*/
private static String CLICK_HOUSE_SERVER = null;
static{
Properties properties = new Properties();
// 使用ClassLoader加载properties配置文件生成对应的输入流
InputStream in = MyClickHouseSink.class.getClassLoader().getResourceAsStream("application.properties");
// 使用properties对象加载输入流
try {
properties.load(in);
} catch (IOException e) {
log.error("加载ClickHouse配置文件失败,{}",e);
}
//获取key对应的value值
CLICK_HOUSE_SERVER = properties.getProperty("clickhouse.servers");
}
/**
* 获取向Clickhouse中写入数据的SinkFunction
*
*
* @param sql
* @param <T>
* @return
*/
public static SinkFunction getJdbcSink(String sql) {
/**
* 8、输出Clickhouse
* `code` String,
* `referer` String,
* `is_new` UInt64,
* `account_no` UInt64,
* `province` String,
* `city` String,
* `ip` String,
* `browser_name` String,
* `os` String,
* `device_type` String,
* `pv` UInt64,
* `uv` UInt64,
* `start_time` DateTime,
* `end_time` DateTime,
* `ts` UInt64
*/
SinkFunction<ShortLinkVisitStatsDO> sinkFunction = JdbcSink.sink(
sql,
//执行写入操,设置占位符
new JdbcStatementBuilder<ShortLinkVisitStatsDO>() {
@Override
public void accept(PreparedStatement ps, ShortLinkVisitStatsDO obj) throws SQLException {
ps.setObject(1, obj.getCode());
ps.setObject(2, obj.getReferer());
ps.setObject(3, obj.getIsNew());
ps.setObject(4, obj.getAccountNo());
ps.setObject(5, obj.getProvince());
ps.setObject(6, obj.getCity());
ps.setObject(7, obj.getIp());
ps.setObject(8, obj.getBrowserName());
ps.setObject(9, obj.getOs());
ps.setObject(10, obj.getDeviceType());
ps.setObject(11, obj.getPv());
ps.setObject(12, obj.getUv());
ps.setObject(13, obj.getStartTime());
ps.setObject(14, obj.getEndTime());
ps.setObject(15, obj.getVisitTime());
}
},
//batchSize属性,执行批次大小,默认5000
new JdbcExecutionOptions.Builder().withBatchSize(10).build(),
//连接配置相关
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(CLICK_HOUSE_SERVER)
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUsername("default")
.build()
);
return sinkFunction;
}
}
- SQL编写
reduceDS.addSink(
MyClickHouseSink.getJdbcSink("insert into visit_stats values(?,?,?,? ,?,?,?,? ,?,?,?,? ,?,?,?)"));
第6集 短链访问-Flink整合ClickHouse链路测试
简介: 短链访问-Flink整合ClickHouse链路测试
-
全链路测试
- http://localhost:8003/026m8O3a
-
bug修复
-
用户名配置错误
-
deviceManufacturer字段为空
-
地理位置信息接口空返回
-
shortLinkWideDO.setProvince("-"); shortLinkWideDO.setCity("-"); return shortLinkWideDO;
-
resultFuture.complete(Collections.singleton(null));
-
-
-
测试数据
INSERT INTO default.visit_stats (code,referer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES ('026m8O3a','douyin.com',0,693100647796441088,'广东省','深圳市','14.197.9.110','Chrome','Mac OS X','COMPUTER',1,0,'2022-03-29 01:34:50.000','2022-03-29 01:35:00.000',1648618495887);
INSERT INTO default.visit_stats (code,referer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES ('026m8O3a','baidu.com',0,693100647796441088,'广东省','广州市','113.68.152.139','Chrome','Mac OS X','COMPUTER',1,1,'2022-03-28 01:35:00.000','2022-03-28 01:35:10.000',1648618505585);
INSERT INTO default.visit_stats (code,referer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES ('026m8O3a','taobao.com',0,693100647796441088,'广东省','广州市','113.68.152.139','Safari','Mac OS X','COMPUTER',1,0,'2022-03-27 01:35:00.000','2022-03-27 01:35:10.000',1648618503041);
INSERT INTO default.visit_stats (code,referer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES ('026m8O3a','taobao.com',0,693100647796441088,'广东省','深圳市','14.197.9.110','Chrome','Mac OS X','COMPUTER',2,0,'2022-03-30 01:35:00.000','2022-03-30 01:35:10.000',1648618508068);
INSERT INTO default.visit_stats (code,referer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES ('026m8O3a','taobao.com',0,693100647796441088,'广东省','深圳市','14.197.9.110','Chrome','Mac OS X','Mobile',1,1,'2022-03-30 02:35:10.000','2022-03-30 02:35:20.000',1648618510247);
INSERT INTO default.visit_stats (code,referer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES ('026m8O3a','douyin.com',0,693100647796441088,'广东省','广州市','113.68.152.139','Safari','Mac OS X','COMPUTER',1,0,'2022-03-30 01:36:30.000','2022-03-30 01:36:40.000',1648618596394);
INSERT INTO default.visit_stats (code,referer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES ('026m8O3a','douyin.com',0,693100647796441088,'广东省','广州市','113.68.152.139','Chrome','Mac OS X','COMPUTER',1,2,'2022-03-30 01:38:20.000','2022-03-30 01:38:30.000',1648618704683);
INSERT INTO default.visit_stats (code,referer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES ('026m8O3a','xdclass.net',0,693100647796441088,'广东省','广州市','113.68.152.139','Chrome','Mac OS X','Mobile',1,0,'2022-03-31 01:38:20.000','2022-03-31 01:38:30.000',1648618708996);
INSERT INTO default.visit_stats (code,referer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES ('026m8O3a','douyin.com',0,693100647796441088,'四川','成都','14.197.9.110','Chrome','Mac OS X','COMPUTER',2,0,'2022-03-30 03:38:20.000','2022-03-30 03:38:30.000',1648618701386);
INSERT INTO default.visit_stats (code,referer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES ('026m8O3a','xdclass.net',0,693100647796441088,'湖南','长沙','113.68.152.129','Safari','Mac OS X','COMPUTER',2,0,'2022-03-30 01:38:30.000','2022-03-30 01:38:40.000',1648618711433);
INSERT INTO default.visit_stats (code,referer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES ('026m8O3a','taobao.com',0,693100647796441088,'广东省','广州市','113.68.152.139','Chrome','Mac OS X','COMPUTER',1,0,'2022-03-30 04:38:30.000','2022-03-30 04:38:40.000',1648618714086);
INSERT INTO default.visit_stats (code,referer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES ('026m8O3a','douyin.com',0,693100647796441088,'广东省','深圳市','14.197.9.110','Chrome','Mac OS X','Mobile',2,0,'2022-03-30 01:38:30.000','2022-03-30 01:38:40.000',1648618710117);
eferer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES (‘026m8O3a’,‘douyin.com’,0,693100647796441088,‘广东省’,‘广州市’,‘113.68.152.139’,‘Chrome’,‘Mac OS X’,‘COMPUTER’,1,2,‘2022-03-30 01:38:20.000’,‘2022-03-30 01:38:30.000’,1648618704683);
INSERT INTO default.visit_stats (code,referer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES (‘026m8O3a’,‘xdclass.net’,0,693100647796441088,‘广东省’,‘广州市’,‘113.68.152.139’,‘Chrome’,‘Mac OS X’,‘Mobile’,1,0,‘2022-03-31 01:38:20.000’,‘2022-03-31 01:38:30.000’,1648618708996);
INSERT INTO default.visit_stats (code,referer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES (‘026m8O3a’,‘douyin.com’,0,693100647796441088,‘四川’,‘成都’,‘14.197.9.110’,‘Chrome’,‘Mac OS X’,‘COMPUTER’,2,0,‘2022-03-30 03:38:20.000’,‘2022-03-30 03:38:30.000’,1648618701386);
INSERT INTO default.visit_stats (code,referer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES (‘026m8O3a’,‘xdclass.net’,0,693100647796441088,‘湖南’,‘长沙’,‘113.68.152.129’,‘Safari’,‘Mac OS X’,‘COMPUTER’,2,0,‘2022-03-30 01:38:30.000’,‘2022-03-30 01:38:40.000’,1648618711433);
INSERT INTO default.visit_stats (code,referer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES (‘026m8O3a’,‘taobao.com’,0,693100647796441088,‘广东省’,‘广州市’,‘113.68.152.139’,‘Chrome’,‘Mac OS X’,‘COMPUTER’,1,0,‘2022-03-30 04:38:30.000’,‘2022-03-30 04:38:40.000’,1648618714086);
INSERT INTO default.visit_stats (code,referer,is_new,account_no,province,city,ip,browser_name,os,device_type,pv,uv,start_time,end_time,ts) VALUES (‘026m8O3a’,‘douyin.com’,0,693100647796441088,‘广东省’,‘深圳市’,‘14.197.9.110’,‘Chrome’,‘Mac OS X’,‘Mobile’,2,0,‘2022-03-30 01:38:30.000’,‘2022-03-30 01:38:40.000’,1648618710117);
``