WSL下的Kafka开发容器:Docker搭建、API、整合

news2025/2/28 23:19:41

背景介绍

Kafka是一个分布式流处理平台,可以处理大规模数据流并支持实时数据流的处理。

本文介绍了如何在WSL下使用Docker搭建Kafka容器,并使用Python的kafka-python库和FastAPI框架实现了一个简单的API。同时,还将该服务整合到一个整体的docker-compose中。文章详细介绍了Docker网络、Kafka环境变量配置、Python连接Kafka的方法以及API的开发。

实验环境

WSL2 Ubuntu18.04 | Docker

⚙️容器配置与搭建

镜像选择

bitnami/kafka镜像

在这里插入图片描述

Bitnami是一个提供开发、部署和管理应用程序的软件公司。Bitnami提供了Kafka的Docker镜像,并有非常详细的文档。我们将使用这个镜像来搭建Kafka容器。

由于Kafka需要Zookeeper支持,我们可以通过docker-compose来快速组合多个容器。

按照官方的文档,我们可以通过如下的配置快速搭建一个Kafka+Zookeeper的中间件:

# docker-compose.yml

version: "3"

networks:
  app-tier:
    driver: bridge

services:
  zookeeper:
    restart: always
    image: 'bitnami/zookeeper:latest'
    networks:
      - app-tier
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

  kafka:
    restart: always
    image: 'bitnami/kafka:latest'
    networks:
      - app-tier
    ports:
      - '9092:9092'
      - '9093:9093'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper

在network部分,我们一个定义了一个 Docker 网络,名称为 “app-tier”,驱动程序为 “bridge”。

在 Docker 中,网络是一种虚拟网络,使容器之间可以进行通信。 “bridge” 驱动程序是 Docker 网络的默认驱动程序,它在单个 Docker 主机内创建一个内部网络,允许容器使用其 IP 地址相互通信。通过定义名称为 “app-tier”,驱动程序为 “bridge” 的网络,连接到该网络的任何容器都将能够使用其在网络内的 IP 地址相互通信。这可以用于创建微服务架构或其他分布式系统,其中多个容器需要彼此通信。

在Kafka的部分,我们设置了 Kafka 的环境变量:包括 Zookeeper 地址、允许明文监听器、监听器安全协议映射、监听器和广告监听器等。

注意,当前的配置将允许明文监听Kafka,这在实际生产环境中是不被允许的,我们在此为了便于开发环境的测试,允许直接监听。

剩余的环境配置可以在官网详细查询

在这里插入图片描述

容器使用

使用如下命令启动容器

docker-compose up

使用如下命令进入容器

sudo docker ps
sudo docker exec -it xx bash

注:其中xx是ps命令得到的Kafka容器的id前两位

进入容器后我们可以通过如下命令开启生产者和消费者

// 确保你在/opt/bitnami/kafka目录下

// 创建topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

// 启动生产者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

// 启动消费者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

你可以打开两个终端,分别开启一个生产者和消费者。如果环境运行正常,就可以在消费者的端口同步查看到生产的输入。

基于此,我们快速地搭建了一个Kafka的开发环境,并且成功地启动了生产者和消费者。在实际的开发中,我们可以使用这个环境来进行Kafka相关的开发和测试工作。同时,在生产环境中,我们需要根据实际情况来进行更加严格和安全的配置,以确保Kafka的安全和可靠性。


🌐API构建与测试

除了命令行,我们还可以使用第三方框架实现对Kafka的连接和操作。下面用Python连接kafka并搭建一个简单的API供调试。

安装Python依赖

kafka-python
fastapi
uvicorn
pydantic

使用懒汉式的单例模式构建一个用于连接Kafka的上下文对象

# kafkaContext.py

import random
from kafka import KafkaProducer, KafkaClient
import time

def _get_kafka_producer_connection(host, port) -> KafkaProducer:
    while True:
        try:
            producer = KafkaProducer(bootstrap_servers=f'{host}:{port}')
            break
        except Exception as e:
            print('producer failed to connect, retrying', e)
            time.sleep(5)

    print('producer connected', producer)
    return producer

def _get_kafka_client_connection(host, port) -> KafkaClient:
    while True:
        try:
            client = KafkaClient(bootstrap_servers=f'{host}:{port}')
            break
        except Exception as e:
            print('client failed to connect, retrying', e)
            time.sleep(5)

    print('client connected', client)
    return client

class KafkaContext:
    # 构建一个单例模式的producer

    def __init__(self, host='kafka', port=9092):
        self.__host = host
        self.__port = port
        self.__client = None
        self.__producer = None

    def __connect_client(self):
        self.__client = _get_kafka_client_connection(self.__host, self.__port)

    def __connect_producer(self):
        self.__producer = _get_kafka_producer_connection(self.__host, self.__port)

    def is_client_connected(self) -> bool:
        if self.__client is None:
            return False
        return self.__client.bootstrap_connected()

    def is_producer_connected(self) -> bool:
        return self.__producer is not None

    def add_topic(self, topic: str):
        if self.__client is None:
            self.__connect_client()
        self.__client.add_topic(topic)

    def send_msg(self, topic: str, msg: str) -> str:
        if self.__producer is None:
            self.__connect_producer()
        # convert msg to bytes
        msg_bytes = bytes(msg, encoding='utf-8')
        self.__producer.send(topic, msg_bytes)

        return msg

kafkaContext = KafkaContext()

注意到我们使用了kafka 的解析去连接Kafka,是因为我们后续要将该服务放在一个Docker容器内,并且接入到和上文提到的Kafka模块的网络中

使用FastAPI构建一个API,提供基本的状态检测、增加topic和生产消息的接口

# api.py

from datetime import datetime
from pydantic import BaseModel

from KafkaContext import kafkaContext
from fastapi import FastAPI

app = FastAPI(title="kafka-server", description="kafka-server", version="0.1.0")

class Message(BaseModel):  # 继承了BaseModel,定义了People的数据格式
    topic: str
    msg: str

@app.get("/")
def read_root():
    return {"time": datetime.now(), "status": "ok"}

@app.get("/health/client")
def health_client():
    return {
        "data": kafkaContext.is_client_connected()
    }

@app.get("/health/producer")
def health_producer():
    return {
        "data": kafkaContext.is_producer_connected()
    }

@app.get("/producer/add_topic/{topic}")
def add_topic(topic: str):
    kafkaContext.add_topic(topic)
    return {
        "status": "ok",
        "data": topic
    }

@app.post("/producer/send_msg")
def send_msg(message: Message):
    return {
        "status": "ok",
        "data": kafkaContext.send_msg(message.topic, message.msg)
    }

如果希望开发更多的接口,可以阅读官方的文档 kafka-python · PyPI

在命令行输入命令启动服务

uvicorn api:app --host 0.0.0.0 --port 8000 --reload

打开浏览器访问http://<your_wsl_ip>:8000/docs即可看到接口文档,如果Kafka的消费者还在运行,则可以尝试接口是否运行正常。

在这里插入图片描述

至此,我们已经实现了用Docker搭建一个Kafka模块并使用FastAPI结合python-kafka实现了接口的开发。


📦容器整合

最后,我们将该服务整合到一个整体的docker-compose

首先我们先构建后端API的镜像,该镜像使用了Python的环境。

FROM python
LABEL author="chene2000"
ENV PYTHONIOENCODING=utf-8

RUN mkdir -p /app
WORKDIR /app
COPY requirements.txt /app
RUN pip3 install -r requirements.txt -i https://pypi.doubanio.com/simple --trusted-host pypi.doubanio.com

COPY . /app

CMD bash start-server.sh

在docker-compose.yml中追加一个server的服务

server:
    restart: always
    # image: 'kafka-server'
    container_name: 'kafka-server'
    build:
      dockerfile: Dockerfile
      context: ./server/
    networks:
      - app-tier
    ports:
      - '8002:8000'
    depends_on:
      - zookeeper
      - kafka

注意,我们采用了如下的项目结构

├── docker-compose.yml
└── server
    ├── api.py
    ├── Dockerfile
    ├── KafkaContext.py
    ├── requirements.txt
    └── start-server.sh

在docker-compose.yml的build处我们配置了Docker容器构建时的目录位置和构建所用的Dockerfile配置。

回到根目录,运行如下命令

# 构建容器
sudo docker compose build

# 启动容器
sudo docker compose up
# 启动容器(后台运行)
sudo docker compose up -d

构建完毕


🗒️ 小结

在实际生产环境中,应注意Kafka的安全性和可靠性。在使用第三方框架连接Kafka时,需要使用Kafka的解析进行连接。最后,将Kafka模块整合到docker-compose中,方便进行部署和使用。

项目代码
https://github.com/ChenE2000/thesis-kafka-server

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/426556.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ssm异常处理

ssm异常处理 类上和方法上都要有注解&#xff1a; 类上的注解&#xff1a; 异常处理用到的注解&#xff0c;里面包含了其他的一些必须的注解&#xff0c;详解看下图 方法上的注解&#xff1a; 上面的要懂打配合 现在创建一个处理异常的工具类&#xff0c;加上前面提到的注…

CSRF与SSRF比较

CSRF与SSRF比较 参考&#xff1a;简述CSRF、SSRF的区别 CSRF CSRF&#xff0c;全名 Cross-site requestforgery&#xff0c;也就是 跨站请求伪造。XSS是跨站脚本攻击。与XSS比较&#xff0c;XSS攻击是跨站脚本攻击&#xff0c;CSRF是跨站请求伪造&#xff0c;也就是说CSRF攻…

【Redis】入门篇之相关概念与Redis的安装

目录 一、关系型数据库与非关系型数据库 1、非关系数据库的种类 2、关系型数据库与非关系型数据库的区别 二、认识Redis 1、概念 2、特点 1.键值型 2.单线程 3.低延迟、速度快 4.支持数据持久化 5.支持主从集群、分片集群 6.支持多语言客户端 三、Redis的安装 1、…

IO-IO基础

简介 IO流&#xff0c;以计算机内存为主体&#xff0c;从内存到网络/磁盘等其他地方叫输出流(内存往外出)&#xff1b;网络/磁盘等其他地方写到内存叫输入流&#xff08;往内存输入&#xff09;。 Java中的IO流 4个抽象基类 InputStream/Reader(读到内存里) 所有的输入流的基…

环境变量详解

目录 环境变量是什么&#xff1f; 常见环境变量 查看环境变量 指令查看 代码查看 系统调用查看 本地变量 环境变量全局性 环境变量是什么&#xff1f; 我们要执行一个我们所写的c/c程序时&#xff0c;需要./可执行文件&#xff0c;告诉操作系统你在哪里&#xff0c…

PC Cleaner Pro(电脑清理工具)图文安装教程

OneSafe PC Cleaner 会查找并删除垃圾文件和快捷方式&#xff0c;这些文件和快捷方式会随着时间的推移在您的 PC 上堆积&#xff0c;从而占用您的硬盘空间。该软件会搜索并删除已卸载程序留下的无效快捷方式和文件。OneSafe PC Cleaner 还会识别并删除注册表中不必要的条目。 W…

YOLOv8详解代码实战,附有效果图

YOLOv8架构 YOLOv8 是 ultralytics 公司在 2023 年 1月 10 号开源的 YOLOv5 的下一个重大更新版本&#xff0c;目前支持图像分类、物体检测和实例分割任务&#xff0c;鉴于Yolov5的良好表现&#xff0c;Yolov8在还没有开源时就收到了用户的广泛关注。yolov8的整体架构如下&…

vue3计算属性与监视及watchEffect函数

computed计算属性 首先看一下页面的结构 在v3中可以用v2的方式来写计算属性&#xff0c;但是不建议这么写 而想要在v3中使用计算属性&#xff0c;需要先引入它 它不想之前在v2中的函数了&#xff0c;而是要写在computed()里面 当然这个计算属性是简写&#xff08;没有考虑计算…

Python opencv 先腐蚀后膨胀 消除图像噪声

cv2.getStructuringElement()介绍 在进行图像形态学操作时&#xff0c;首先需要构造一个特定的核&#xff0c;该核可以自定义生成&#xff0c;也可以通过cv2.getStructuringElement()函数构造。 cv2.getStructuringElement(shape, ksize)参数&#xff1a; shape ---- 代表形状…

常见在线AI绘画平台

系列文章目录 Midjourney AI绘画工具使用保姆级教程 本地部署Stable Diffusion教程&#xff0c;亲测可以安装成功 Stable Diffusion界面参数及模型使用 文章目录系列文章目录前言一、Midjourney二、DreamStudio三、Lexica四、STOCKIMG.AI五、Dream by WOMBO六、PicSo七、百…

ArcGIS、ENVI、InVEST、FRAGSTATS等多技术融合提升环境、生态、水文、土地、土壤、农业、大气等领域的数据分析

查看原文>>>ArcGIS、ENVI、InVEST、FRAGSTATS等多技术融合提升环境、生态、水文、土地、土壤、农业、大气等领域的数据分析 目录 专题一、空间数据获取与制图 专题二、ArcGIS专题地图制作 专题三、空间数据采集与处理 专题四、遥感数据处理与应用 专题五、DEM数据…

macOS Big Sur 11.7.6 (20G1231) Boot ISO 原版可引导镜像

本站下载的 macOS 软件包&#xff0c;既可以拖拽到 Applications&#xff08;应用程序&#xff09;下直接安装&#xff0c;也可以制作启动 U 盘安装&#xff0c;或者在虚拟机中启动安装。另外也支持在 Windows 和 Linux 中创建可引导介质。 2023 年 4 月 10 日&#xff08;北京…

压力测试工具JMeter的下载安装与基础使用(一)

JMeter的下载安装与基础使用1.环境准备2.下载与配置2.1下载并解压2.2 配置系统变量 JMETER_HOME2.3配置系统变量 CLASSPATH3.测试JMeter是否配置成功4. 语言修永久修改为中文&#xff08;可选&#xff09;1.环境准备 JMeter是用java开发的&#xff0c;示例Apache JMeter5.5版本…

css动画及背景设置

css属性 clip-path 属性使用裁剪方式创建元素的可显示区域。区域内的部分显示&#xff0c;区域外的隐藏。可以指定一些特定形状。 可以创建多边形内容显示区域polygon clip-path: polygon(0 0, 100% 0, 100% 75vh 0 100%); 顺序 1初始x y 为0 0 1向2位置 x轴移动100% y移…

SpringBoot集成elasticsearch使用(增删改查)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 SpringBoot集成elasticsearch使用&#xff08;增删改查&#xff09;一、es是什么&#xff1f;二、使用步骤1.搭配环境springboot集成es1、新建springboot项目&#xff0c;引入…

19_I.MX6ULL_SPI实验

目录 SPI简介 I.MX6U ECSPI简介 相关寄存器 ICM-20608简介 实验源码 SPI简介 同I2C一样,SPI是很常用的通信接口,也可以通过SPI来连接众多的传感器。相比I2C接口, SPI接口的通信速度很快, I2C最多400KHz,但是SPI可以到达几十MHz。I.MX6U也有4个SPI接口,可以通过这4个SPI接…

电感为什么会有饱和电流

电感有一个重要的参数那就是饱和电流&#xff0c;饱和电流的定义是当电感感值下降30%时流过电感的电流。 那么电感为什么会有饱和电流呢&#xff1f; 这个是电感磁芯的磁化曲线&#xff0c;横坐标是磁场强度H&#xff0c;纵坐标是磁感应强度B 在磁场强度较小的时候&#xff0c;…

大数据需要学哪些内容

大数据技术是当今互联网时代的热点之一&#xff0c;目前已经成为了各行各业中的最佳选择。随着物联网、人工智能、云计算等技术的发展&#xff0c;数据的规模不断增大&#xff0c;数据分析、数据挖掘、人工智能等应用也随之蓬勃发展&#xff0c;对大数据开发的需求越来越多。因…

智能网卡-提升网络性能的新选择

一、智能网口介绍 智能网卡&#xff08;Smart NIC&#xff09;是一种专用于网络数据处理的高性能网卡&#xff0c;采用了定制芯片、高速网络接口和强大的软件支持&#xff0c;可以为数据中心和企业网络提供更快、更安全、更可靠的网络连接和数据传输服务。 智能网卡的出现受到…

教你使用Apache搭建Http

Apache2默认采用的是80端口号&#xff0c;因此直接通过公网ip或域名就能访问。现实中&#xff0c;很多服务器本身就部署了许多其它服务&#xff0c;80端口号往往被占用&#xff0c;因此就需要将Apache2改成其它访问端口。 修改端口&#xff0c;首先需要修改/etc/apache2/ports…