1、Kafka
1、kafka集群架构
producer
消息生产者,发布消息到Kafka集群的终端或服务
broker
Kafka集群中包含的服务器,一个borker就表示kafka集群中的一个节点
topic
每条发布到Kafka集群的消息属于的类别,即Kafka是面向 topic 的。
更通俗的说Topic就像一个消息队列,生产者可以向其写入消息,消费者可以从中读取消息,一个Topic支持多个生产者或消费者同时订阅它,所以其扩展性很好。
partition
每个 topic 包含一个或多个partition。Kafka分配的单位是partition
replica
partition的副本,保障 partition 的高可用。
consumer
从Kafka集群中消费消息的终端或服务
consumer group
每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
leader
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。 producer 和 consumer 只跟 leader 交互
follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。
controller
知道大家有没有思考过一个问题,就是Kafka集群中某个broker宕机之后,是谁负责感知到他的宕机,以及负责进行Leader Partition的选举?如果你在Kafka集群里新加入了一些机器,此时谁来负责把集群里的数据进行负载均衡的迁移?包括你的Kafka集群的各种元数据,比如说每台机器上有哪些partition,谁是leader,谁是follower,是谁来管理的?如果你要删除一个topic,那么背后的各种partition如何删除,是谁来控制?还有就是比如Kafka集群扩容加入一个新的broker,是谁负责监听这个broker的加入?如果某个broker崩溃了,是谁负责监听这个broker崩溃?这里就需要一个Kafka集群的总控组件,Controller。他负责管理整个Kafka集群范围内的各种东西。
zookeeper
(1) Kafka 通过 zookeeper 来存储集群的meta元数据信息
(2)一旦controller所在broker宕机了,此时临时节点消失,集群里其他broker会一直监听这个临时节点,发现临时节点消失了,就争抢再次创建临时节点,保证有一台新的broker会成为controller角色。
offset
-
偏移量
消费者在对应分区上已经消费的消息数(位置),offset保存的地方跟kafka版本有一定的关系。
kafka0.8 版本之前offset保存在zookeeper上。
kafka0.8 版本之后offset保存在kafka集群上。
它是把消费者消费topic的位置通过kafka集群内部有一个默认的topic,
名称叫 __consumer_offsets,它默认有50个分区。
ISR机制
光是依靠多副本机制能保证Kafka的高可用性,但是能保证数据不丢失吗?不行,因为如果leader宕机,但是leader的数据还没同步到follower上去,此时即使选举了follower作为新的leader,当时刚才的数据已经丢失了。
ISR是:in-sync replica,就是跟leader partition保持同步的follower partition的数量,只有处于ISR列表中的follower才可以在leader宕机之后被选举为新的leader,因为在这个ISR列表里代表他的数据跟leader是同步的。
2、kafka的命令行的管理使用
-
1、创建topic
kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper node01:2181,node02:2181,node03:2181
2、查询所有的topic
kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181
3、查看topic的描述信息
kafka-topics.sh --describe --topic test --zookeeper node01:2181,node02:2181,node03:2181
4、删除topic
kafka-topics.sh --delete --topic test --zookeeper node01:2181,node02:2181,node03:2181
5、模拟生产者写入数据到topic中
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
6、模拟消费者拉取topic中的数据
kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --topic test --from-beginning
或者
kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test --from-beginning
3、kafka的生产者和消费者api代码开发
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
1、生产者
package com.kaikeba.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;/**
* 需求:开发kafka生产者代码
*/
public class KafkaProducerStudy {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//准备配置属性
Properties props = new Properties();
//kafka集群地址
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
//acks它代表消息确认机制 // 1 0 -1 all
/**
* acks = 0: 表示produce请求立即返回,不需要等待leader的任何确认。
* 这种方案有最高的吞吐率,但是不保证消息是否真的发送成功。
*
* acks = 1: 表示leader副本必须应答此produce请求并写入消息到本地日志,之后produce请求被认为成功. 如果leader挂掉有数据丢失的风险* acks = -1或者all: 表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为produce请求成功。
* 这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。
*/
props.put("acks", "1");//重试的次数
props.put("retries", 0);
//缓冲区的大小 //默认32M
props.put("buffer.memory", 33554432);
//批处理数据的大小,每次写入多少数据到topic //默认16KB
props.put("batch.size", 16384);
//可以延长多久发送数据 //默认为0 表示不等待 ,立即发送
props.put("linger.ms", 1);
//指定key和value的序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++) {
//这里需要三个参数,第一个:topic的名称,第二个参数:表示消息的key,第三个参数:消息具体内容
producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hello-kafka-"+i));
}
producer.close();
}}
2、带消息回调的生产者
package kafka.product; import org.apache.kafka.clients.producer.*; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * 需求:异步发送 */ public class KafkaProducerStudySyn { public static void main(String[] args) throws ExecutionException, InterruptedException { //准备配置属性 Properties props = new Properties(); //kafka集群地址 props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); //acks它代表消息确认机制 // 1 0 -1 all /** * acks = 0: 表示produce请求立即返回,不需要等待leader的任何确认。 * 这种方案有最高的吞吐率,但是不保证消息是否真的发送成功。 * * acks = 1: 表示leader副本必须应答此produce请求并写入消息到本地日志,之后produce请求被认为成功. 如果leader挂掉有数据丢失的风险 * acks = -1或者all: 表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为produce请求成功。 * 这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。 */ props.put("acks", "1"); //重试的次数 props.put("retries", 0); //缓冲区的大小 //默认32M props.put("buffer.memory", 33554432); //批处理数据的大小,每次写入多少数据到topic //默认16KB props.put("batch.size", 16384); //可以延长多久发送数据 //默认为0 表示不等待 ,立即发送 props.put("linger.ms", 1); //指定key和value的序列化器 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 10; i++) { //这里需要三个参数,第一个:topic的名称,第二个参数:表示消息的key,第三个参数:消息具体内容 producer.send(new ProducerRecord<String, String>("jhj", Integer.toString(i), "hello-kafka-" + i), new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { System.out.println("消息发送成功"); String topic = recordMetadata.topic(); long offset = recordMetadata.offset(); int partition = recordMetadata.partition(); System.out.println("topic:" + topic + " offset" + offset + "partition:" + partition); } } }); } producer.close(); } }
3、消费者自动提交偏移量( props.put("enable.auto.commit", "true");)
package kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; //todo:需求:开发kafka消费者代码(自动提交偏移量) public class KafkaConsumerStudy { public static void main(String[] args) { //准备配置属性 Properties props = new Properties(); //kafka集群地址 props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); //消费者组id props.put("group.id", "consumer-test"); //自动提交偏移量 props.put("enable.auto.commit", "true"); //自动提交偏移量的时间间隔 props.put("auto.commit.interval.ms", "1000"); //默认是latest //earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 //latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 //none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 props.put("auto.offset.reset","earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //指定消费哪些topic consumer.subscribe(Arrays.asList("jhj")); while (true) { //不断的拉取数据 ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { //该消息所在的分区号 int partition = record.partition(); //该消息对应的key String key = record.key(); //该消息对应的偏移量 long offset = record.offset(); //该消息内容本身 String value = record.value(); System.out.println("partition:"+partition+"\t key:"+key+"\toffset:"+offset+"\tvalue:"+value); } } } }
4、手动提交偏移量( consumer.commitSync();//提交偏移量)
package kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; //todo:需求:开发kafka消费者代码(手动提交偏移量) public class KafkaConsumerControllerOffset { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("group.id", "controllerOffset"); //关闭自动提交,改为手动提交偏移量 props.put("enable.auto.commit", "false"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //指定消费者要消费的topic consumer.subscribe(Arrays.asList("jhj")); //定义一个数字,表示消息达到多少后手动提交偏移量 final int minBatchSize = 20; //定义一个数组,缓冲一批数据 List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { String value = record.value(); System.out.println("获取到的:" + value); buffer.add(record); } if (buffer.size() >= minBatchSize) { //insertIntoDb(buffer); 拿到数据之后,进行消费 System.out.println("缓冲区的数据条数:" + buffer.size()); System.out.println("我已经处理完这一批数据了..."); consumer.commitSync();//自动提交偏移量 buffer.clear(); } } } }
kafka分区策略
分区是实现负载均衡以及高吞吐量的关键,故在生产者这一端就要仔细盘算合适的分区策略,避免造成消息数据的“倾斜”,使得某些分区成为性能瓶颈,这样极易引发下游数据消费的性能下降
-
kafka的分区策略决定了producer生产者产生的一条消息最后会写入到topic的哪一个分区中
-
1、指定具体的分区号
//1、给定具体的分区号,数据就会写入到指定的分区中
producer.send(new ProducerRecord<String, String>("test", 0,Integer.toString(i), "hello-kafka-"+i));
2、不给定具体的分区号,给定key的值
//2、不给定具体的分区号,给定一个key值, 这里使用key的 hashcode%分区数=分区号
producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hello-kafka-"+i));
3、不给定具体的分区号,也不给对应的key
//3、不给定具体的分区号,也不给定对应的key ,这个它会进行轮训的方式把数据写入到不同分区中
producer.send(new ProducerRecord<String, String>("test", "hello-kafka-"+i));
4、自定义分区
package com.kaikeba.partitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;
//todo:需求:自定义kafka的分区函数
public class MyPartitioner implements Partitioner{
/**
* 通过这个方法来实现消息要去哪一个分区中
* @param topic
* @param key
* @param bytes
* @param value
* @param bytes1
* @param cluster
* @return
*/
public int partition(String topic, Object key, byte[] bytes, Object value, byte[] bytes1, Cluster cluster) {
//获取topic分区数
int partitions = cluster.partitionsForTopic(topic).size();
//key.hashCode()可能会出现负数 -1 -2 0 1 2
//Math.abs 取绝对值
return Math.abs(key.hashCode()% partitions);}
public void close() {
}public void configure(Map<String, ?> map) {
}
}
props.put("partitioner.class","com.kaikeba.partitioner.MyPartitioner");
kafka高效文件存储设计特点
-
Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
-
通过索引信息可以快速定位message
-
通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
-
通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。
为什么Kafka速度那么快
Kafka是大数据领域无处不在的消息中间件,目前广泛使用在企业内部的实时数据管道,并帮助企业构建自己的流计算应用程序。
Kafka虽然是基于磁盘做的数据存储,但却具有高性能、高吞吐、低延时的特点,其吞吐量动辄几万、几十上百万,这其中的原由值得我们一探究竟。
1、顺序读写
磁盘顺序读写性能要高于内存的随机读写
众所周知Kafka是将消息记录持久化到本地磁盘中的,一般人会认为磁盘读写性能差,可能会对Kafka性能如何保证提出质疑。实际上不管是内存还是磁盘,快或慢关键在于寻址的方式,磁盘分为顺序读写与随机读写,内存也一样分为顺序读写与随机读写。基于磁盘的随机读写确实很慢,但磁盘的顺序读写性能却很高,一般而言要高出磁盘随机读写三个数量级,一些情况下磁盘顺序读写性能甚至要高于内存随机读写。
2、Page Cache(页缓存)
为了优化读写性能,Kafka利用了操作系统本身的Page Cache,就是利用操作系统自身的内存而不是JVM空间内存。这样做的好处有:
(1)避免Object消耗:如果是使用Java堆,Java对象的内存消耗比较大,通常是所存储数据的两倍甚至更多。
(2)避免GC问题:随着JVM中数据不断增多,垃圾回收将会变得复杂与缓慢,使用系统缓存就不会存在GC问题。
3、零拷贝(sendfile)
零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数。通常是说在IO读写过程中。
4、总结
Kafka采用顺序读写、Page Cache、零拷贝以及分区分段等这些设计,再加上在索引方面做的优化,另外Kafka数据读写也是批量的而不是单条的,使得Kafka具有了高性能、高吞吐、低延时的特点。这样Kafka提供大容量的磁盘存储也变成了一种优点
Java的NIO提供了FileChannle,它的transferTo、transferFrom方法就是Zero Copy。
kafka整合flume
-
1、安装flume
-
2、添加flume的配置
-
vi flume-kafka.conf
-
#为我们的source channel sink起名
a1.sources = r1
a1.channels = c1
a1.sinks = k1#指定我们的source数据收集策略
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /kkb/install/flumeData/files
a1.sources.r1.inputCharset = utf-8#指定我们的source收集到的数据发送到哪个管道
a1.sources.r1.channels = c1#指定我们的channel为memory,即表示所有的数据都装进memory当中
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#指定我们的sink为kafka sink,并指定我们的sink从哪个channel当中读取数据
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = kaikeba
a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
3、创建topic
kafka-topics.sh --create --topic kaikeba --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181
4、启动flume
bin/flume-ng agent -n a1 -c conf -f conf/flume-kafka.conf -Dflume.root.logger=info,console
5、启动kafka控制台消费者,验证数据写入成功
kafka-console-consumer.sh --topic kaikeba --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning
6、测试数据
echo hadoop >test.txt
1、Kafka Eagle
-
下载Kafka Eagle安装包
-
Kafka Eagle
-
kafka-eagle-bin-1.2.3.tar.gz
-
-
-
2、解压
-
tar -zxvf kafka-eagle-bin-1.2.3.tar.gz -C /kkb/install
-
解压之后进入到kafka-eagle-bin-1.2.3目录中
-
得到kafka-eagle-web-1.2.3-bin.tar.gz
-
然后解压 tar -zxvf kafka-eagle-web-1.2.3-bin.tar.gz
-
重命名 mv kafka-eagle-web-1.2.3 kafka-eagle-web
-
-
-
3、修改配置文件
-
进入到conf目录
-
修改system-config.properties
-
-
# 填上你的kafka集群信息
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=node01:2181,node02:2181,node03:2181# kafka eagle页面访问端口
kafka.eagle.webui.port=8048# kafka sasl authenticate
kafka.eagle.sasl.enable=false
kafka.eagle.sasl.protocol=SASL_PLAINTEXT
kafka.eagle.sasl.mechanism=PLAIN
kafka.eagle.sasl.client=/kkb/install/kafka-eagle-bin-1.2.3/kafka-eagle-web/conf/kafka_client_jaas.conf# 添加刚刚导入的ke数据库配置,我这里使用的是mysql
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://node03:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
4、配置环境变量
-
vi /etc/profile
export KE_HOME=/kkb/install/kafka-eagle-bin-1.2.3/kafka-eagle-web
export PATH=$PATH:$KE_HOME/bin
source /etc/profile
-
5、启动kafka-eagle
-
进入到$KE_HOME/bin目录
-
执行脚本sh ke.sh start
-
-
-
6、访问地址
-
启动成功后在浏览器中输入
http://node01:8048/ke
就可以访问kafka eagle 了。-
用户名:admin
-
password:123456
-
-
如何保证kafka数据不丢失
-
Kafka是一种高吞吐量的分布式发布订阅消息系统。在使用过程中如果使用不当,经常会出现消息丢失的情况,这是业务系统不能容忍的,消息系统最重要的是保证数据不丢失。
-
三个维度来保证:消息发送端保证数据不丢失,kafka服务保证消息不丢失,消费者保证消息不丢失。
-
1、Producer 生产端保证数据不丢失
-
1、发送消息API使用
-
在生产中Kafka生产者的开发我们都会用异步调用的方式,异步调用方式有如下两个API:
-
(1)producer.send(msg) 不带回调方法
(2)producer.send(msg,callback) 带回调方法
记得要使用带有回调方法的API,我们可以根据回调函数得知消息是否发送成功,如果发送失败了我们要进行异常处理,比如存储到其他介质来保证消息不丢。
2、ack的配置策略
设置 acks = all或者-1。
生产者在发送消息之后,需要等待ISR中所有的副本都成功写入消息之后才能够收到来自服务端的成功响应,在配置环境相同的情况下此种配置可以达到最强的可靠性。即:在发送消息时,需要leader 向fllow 同步完数据之后,也就是ISR队列中所有的broker全部保存完这条消息后,才会向ack发送消息,表示发送成功。
3、retries参数设置
设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
在kafka中错误分为2种,一种是可恢复的,另一种是不可恢复的。
可恢复性的错误:
如遇到在leader的选举、网络的抖动等这些异常时,如果我们在这个时候配置的retries大于0的,也就是可以进行重试操作,那么等到leader选举完成后、网络稳定后,这些异常就会消息,错误也就可以恢复,数据再次重发时就会正常发送到broker端。需要注意retries(重试)之间的时间间隔,以确保在重试时可恢复性错误都已恢复。
不可恢复性的错误:
如:超过了发送消息的最大值(max.request.size)时,这种错误是不可恢复的,如果不做处理,那么数据就会丢失,因此我们需要注意在发生异常时把这些消息写入到DB、缓存本地文件中等等,把这些不成功的数据记录下来,等错误修复后,再把这些数据发送到broker端
Broker服务端保证数据不丢失
1、replication-factor 参数设置
这个参数设置的是partition副本的个数,如果我们要想保证数据不丢,这个副本数需要设置成大于1。
2、unclean.leader.election.enable 参数设置
是否允许从非ISR队列中选举leader副本,默认值是false,如果设置成true,则可能会造成数据丢失。
3、min.insync.replicas 参数设置
分区ISR队列集合中最少有多少个副本,默认值是1
这个参数要跟生产者里的acks参数配合使用,当生产者acks=-1时,服务端的ISR列表里的所有副本都写入成功,才会给生产者返回成功的响应。而min.insync.replicas这个参数就是控制ISR列表的,假设min.insync.replicas=1,这就意味着ISR列表里可以只有一个副本,这个副本就是leader replica,这个时候即使acks设置的是-1,但其实消息只发送到leader replica,以后就返回成功的响应了。
因为ISR只有一个副本,我们知道这种情况是有可能会丢数据的,所以min.insync.replicas这个值需要大于1的。 如果ISR列表里面副本的个数小于min.insync.replicas,生产者发送消息是失败的
确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
Consumer消费者保证数据不丢失
1、enable.auto.commit 参数设置
消费者是可以自动提交offset的,但是如果是自动提交offset,可能会丢数据
确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。
SparkStreaming
-
离散数据流或者DStream是SparkStreaming提供的基本抽象。其表现数据的连续流,这个输入数据流可以来自于源,也可以来自于转换输入流产生的已处理数据流。内部而言,一个DStream以一系列连续的RDDs所展现,这些RDD是Spark对于不变的,分布式数据集的抽象。一个DStream中的每个RDD都包含来自一定间隔的数据,如下图:
在DStream上使用的任何操作都会转换为针对底层RDD的操作。例如:之前那个将行的流转变为词流的例子中,flatMap操作应用于行DStream的每个RDD上 从而产生words DStream的RDD。如下图:
DStream算子操作
1、Transformations
-
实现把一个DStream转换生成一个新的DStream,延迟加载不会触发任务的执行
Transformation | Meaning |
---|---|
map(func) | 对DStream中的各个元素进行func函数操作,然后返回一个新的DStream |
flatMap(func) | 与map方法类似,只不过各个输入项可以被输出为零个或多个输出项 |
filter(func) | 过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream |
repartition(numPartitions) | 增加或减少DStream中的分区数,从而改变DStream的并行度 |
union(otherStream) | 将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream. |
count() | 通过对DStream中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream |
reduce(func) | 对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream. |
countByValue() | 对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数 |
reduceByKey(func, [numTasks]) | 利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream |
join(otherStream, [numTasks]) | 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W))类型的DStream |
cogroup(otherStream, [numTasks]) | 输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream |
transform(func) | 通过RDD-to-RDD函数作用于DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD |
updateStateByKey(func) | 根据key的之前状态值和key的新值,对key进行更新,返回一个新状态的DStream |
reduceByKeyAndWindow | 窗口函数操作,实现按照window窗口大小来进行计算 |
Output Operations
-
输出算子操作,触发任务的真正运行
Output Operation | Meaning |
---|---|
print() | 打印到控制台 |
saveAsTextFiles(prefix, [suffix]) | 保存流的内容为文本文件,文件名为"prefix-TIME_IN_MS[.suffix]". |
saveAsObjectFiles(prefix, [suffix]) | 保存流的内容为SequenceFile,文件名为 "prefix-TIME_IN_MS[.suffix]". |
saveAsHadoopFiles(prefix, [suffix]) | 保存流的内容为hadoop文件,文件名为 "prefix-TIME_IN_MS[.suffix]". |
foreachRDD(func) | 对Dstream里面的每个RDD执行func |
数据源
socket数据源
sparkStreaming实时接收socket数据,实现单词计数
安装socket服务
-
首先在linux服务器node01上用yum 安装nc工具,nc命令是netcat命令的简称,它是用来设置路由器。我们可以利用它向某个端口发送数据。
yum -y install nc
执行命令向指定的端口发送数据
nc -lk 9999
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.3.3</spark.version>
</properties><dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency><dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency></dependencies>
接收socket 数据源,实现单词统计
package com.kaikeba.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}/**
* sparkStreaming接受socket数据实现单词计数程序
*/
object SocketWordCount {def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)// todo: 1、创建SparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("TcpWordCount").setMaster("local[2]")// todo: 2、创建StreamingContext对象
val ssc = new StreamingContext(sparkConf,Seconds(2))//todo: 3、接受socket数据
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)//todo: 4、对数据进行处理
val result: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)//todo: 5、打印结果
result.print()//todo: 6、开启流式计算
ssc.start()
ssc.awaitTermination()
}
}
监听hdfs上的数据实现单词统计
package com.kaikeba.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}/**
* HDFS数据源
*/
object HdfsWordCount {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)// todo: 1、创建SparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster("local[2]")// todo: 2、创建StreamingContext对象
val ssc = new StreamingContext(sparkConf,Seconds(2))//todo: 3、监控hdfs目录数据
val textFileStream: DStream[String] = ssc.textFileStream("hdfs://node01:8020/data")
//todo: 4、对数据进行处理
val result: DStream[(String, Int)] = textFileStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)//todo: 5、打印结果
result.print()
//todo: 6、开启流式计算
ssc.start()
ssc.awaitTermination()
}
}
自定义数据源
/** * 自定义一个Receiver,这个Receiver从socket中接收数据 * 使用方式:nc -lk 8888 */ package sparkStreaming import java.io.{BufferedReader, InputStreamReader} import org.apache.log4j.{Level, Logger} import java.net.Socket import java.nio.charset.StandardCharsets import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.receiver.Receiver /** * 自定义数据源 */ object CustomReceiver { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) // todo: 1、创建SparkConf对象 val sparkConf: SparkConf = new SparkConf() .setAppName("CustomReceiver") .setMaster("local[2]") // todo: 2、创建StreamingContext对象 val ssc = new StreamingContext(sparkConf,Seconds(2)) //todo: 3、调用 receiverStream api,将自定义的Receiver传进去 val receiverStream = ssc.receiverStream(new CustomReceiver("node03",9999)) //todo: 4、对数据进行处理 val result: DStream[(String, Int)] = receiverStream .flatMap(_.split(" ")) .map((_,1)) .reduceByKey(_+_) //todo: 5、打印结果 result.print() //todo: 6、开启流式计算 ssc.start() ssc.awaitTermination() } } /** * 自定义source数据源 * @param host * @param port */ class CustomReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) with Logging{ override def onStart(): Unit ={ //启动一个线程,开始接受数据 new Thread("socket receiver"){ override def run(): Unit = { receive() } }.start() } /** Create a socket connection and receive data until receiver is stopped */ private def receive() { var socket: Socket = null var userInput: String = null try { logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) val reader = new BufferedReader( new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) userInput = reader.readLine() while(!isStopped && userInput != null) { store(userInput) userInput = reader.readLine() } reader.close() socket.close() logInfo("Stopped receiving") restart("Trying to connect again") } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) case t: Throwable => restart("Error receiving data", t) } } override def onStop(): Unit ={ } }
SparkStreaming 任务提交集群运行
开发wordcount程序,然后打包上传到集群,并打开任务运行界面,查看一下任务运行情况。
bin/spark-submit \
--master spark://node01:7077 \
--class com.kaikeba.streaming.Demo \
--executor-memory 1g \
--total-executor-cores 2 \
original-sparkStreamingStudy-1.0-SNAPSHOT.jar
Transformation高级算子
updateStateByKey
需求
-
sparkStreaming接受socket数据实现所有批次的单词次数累加
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}/**
* todo: 实现把所有批次的单词出现的次数累加
*/
object UpdateStateBykeyWordCount {def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)// todo: 1、创建SparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("TcpWordCount").setMaster("local[2]")// todo: 2、创建StreamingContext对象
val ssc = new StreamingContext(sparkConf,Seconds(2))//需要设置checkpoint目录,用于保存之前批次的结果数据,该目录一般指向hdfs路径
ssc.checkpoint("hdfs://node01:8020/ck")//todo: 3、接受socket数据
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)//todo: 4、对数据进行处理
val wordAndOneDstream: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_,1))val result: DStream[(String, Int)] = wordAndOneDstream.updateStateByKey(updateFunc)
//todo: 5、打印结果
result.print()//todo: 6、开启流式计算
ssc.start()
ssc.awaitTermination()}
//currentValue:当前批次中每一个单词出现的所有的1
//historyValues:之前批次中每个单词出现的总次数,Option类型表示存在或者不存在。 Some表示存在有值,None表示没有
def updateFunc(currentValue:Seq[Int], historyValues:Option[Int]):Option[Int] = {val newValue: Int = currentValue.sum + historyValues.getOrElse(0)
Some(newValue)
}
}
mapWithState
-
需求
-
sparkStreaming接受socket数据实现所有批次的单词次数累加
-
package com.kaikeba.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, MapWithStateDStream, ReceiverInputDStream}
import org.apache.spark.streaming._/**
* todo: mapWithState实现把所有批次的单词出现的次数累加
* --性能更好
*/
object MapWithStateWordCount {def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)// todo: 1、创建SparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("MapWithStateWordCount").setMaster("local[2]")// todo: 2、创建StreamingContext对象
val ssc = new StreamingContext(sparkConf,Seconds(2))val initRDD: RDD[(String, Int)] = ssc.sparkContext.parallelize((List(("hadoop",10),("spark",20))))
//需要设置checkpoint目录,用于保存之前批次的结果数据,该目录一般指向hdfs路径
ssc.checkpoint("hdfs://node01:8020/ck")//todo: 3、接受socket数据
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)//todo: 4、对数据进行处理
val wordAndOneDstream: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_,1))
val stateSpec=StateSpec.function((time:Time,key:String,currentValue:Option[Int],historyState:State[Int])=>{//当前批次结果与历史批次的结果累加
val sumValue: Int = currentValue.getOrElse(0)+ historyState.getOption().getOrElse(0)
val output=(key,sumValue)
if(!historyState.isTimingOut()){
historyState.update(sumValue)
}Some(output)
//给一个初始的结果initRDD
//timeout: 当一个key超过这个时间没有接收到数据的时候,这个key以及对应的状态会被移除掉
}).initialState(initRDD).timeout(Durations.seconds(5))
//todo: 使用mapWithState方法,实现累加
val result: MapWithStateDStream[String, Int, Int, (String, Int)] = wordAndOneDstream.mapWithState(stateSpec)//todo: 5、打印所有批次的结果数据
result.stateSnapshots().print()//todo: 6、开启流式计算
ssc.start()
ssc.awaitTermination()}
}
特点:
(1)若要清除某个key的状态,可在自定义的方法中调用state.remove()
(2)若要设置状态超时时间,可以调用StateSpec.function(mappingFunc).timeout()方法设置
(3)若要添加初始化的状态,可以调用StateSpec.function(mappingFunc).initialState(initialRDD)方法
(4)性能比updateStateByKey好
transform
-
需求
-
获取每一个批次中单词出现次数最多的前3位
-
-
代码开发
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}/**
* todo: 获取每一个批次中单词出现次数最多的前3位
*/
object TransformWordCount {def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
// todo: 1、创建SparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("TransformWordCount").setMaster("local[2]")// todo: 2、创建StreamingContext对象
val ssc = new StreamingContext(sparkConf,Seconds(2))//todo: 3、接受socket数据
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)//todo: 4、对数据进行处理
val result: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//todo: 5、将Dstream进行transform方法操作
val sortedDstream: DStream[(String, Int)] = result.transform(rdd => {
//对单词出现的次数进行排序
val sortedRDD: RDD[(String, Int)] = rdd.sortBy(_._2, false)val top3: Array[(String, Int)] = sortedRDD.take(3)
println("------------top3----------start")
top3.foreach(println)
println("------------top3------------end")
sortedRDD
})//todo: 6、打印该批次中所有单词按照次数降序的结果
sortedDstream.print()//todo: 7、开启流式计算
ssc.start()
ssc.awaitTermination()}
}
window
-
Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。
-
所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
-
窗口时长:计算内容的时间范围
-
滑动步长:每隔多久触发一次计算
-
-
注意:这两者都必须为采集周期大小的整数倍。
-
需求
-
2秒一个批次,实现每隔4秒统计6秒窗口的数据结果
-
-
代码开发
package com.kaikeba.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}/**
* todo: 2秒一个批次,实现每隔4秒统计6秒窗口的数据结果
*/
object ReduceByKeyAndWindowWordCount {def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)// todo: 1、创建SparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("ReduceByKeyAndWindowWordCount").setMaster("local[2]")// todo: 2、创建StreamingContext对象
val ssc = new StreamingContext(sparkConf,Seconds(2))//todo: 3、接受socket数据
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)//todo: 4、对数据进行处理
val result: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_,1))
//todo: 5、每隔4秒统计6秒的数据
/**
* 该方法需要三个参数:
* reduceFunc: (V, V) => V, ----------> 就是一个函数
* windowDuration: Duration, ----------> 窗口的大小(时间单位),该窗口会包含N个批次的数据
* slideDuration: Duration ----------> 滑动窗口的时间间隔,表示每隔多久计算一次
*/
val windowDStream: DStream[(String, Int)] = result.reduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(6),Seconds(4))//todo: 6、打印该批次中所有单词按照次数降序的结果
windowDStream.print()//todo: 7、开启流式计算
ssc.start()
ssc.awaitTermination()
}}
Output 算子
import java.sql.DriverManagerimport org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}/**
* todo: 将WordCount案例中得到的结果通过foreachRDD保存结果到mysql中
*/
object WordCountForeachRDD {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)// todo: 1、创建SparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("WordCountForeachRDD").setMaster("local[2]")// todo: 2、创建StreamingContext对象
val ssc = new StreamingContext(sparkConf,Seconds(2))//todo: 3、接受socket数据
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)//todo: 4、对数据进行处理
val result: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//todo: 5、保存结果到mysql表中//todo:方案一(有问题)
result.foreachRDD(rdd =>{
//注意这里创建的对象都是在Driver端
val conn = DriverManager.getConnection("jdbc:mysql://node03:3306/test", "root", "123456")
val statement = conn.prepareStatement(s"insert into wordcount(word, count) values (?, ?)")rdd.foreach { record =>
//这一块代码的执行是在executor端,需要进行网络传输,会出现task not serializable 异常
statement.setString(1, record._1)
statement.setInt(2, record._2)
statement.execute()
}
statement.close()
conn.close()
})//todo: 方案二
result.foreachRDD(rdd=>{
//遍历
rdd.foreach { record =>
val conn = DriverManager.getConnection("jdbc:mysql://node03:3306/test", "root", "123456")
val statement = conn.prepareStatement(s"insert into wordcount(word, count) values (?, ?)")
statement.setString(1, record._1)
statement.setInt(2, record._2)
statement.execute()statement.close()
conn.close()
}
})//todo: 方案三
result.foreachRDD(rdd=>{
rdd.foreachPartition( iter =>{
val conn = DriverManager.getConnection("jdbc:mysql://node03:3306/test", "root", "123456")
val statement = conn.prepareStatement(s"insert into wordcount(word, count) values (?, ?)")iter.foreach( record =>{
statement.setString(1, record._1)
statement.setInt(2, record._2)
statement.execute()})
statement.close()
conn.close()
})
})
//todo: 方案四
result.foreachRDD(rdd=>{
rdd.foreachPartition( iter =>{
val conn = DriverManager.getConnection("jdbc:mysql://node03:3306/test", "root", "123456")
val statement = conn.prepareStatement(s"insert into wordcount(word, count) values (?, ?)")
//关闭自动提交
conn.setAutoCommit(false);
iter.foreach( record =>{
statement.setString(1, record._1)
statement.setInt(2, record._2)
//添加到一个批次
statement.addBatch()})
//批量提交该分区所有数据
statement.executeBatch()
conn.commit()
// 关闭资源
statement.close()
conn.close()
})
})//todo: 6、开启流式计算
ssc.start()
ssc.awaitTermination()}
}
SparkStreaming消费kafka1.0的数据, 采用Direct直连方式
package sparkStreaming.kafka import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} object KafkaDirect10 { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) //1、创建StreamingContext对象 val sparkConf = new SparkConf() .setAppName("KafkaDirect10") .setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(2)) //2、使用direct接受kafka数据 //准备配置 val topic = Set("jhj") val kafkaParams = Map( "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092", "group.id" -> "KafkaDirect111", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "enable.auto.commit" -> "false" ) val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, //数据本地性策略 LocationStrategies.PreferConsistent, //指定要订阅的topic ConsumerStrategies.Subscribe[String, String](topic, kafkaParams) ) //3、对数据进行处理 //如果你想获取到消息消费的偏移,这里需要拿到最开始的这个Dstream进行操作 //如果你对该DStream进行了其他的转换之后,生成了新的DStream,新的DStream不在保存对应的消息的偏移量 // val kafkaStream = kafkaDStream.map(record => (record.key, record.value)).print() kafkaDStream.foreachRDD(rdd => { println("该运行") //获取消息内容 val dataRDD: RDD[String] = rdd.map(_.value()) // println("dataRDD"+dataRDD) //打印 dataRDD.foreach(line => { println("数据:" + line) }) //4、提交偏移量信息,把偏移量信息添加到kafka中 val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }) //5、开启流式计算 ssc.start() ssc.awaitTermination() } }