java阻塞队列/kafka

news2025/1/8 5:29:56

queue增加删除元素

  • 增加元素
    • add方法在添加元素的时候,若超出了度列的长度会直接抛出异常:
    • put方法,若向队尾添加元素的时候发现队列已经满了会发生阻塞一直等待空间,以加入元素
    • offer方法在添加元素时,如果发现队列已满无法添加的话,会直接返回false
  • 删除元素
    • poll: 若队列为空,返回null。
    • remove:若队列为空,抛出NoSuchElementException异常。
    • take:若队列为空,发生阻塞,等待有元素

BlockingQueue:

  • 解决线程通信的问题
  • 阻塞方法:put、take

其他实现类:

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue/ SynchronousQueue/ DelayQueue

BlockingQueue实例

package com.nowcoder.mycommunity;

import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTests {
    public static void main(String[] args) {
        BlockingQueue queue = new ArrayBlockingQueue(10);
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }
}

class Producer implements Runnable{

    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for(int i = 0; i < 100; ++ i){
                queue.put(i);
                Thread.sleep(20);
                System.out.println(Thread.currentThread().getName() + "   producer" + queue.size());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable{

    public BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                queue.take();
                Thread.sleep(new Random().nextInt(1000));
                System.out.println(Thread.currentThread().getName() + "   consuer" + queue.size());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

kafka

  • kafka是一个分布式的流媒体平台
  • 主要应用:消息系统、日志收集、用户行为追踪、流式处理
  • 特点:高吞吐量、消息持久化(存放在磁盘上,btw,磁盘顺序读写速度并不慢)、高可靠性、高扩展性

Broker

kafka的服务器,每一台服务器称为一个Broker

Zookeeper

管理其他集群,包括kafka的集群。可以单独下载

Topic/ Partition/ Offset

消息队列可能是一对多的形式,生产者将一条消息放在多个队列中,然后消费者从各自的队列中取消息。
下图为一个Topic,Topic中可能会含有很多Partition,Offset为Partition的索引
在这里插入图片描述

Leader Replica/ Follower Replica

kafka的数据不止存储一份,他会存为多份,即使某一个分区坏了还可以有备份。
leader Replica(祖副本):当尝试从分区获取数据时,祖副本可以处理请求,返回数据
Follower Replica(随从副本):只能备份,不能响应请求
如果祖副本挂掉,集群会从Follower Replica中选一个作为新的leader

kafka命令

官方文档

使用

进入到kafka的目录中

// 启动zookeeper
> ./bin/zookeeper-server-start.sh config/zookeeper.properties 

// 启动kafka
> ./bin/kafka-server-start.sh config/server.properties 

// --create:创建主题
// --bootstrap-server localhost:9092:在哪个服务器创建主题,kafka默认端口为9092
// --replication-factor 1:副本为1
// --partitions 1:分区为1
// --topic test:主题的名字
> ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Created topic test.

// 查看该服务器上的主题
> ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092            
test

// 创建生产者向某个服务器的某个主题中发消息
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test        
>hello
>world

// 创建一个消费者,读取某个服务器上某个主题下的消息队列,从头开始读取
> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello
world

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/709285.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PCL点云处理之沿着法向量投影点云到平面 (一百九十四)

即使你很爱她,也要保留一些尊严 PCL点云处理之沿着法向量投影点云到平面 (一百九十四) 一、算法介绍二、具体实现1.代码(详细注释)2.效果一、算法介绍 实现这样的目标:沿着法向量,将点云投影到一个平面上 具体步骤: 1计算整个点云的法向量,来确定平面方向 2选择某一点…

IP编址数据转发

目录 一、IP编址 1.1、二进制、十进制和十六进制 1.2、进制之间的转换 1.3、IP编址 1.4、子网掩码 1.5、二进制和十进制转换 1.6、IP地址分类 1.7、IP地址类型 1.8、地址规划 二、VLSM与CIDR 2.1、有类IP编址的缺陷 2.2、变长子网掩码 VLSM 2.3、缺省情况下的掩码 …

红色通信史(四):永不消逝的电波

年龄稍大一点的读者&#xff0c;相信对下面这张剧照并不会感到陌生。 没错&#xff0c;这张经典的剧照&#xff0c;来自曾经家喻户晓的一部红色电影——《永不消逝的电波》。 这部电影上映于1958年&#xff0c;由八一电影制片厂摄制&#xff0c;王苹执导&#xff0c;孙道临、袁…

Qt和MySQL的连接

具体视频的教程 视频教程 我的文章是做一下补充的 流程&#xff1a; 第一点你要确保你的数据库的位数&#xff0c;如果你数据库位数的是32位&#xff0c;mingw编译的时候就需要选择的是32位**&#xff08;在下面会提到&#xff09;** 去到所在的文件 点击.pro文件进行编译…

ModaHub魔搭社区:为什么选择Zilliz Cloud?

目录 Zilliz Cloud是什么&#xff1f; 为什么选择Zilliz Cloud&#xff1f; 基于Milvus构建&#xff0c;针对性能进行优化 弹性和可扩展 按需付费 多云支持&#xff08;AWS、GCP&#xff09; 云原生的可靠性 企业安全和治理 Zilliz Cloud基于Milvus的云原生服务 Zilliz…

深蓝学院C++基础与深度解析笔记 第 8 章 动态内存管理

第 8 章 动态内存管理 1. 动态内存基础 ● 栈内存 V.S. 堆内存 – 栈内存的特点&#xff1a;更好的局部性&#xff0c;用于语言的固有类型&#xff0c;对象自动销毁,由低到高开辟 – 堆内存的特点&#xff1a;运行期动态扩展&#xff0c;需要显式释放&#xff0c;由高到低开辟…

使用 TailwindCSS 中的 color-mix() 构建自定义调色板

在这篇文章中&#xff0c;我们将了解如何使用 CSS 函数color-mix()和 CSS 变量&#xff0c;通过 TailwindCSS 高效地为 Nuxt 应用程序生成自定义调色板。 先决条件 最好使用以下命令设置 Nuxt 应用程序&#xff1a; npx nuxi init tailwindcss-color-mix 在安装提示期间选择 …

新项目即将启动!小灰做个市场调研

熟悉小灰的小伙伴们都知道&#xff0c;在2019年初&#xff0c;做了整整10年程序员的小灰离开职场&#xff0c;成为了一名自由职业者。 2021年末&#xff0c;小灰注册了自己的公司&#xff0c;名为北京小灰大黄科技有限公司。 公司虽然注册了&#xff0c;但是整个公司只有小灰一…

【C2】文件,时间,多线程,动静态库

文章目录 1.文件&#xff1a;fprint/fgets/fwrite/fread&#xff0c;ftell/rewind/fseek/fflush1.1 文本文件&#xff1a;FILE结构体1.2 二进制文件&#xff1a;没有行概念1.3 文件定位&#xff1a;linux下文本文件模式和二进制文件模式没有区别。fgets和fprintf以行方式读写文…

【Flutter】Flutter 国际化入门 使用 intl 包 格式化日期

文章目录 一、 前言二、 版本信息三、 什么是 intl 包四、 如何安装和使用 intl 包1. 安装 intl 包2. 使用 intl 包进行基本的日期和数字格式化3. 使用 intl 包进行消息翻译 五、 一个简单的使用示例六、 总结 一、 前言 在全球化的今天&#xff0c;为你的 Flutter 应用添加国…

快速上手MATLAB图像处理:100种项目全覆盖

本教程涵盖了MATLAB图像处理的广泛内容。我们学习了图像读取、显示和保存,图像的基本操作(如缩放、裁剪、旋转和翻转),以及图像的基本增强(如亮度调整、对比度调整和颜色空间转换)。本教程还介绍了常见的图像滤波技术(如均值滤波、中值滤波和高斯滤波),图像的直方图均…

JAVA临时文件的使用

目录 什么是临时文件&#xff1f; 临时文件在编程中有各种妙用 java在缓存目录创建临时文件的方式 1 按照指定文件名随机数字共同作为文件名创建 2 按照指定文件名创建 3 通过获取临时文件夹的真实路径 什么是临时文件&#xff1f; 临时文件是在计算机系统中用于临时存储数…

Spring Cloud - Gateway统一网关、断言工厂、过滤器工厂、全局过滤器、跨域问题

目录 一、什么是网关&#xff1f;为什么选择 Gateway? 二、Gateway 网关 2.1、搭建网关服务 1.创建新的module&#xff0c;引入SpringCloudGateway的依赖和nacos的服务发现依赖 2.编写nacos地址和路由配置 2.2、路由断言工厂PredicateFactory 2.3、路由过滤器 GatewayF…

2015年全国硕士研究生入学统一考试管理类专业学位联考逻辑试题——纯享题目版

&#x1f3e0;个人主页&#xff1a;fo安方的博客✨ &#x1f482;个人简历&#xff1a;大家好&#xff0c;我是fo安方&#xff0c;考取过HCIE Cloud Computing、CCIE Security、CISP、RHCE、CCNP RS、PEST 3等证书。&#x1f433; &#x1f495;兴趣爱好&#xff1a;b站天天刷&…

如何删除Git仓库中的敏感文件及其历史记录

本文主要介绍如何使用 git filter-branch 命令删除 Git 仓库中的敏感文件及其历史记录。在 Git 中&#xff0c;我们通常会将敏感信息(如密码、私钥等)存储在 .gitignore 文件中&#xff0c;以防止这些信息被意外提交到仓库。有时候&#xff0c;因为疏忽或私有仓库转公开仓库&am…

SQL15 查看学校名称中含北京的用户

SELECT device_id,age,university FROM user_profile WHERE university LIKE %北京%下划线 代表匹配任意一个字符&#xff1b; % &#xff1a;百分号 代表匹配0个或多个字符&#xff1b; []: 中括号 代表匹配其中的任意一个字符&#xff1b; [^]: ^尖冒号 代表 非&#xff0c;取…

CRM系统通过哪三步增加销售团队协作?

销售团队的协作是企业成功的重要保障。协调一致的销售团队能够提升销售效率&#xff0c;提高销售转化&#xff0c;获得更多业绩收入。那么企业要如何增加销售团队的协作&#xff1f;可以用CRM销售管理系统。 CRM系统如何增加销售团队协作&#xff1a; 1、建立统一的客户数据库…

SAP ABAP 如果某字段没有参数ID,如自开发程序使用的自建表 新建参数ID

1&#xff09;新建参数ID sm30 TPARA 维护 输入ID和描述 2&#xff09; 参数ID和Se11数据元素 绑定

【EasyX】扫雷

目录 扫雷1. 主体功能描述2、主要实现步骤3、效果图 扫雷 本博客介绍利用EasyX加上图片、音乐素材实现一个传统的扫雷小游戏。 1. 主体功能描述 1、全局变量&#xff1a;时间、地图、图片资源、状态&#xff1b; 2、绘图初始化函数drawinit&#xff1a;载入图片资源&#xf…