- 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
- 📕系列专栏:Spring源码、JUC源码、Kafka原理
- 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
- 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
- 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀
文章目录
- 练习
- 生产者
- Ack 应答机制
- 消费者重要参数
- API 开发:topic 管理
- 列出主题
- 查看主题信息
- 创建主题
- 删除主题
- 其他管理
- 练习
- 消费者
- 消费者
练习
需求:
写一个生产者,不断地去生成“用户行为事件” 并写入kafka
{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}
{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}
{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}
…
写一个消费者,不断地从kafka种取消费如上 “ 用户行为事件” 数据,并做统计计算,每五分钟,输出一次截至到当时的数据中出现过的用户总数。
生产者
public class KafkaTest{
public static void main(String[] args){
MyDataGen myDataGen = new MyDataGen();
myDataGen.genData();
}
}
/*
业务数据生成器
*/
class MyDataGen{
Producer<String, String> produce;
public MyDataGen(){
Properties props = new Properties();
//设置 kafka 集群的地址
props.put("bootstrap.servers", "doitedu01:9092,doitedu02:9092,doitedu03:9092");
//序列化器
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//ack 模式,取值有 0,1,-1(all) , all 是最慢但最安全的
props.put("acks", "all")
producer = new KafkaProducer<>(props);
}
public void genData(){
UserEvent userEvent = new UserEvent();
while(true){
// 造一条随机用户行为事件数据对象
userEvent.setGuid(RandomUtils.nextInt(1,10000));
userEvent.setEventId(RandomStringUtils.randomAlphabetic(5,8));
userEvent.setTimeStamp(System.currentTimeMills());
// 转成json串
String json = JSON.toJSONString(userEvent);
// 讲业务数据封装成ProducerRecord对象
ProducerRecord<String,String> record = new ProducerRecord<String,String>("test",json);
// 用producer写入kafka
producer.send(record);
Thread.sleep(RandomUtils.nextInt(500,2000));
}
}
}
@NoArgsConStructor
@AllArgConstructor
@Getter
@Setter
class UserEvent{
private long guid;
private String eventId;
private long timeStamp;
}
Ack 应答机制
Ack 应答机制参数配置
- 0 : 生产者发出消息后不等待服务端的确认
- 1 : 生产者发出消息后要求服务端的分区 leader 确保数据存储成功后发送一个确认信息
- -1: 生产者发出消息后要求服务端的分区的 ISR 副本全部同步成功后发送一个确认信息
生产者的 ack=all,也不能完全保证数据发送的 100%可靠性
为什么?因为,如果服务端目标 partition 的同步副本只有 leader 自己了,此时,它收到数据就会给生产者反馈成功!
可以修改服务端的一个参数(分区最小 ISR 数[min.insync.replicas]>=2),来避免此问题;
消费者重要参数
fetch.min.bytes=1B 一次拉取的最小字节数
fetch.max.bytes=50M 一次拉取的最大数据量
fetch.max.wait.ms=500ms 拉取时的最大等待时长
max.partition.fetch.bytes = 1MB 每个分区一次拉取的最大数据量
max.poll.records=500 一次拉取的最大条数
connections.max.idle.ms=540000ms 网络连接的最大闲置时长
request.timeout.ms=30000ms 一次请求等待响应的最大超时时间 consumer 等待请求响应的最长时间
metadata.max.age.ms=300000 元数据在限定时间内没有进行更新,则会被强制更新
reconnect.backoff.ms=50ms 尝试重新连接指定主机之前的退避时间
retry.backoff.ms=100ms 尝试重新拉取数据的重试间隔
isolation.level=read_uncommitted 隔离级别! 决定消费者能读到什么样的数据
read_uncommitted:可以消费到 LSO(LastStableOffset)位置;
read_committed:可以消费到 HW(High Watermark)位置
max.poll.interval.ms 超过时限没有发起 poll 操作,则消费组认为该消费者已离开消费组
enable.auto.commit=true 开启消费位移的自动提交
auto.commit.interval.ms=5000 自动提交消费位移的时间间隔
API 开发:topic 管理
如果希望将管理类的功能集成到公司内部的系统中,打造集管理、监控、运维、告警为一体的生态平台,那么就需要以程序调用 API 方式去实现。如下所示:
为什么在网页上,或者用命令,都能做到topic的创建、删除、列出、查看详情,其底层逻辑是什么?
其本质上是通过不同方式调用了kafka提供的创建topic api
底层逻辑:web的后端java程序中,去调用相关的api
工具类 KafkaAdminClient 可以用来管理 broker、配置和 ACL (Access Control List),管理 topic
构造一个 KafkaAdminClient
AdminClient adminClient = KafkaAdminClient.create(props);
列出主题
ListTopicsResult listTopicsResult = adminClient.listTopics();
Set<String> topics = listTopicsResult.names().get();
System.out.println(topics);
查看主题信息
DescribeTopicsResult describeTopicsResult =
adminClient.describeTopics(Arrays.asList("tpc_4", "tpc_3"));
Map<String, TopicDescription> res = describeTopicsResult.all().get();
Set<String> ksets = res.keySet();
for (String k : ksets) {
System.out.println(res.get(k));
}
创建主题
// 参数配置
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"doit01:9092,doit02:9092,doit03:
9092");
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);
// 创建 admin client 对象
AdminClient adminClient = KafkaAdminClient.create(props);
// 由服务端 controller 自行分配分区及副本所在 broker
NewTopic tpc_3 = new NewTopic("tpc_3", 2, (short) 1);
// 手动指定分区及副本的 broker 分配
HashMap<Integer, List<Integer>> replicaAssignments = new HashMap<>();
// 分区 0,分配到 broker0,broker1
replicaAssignments.put(0,Arrays.asList(0,1));
// 分区 1,分配到 broker0,broker2
replicaAssignments.put(1,Arrays.asList(0,1));
NewTopic tpc_4 = new NewTopic("tpc_4", replicaAssignments);
CreateTopicsResult result = adminClient.createTopics(Arrays.asList(tpc_3,tpc_4));
// 从 future 中等待服务端返回
try {
result.all().get();
} catch (Exception e) {
e.printStackTrace();
}
adminClient.close();
删除主题
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("tpc_1",
"tpc_1"));
Map<String, KafkaFuture<Void>> values = deleteTopicsResult.values();
System.out.println(values);
其他管理
除了进行 topic 管理外,KafkaAdminClient 也可进行诸如动态参数管理,分区管理等各类管理操作;
练习
需求:
写一个生产者,不断地去生成“用户行为事件” 并写入kafka
{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}
{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}
{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}
…
写一个消费者,不断地从kafka种取消费如上 “ 用户行为事件” 数据,并做统计计算,每五分钟,输出一次截至到当时的数据中出现过的用户总数。 本质 去重guid数
消费者
首先分析思路,可以使用hashmap来进行统计思路,如下所示:
如果存在线程安全的问题,可以使用ConcurrentHashMap。
还有一个业务需求就是 每五分钟去输出一个结果,这个时机、节奏,如何去把控。
实际上就是一个线程源源不断的拉数据,一个线程定期的输出结果。
public class consumer{
public static void main(String[] args){
ConcurrentHashMap<Long, String> guidMap = new ConcurrentHashMap<>();
// 启动数据消费线程
new Thread(new ConsumeRunnable(guidMap)).start();
// 启动统计及消费输出结果的线程(每5s输出一次)
// 优雅一点实现定时调度,可以用各种定时调度器(有第三方的,也可以用JDK自己的,Timer)
Timer timer = new Timer();
timer.scheduleAtFixedRate(new StatisticTask(guidMap),0,1000);
}
}
class ConsumeRunnable implements Runnable{
ConcurrentHashMap<Long, String> guidMap;
public ConsumeRunnable(ConcurrentHashMap<Long, String> guidMap){
this.guidMap = guidMap;
}
public void run(){
Properties props = new Properties();
// 定义 kakfa 服务的地址,不需要将所有 broker 指定上
props.put("bootstrap.servers", "doitedu01);
// key 的反序列化类
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// value 的反序列化类
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, none
props.put("auto.offset.reset","latest");
// 制定 consumer group
props.put("group.id", "g1");
while (true) {
// 读取数据,读取超时时间为 5000ms
ConsumerRecords<String, String> records = consumer.poll(5000);
for (ConsumerRecord<String, String> record : records)
String eventJson = record.value();
UserEvent userEvent = JSON.parseObject(eventJson,UserEvent.class);
guidMap.put(userEvent.getGuid(), "");
}
}
}
class StatisticTask extends TimerTask{
ConcurrentHashMap<Long, String> guidMap;
public StatisticTask(ConcurrentHashMap<Long, String> guidMap){
this.guidMap = guidMap;
}
public void run(){
System.out.println("截止到当前的用户总数为:"+guidMap.size());
}
}
ConcurrentHashMap 中无论 key 还是 Value都不能填 null值,具体在源码中有体现:
其 put流程如下:
public V put(K key, V value) {
return putVal(key, value, false);
// onlyIfAbsent如果是false,那么每次都会用新值替换掉旧值
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 其中 spread 方法会综合高位低位, 具有更好的 hash 性
int hash = spread(key.hashCode());
int binCount = 0;
// 死循环
for (Node<K,V>[] tab = table;;) {
// f 是链表头节点
// fh 是链表头结点的 hash
// i 是链表在 table 中的下标
Node<K,V> f; int n, i, fh;
// 要创建 table
if (tab == null || (n = tab.length) == 0)
// 初始化 table 使用了 cas, 无需 synchronized 创建成功, 进入下一轮循环
// 因为是懒惰初始化的,所以直到现在才开始创建 初始化使用cas 创建,其它失败得再次进入循环,没有用syn 我们得线程并没有被阻塞住
tab = initTable();
// 要创建链表头节点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 添加链表头使用了 cas, 无需 synchronized
// 用cas将头节点加进去,如果加入失败了,继续循环
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
// 帮忙扩容
// 其实就是看你的头结点是不是 ForwardingNode,其对应得MOVED是一个负数
else if ((fh = f.hash) == MOVED)
// 帮忙之后, 进入下一轮循环
// 锁住当前的链表,帮助去扩容
tab = helpTransfer(tab, f);
// 能进入这个else,说明 table既不处于扩容中,也不是处于table的初始化过程中,而且这时肯定发生了锁下标的冲突
else {
V oldVal = null;
// 锁住链表头节点
// 并没有锁住整个tab,而是锁住这个桶链表的头节点
synchronized (f) {
// 再次确认链表头节点没有被移动
if (tabAt(tab, i) == f) {
// 链表
// 链表的头节点hash码大于等于 0
if (fh >= 0) {
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到相同的 key
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// 更新
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 已经是最后的节点了, 新增 Node, 追加至链表尾
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// putTreeVal 会看 key 是否已经在树中, 是, 则返回对应的 TreeNode
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
// 释放链表头节点的锁
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
// 如果链表长度 >= 树化阈值(8), 进行链表转为红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 增加 size 计数
addCount(1L, binCount);
return null;
}
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 这个hash有没有被创建
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 让出cpu的使用权,如果cpu的时间片没有其它线程了,那么还是会分给这个线程,只是让他不至于充分利用cpu,少占用一点cpu的时间。
Thread.yield();
// 尝试将 sizeCtl 设置为 -1(表示初始化 table)
// 而其它的线程,再次进入循环,首先 不小于0了,其次,之前的 sc也已经变了,cas失败,再次循环的时候,发现 tab已经不为空了,结束循环
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 获得锁, 创建 table, 这时其它线程会在 while() 循环中 yield 直至 table 创建
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 计算出下一次要扩容的阈值
sc = n - (n >>> 2);
}
} finally {
// 计算出下一次要扩容的阈值
sizeCtl = sc;
}
break;
}
}
return tab;
}
// check 是之前 binCount 的个数
// 运用了 longadder 的思想
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if (
// 已经有了 counterCells, 向 cell 累加
// 累加单元数组不为空
(as = counterCells) != null ||
// 还没有, 向 baseCount 累加
// 一个基础数值累加
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)
) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (
// 还没有 counterCells
as == null || (m = as.length - 1) < 0 ||
// 还没有 cell
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
// cell cas 增加计数失败
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
) {
// 创建累加单元数组和cell, 累加重试
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
// 获取元素个数
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 看看元素的个数是否大于扩容的阈值
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// newtable 已经创建了,帮忙扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// 首次调用,因为是懒惰初始化的,所以还没有创建
transfer(tab, nt);
}
// 需要扩容,这时 newtable 未创建
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
但是这里使用一个hashmap去记录去重的guid是有弊端的。
利用map或者set来记录guid本身,比较占用内存还有性能。
guid是一个Long值,8个byte,如果咱们的用户规模达到了10亿,那你这个set或者map中存储的guid就将达到10亿个。
相当于8 * 1000000000 /1024/1024 = 7629MB相当于一个Hashmap中存储了7G数据
那么有一些经典的数据结构,bitmap、bloomfilter、hyperloglog都可以解决去统计个数、判重的场景。
输出个数:有几个1,就是出现过几种 guid:
之前使用hashmap存,直接存guid,8个byte一个guid == 64bit 现在bitmap里面,记一个guid,只要一个bit
并且hashmap是按需扩容的,这个过程是很浪费性能的,而bigmap要一开始初始化10亿个bit,有点浪费空间。
但是api会对最原始的bitmap进行一些优化,比如说稀疏型的向量的场景中,例如roaringbitmap工具包进行了优化,慢慢的进行了扩容。
public class consumer{
public static void main(String[] args){
// 使用roaringbitmap来记录
RoaringBitmap bitmap = RoaringBitmap.bigmapOf();
// 启动数据消费线程
new Thread(new ConsumeRunnable(bitmap)).start();
// 启动统计及消费输出结果的线程(每5s输出一次)
// 优雅一点实现定时调度,可以用各种定时调度器(有第三方的,也可以用JDK自己的,Timer)
Timer timer = new Timer();
timer.scheduleAtFixedRate(new StatisticTask(bitmap),0,1000);
}
}
class ConsumeRunnable implements Runnable{
RoaringBitmap bitmap;
public ConsumeRunnable(RoaringBitmap bitmap){
this.bitmap = bitmap;
}
public void run(){
Properties props = new Properties();
// 定义 kakfa 服务的地址,不需要将所有 broker 指定上
props.put("bootstrap.servers", "doitedu01);
// key 的反序列化类
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// value 的反序列化类
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, none
props.put("auto.offset.reset","latest");
// 制定 consumer group
props.put("group.id", "g1");
while (true) {
// 读取数据,读取超时时间为 5000ms
ConsumerRecords<String, String> records = consumer.poll(5000);
for (ConsumerRecord<String, String> record : records)
String eventJson = record.value();
UserEvent userEvent = JSON.parseObject(eventJson,UserEvent.class);
bitmap.add(userEvent.getGuid());
}
}
}
class StatisticTask extends TimerTask{
RoaringBitmap bitmap;
public StatisticTask(RoaringBitmap bitmap){
this.bitmap = bitmap;
}
public void run(){
System.out.println("截止到当前的用户总数为:"+bitmap.getCardinality());
}
}
但是如果涉及到了多维聚合,那么单纯的使用bitmap可能还不够
比如 省市区 用户数 、 省市 用户数
江苏省,南京市,鼓楼区,5
江苏省,南京市,下关区,6
…
从用户访问记录中统计各区域的用户数
江苏省,南京市,鼓楼区,[0,1,0,1,1,1,0,1,0,0,0,0] 5
江苏省,南京市,下关区,[1,1,0,0,0,0,0,0,1,1,1,1] 6
要到的:江苏省,南京市 [1,1,0,1,1,1,0,1,1,1,1,1] 10
这也就是使用bitmap来实现高效率的,维度再均衡
所以这也就是数据结构与应用解耦
消费者
需求2:写一个消费者,不断地从kafka中消费如上"用户行为事件",并做出如下加工处理
给每一条数据,添加一个字段来标识,该条数据所属的用户今天是否第一次出现,如是,则标注1,否则标注0
{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789,“flag”:1}
{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789,“flag”:0}
{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789,“flag”:1}
这个需求可以用HashMap来做
也可以用bitmap来做,启动bitmap.contain()能够判断是是否包含
如果是判重字符串,用bitmap就不太方便了,可以使用bloomfilter,但是bloomfilter会牺牲一定的准确率(因为bloomfilter会误判)
在判重方面,布隆过滤器比bitmap.contain()更适合。
布隆过滤器具有以下优势:
- 内存占用更小:布隆过滤器使用位数组和哈希函数来表示数据集合,相比于bitmap.contain()方法,它可以以较小的内存占用来存储大规模的数据集合。
- 高效的查询性能:布隆过滤器可以在常数时间内完成判重操作,即使数据集合非常庞大,也能以较高的速度进行查询。而bitmap.contain()方法可能需要遍历整个位图数组,时间复杂度较高。
- 可扩展性:布隆过滤器可以动态地添加新的元素,并且支持删除操作。相比之下,bitmap.contain()方法需要预先确定数据范围,不便于动态扩展。
虽然布隆过滤器具有以上优势,但也需要注意它存在一定的误判率(即可能将不存在的元素判断为存在)。因此,在判重的关键场景中,如果对结果的准确性要求非常高,可以使用其他更精确的去重方法进行验证。
public class consumer{
public static void main(String[] args){
// 启动数据消费线程
new Thread(new ConsumeRunnable()).start();
}
}
class ConsumeRunnable implements Runnable{
BloomFilter<Long> bloomFilter;
Properties props;
public ConsumeRunnable(){
bloomFilter = BloomFilter.create(Funnels.longFunnel(),1000000000,0.01);
props = new Properties();
// 定义 kakfa 服务的地址,不需要将所有 broker 指定上
props.put("bootstrap.servers", "doitedu01);
// key 的反序列化类
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// value 的反序列化类
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, none
props.put("auto.offset.reset","latest");
// 制定 consumer group
props.put("group.id", "g1");
}
public void run(){
while (true) {
// 读取数据,读取超时时间为 5000ms
ConsumerRecords<String, String> records = consumer.poll(5000);
for (ConsumerRecord<String, String> record : records)
String eventJson = record.value();
UserEvent userEvent = JSON.parseObject(eventJson,UserEvent.class);
// 去布隆过滤器判断一下
boolean mightContain = bloomFilter.mightContain(userEvent.getGuid());
if(mightContain){
userEvent.setFlag(0);
}else{
userEvent.setFlag(1);
bloomFilter.put(userEvent.getGuid());
}
}
}
}