queue增加删除元素
- 增加元素
- add方法在添加元素的时候,若超出了度列的长度会直接抛出异常:
- put方法,若向队尾添加元素的时候发现队列已经满了会发生阻塞一直等待空间,以加入元素
- offer方法在添加元素时,如果发现队列已满无法添加的话,会直接返回false
- 删除元素
- poll: 若队列为空,返回null。
- remove:若队列为空,抛出NoSuchElementException异常。
- take:若队列为空,发生阻塞,等待有元素
BlockingQueue:
- 解决线程通信的问题
- 阻塞方法:put、take
其他实现类:
- ArrayBlockingQueue
- LinkedBlockingQueue
- PriorityBlockingQueue/ SynchronousQueue/ DelayQueue
BlockingQueue实例
package com.nowcoder.mycommunity;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueTests {
public static void main(String[] args) {
BlockingQueue queue = new ArrayBlockingQueue(10);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
class Producer implements Runnable{
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue){
this.queue = queue;
}
@Override
public void run() {
try {
for(int i = 0; i < 100; ++ i){
queue.put(i);
Thread.sleep(20);
System.out.println(Thread.currentThread().getName() + " producer" + queue.size());
}
}catch (Exception e){
e.printStackTrace();
}
}
}
class Consumer implements Runnable{
public BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue){
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
queue.take();
Thread.sleep(new Random().nextInt(1000));
System.out.println(Thread.currentThread().getName() + " consuer" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
kafka
- kafka是一个分布式的流媒体平台
- 主要应用:消息系统、日志收集、用户行为追踪、流式处理
- 特点:高吞吐量、消息持久化(存放在磁盘上,btw,磁盘顺序读写速度并不慢)、高可靠性、高扩展性
Broker
kafka的服务器,每一台服务器称为一个Broker
Zookeeper
管理其他集群,包括kafka的集群。可以单独下载
Topic/ Partition/ Offset
消息队列可能是一对多的形式,生产者将一条消息放在多个队列中,然后消费者从各自的队列中取消息。
下图为一个Topic,Topic中可能会含有很多Partition,Offset为Partition的索引
Leader Replica/ Follower Replica
kafka的数据不止存储一份,他会存为多份,即使某一个分区坏了还可以有备份。
leader Replica(祖副本):当尝试从分区获取数据时,祖副本可以处理请求,返回数据
Follower Replica(随从副本):只能备份,不能响应请求
如果祖副本挂掉,集群会从Follower Replica中选一个作为新的leader
kafka命令
官方文档
配置
进入到configure目录下,修改consumer.properties
使用
进入到kafka的目录中
// 启动zookeeper
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
// 启动kafka
> ./bin/kafka-server-start.sh config/server.properties
// --create:创建主题
// --bootstrap-server localhost:9092:在哪个服务器创建主题,kafka默认端口为9092
// --replication-factor 1:副本为1
// --partitions 1:分区为1
// --topic test:主题的名字
> ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Created topic test.
// 查看该服务器上的主题
> ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test
// 创建生产者向某个服务器的某个主题中发消息
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello
>world
// 创建一个消费者,读取某个服务器上某个主题下的消息队列,从头开始读取
> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello
world
Spring整合Kafka
引入依赖
pom.xml
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.7</version>
</dependency>
配置Kafka
- 配置server
- 配置consumer
# Kafka Properties
# 服务器地址
spring.kafka.bootstrap-servers==localhost:9092
#消费者id,可以在consumer.properties查看
spring.kafka.consumer.group.id=mycommunity-consumer-group
# 是否自动提交
spring.kafka.consumer.enable-auto-commit=true
# 自动提交的时间间隔,单位毫秒
spring.kafka.consumer.auto-commit-interval=3000
访问Kafka
- producer
- consumer
Spring整合Kafka的例子
package com.nowcoder.mycommunity;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = MyCommunityApplication.class)
public class KafkaTests {
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka(){
kafkaProducer.sendMessage("test", "hello");
kafkaProducer.sendMessage("test", "world");
try {
Thread.sleep(1000*10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Component
class KafkaProducer{
@Autowired
public KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, String content){
kafkaTemplate.send(topic, content);
}
}
@Component
class KafkaConsumer{
// 加上listener注解,Spring会自动注入
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record){
System.out.println(record.value());
}
}