PHP小白搭建Kafka环境以及初步使用rdkafka

news2025/1/12 6:49:18

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 前言
  • 一、安装java(Kafka必须安装java,因为kafka依赖java核心)
  • 二、安装以及配置Kafka、zookeeper
    • 1.下载Kafka(无需下载zookeeper,使用kafka自带的即可)
    • 2.配置topid
    • 3.安装PHP的rdkafka,这个网上教程很多,基本上都是正确的


前言

提示:windows环境安装失败,Linux环境安装成功(以下并没有windows安装示例)

一、安装java(Kafka必须安装java,因为kafka依赖java核心)

下载地址:链接: https://www.oracle.com/java/technologies/downloads/#jdk20-linux
在这里插入图片描述
将文件放在Linux目录中后进行解压:

假设我把[jdk-20_linux-x64_bin.tar.gz]包放在了/root/src/uap/web/third 目录下
1、tar -zxvf jdk-20_linux-x64_bin.tar.gz
2、mv jdk.0.20 ./jdk
3、vim /etc/profile 
 JAVA_HOME=/root/src/uap/web/third/jdk
 PATH=/root/src/uap/web/third/jdk/bin:$PATH
 export JAVA_HOME
4、source /ect/profile
5、java -version (出现下图极为成功)

在这里插入图片描述

二、安装以及配置Kafka、zookeeper

1.下载Kafka(无需下载zookeeper,使用kafka自带的即可)

下载地址:https://kafka.apache.org/downloads
提示:不要下载带src的那个,具体我也不知道,因为我也是个小白
在这里插入图片描述

假设我把[kafka_2.12-3.5.1.tgz]包放在了/root/src/uap/web/third 目录下
1、tar -zxvf kafka_2.12-3.5.1.tgz
2、mv kafka.2.12 ./kafka
3、创建kafka日志文件
 mkdir -p ./kafka_data/log/kafka
 mkdir -p ./kafka_data/log/zookeeper
 mkdir -p ./kafka_data/zookeeper
4、cd ./kafka/config
vim server.properties
 listeners=PLAINTEXT://localhost:9092 (34行左右,添加对应的host、port)
 broker.id=0
 port=9092
 host.name=192.168.1.241
 log.dirs=/root/src/uap/web/third/kafka_data/log/kafka
 zookeeper.connect=localhost:2181
wd
vim zookeeper.properties
 dataDir=/root/src/uap/web/third/kafka_data/zookeeper
 dataLogDir=/root/src/uap/web/third/kafka_data/log/zookeeper
 clientPort=2181
 maxClientCnxns=100
 tickTimes=2000
 initLimit=10
 syncLimit=5
wd
5、cd ../ 进入kafka目录下
#启动zookeeper
./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
//如果其中报错,大部分应该是报JAVA_HOME 这个说明你没有配置 /etc/profile 上面有
./bin/kafka-server-start.sh -daemon ./config/server.properties &

2.配置topid

代码如下(示例):

kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic myt
返回值:Created topic myt.  创建成功/否则失败

3.安装PHP的rdkafka,这个网上教程很多,基本上都是正确的

例如:阿里云开发者社区,php安装rdkafka教程
剩下逻辑就直接贴代码了

生产者:
public function producer(){
        $conf = new RdKafka\Conf();
        $conf->set('metadata.broker.list', 'localhost:9092');
        $producer = new RdKafka\Producer($conf);
        $topic = $producer->newTopic("mytest");
        //获取数据库数据,存入kafka中
        $wanchk = $this->db->query("SELECT * FROM hf_alarm_wanchk");
        foreach ($wanchk as $k => $v){
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, array2json($v));
            $producer->poll(0);
        }
        
        $result = $producer->flush(10000);
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException('Was unable to flush, messages might be lost!');
        }
        $producer->purge(RD_KAFKA_PURGE_F_QUEUE);
        $producer->flush(10000);
    }
消费者:
//这个代码需要使用终端运行:
// /bin/php -c /etc/php.ini  -f  /入口文件目录/index.php (类)consumer (方法)consumer
 public function consumer()
    {
        $conf = new \RdKafka\Conf();

        $conf->set('group.id', 'mytest');

        $rk = new \RdKafka\Consumer($conf);

        $rk->addBrokers("127.0.0.1");

        $topicConf = new \RdKafka\TopicConf();

        $topicConf->set('auto.commit.interval.ms', 100);

        $topicConf->set('offset.store.method', 'broker');

        $topicConf->set('auto.offset.reset', 'smallest');

        $topic = $rk->newTopic('mytest', $topicConf);

        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

        while (true) {
            $message = $topic->consume(0, 120 * 10000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    var_dump($message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "No more messages; will wait for more\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "Timed out\n";
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        }
    } 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/927567.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从零开始配置Jenkins与GitLab集成:一步步实现持续集成

在软件开发中,持续集成是确保高效协作和可靠交付的核心实践。以下是在CentOS上安装配置Jenkins与GitLab集成的详细步骤: 1.安装JDK 解压JDK安装包并设置环境变量: JDK下载网址 Java Downloads | Oracle 台灣 tar zxvf jdk-11.0.5_linux-x64_b…

在Linux中使用gcc/g++编译代码

gcc/g 1.方法速记2.具体过程2.1 预处理阶段2.2 编译阶段2.3 汇编阶段2.4链接阶段2.4.1 链接的细节 gcc和g的操作一样,g的方法就仅需把gcc换成g即可。 1.方法速记 直接编译语法:将text.c文件或者text.cpp文件直接编译成text文件。 gcc text.c -o text /…

微软 Visual Studio 现已内置 Markdown 编辑器,可直接修改预览 .md 文件

Visual Studio Code V1.66.0 中文版 大小:75.30 MB类别:文字处理 本地下载 Markdown 是一种轻量级标记语言,当开发者想要格式化代码但又不想牺牲易读性时,Markdown 是一个很好的解决方案,比如 GitHub 就使用 Markdo…

公司阿里云服务器被暴力破解

公司阿里云服务器被暴力破解? 公司云服务器跑了3年了,从来没改过密码,而且基本所有服务器密码都是同一个,只把公司IP添加白名单了。(确实不严谨,但至少限制了连接源) 突然就收到阿里云短信提醒…

ppt如何转pdf文档?用这个方法可将ppt转pdf

在现代社会中,PPT(幻灯片)已成为一种常见的演示工具,被广泛应用于学术、商务、培训等领域。然而,PPT文件的使用和分享存在一些问题,例如文件格式不兼容、内容修改易被篡改等。为了解决这些问题,将PPT转换为PDF格式已成…

【一文读懂】 Java并发 - 锁升级原理

要明白锁的原理,首先要知道对象头 Java对象头 在Java中,一个对象一般由两部分组成 :1、对象头 ; 2、对象的成员变量信息 在32位的虚拟机中: (1)普通对象的对象头长度64bit(8字节&…

Javascript 编写一个简单的聊天机器人

在本 Web 开发教程中,我们将了解如何使用 HTML、CSS 和 vanilla JavaScript 创建基本的聊天机器人。本练习侧重于 JS 基础知识,而不是任何类型的人工智能 (AI)。为了使该过程更简单,更易于学习,我没有使用任…

ffmpeg windows环境MinGW+msys2编译so库

一、安装MinGW 1.1、下载MinGW 1.2、下载完成后,会得到一个名为 mingw-get-setup.exe 的安装包,双击打开它,可以看到如下的对话框: 1.3、直接点击“Install”,进入下面的对话框 1.4、可根据自己操作系统的实际情况&am…

数据库MySQL中left join多个条件下的执行

1 基础表 创建表A 表B create table testA(id int, name varchar(10)); create table testB(id int, name varchar(10)); 2 插入数据 insert into testA values(1,zhangssa),(2,lisi),(3,wangwu) insert into testB values(2,zhangssa2),(3,lisi2),(4,wangwu4) 3 left joi…

WPF自定义命令及属性改变处理

1、项目建构 2、自定义命令 namespace WpfDemo.Base {public class MyCommand : ICommand{Action executeAction;public MyCommand(Action action){executeAction action;}public event EventHandler? CanExecuteChanged;public bool CanExecute(object? parameter){retu…

ssm会员管理系统源码和论文

ssm会员管理系统源码和论文062 开发工具:idea 数据库mysql5.7 数据库链接工具:navcat,小海豚等 技术:ssm 摘 要 随着科学技术的飞速发展,各行各业都在努力与现代先进技术接轨,通过科技手段提高自身的优势&#x…

36k字从Attention讲解Transformer及其在Vision中的应用(pytorch版)

文章目录 0.卷积操作1.注意力1.1 注意力概述(Attention)1.1.1 Encoder-Decoder1.1.2 查询、键和值1.1.3 注意力汇聚: Nadaraya-Watson 核回归1.2 注意力评分函数1.2.1 加性注意力1.2.2 缩放点积注意力1.3 自注意力(Self-Attention)1.3.1 自注意力的定义和计算1.3.2 自注意…

邀请函 | 区块链如何助力建设“健康中国”?ESG系列研讨会“医疗”专场来袭!

党的十九大报告指出,要全面实施健康中国战略,为人民群众提供全方位全周期健康服务。今年7月,国家卫生健康委等六部门联合印发了《深化医药卫生体制改革2023年下半年重点工作任务》,明确指出要开展全国医疗卫生机构信息互通共享三年…

基于 vue2 发布 npm包

背景:组件化开发需要,走了一遍发布npm包的过程,采用很简单的模式实现包的发布流程,记录如下。 项目参考:基于vue的时间播放器组件,并发布到npm_timeplay.js_xmy_wh的博客-CSDN博客 1、项目初始化 首先&a…

AKM10-58C大电流TVS二极管参数:58V 10000A

东沃(DOWO)AKM10-76C是什么二极管? 东沃生产AKM10-76C大电流TVS二极管吗?有现货吗? 除了AKM10-76C外,东沃(DOWO)生产的贴片大电流二极管还有哪些型号? …… AKM10-76C是厂…

VMware 新装 CentOS 7 连不上网络的【解决方法】

文章目录 1)虚拟机设置2)虚拟网络编辑器3)Linux 网卡设置4)检查网络状态参考资料: 安装好虚拟机之后,将来会在虚拟机内的系统中安装各种应用,如果虚拟机内的系统连不上网,则无从谈起…

PythonJS逆向解密——实现翻译软件+语音播报

前言 嗨喽,大家好呀~这里是爱看美女的茜茜呐 环境使用: python 3.8 pycharm 模块使用: requests --> pip install requests execjs --> pip install PyExecJS ttkbootstrap --> pip install ttkbootstrap pyttsx3 --> pip install pyttsx3 第三…

interview2-框架篇

一、Spring篇 1、Spring (1)Bean线程安全问题 不是线程安全的。Spring框架中有一个Scope注解,默认的值就是singleton,单例的。因为一般在spring的bean的中都是注入无状态的对象,没有线程安全问题,如果在b…

【Focal Loss】解决类别不平衡问题,增加对困难样本的挖掘

Focal Loss是在交叉熵损失函数的基础上增加了一个平衡因子 α \alpha α和一个聚焦因子 γ \gamma γ,分别用来调节不同类别样本的权重以及难分样本和易分样本之间的权重一个样本的交叉熵损失函数如下: p t p_t pt​表示将该样本分类为t的概率一个样本的…

8个好用的产品设计工具,收藏一下

产品设计在设计工作中是非常重要的一个环节,它是对产品的初步构思,能明确规定产品设计的细节。今天本文将为大家推荐8个好用的产品设计工具,不仅操作方便,而且简单好用,能充分提高工作效率,一起来看看吧&am…