【Kafka】Ubuntu 部署kafka中间件,实现Django生产和消费

news2025/2/4 7:03:49

原文作者:我辈李想
版权声明:文章原创,转载时请务必加上原文超链接、作者信息和本声明。

文章目录

  • 前言
  • 一、Kafka安装
    • 1.下载并安装Java
    • 2.下载和解压 Kafka
    • 3.配置 Kafka
    • 4.启动 Kafka
    • 5.创建主题和生产者/消费者
    • 6.发布和订阅消息
  • 二、Kafka+Django生产和消费
    • 1.Django配置文件
    • 2.通过django命令实现消费
    • 3.通过Django生产


前言

Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。Kafka是一个分布式消息队列:生产者、消费者的功能。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。

一、Kafka安装

1.下载并安装Java

Kafka 是基于 Java 开发的,因此需要先安装 Java 环境。如果你已经安装了 Java 环境,可以跳过这一步。

在命令行中输入以下命令:

sudo apt-get update
# linux命令行下,安装jdk
sudo apt-get install openjdk-8-jdk
# 查看安装结果
java -version

2.下载和解压 Kafka

下载 Kafka 压缩包,并解压到 /opt 目录下。

cd ~
sudo wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz
sudo tar -xzf kafka_2.13-3.4.0.tgz
cd /opt
sudo mv cd ~/kafka_2.13-3.4.0 kafka

3.配置 Kafka

接下来需要更改 Kafka 的配置文件。打开 config/server.properties 文件,并进行以下更改:

sudo vim /opt/kafka/config/server.properties

advertised.listeners=PLAINTEXT://<your-server-IP-address>:9092
listeners=PLAINTEXT://0.0.0.0:9092

确保将 <your-server-IP-address> 替换为实际的服务器 IP 地址。(ifconfig查看本机ip)

4.启动 Kafka

Kafka 启动有两种方式:单机模式和分布式模式。

  • 单机模式

在单机模式下,Kafka 只有一个 broker。在命令行中输入以下命令:

cd /opt/kafka
sudo /opt/kafka/bin/zookeeper-server-start.sh  /opt/kafka/config/zookeeper.properties  # 在一个窗口,或后台运行
sudo /opt/kafka/bin/kafka-server-start.sh  /opt/kafka/config/server.properties  # 另起一个窗口,或后台运行
  • 分布式模式

在分布式模式中,Kafka 包含多个 broker。在命令行中输入以下命令:

首先,编辑 config/server.properties 文件,设置以下属性:

broker.id=0 # 设置当前 broker 的 id,不能重复
listeners=PLAINTEXT://your.server.ip.address:9092 # 设置监听地址和端口
log.dirs=/tmp/kafka-logs # 设置日志目录

复制 broker.id=0 的行,修改 broker.id 的值,设置多个 broker 的 id。

接下来,启动 ZooKeeper:

cd /opt/kafka
sudo /opt/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties

最后,每个 broker 启动 Kafka:

cd /opt/kafka
sudo /opt/kafka/bin/kafka-server-start.sh config/server.properties

5.创建主题和生产者/消费者

可以使用 Kafka 自带的命令行工具创建主题、发送消息和消费消息。

  • 创建主题
cd /opt/kafka
sudo /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server your.server.ip.address:9092 --replication-factor 1 --partitions 1 --topic test-topic
  • 查看
sudo /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  • 删除
sudo /opt/kafka/bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test-topic

6.发布和订阅消息

现在已经成功地部署了 Kafka,并创建了一个 topic。可以使用以下命令在 topic 中发布和订阅消息:

发布消息:# 另起一个窗口

sudo /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic 

订阅消息:# 另起一个窗口

sudo /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

以上是在 Ubuntu 上部署 Kafka 的基本步骤,你可以根据实际情况进行修改。

二、Kafka+Django生产和消费

confluent-kafka 和kafka-python是python中处理kafka的三方包,这里以confluent-kafka为例。

pip install confluent-kafka

1.Django配置文件

在settings.py中加入配置

KAFKA_SETTINGS = {
    'bootstrap.servers': 'localhost:9092', # localhost替换为kafka服务的ip
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest',
}

2.通过django命令实现消费

  1. kafka消费处理kafka_consumer.py文件
from confluent_kafka import Consumer, KafkaError
from django.conf import settings

def kafka_handler():
    c = Consumer(settings.KAFKA_SETTINGS)
    c.subscribe(['test-topic'])

    while True:
        msg = c.poll(1.0)
		print(111,msg)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print('End of partition reached')
            else:
                print('Error: {}'.format(msg.error()))
        else:
            print('Received message: {}'.format(msg.value()))

  1. 使用django的startapp新增kafka对app

     cd ptoject  #  manage.py的同级目录
     django-admin startapp kafka
     python manage.py startapp kafka
    
  2. 在django配置文件settings中添加kafka至INSTALLED_APPS

     INSTALLED_APPS = [
         'django.contrib.admin',
         'django.contrib.auth',
         'django.contrib.contenttypes',
         'django.contrib.sessions',
         'django.contrib.messages',
         'django.contrib.staticfiles',
         'kafka'
     ]
    
  3. 自定义django命令
    可以参看这个链接:https://www.osgeo.cn/django/howto/custom-management-commands.html

    在kafka下新建 management/commands二级文件夹
    在这里插入图片描述

其中kafka_consumer是第一步创建的消费处理程序,my_消费是我们自定义的django命令程序。my_消费.py的内容如下:

# -*- coding:utf-8 _*-
"""
@author:Administrator
@file: my_消费.py
@time: 2023/7/16 0016  12:32
"""
from django.core.management.base import BaseCommand, CommandError

from kafka_consumer import kafka_handler


class Command(BaseCommand):
    help = 'Closes the specified poll for voting'

    def handle(self, *args, **options):
        kafka_handler()
  1. 启动消费命令

切换至虚拟环境,目录切换至django项目目录,及mnange.py的同级目录。

	python manage.py my_消费

在这里插入图片描述

在这里插入图片描述

3.通过Django生产

在这里插入图片描述

  1. 创建kafka_producer.py生产文件
# -*- coding:utf-8 _*-
"""
@author:Administrator
@file: kafka_producer.py
@time: 2023/7/16 0016  12:28
"""
from confluent_kafka import Producer
from django.conf import settings


def send_message(message):
    p = Producer({'bootstrap.servers': settings.KAFKA_SETTINGS.get('bootstrap.servers')})
    topic = 'test-topic'
    p.produce(topic, message.encode('utf-8'))
    p.flush()


if __name__ == '__main__':
    send_message('测试')
  1. 通过django命令生产消息
    这里还是用的自定义命令,自定义文件为my_消费.py
# -*- coding:utf-8 _*-
"""
@author:Administrator
@file: my_消费.py
@time: 2023/7/16 0016  12:32
"""
from django.core.management.base import BaseCommand, CommandError

from kafka_producer import send_message


class Command(BaseCommand):
    help = 'Closes the specified poll for voting'

    def handle(self, *args, **options):
        send_message('111')

在这里插入图片描述
启动方式与消费一样,python manage.py my_生产

  1. 通过django程序生产消息

kafka_producer文件中的send_message方法可以在程序中被调用,我们在处理用户请求时,可以通过异步的方式处理send_message。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/761955.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AIS报文解析

!AIVDM,1,1,A,13u?etPv2;0n:dDPwUM1U1Cb069D,023* 我们知道消息内容就是13u?etPv2;0n:dDPwUM1U1Cb069D&#xff0c;这是一串ASCII码字符串&#xff0c;我们可以获取其对应的ASCII码数值。 但是在AIS的编码表不需要这么多符号&#xff0c;所以截取了其中一部分&#xff0c;如…

良心推荐!5款支持Linux系统的国产软件,兼容国产操作系统

虽然市面上大多数用户使用的是Windows操作系统&#xff0c;但也有不少使用Linux系统的用户&#xff0c;特别是国产操作系统的崛起&#xff0c;让Linux系统阵营的用户越来越多。Linux不像Windows那样&#xff0c;有着完整的生态环境丰富的软件应用&#xff0c;但也逐渐在完善中&…

探秘Session跨页面传递数据的神奇力量

探秘Session跨页面传递数据的神奇力量 前言一、什么是 Session 会话?二、如何创建 Session 和获取(id 号,是否为新)三、Session 域数据的存取四、Session 生命周期控制五、Session的销毁五、浏览器和 Session 之间关联的技术内幕 前言 本博主将用CSDN记录软件开发求学之路上亲…

自定义类型详解(结构体、枚举、联合)

目录 一、结构体 1.1结构体的认识&#xff1a; 1.2结构体的声明 1.先声明结构体类型&#xff0c;再定义该类型的变量 2.在声明类型的同时定义 1.3结构体的特殊声明 1.4结构体的自引用 1.5结构体的初始化和访问 1.6结构体内存对齐 1.7修改默认对齐数 1.8结构体传参 二…

驱动程序和应用程序

驱动程序和应用程序 一、应用程序和驱动程序如何关联起来的 1、文件描述符fp 与 struct file 应用程序&#xff08;APP&#xff09;在打开文件时&#xff0c;可以得到一个整数&#xff0c;这个整数被称为文件句柄。对于 APP 的每一个文件句柄&#xff0c;在内核里面都有一个…

AI辅助瞄准系统开发与实战(三)-竣工

文章目录 前言GUI功能整合提示框功能整合 总体代码自定义线程池YoloDectect工具类窗口绘制鼠标控制控制器GUI界面 总结 前言 okey&#xff0c;大概经过&#xff0c;两天的开发&#xff0c;我在这里完成了基本的全部开发。 那么我们先来看看大概的效果吧&#xff1a; 在这里的…

Vue3通透教程【十八】TS为组件的props标注类型

文章目录 &#x1f31f; 写在前面&#x1f31f; 回顾defineProps的基础写法&#x1f31f; defineProps的TS写法&#x1f31f; withDefaults方法&#x1f31f; 拓展&#x1f31f; 写在最后 &#x1f31f; 写在前面 专栏介绍&#xff1a; 凉哥作为 Vue 的忠实 粉丝输出过大量的 …

解决 Error:java: Compilation failed: internal java compiler error

编译失败 因为maven工程项目的 多个model 模块的jdk版本不同 改成一样的就可以了

mysql进阶 —— 主从复制和读写分离

前言 在这篇文章中荔枝会梳理MySQL中有关主从复制和读写分离的相关知识点&#xff0c;主要包括基本的概念、配置搭建、命令和模式选择等几个方面。MySQL主从复制和读写分离属于MySQL数据库学习中的高阶内容了&#xff0c;大家要和荔枝一起加油学习噢~~~希望能帮助到需要的小伙伴…

【机密计算标准】GB/T 41388-2022 可信执行环境基础安全规范

1 范围 本文件确立了可信执行环境系统整体技术架构&#xff0c;描述了可信执行环境基础要求、可信虚拟化系统、可信操作系统、可信应用与服务管理、跨平台应用中间件等主要内容及其测试评价方法。 2 规范性引用文件 下列文件中的内容通过文中的规范性引用面构成本文件必不…

MFC 基于数据库的管理系统

文章目录 初始化设置菜单 添加数据库类创建数据库配置数据库 全部代码 初始化 创建文件选择基于CListView 初始化数据 public:CListCtrl& m_list;CSQLView::CSQLView() noexcept:m_list(GetListCtrl()) {// TODO: 在此处添加构造代码}void CSQLView::OnInitialUpdate() {C…

RK3588 CPU GPU NPU 定频和性能模式设置方法以及温度监控

一. CPU定频 1. RK3588的cpu是4个A554个A76&#xff0c;分为3组单独管理&#xff0c;节点分别是&#xff1a; /sys/devices/system/cpu/cpufreq/policy0&#xff08;对应四个A55:CPU0-CPU3&#xff09; /sys/devices/system/cpu/cpufreq/policy4&#xff08;对应2个A76:CPU4-…

【Java从0到1学习】01 Java 概述

1. Java概述 Java 是由 Sun Microsystems 公司于 1995 年 5 月推出的 Java 面向对象程序设计语言和 Java 平台的总称。由 James Gosling和同事们共同研发&#xff0c;并在 1995 年正式推出。 后来 Sun 公司被 Oracle &#xff08;甲骨文&#xff09;公司收购&#xff0c;Java…

微信小程序基于Promise封装发起网络请求

1.创建一个request.js // 相当于域名 const baseURL ***************; // 暴露一个request函数 export function request(parms) {// 路径拼接const url baseURL parms.url;// 请求体&#xff0c;默认为{}const data parms.data || {};// 请求方式&#xff0c;默认为GETco…

<Babel> 前端语言的巴别塔

Babel中文站点&#xff1a;https://www.babeljs.cn/ Babel外文站点&#xff1a;https://babeljs.io/ 什么是Babel Babel取自人类语言最早的传说&#xff0c;Tower of Babel。 上帝摧毁了巴别塔上说着共同语言的我们&#xff0c;又被Babel重新带了回来。 如果说巴别塔是人们对混…

windows命令行运行mysql

1.运行命令&#xff1a;mysql -u用户名 -p密码 2.创建数据库&#xff1a; create 数据库名称 &#xff1b; 3.use 数据库 4.show tables &#xff1b; 5.创建用户&#xff0c;分配权限 6.退出 ctrlc 7.切换用户

海外app在谷歌和苹果商店中该如何设置关键词

主导应用商店搜索结果的方法就是关键词的设置。我们需要寻找关键词&#xff0c;跟踪关键词排名并监控其应用的性能&#xff0c;这样就能大大的提高应用的可见度。 优先考虑可推动Android或ios应用自然下载量的关键词&#xff0c;使用搜索量指标了解某个关键词在应用商店中的搜…

数字孪生很火嘛?是做什么的?

数字孪生是一种新兴的技术概念&#xff0c;将现实世界与数字世界紧密结合&#xff0c;通过数字化的方式模拟、仿真和预测真实世界的物理实体、过程和系统。它是物理实体与其数字化的虚拟模型之间的一种互联关系&#xff0c;旨在实现对现实世界的全面感知和实时监测。 数字孪生的…

骑行健身,生活和工作压力的避风港

在忙碌的现代生活中&#xff0c;每个人都面临着种种生活压力和工作压力。而自行车运动&#xff0c;正是一种理想的压力释放方式。它不仅能帮助我们保持身体健康&#xff0c;更能丰富我们的生活&#xff0c;让我们在自然中寻找宁静。 首先&#xff0c;骑自行车是一种全身性的运动…

BigTable:一个针对结构化数据的分布式存储系统----论文摘要

目录 摘要 1. 介绍 2. 数据模型 行 列族 时间戳 3. API 4. 所需构件 5. 实现 5.1 Tablet的位置 5.2 Tablet分配 5.3 Tablet服务 5.4 压实&#xff08;Compactions&#xff09; 6. 优化 本地化分组 压缩(compression) 通过缓存提高读操作的性能 Bloom过滤器 C…