消息中间件Kafka(PHP版本)

news2025/1/13 10:14:47

        小编最近需要用到消息中间件,有需要要复习一下以前的东西,有需要的自取,强调一点,如果真的想了解透彻,一定要动手脑袋会了不代表就会写了

        Kafka是由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息

Kafka 的特性

高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
高伸缩性: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
持久性、可靠性: Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
容错性: 允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
高并发: 支持数千个客户端同时读写

Kafka 的使用场景

活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。
传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的。
度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
流式处理:流式处理是有一个能够提供多种应用程序的领域。
限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃。

消息:
Kafka中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。
批次:为了提高效率,消息会分批次写入Kafka,批次就代指的是一组消息。

主题:
消息的种类称为主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。

分区:
主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,
由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序

生产者:
向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。

消费者:
订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。

消费者群组:
生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,
消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。

偏移量:
偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。

broker:
一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。

broker 集群:
broker 是集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。
副本:Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。
重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

安装

下载地址

librdkafka获取地址:https://github.com/edenhill/librdkafka
kafka获取地址:https://github.com/arnaud-lb/php-rdkafka

安转java环境

下载地址:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
下载解压完成之后,设置系统变量:path(路径为:安装目录/bin)
设置环境变量:JAVA_HOME(路径为:安装目录\bin)

查看是否安装成功:java -version

Zookeeper安装

下载地址:https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.4/apache-zookeeper-3.6.4-bin.tar.gz
设置环境变量:path(路径为:安装目录\bin)
新建data文件夹,新建logs文件夹
config文件夹:zoo_sample.cfg  新复制一个:zoo.cfg
编辑zoo.cfg文件:
新增(配置路径【一定要配置\\,要不然不识别】:安装路径\\zookeeper\\apache-zookeeper-3.6.4-bin\\):
dataDir= 安装路径\zookeeper\apache-zookeeper-3.6.4-bin\data
dataLogDir=安装路径\zookeeper\apache-zookeeper-3.6.4-bin\log
audit.enable=truezookeeper/conf/zoo.cfg 参数详解

tickTime=2000:
        这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳,单位是毫秒
        
initLimit=10:
        这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒,10秒内要启动集群并出现leader和floower。
syncLimit=5:
        这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒,超出时间认为是死机。
dataDir:
        快照日志的存储路径
dataLogDir:
        事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
clientPort=12181:
        这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点

启动zkServer

启动目录:\bin\zkServer.sh

启动命令:zkServer.sh start

查找看到:binding to port .0.0.0.0/0.0.0.2181能表示成功

安装Scala

下载地址:https://downloads.lightbend.com/scala/2.11.12/scala-2.11.12.msi
安装:一键安装(一直next,直到完成)
配置环境变量(这个需要配置):安装目录/bin
判断是否安装完成:scala -version

Kafka安装

下载地址:https://kafka.apache.org/downloads
Kafka安装目录下新建目录logs
编辑config\server.properties文件
log.dirs=安装目录\\logs(注意双斜线,如果是cmd命令出现命令行太长,那就把Kafka安装安装在磁盘的最外面,D盘的最外层)
新增参数:listeners=PLAINTEXT://localhost:9092

启动

一定要先启动zookeeper(命令:zkServer)
然后启动kafka(命令(cmd进入Kafka安装目录):.\bin\windows\kafka-server-start.bat .\config\server.properties)
查找看到:from now on will use node localhost:9092
能表示成功(如果启动不了,删除logs文件夹下的文件)

操作:

创建topics(主题):

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test


查看主题:

kafka-topics.bat --bootstrap-server localhost:9092 --list

生产者:

cmd进入:安装目录\bin\windows
打开生产者:kafka-console-producer.bat --broker-list localhost:9092 --topic test

消费者:

cmd进入:安装目录\bin\windows
打开消费者:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

测试:

生产者发送消息,消费者订阅收到消息

注意:
1.一共打开四个窗口,1.zookeeper 2.kafka  3.生产者  4.消费者  (注意,这四个窗口不能关闭,要一直开着)
2.一定是生产者生产消息,消费者才会收到消息(注意,生产者和消费者的topics一定要是一样的,要不然收不到消息)

方法

getOutQlen方法

使用方法:$producer->getOutQLen();
作用:
1.用于获取生产者(Producer)内部队列中等待发送到Kafka broker的消息数量。
2.getOutQLen() 方法允许你查询这个内部队列中当前待发送的消息数量。通常用于监控和调试目的,帮助了解生产者的发送速率和队列积压情况
3.getOutQLen() 返回的是近似值,它可能在调用之间发生变化
输出数据:
int(0)


poll方法

使用方法:
while ($producer->getOutQLen() > 0) {
    $producer->poll(1);
}
作用:
用于从Kafka集群中拉取消息,当消费者调用poll()方法时,如果在规定的时间内没有收到任何消息,它会立即返回,并且没有任何消息被拉取到(轮询一次就相当于拉取一定时间段broker中可消费的数据)


 
flush方法

使用方法:$producer->flush(10000);
作用:将生产者内部缓冲区中的消息强制发送到Kafka broker的过程。


consumerstart方法

使用方法:$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
作用:在Kafka中,consumerstart方法是用于启动消费者线程并开始从Kafka集群中拉取消息的方法。


Consume方法

使用方法:$topic->consume(0, 120*10000);
作用:是指Kafka消费者从Kafka集群中读取消息的过程。首先需要从集群中先拉取数据


Purge方法

使用方法:$producer->purge(RD_KAFKA_PURGE_F_QUEUE);
作用:是指清除已完成或已过期的请求,以释放缓存资源。


initTransactions方法

作用:用于初始化一个事务。Kafka从0.11.0.0版本开始支持事务性生产者API,允许生产者将多个消息组合成一个事务,确保这些消息被原子性地写入Kafka。这意味着要么所有消息都成功写入,要么都不写入,保证了消息的一致性


beginTransaction方法

作用:用于开始一个新的生产者事务。Kafka从0.11.0.0版本开始支持事务性生产者API,它允许你将多个消息组合成一个事务,确保这些消息被原子性地写入Kafka。这意味着要么所有消息都成功写入,要么都不写入,这保证了消息的一致性


commitTransaction方法

作用:它用于提交一个事务。当你使用 Kafka 的事务性生产者 API 时,你可以将一系列的消息发送操作组合成一个原子性的事务。这意味着这些操作要么全部成功,要么全部失败,从而确保数据的一致性和顺序性


abortTransaction方法

作用:中止事务,当发送消息或提交事务过程中发生错误时使用

getMetadata方法

使用方法:$producer->getMetadata(false, $topic, 10*1000);

作用:
用于获取Kafka集群的元数据
获取数据包括:1.主题(topics)2.分区(partitions)3.副本(replicas)4.ISR(In-Sync Replicas)等信息。
通常,客户端库(如PHP的php-kafka)会在初始化时或需要时自动执行此操作,以便了解集群的状态和可用主题。


代码:

class Kafka extends CI_Controller {

    //定义变量(分区)
    private $borker_list = "";
    //定义变量(配置)
    private $conf = "";
    //定义变量(主题)
    private $topics = "";
    //定义变量(分组)
    private $topics_group = "";

    //构造
    public function __construct(){
        parent::__construct();
        //初始化数据
        $this->borker_list = "localhost:9092";
        $this->topics = "test";
        $this->topics_group = "test-group";
    }

    //消息生产者
    public function producter(){
        //初始化
        $conf=  new RdKafka\Conf();
        //设置分区
        $conf->set('metadata.broker.list', $this->borker_list);
        //初始化生产者
        $producer = new RdKafka\Producer($conf);
        //设置主题
        $topic = $producer->newTopic($this->topics);
        //产生信息
        for ($i = 0; $i < 10; $i++) {
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
            $producer->poll(0);
        }
        //消息刷新
        for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
            $result = $producer->flush(10000);
            if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
                break;
            }
        }
        //刷新结果
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException('Was unable to flush, messages might be lost!');
        }
    }

    //消息订阅者
    public function consumer(){
        set_time_limit(0);
        //初始化
        $conf = new RdKafka\Conf();
        //设置分区
        $conf->set('metadata.broker.list', $this->borker_list);
        $conf->set('group.id',$this->topics_group);
        //初始化消费者
        $rk = new RdKafka\Consumer($conf);

        //主题配置
        $topicConf = new RdKafka\TopicConf();
        $topicConf->set('auto.commit.interval.ms', 100);
        $topicConf->set('offset.store.method', 'file');
        $topicConf->set('offset.store.path', sys_get_temp_dir());
        $topicConf->set('auto.offset.reset', 'smallest');

        $topic = $rk->newTopic($this->topics, $topicConf);
        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

        while (true) {
            $message = $topic->consume(0, 120*10000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    //没有错误打印信息
                    var_dump($message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "等待接收信息\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "超时\n";
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        }
    }

    //获取元数据(包括主题(topics)、分区(partitions)、副本(replicas)和ISR(In-Sync Replicas)等信息)
    public function gettest(){
        //初始化
        $conf=  new RdKafka\Conf();
        //设置分区
        $conf->set('metadata.broker.list', $this->borker_list);
        //初始化生产者
        $producer = new RdKafka\Producer($conf);
        $topic = $producer->newTopic($this->topics);
//        $result = $producer->getMetadata(false, $topic, 10*1000);
        $result = $producer->getOutQLen();
        var_dump($result);die;
    }


    //获取元数据
    public function metadata(){
        $conf = new RdKafka\Conf();
        $conf->setDrMsgCb(function ($kafka, $message) {
            file_put_contents("./xx.log", var_export($message, true), FILE_APPEND);
        });
        $conf->setErrorCb(function ($kafka, $err, $reason) {
            printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
        });

        $conf->set('group.id', 'myConsumerGroup');

        $rk = new RdKafka\Consumer($conf);
        $rk->addBrokers("127.0.0.1");

        $allInfo = $rk->getMetadata(true, NULL, 60e3);
        $topics = $allInfo->getTopics();

        //循环输出
        foreach ($topics as $topic) {
            $topicName = $topic->getTopic();
            if ($topicName == "__consumer_offsets") {
                continue ;
            }
            $partitions = $topic->getPartitions();
            foreach ($partitions as $partition) {
                $topPartition = new RdKafka\TopicPartition($topicName, $partition->getId());
                echo  "当前的话题:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - ";
                echo  "offset:" . ($topPartition->getOffset()) . PHP_EOL;
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1668928.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

py黑帽子学习笔记_环境准备

1 下载os装os 下载一个kali虚机镜像然后用虚机管理软件创虚机&#xff0c;装完如下图&#xff0c;我用的版本是2024.1的版本kali-linux-2024.1-installer-amd64&#xff0c;可以从镜像站下载&#xff0c;官网下的慢还断网Index of /kali-images/kali-2024.1/ | 清华大学开源软…

AI 问答 API 对接说明

我们知道&#xff0c;市面上一些问答 API 的对接还是相对没那么容易的&#xff0c;比如说 OpenAI 的 Chat Completions API&#xff0c;它有一个 messages 字段&#xff0c;如果要完成连续对话&#xff0c;需要我们把所有的上下文历史全部传递&#xff0c;同时还需要处理 Token…

47岁古天乐唯一承认女友约「御用阿妈」过母亲节

日前关宝慧在IG晒出一张聚会照&#xff0c;并写道&#xff1a;「预祝各位#母亲节快乐&#x1f339;#dinner #happy #friends #好味」相中所见&#xff0c;前TVB金牌监制潘嘉德、卢宛茵、黄&#x28948;莹、黎萨达姆都有出席饭局。 当中黄&#x28948;莹身穿卡其色西装褛&…

【35分钟掌握金融风控策略24】定额策略实战

目录 基于客户风险评级的定额策略 确定托底额度和盖帽额度 确定基础额度 基于客户风险评级确定风险系数 计算最终授信额度 确定授信有效期 基于客户风险评级的定额策略 在开发定额策略时&#xff0c;精准确定客户的基础额度是一个关键步骤&#xff0c;通常会基于客户的收…

整体安全设计

人员和资产的安全是当今许多组织的最高优先事项之一。随着暴力事件在美国各地盛行——枪击事件、袭击、内乱等——建筑物业主必须为其建筑物及其居住者的安全做好计划。 为了创造一个安全的环境&#xff0c;新设施或园区的安全设计必须超越基本的摄像头和访问控制设备&#xf…

纯血鸿蒙APP实战开发——首页下拉进入二楼效果案例

介绍 本示例主要介绍了利用position和onTouch来实现首页下拉进入二楼、二楼上划进入首页的效果场景&#xff0c;利用translate和opacity实现动效的移动和缩放&#xff0c;并将界面沉浸式&#xff08;全屏&#xff09;显示。 效果图预览 使用说明 向下滑动首页页面超过触发距…

【Linux】centos7安装软件(rpm、yum、编译安装),补充:查找命令的相关文件路径,yum安装mysql

【Linux】技术上&#xff0c;Linux是内核。而术语上&#xff0c;我们通常说的Linux是完整的操作系统&#xff0c;其实称为"Linux发行版"&#xff0c;是将Linux内核和应用系统打包&#xff0c;由不同的发行家族发行了不同版本。Linux发行版众多&#xff0c;主要有RedH…

HCIP-Datacom-ARST自选题库_07_割接【35道题】

一、单选题 1.在割接的测试阶段&#xff0c;符合以下哪一种情况的可以判断为割接成功? 网络承载的上层应用业务测试正常 网络设备的配置查看结果正常 网络流量路径正常 路由协议运行正常 2.在割接的测试阶段中&#xff0c;表明已经完成测试的标准是: IP设备的配置查看结…

org.postgresql.util.PSQLException: 错误: 关系 “dual“ 不存在

springboot 项目连接 postgreps&#xff0c;启动时报错 org.postgresql.util.PSQLException: 错误: 关系 "dual" 不存在。 查阅资料后发现这是由配置文件中的配置 datasource-dynamic-druid-validationQuery 导致的 spring:datasource:druid:stat-view-servlet:ena…

二叉树介绍

引入 定义 区别 定义不同 形态不同 基本形态

Ubuntu18.04解决有线网卡连接问题(不更新内核成功版)

https://www.realtek.com/Download/List?cate_id584 &#xff08;需要翻一下&#xff09; 不想自己去下载&#xff0c;直接去我资源里下载我上传的包就好啦(&#x1f602;&#x1f602;&#x1f602;刚刚看了下别人下载要VIP还是自己去网站下很快的) 下载后解压&#xff0c;在…

FreeRTOS二值信号量

目录 一、信号量的概念 1、信号量的基本概念 2、信号量的分类 二、二值信号量简介 三、二值信号量相关API 1、创建二值信号量 2、释放二值信号量 3、获取二值信号量 四、二值信号量实操 1、实验需求 2、CubeMX配置 3、代码实现 一、信号量的概念 1、信号量的基本概…

从零开始的软件测试学习之旅(七)接口测试流程及原则案例

接口测试三要素及案例 接口测试介绍接口预定义接口测试的主要作用测试接口流程如下接口测试三要素接口测试分类RESTful架构风格RESTful架构三要素要素一要素二要素三 RESTful架构风格实现restful架构案例接口测试流程接口测试原则功能测试自动化测性能测试 复习复盘 接口测试介…

MYSQL:MySQL 事务隔离级别详解

一、MySQL事务是什么&#xff1f; MySQL事务是一组在数据库中执行的操作&#xff0c;这些操作要么全部成功执行&#xff0c;要么全部不执行&#xff0c;以确保数据库的完整性和一致性。 事务的 ACID 事务具有四个特征&#xff1a;原子性&#xff08; Atomicity &#xff09;、…

COX回归特征筛选

任务&#xff1a;利用cox筛选出P值小于0.05的特征 数据的格式第一列为标签&#xff0c;第二列为时间&#xff0c;第三列及后为特征 先想一想&#xff0c;想好了再更新 这里我们先举一个例子&#xff1a; import pandas as pd from lifelines import CoxPHFitter# 创建示例数…

【随笔】Git 高级篇 -- 远程跟踪分支 git checkout -b | branch -u(三十五)

&#x1f48c; 所属专栏&#xff1a;【Git】 &#x1f600; 作  者&#xff1a;我是夜阑的狗&#x1f436; &#x1f680; 个人简介&#xff1a;一个正在努力学技术的CV工程师&#xff0c;专注基础和实战分享 &#xff0c;欢迎咨询&#xff01; &#x1f496; 欢迎大…

Github 2024-05-12 开源项目日报 Top10

根据Github Trendings的统计,今日(2024-05-12统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量TypeScript项目5Python项目2非开发语言项目2Vue项目1Rust项目1AFFiNE: 下一代知识库 创建周期:649 天开发语言:TypeScript协议类型:OtherSta…

2024最新最全【NMAP】零基础入门到精通

一、Nmap介绍 Nmap(Network Mapper&#xff0c;网络映射器)是一款开放源代码的网络探测和安全审核工具。它被设计用来快速扫描大型网络&#xff0c;包括主机探测与发现、开放的端口情况、操作系统与应用服务指纹识别、WAF识别及常见安全漏洞。它的图形化界面是Zenmap&#xff…

AJAX前端与后端交互技术知识点以及案例

Promise promise对象用于表示一个异步操作的最终完成&#xff08;或失败&#xff09;及其结果值 好处&#xff1a; 逻辑更清晰了解axios函数内部运作机制成功和失败状态&#xff0c;可以关联对应处理程序能解决回调函数地狱问题 /*** 目标&#xff1a;使用Promise管理异步任…

基于JAVA的微信小程序二手车交易平台(源码)

博主介绍&#xff1a;✌程序员徐师兄、8年大厂程序员经历。全网粉丝15w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…