一、说明
衡量网站流量一个最简单的指标,就是网站的页面浏览量(Page View,PV)。用户每次打开一个页面便记录1次PV,多次打开同一页面则浏览量累计。
一般来说,PV与来访者的数量成正比,但是PV并不直接决定页面的真实来访者数量,如同一个来访者通过不断的刷新页面,也可以制造出非常高的PV。接下来我们就用Flink算子来实现PV的统计。
二、测试数据准备
把数据文件 UserBehavior 复制到project的input目录下
用于封装数据的JavaBean类
package com.atguigu.flink.java.chapter_6;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @Author lizhenchao@atguigu.cn
* @Date 2020/12/10 19:32
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserBehavior {
private Long userId;
private Long itemId;
private Integer categoryId;
private String behavior;
private Long timestamp;
}
三、代码
pv实现思路1: WordCount
package com.lyh.flink06;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class PVcount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readTextFile("input/UserBehavior.csv")
.map(line -> { // 对数据切割, 然后封装到POJO中
String[] split = line.split(",");
return new UserBehavior(
Long.valueOf(split[0]),
Long.valueOf(split[1]),
Integer.valueOf(split[2]),
String.valueOf(split[3]),
Long.valueOf(split[4]));
})
.filter(behavior -> "pv".equals(behavior.getBehavior())) //过滤出pv行为
.map(behavior -> Tuple2.of("pv", 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG)) // 使用Tuple类型, 方便后面求和
.keyBy(value -> value.f0) // keyBy: 按照key分组
.sum(1) // 求和
.print();
env.execute();
}
}
pv实现思路2: process
package com.lyh.flink06;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class PVprocess {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.readTextFile("input/UserBehavior.csv")
.map(line -> {
String[] split = line.split(",");
return new UserBehavior(
Long.valueOf(split[0]),
Long.valueOf(split[1]),
Integer.valueOf(split[2]),
String.valueOf(split[3]),
Long.valueOf(split[4]));
})
.filter(behavior -> "pv".equals(behavior.getBehavior()))
.keyBy(UserBehavior::getBehavior)
.process(new KeyedProcessFunction<String, UserBehavior, Long>() {
long count = 0;
@Override
public void processElement(UserBehavior userBehavior,
Context ctx,
Collector<Long> out) throws Exception {
count++;
out.collect(count);
}
}).print();
env.execute();
}
}