java阻塞队列/kafka/spring整合kafka

news2025/1/20 18:22:33

queue增加删除元素

  • 增加元素
    • add方法在添加元素的时候,若超出了度列的长度会直接抛出异常:
    • put方法,若向队尾添加元素的时候发现队列已经满了会发生阻塞一直等待空间,以加入元素
    • offer方法在添加元素时,如果发现队列已满无法添加的话,会直接返回false
  • 删除元素
    • poll: 若队列为空,返回null。
    • remove:若队列为空,抛出NoSuchElementException异常。
    • take:若队列为空,发生阻塞,等待有元素

BlockingQueue:

  • 解决线程通信的问题
  • 阻塞方法:put、take

其他实现类:

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue/ SynchronousQueue/ DelayQueue

BlockingQueue实例

package com.nowcoder.mycommunity;

import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTests {
    public static void main(String[] args) {
        BlockingQueue queue = new ArrayBlockingQueue(10);
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }
}

class Producer implements Runnable{

    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for(int i = 0; i < 100; ++ i){
                queue.put(i);
                Thread.sleep(20);
                System.out.println(Thread.currentThread().getName() + "   producer" + queue.size());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable{

    public BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                queue.take();
                Thread.sleep(new Random().nextInt(1000));
                System.out.println(Thread.currentThread().getName() + "   consuer" + queue.size());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

kafka

  • kafka是一个分布式的流媒体平台
  • 主要应用:消息系统、日志收集、用户行为追踪、流式处理
  • 特点:高吞吐量、消息持久化(存放在磁盘上,btw,磁盘顺序读写速度并不慢)、高可靠性、高扩展性

Broker

kafka的服务器,每一台服务器称为一个Broker

Zookeeper

管理其他集群,包括kafka的集群。可以单独下载

Topic/ Partition/ Offset

消息队列可能是一对多的形式,生产者将一条消息放在多个队列中,然后消费者从各自的队列中取消息。
下图为一个Topic,Topic中可能会含有很多Partition,Offset为Partition的索引
在这里插入图片描述

Leader Replica/ Follower Replica

kafka的数据不止存储一份,他会存为多份,即使某一个分区坏了还可以有备份。
leader Replica(祖副本):当尝试从分区获取数据时,祖副本可以处理请求,返回数据
Follower Replica(随从副本):只能备份,不能响应请求
如果祖副本挂掉,集群会从Follower Replica中选一个作为新的leader

kafka命令

官方文档

配置

进入到configure目录下,修改consumer.properties

使用

进入到kafka的目录中

// 启动zookeeper
> ./bin/zookeeper-server-start.sh config/zookeeper.properties 

// 启动kafka
> ./bin/kafka-server-start.sh config/server.properties 

// --create:创建主题
// --bootstrap-server localhost:9092:在哪个服务器创建主题,kafka默认端口为9092
// --replication-factor 1:副本为1
// --partitions 1:分区为1
// --topic test:主题的名字
> ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Created topic test.

// 查看该服务器上的主题
> ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092            
test

// 创建生产者向某个服务器的某个主题中发消息
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test        
>hello
>world

// 创建一个消费者,读取某个服务器上某个主题下的消息队列,从头开始读取
> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello
world

Spring整合Kafka

引入依赖

pom.xml

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.0.7</version>
</dependency>

配置Kafka

  • 配置server
  • 配置consumer
# Kafka Properties
# 服务器地址
spring.kafka.bootstrap-servers==localhost:9092
#消费者id,可以在consumer.properties查看
spring.kafka.consumer.group.id=mycommunity-consumer-group
# 是否自动提交
spring.kafka.consumer.enable-auto-commit=true
# 自动提交的时间间隔,单位毫秒
spring.kafka.consumer.auto-commit-interval=3000

访问Kafka

  • producer
  • consumer

Spring整合Kafka的例子

package com.nowcoder.mycommunity;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = MyCommunityApplication.class)
public class KafkaTests {

    @Autowired
    private KafkaProducer kafkaProducer;

    @Test
    public void testKafka(){
        kafkaProducer.sendMessage("test", "hello");
        kafkaProducer.sendMessage("test", "world");

        try {
            Thread.sleep(1000*10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

@Component
class KafkaProducer{

    @Autowired
    public KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String content){
        kafkaTemplate.send(topic, content);
    }
}

@Component
class KafkaConsumer{

	// 加上listener注解,Spring会自动注入
    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record){
        System.out.println(record.value());
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/718930.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

FPGA实验三:状态机的设计

目录 一、实验目的 二、实验要求 三、实验代码 1.design source文件部分代码 2.测试文件代码 四、实验结果及分析 1、引脚锁定 2、仿真波形及分析 &#xff08;1&#xff09;设计好序列检测器 &#xff08;2&#xff09;仿真波形&#xff08;检测11010&#xff09; 3…

Linux-vim与gdb与make/makefile

三个模式&#xff1a;命令模式 文本模式 底行模式 yum :instell 安装 remove 卸载 gcc -o执行后生成文件命名 gcc 1.c -o fst.out -E预编译 -S汇编 -c生成机器码 Linux 中 静态库&#xff1a;.a&#xff1b;动态库&#xff1a;.so Linux默认动态库&#xff0c;…

Redis的安装,启动,关闭

一&#xff0c;redis安装linux 1&#xff0c;安装gcc环境 yum -y install gcc-c2,上传压缩包到/usr/soft目录&#xff0c;并解压 cd /soft tar -xvf redis-3.2.11.tar.gz3&#xff0c;进入redis-5.0.7目录&#xff0c;使用make命令编译redis [rootlocalhost soft]# cd re…

【DBA课程-笔记】第1章:MongoDB数据库入门

一、MongoDB 概览及新特性 1. MongoDB 简介 目前最流行的NoSQL数据库&#xff08;NO.1&#xff09;MongoDB是一个基于分布式文件存储的数据库&#xff0c;由C语言编写&#xff0c;特点是高性能、易部署、易使用、存储数据非常方便&#xff0c;旨在为Web应用提供可扩展的高性能…

企业该如何防止数据泄漏问题

关键词&#xff1a;企业网盘、知识文档管理系统、群晖NAS、数据安全 根据Verizon《2022 数据泄露调查报告》显示&#xff0c;2022年数据泄露事件中82%的违规行为涉及人为因素&#xff0c;勒索软件泄露事件增加了13%&#xff0c;超过过去五年的总和&#xff0c;数据安全已变成关…

【JUC-7】ReentrantLock (可重入锁)基础

ReentrantLock (可重入锁) ReentrantLock实现了Lock接口, 内部通过继承AQS, 实现了一个同步器. 可以通过同步器来创建Condition条件变量, 可以用作容器, 存放不同条件的等待线程. 说明ReentrantLock与AQS的关系 类图: 相对于synchronized, 都支持可重入. 它还具备如下特点: …

【算法练习】24:凯撒密码

一、凯撒密码介绍&#xff1a; 采用替换的方式对英文字母进行处理&#xff0c;将每一个英文字符循环替换为字母表序列中该字符的后面的第三个字符&#xff0c;即循环右移3位。 明文字母表&#xff1a;ABCDEFGHIJKLMNOPQRSTUVWXYZ 密文字母表&#xff1a;DEFGHIJKLMNOPQRSTUV…

微信小程序如何读取本地云存储txt数据,避免乱码

第一步 找到你的txt文件&#xff0c;重命名为json文件 第二步 上传到云存储中&#xff0c;获取File ID 第三步 编写js代码 相关技术文档&#xff1a; https://developers.weixin.qq.com/miniprogram/dev/api/file/FileSystemManager.readFile.html onShow(){wx.cloud.d…

《Redis 核心技术与实战》课程学习笔记(三)

高性能 IO 模型&#xff1a;为什么单线程 Redis 能那么快&#xff1f; Redis 是单线程&#xff0c;主要是指 Redis 的网络 IO 和键值对读写是由一个线程来完成的&#xff0c;这也是 Redis 对外提供键值存储服务的主要流程。但 Redis 的其他功能&#xff0c;比如持久化、异步删…

CDS Core Data Services S4 CDS view--2

7.2 怎么加注释 首先要看懂注释&#xff0c;comparefilter 一般都是true,这样在association 里的join只被验证一次&#xff0c;如果是FALSE就会不停的被验证。 preservekey, 验证和数据库表的key是否一致。 authorizationcheck, 需要验证权限。不过我们没有设access control…

STM32F1 GPIO 简介

GPIO 是控制或者采集外部器件的信息的外设&#xff0c;即负责输入输出。它按组分配&#xff0c;每组 16 个 IO 口&#xff0c;组数视芯片而定。STM32F103ZET6 芯片是 144 脚的芯片&#xff0c;具有 GPIOA、GPIOB、GPIOC、 GPIOD、GPIOE、GPIOF 和 GPIOG 七组 GPIO 口&#xff0…

13---罗马数字转整数

罗马数字包含以下七种字符: I&#xff0c; V&#xff0c; X&#xff0c; L&#xff0c;C&#xff0c;D 和 M。 字符 数值 I 1 V 5 X 10 L 50 C 100 D 500 M 1000 例如&#xf…

【DBA课程-笔记】MongoDB入门到云上开发

课程目的&#xff1a;成为专业MongoDB的DBA nosql第一&#xff1a;MongoDB 一、讲师&#xff1a; 二、课程目录 第1章&#xff1a;MongoDB数据库入门 第2章&#xff1a;MongoDB数据数据查询与分析 第3章&#xff1a;MongoDB数据库核心知识 第4章&#xff1a;MongoDB数据库管…

时间序列预测 | Matlab基于粒子群算法优化门控循环单元(PSO-GRU)的时间序列预测,PSO-GRU时间序列预测,单列数据集

文章目录 效果一览文章概述部分源码参考资料效果一览 文章概述 时间序列预测 | Matlab基于粒子群算法优化门控循环单元(PSO-GRU)的时间序列预测,PSO-GRU时间序列预测,单列数据集。 优化参数为学习率,隐藏层节点个数,正则化参数,要求2020b及以上版本&#

Redhat7.6安装mysql5.7

环境准备&#xff1a;硬盘剩余空间最少8G,内存剩余最少2G Mysql官网下载地址&#xff1a;https://dev.mysql.com/downloads/mysql/5.7.html 在Mysql官网下载列表中选择需要安装的版本: RedHat7.6安装MySQL5.7 安装之前&#xff0c;先要保证系统环境是干净的&#xff0c;不能存…

Dual In-Line Package(双列直插式封装)和Pin Grid Array Package(针栅阵列插入式封装)

DIP封装示意图 1.Dual In-Line Package&#xff08;双列直插式封装&#xff09; DIP的详细介绍&#xff1a; 1.封装形式&#xff1a;DIP是一种插件式封装&#xff0c;它由一个狭长的塑料或陶瓷封装体组成&#xff0c;具有在两侧排列的引脚。引脚通常是分布均匀的&#xff0c…

17. 订单金额趋势分析

文章目录 题目需求思路一实现一实现二&#xff1a;使用 over(range)学习链接题目来源 题目需求 查询截止每天的最近3天内的订单金额总和以及订单金额日平均值&#xff0c;保留两位小数&#xff0c;四舍五入。 最近三天 的业务逻辑通常是基于当天往前推2天 期望结果如下&#x…

Kepware.KEPServer安装

1.1 Kepware.KEPServer安装 1.1.1 解压并安装 首先解压并安装KEPServerEX v4.500.465.zip,右键点击KEPServer执行文件进行安装,如图2-2-14所示, 图2-2-14 2) 运行KEPServer安装文件之后出现如图2-2-15所示:点击Next继续。 图2-2-15 3) 选择I accept the tems of the lice…

Keras-5-深度学习用于文本和序列-处理文本数据

深度学习用于文本和序列 说明: 本篇学习记录为&#xff1a;《Python 深度学习》第6章第1节&#xff08;处理文本数据&#xff09; 知识点: 深度学习处理文本或序列数据的基本方法是&#xff1a;循环神经网络 (recurrent neural network) 和 一维卷积神经网络 (1D convert)&…

Python中怎样用索引和切片取出字符串片段?

Python 语言为字符串中的元素编号&#xff0c;以实现对字符串中的单个字符或字符片段的索引。按照不同的方向&#xff0c;索引分为正向索引和逆向索引。假设字符串的长度为L&#xff0c;正向索引中字符串的字符编号从左至右由0递增为L-1&#xff0c;逆向索引中字符串的字符编号…