【Kafka中间件】ubuntu 部署kafka,实现Django生产和消费

news2024/11/24 3:08:59

原文作者:我辈李想
版权声明:文章原创,转载时请务必加上原文超链接、作者信息和本声明。

文章目录

  • 前言
  • 一、Kafka安装
    • 1.下载并安装Java
    • 2.下载和解压 Kafka
    • 3.配置 Kafka
    • 4.启动 Kafka
    • 5.创建主题和生产者/消费者
    • 6.发布和订阅消息
  • 二、Kafka+Django生产和消费
    • 1.Django配置文件
    • 2.通过django命令实现消费
    • 3.通过Django生成


前言

Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。Kafka是一个分布式消息队列:生产者、消费者的功能。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。

一、Kafka安装

1.下载并安装Java

Kafka 是基于 Java 开发的,因此需要先安装 Java 环境。如果你已经安装了 Java 环境,可以跳过这一步。

在命令行中输入以下命令:

sudo apt-get update
# linux命令行下,安装jdk
sudo apt-get install openjdk-8-jdk
# 查看安装结果
java -version

2.下载和解压 Kafka

下载 Kafka 压缩包,并解压到 /opt 目录下。

cd ~
sudo wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz
sudo tar -xzf kafka_2.13-3.4.0.tgz
cd /opt
sudo mv cd ~/kafka_2.13-3.4.0 kafka

3.配置 Kafka

接下来需要更改 Kafka 的配置文件。打开 config/server.properties 文件,并进行以下更改:

sudo vim /opt/kafka/config/server.properties

advertised.listeners=PLAINTEXT://<your-server-IP-address>:9092
listeners=PLAINTEXT://0.0.0.0:9092

确保将 <your-server-IP-address> 替换为实际的服务器 IP 地址。(ifconfig查看本机ip)

4.启动 Kafka

Kafka 启动有两种方式:单机模式和分布式模式。

  • 单机模式

在单机模式下,Kafka 只有一个 broker。在命令行中输入以下命令:

cd /opt/kafka
sudo /opt/kafka/bin/zookeeper-server-start.sh  /opt/kafka/config/zookeeper.properties  # 在一个窗口,或后台运行
sudo /opt/kafka/bin/kafka-server-start.sh  /opt/kafka/config/server.properties  # 另起一个窗口,或后台运行
  • 分布式模式

在分布式模式中,Kafka 包含多个 broker。在命令行中输入以下命令:

首先,编辑 config/server.properties 文件,设置以下属性:

broker.id=0 # 设置当前 broker 的 id,不能重复
listeners=PLAINTEXT://your.server.ip.address:9092 # 设置监听地址和端口
log.dirs=/tmp/kafka-logs # 设置日志目录

复制 broker.id=0 的行,修改 broker.id 的值,设置多个 broker 的 id。

接下来,启动 ZooKeeper:

cd /opt/kafka
sudo /opt/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties

最后,每个 broker 启动 Kafka:

cd /opt/kafka
sudo /opt/kafka/bin/kafka-server-start.sh config/server.properties

5.创建主题和生产者/消费者

可以使用 Kafka 自带的命令行工具创建主题、发送消息和消费消息。

  • 创建主题
cd /opt/kafka
sudo /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server your.server.ip.address:9092 --replication-factor 1 --partitions 1 --topic test-topic
  • 查看
sudo /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  • 删除
sudo /opt/kafka/bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test-topic

6.发布和订阅消息

现在已经成功地部署了 Kafka,并创建了一个 topic。可以使用以下命令在 topic 中发布和订阅消息:

发布消息:# 另起一个窗口,或后台运行

sudo /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic 

订阅消息:# 另起一个窗口,或后台运行

sudo /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

以上是在 Ubuntu 上部署 Kafka 的基本步骤,你可以根据实际情况进行修改。

二、Kafka+Django生产和消费

confluent-kafka 和kafka-python是python中处理kafka的三方包,这里以confluent-kafka为例。

pip install confluent-kafka

1.Django配置文件

在settings.py中加入配置

KAFKA_SETTINGS = {
    'bootstrap.servers': 'localhost:9092', # localhost替换为kafka服务的ip
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest',
}

2.通过django命令实现消费

  1. kafka消费处理kafka_consumer.py文件
from confluent_kafka import Consumer, KafkaError
from django.conf import settings

def kafka_handler():
    c = Consumer(settings.KAFKA_SETTINGS)
    c.subscribe(['test-topic'])

    while True:
        msg = c.poll(1.0)
		print(111,msg)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print('End of partition reached')
            else:
                print('Error: {}'.format(msg.error()))
        else:
            print('Received message: {}'.format(msg.value()))

  1. 使用django的startapp新增kafka对app

     cd ptoject  #  manage.py的同级目录
     django-admin startapp kafka
     python manage.py startapp kafka
    
  2. 在django配置文件settings中添加kafka至INSTALLED_APPS

     INSTALLED_APPS = [
         'django.contrib.admin',
         'django.contrib.auth',
         'django.contrib.contenttypes',
         'django.contrib.sessions',
         'django.contrib.messages',
         'django.contrib.staticfiles',
         'kafka'
     ]
    
  3. 自定义django命令
    可以参看这个链接:https://www.osgeo.cn/django/howto/custom-management-commands.html

    在kafka下新建 management/commands二级文件夹
    在这里插入图片描述

其中kafka_consumer是第一步创建的消费处理程序,my_消费是我们自定义的django命令程序。my_消费.py的内容如下:

# -*- coding:utf-8 _*-
"""
@author:Administrator
@file: my_消费.py
@time: 2023/7/16 0016  12:32
"""
from django.core.management.base import BaseCommand, CommandError

from kafka_consumer import kafka_handler


class Command(BaseCommand):
    help = 'Closes the specified poll for voting'

    def handle(self, *args, **options):
        kafka_handler()
  1. 启动消费命令

切换至虚拟环境,目录切换至django项目目录,及mnange.py的同级目录。

	python manage.py my_消费

在这里插入图片描述

在这里插入图片描述

3.通过Django生成

在这里插入图片描述

  1. 创建kafka_producer.py生产文件
# -*- coding:utf-8 _*-
"""
@author:Administrator
@file: kafka_producer.py
@time: 2023/7/16 0016  12:28
"""
from confluent_kafka import Producer
from django.conf import settings


def send_message(message):
    p = Producer({'bootstrap.servers': settings.KAFKA_SETTINGS.get('bootstrap.servers')})
    topic = 'test-topic'
    p.produce(topic, message.encode('utf-8'))
    p.flush()


if __name__ == '__main__':
    send_message('测试')
  1. 通过django命令生产消息
    这里还是用的自定义命令,自定义文件为my_消费.py
# -*- coding:utf-8 _*-
"""
@author:Administrator
@file: my_消费.py
@time: 2023/7/16 0016  12:32
"""
from django.core.management.base import BaseCommand, CommandError

from kafka_producer import send_message


class Command(BaseCommand):
    help = 'Closes the specified poll for voting'

    def handle(self, *args, **options):
        send_message('111')

在这里插入图片描述
启动方式与消费一样,python manage.py my_生产

  1. 通过django程序生产消息

kafka_producer文件中的send_message方法可以在程序中被调用,我们在处理用户请求时,可以通过异步的方式处理send_message。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/761145.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

红黑树底层原理【白话版】

一、红黑树——特殊的平衡二叉搜索树 定义&#xff1a;红黑树是一种特殊的平衡二叉搜索树。我们用它来排列数据&#xff0c;并方便以后快速检索数据。 估计看到这句话&#xff0c;你就崩溃了&#xff0c;因为这话说了等于没说。 先观察这个图。 球要不是黑色&#xff0c;要不…

console的奇妙用法

console的奇妙用法 console.log是调试 JavaScript 代码的最佳方法之一。但是本文将介绍几个与console交互的更好方法。 在vscode或者的其他ide中输入console可以看到里边提供了非常多的方法。 虽然我们通常都是用console.log&#xff0c;但是使用其他可以使调试过程变得更加容…

分布式链路追踪

文章目录 1、背景2、微服务架构下的问题3、链路追踪4、核心概念5、技术选型对比6、zipkin 1、背景 随着互联网业务快速扩展&#xff0c;软件架构也日益变得复杂&#xff0c;为了适应海量用户高并发请求&#xff0c;系统中越来越多的组件开始走向分布式化&#xff0c;如单体架构…

流水灯——FPGA

文章目录 前言一、流水灯介绍二、系统设计1.模块框图2.RTL视图 三、源码四、效果五、总结六、参考资料 前言 环境&#xff1a; 1、Quartus18.0 2、vscode 3、板子型号&#xff1a;EP4CE6F17C8 要求&#xff1a; 每隔0.2s循环亮起LED灯 一、流水灯介绍 从LED0开始亮起到LED3又回…

如何定制自己的应用层协议?|面向字节流|字节流如何解决黏包问题?如何将字节流分成数据报?

前言 那么这里博主先安利一些干货满满的专栏了&#xff01; 首先是博主的高质量博客的汇总&#xff0c;这个专栏里面的博客&#xff0c;都是博主最最用心写的一部分&#xff0c;干货满满&#xff0c;希望对大家有帮助。 高质量干货博客汇总https://blog.csdn.net/yu_cblog/c…

基于ssm的社区生活超市的设计与实现

博主介绍&#xff1a;专注于Java技术领域和毕业项目实战。专注于计算机毕设开发、定制、文档编写指导等&#xff0c;对软件开发具有浓厚的兴趣&#xff0c;工作之余喜欢钻研技术&#xff0c;关注IT技术的发展趋势&#xff0c;感谢大家的关注与支持。 技术交流和部署相关看文章…

SpringBoot拦截器

一、SpringBoot拦截器介绍 Spring Boot中的拦截器是一种用于在处理请求之前或之后执行特定操作的组件。拦截器通常用于实现对请求进行预处理、日志记录、权限验证等功能。 在Spring Boot中&#xff0c;可以使用HandlerInterceptor接口来定义自己的拦截器&#xff0c;并通过配…

流水灯实现

文章目录 一、流水灯二、代码实现三、引脚分配 一、流水灯 流水灯指的是LED像水流一样点亮&#xff0c;即LED依次点亮但不立刻熄灭&#xff0c;等到4个LED都点亮后&#xff0c;再把所有灯一次性熄灭。 二、代码实现 module horse_led(input wire clk,input wire rst_n,output…

记录管理系统

简单的记录管理系统&#xff0c;适用于保存表格数据&#xff0c;可以用来替代Excel软件保存数据&#xff0c;提供可视化拖动组件用于自定义数据列&#xff0c;数据存到数据库&#xff0c;相比于Excel&#xff0c;更易保存&#xff0c;易搜索。 例如创建合同记录数据&#xff0…

【电子学会】2023年05月图形化四级 -- 计算圆的面积和周长

计算圆的面积和周长 编写程序计算圆的面积和周长。输入圆的半径&#xff0c;程序计算出圆的面积和周长&#xff0c;圆的面积等于3.14*半径*半径&#xff1b;圆的周长等于2*3.14*半径。 1. 准备工作 &#xff08;1&#xff09;保留舞台中的小猫角色和白色背景&#xff1b; 2…

MySQL数据表高级操作

一、克隆/复制数据表二、清空表&#xff0c;删除表内的所有数据删除小结 三、创建临时表四、MySQL中6种常见的约束1、外键的定义2、创建外键约束作用3、创建主表test44、创建从表test55、为主表test4添加一个主键约束。主键名建议以"PK_”开头。6、为从表test5表添加外键&…

Html利用Canvas绘制图形

今天接到粉丝私信&#xff0c;询问是否可以通过Canvas绘制一些图形&#xff0c;然后根据粉丝提供的模板图&#xff0c;通过Canvas进行模拟绘制&#xff0c;通过分析发现&#xff0c;图形虽然相对简单&#xff0c;但是如果不借助相应的软件&#xff0c;纯代码绘制还是稍微费些时…

机器学习:self supervised learning- Recent Advances in pre-trained language models

背景 Autoregressive Langeuage Models 不完整的句子&#xff0c;预测剩下的空的词语 sentence completion Transformer-based ALMs Masked language models-MLMs 预训练模型能将输入文本转成hidden feature representation 模型参数最开始是从预训练模型中拿到&#xf…

如何快速制作一个奶茶店小程序商城

如果你是一个奶茶店的老板&#xff0c;你可能会考虑开设一个小程序商城来增加销售渠道和提升品牌形象。那么&#xff0c;如何快速制作一个奶茶店小程序商城呢&#xff1f;下面我们将介绍一个简单的步骤供你参考。 首先&#xff0c;你需要登录乔拓云平台进入商城后台管理页面。在…

数据结构真题

数据结构真题 1. A. Bills of Paradise 线段树并查集四个操作&#xff1a; D x。标记大于等于 x 的第一个未标记的 a i a_i ai​&#xff1b;若没有&#xff0c;则不操作.F x。查询大于等于 x 的第一个未标记的 a i a_i ai​&#xff1b;若没有&#xff0c;则输出 1 0 12…

《UNUX环境高级编程》(9)进程关系

1、前言 2、终端登录 在早期的UNIX系统&#xff0c;用户用哑终端&#xff08;用硬连接到主机&#xff09;进行登录&#xff0c;因为连接到主机上的终端设备数是固定的&#xff0c;所以同时登录数也就有了已知的上限。 随着位映射图像终端的出现&#xff0c;开发出了窗口系统&…

数学分析:对偶映射

这个其实就是我们一致讨论的对偶映射&#xff0c;换了个马甲&#xff0c;差点认不出来了。本来是V->R 要变成U->R&#xff0c;就需要一个反向的V*->U*的映射。 注意这个式子&#xff0c;t属于U&#xff0c;phit转到了V&#xff0c;但是坐标也发生了变化&#xff0c;这…

2023西南赛区ciscn -- do you like read

Attack 打开后一个商城页面 在login as admin那里有个登录页面&#xff0c;账号admin&#xff0c;密码爆破即可得到admin123 也可以在book.php?bookisbn1进行sql注入得到密码&#xff0c;这里发现是没有注入waf的 登录进来是一个Book List的管理页面&#xff0c;同时在审计源…

【C语言】初阶指针(详细版)

&#x1f466;个人主页&#xff1a;Weraphael ✍&#x1f3fb;作者简介&#xff1a;目前正在回炉重造C语言&#xff08;2023暑假&#xff09; ✈️专栏&#xff1a;【C语言航路】 &#x1f40b; 希望大家多多支持&#xff0c;咱一起进步&#xff01;&#x1f601; 如果文章对你…

RSA原理

RSA的历史 RSA加密算法是一种非对称加密算法&#xff0c;在公开密钥加密和电子商业中被广泛使用。RSA是由罗纳德李维斯特&#xff08;Ron Rivest&#xff09;、阿迪萨莫尔&#xff08;Adi Shamir&#xff09;和伦纳德阿德曼&#xff08;Leonard Adleman&#xff09;在1977年一…