kafka-6-python单线程操作kafka

news2025/1/14 18:01:35

使用Python操作Kafka:KafkaProducer、KafkaConsumer
Python kafka-python API的帮助文档

1 kafka tools连接

(1)/usr/local/kafka_2.13-3.4.0/config/server.properties
listeners = PLAINTEXT://myubuntu:9092
advertised.listeners=PLAINTEXT://192.168.1.8:29092

(2)/etc/hosts
10.0.2.11 myubuntu

其中192.168.1.8是宿主机外网地址
其中10.0.2.11是虚拟机内网地址
192.168.1.8:29092映射到了10.0.2.11:9092。

在这里插入图片描述

2 单线程生产者

说是单线程,其实并不是,你启动一个生产者其实是2个线程,后台有一个IO线程用于真正发送消息出去,前台有一个线程用于把消息发送到本地缓冲区。

KafkaProducer是发布消息到Kafka集群的客户端,它是线程安全的并且共享单一生产者实例。生产者包含一个带有缓冲区的池,用于保存还没有传送到Kafka集群的消息记录以及一个后台IO线程,该线程将这些留在缓冲区的消息记录发送到Kafka集群中。

2.1 KafkaProducer构造函数参数

(1)acks 
0表示发送不理睬发送是否成功;
1表示需要等待leader成功写入日志才返回;
all表示所有副本都写入日志才返回

(2)buffer_memory 
默认33554432也就是32M,该参数用于设置producer用于缓存消息的缓冲区大小,
如果采用异步发送消息,那么生产者启动后会创建一个内存缓冲区用于存放待发送的消息,
然后由专属线程来把放在缓冲区的消息进行真正发送,
如果要给生产者要给很多分区发消息那么就需要考虑这个参数的大小防止过小降低吞吐量

(3)compression_type 
是否启用压缩,默认是none,可选类型为gzip、lz4、snappy三种。
压缩会降低网络IO但是会增加生产者端的CPU消耗。
另外如果broker端的压缩设置和生产者不同那么也会给broker带来重新解压缩和重新压缩的CPU负担。

(4)retries 
重试次数,当消息发送失败后会尝试几次重发。
默认为0,一般考虑到网络抖动或者分区的leader切换,
而不是服务端真的故障所以可以设置重试3次。

(5)retry_backoff_ms 
每次重试间隔多少毫秒,默认100毫秒。

(6)max_in_flight_requests_per_connection 
生产者会将多个发送请求缓存在内存中,默认是5个,
如果你开启了重试,也就是设置了retries参数,
那么将可能导致针对于同一分区的消息出现顺序错乱。
为了防止这种情况需要把该参数设置为1,来保障同分区的消息顺序。

(7)batch_size 
对于调优生产者吞吐量和延迟性能指标有重要的作用。
buffer_memeory可以看做池子,而这个batch_size可以看做池子里装有消息的小盒子。
这个值默认16384也就是16K,其实不大。

生产者会把发往同一个分区的消息放在一个batch中,
当batch满了就会发送里面的消息,但是也不一定非要等到满了才会发。

这个数值大那么生产者吞吐量高但是性能低,
因为盒子太大占用内存发送的时候这个数据量也就大。

如果你设置成1M,那么显然生产者的吞吐量要比16K高的多。

(8)linger_ms 
上面说batch没有填满也可以发送,那显然有一个时间控制,就是这个参数,
默认是0毫秒,这个参数就是用于控制消息发送延迟多久的。
默认是立即发送,无需关系batch是否填满。
大多数场景我们希望立即发送,但是这也降低了吞吐量。

(9)max_request_size 
最大请求大小,可以理解为一条消息记录的最大大小,默认是1048576字节,1M。

(10)request_timeout_ms  
生产者发送消息后,broker需要在规定时间内将处理结果返回给生产者,
那个这个时间长度就是这个参数控制的,默认30000,也就是30秒。

如果broker在30秒内没有给生产者响应,那么生产者就会认为请求超时,
并在回调函数中进行特殊处理,或者进行重试。

2.2 示例代码

# -*- coding: utf-8 -*-
import time
import random
import sys

from kafka import KafkaProducer
from kafka.errors import KafkaError, KafkaTimeoutError
import json

class Producer(object):
    def __init__(self, KafkaServerList=['127.0.0.1:9092'], ClientId="Procucer01", Topic='Test'):
        self._kwargs = {
            "bootstrap_servers": KafkaServerList,
            "client_id": ClientId,
            "acks": 1,
            "buffer_memory": 33554432,
            'compression_type': None,
            "retries": 3,
            "batch_size": 1048576,
            "linger_ms": 100,
            "key_serializer": lambda m: json.dumps(m).encode('utf-8'),
            "value_serializer": lambda m: json.dumps(m).encode('utf-8'),
        }
        self._topic = Topic
        try:
            self._producer = KafkaProducer(**self._kwargs)
        except Exception as err:
            print(err)

    def _onSendSucess(self, record_metadata):
        """
        异步发送成功回调函数,也就是真正发送到kafka集群且成功才会执行。
        发送到缓冲区不会执行回调方法。
        :param record_metadata:
        :return:
        """
        print("发送成功")
        print("被发往的主题:", record_metadata.topic)
        print("被发往的分区:", record_metadata.partition)
        print("队列位置:", record_metadata.offset)  # 这个偏移量是相对偏移量,也就是相对起止位置,也就是队列偏移量。

    def _onSendFailed(self):
        print("发送失败")

    def sendMessage(self, value=None, partition=None):
        if not value:
            return None

        # 发送的消息必须是序列化后的,或者是字节
        # value = json.dumps(value, encoding='utf-8', ensure_ascii=False)

        # value 必须为字节或者被序列化为字节,
        # 由于之前我们初始化时已经通过value_serializer来做了,所以我上面的语句就注释了

        # key 与value对应的键,可选,也就是把一个键关联到这个消息上,
        # KEY相同就会把消息发送到同一分区上,所以如果有这个要求就可以设置KEY,也需要序列化

        # partition 发送到哪个分区,整型。如果不指定将会自动分配。
        kwargs = {
            "value": value,
            "key": None,
            "partition": partition
        }

        try:
            # 异步发送,发送到缓冲区,同时注册两个回调函数,一个是发送成功的回调,一个是发送失败的回调。
            # send函数是有返回值的是RecordMetadata,也就是记录的元数据,包括主题、分区、偏移量
            future = self._producer.send(self._topic, **kwargs)
            future.add_callback(self._onSendSucess)
            future.add_errback(self._onSendFailed)
            print("发送消息:", value)
        except KafkaTimeoutError as err:
            print(err)
        except Exception as err:
            print(err)

    def closeConnection(self, timeout=None):
        # 关闭生产者,可以指定超时时间,也就是等待关闭成功最多等待多久。
        self._producer.close(timeout=timeout)

    def sendNow(self, timeout=None):
        # 调用flush()函数可以将所有在缓冲区的消息记录立即发送,即使ligner_ms值大于0.
        # 这时候后台发送消息线程就会开始立即发送消息并且阻塞在这里,等待消息发送成功,当然是否阻塞取决于acks的值。
        # 如果不调用flush函数,那么什么时候发送消息取决于ligner_ms或者batch任意一个条件满足就会发送。
        try:
            self._producer.flush(timeout=timeout)
        except KafkaTimeoutError as err:
            print(err)
        except Exception as err:
            print(err)


def main():
    p = Producer(KafkaServerList=["127.0.0.1:29092"], ClientId="Procucer01", Topic="test")
    for i in range(10):
        time.sleep(1)
        closePrice = random.randint(1, 500)
        msg = {
            "Publisher": "Procucer01",
            "股票代码": 60000 + i,
            "昨日收盘价": closePrice,
            "今日开盘价": 0,
        }
        p.sendMessage(value=msg)
    p.closeConnection()


if __name__ == "__main__":
    try:
        main()
    finally:
        sys.exit()

3 单线程消费者

初始化一个消费者实例,消费者不是线程安全的,所以建议一个线程实现一个消费者,而不是一个消费者让多个线程共享

下面这些是可选参数,可以在初始化KafkaConsumer实例的时候传递进去:

enable_auto_commit 是否自动提交,默认是true

auto_commit_interval_ms 自动提交间隔毫秒数

auto_offset_reset="earliest"  
重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest

3.1 手动拉取消息

# -*- coding: utf-8 -*-
import sys
from kafka import KafkaConsumer
import json


class Consumer(object):
    def __init__(self, KafkaServerList=['127.0.0.1:9092'], GroupID='TestGroup11', ClientId="Test", Topics=['Test', ]):
        """
        用于设置消费者配置信息,这些配置项可以从源码中找到,下面为必要参数。
        :param KafkaServerList: kafka服务器IP:PORT 列表
        :param GroupID: 消费者组ID
        :param ClientId: 消费者名称
        :param Topic: 主题 列表
        """
        self._kwargs = {
            "bootstrap_servers": KafkaServerList,
            "client_id": ClientId,
            "group_id": GroupID,
            "enable_auto_commit": False,
            "auto_offset_reset": "earliest",
            "key_deserializer": lambda m: json.loads(m.decode('utf-8')),
            "value_deserializer": lambda m: json.loads(m.decode('utf-8')),
        }

        try:
            self._consumer = KafkaConsumer(**self._kwargs)
            self._consumer.subscribe(topics=(Topics))
        except Exception as err:
            print("Consumer init failed, %s" % err)

    def consumeMsg(self):
        try:
            while True:
                data = self._consumer.poll(timeout_ms=1000, max_records=100)  # 拉取消息,字典类型
                print(data)
                if data:
                    for key in data:
                        for consumerrecord in data.get(key):
                            # 返回的是ConsumerRecord对象,可以通过字典的形式获取内容。
                            if consumerrecord != None:
                                # 消息消费逻辑
                                message = {
                                    "Topic": consumerrecord.topic,
                                    "Partition": consumerrecord.partition,
                                    "Offset": consumerrecord.offset,
                                    "Key": consumerrecord.key,
                                    "Value": consumerrecord.value
                                }
                                print(message)
                                # 消费逻辑执行完毕后在提交偏移量
                                self._consumer.commit()
                            else:
                                print("%s consumerrecord is None." % key)
        except Exception as err:
            print(err)


def main():
    try:
        c = Consumer(KafkaServerList=['127.0.0.1:29092'], Topics=['test'])
        c.consumeMsg()
    except Exception as err:
        print(err)


if __name__ == "__main__":
    try:
        main()
    finally:
        sys.exit()

3.2 非手动拉取消息

# -*- coding: utf-8 -*-

import sys
from kafka import KafkaConsumer
import json


class Consumer(object):
    def __init__(self, KafkaServerList=['172.16.48.171:9092'], GroupID='TestGroup111', ClientId="Test", Topics=['Test', ]):
        """
        用于设置消费者配置信息,这些配置项可以从源码中找到,下面为必要参数。
        :param KafkaServerList: kafka服务器IP:PORT 列表
        :param GroupID: 消费者组ID
        :param ClientId: 消费者名称
        :param Topic: 主题
        """
        self._kwargs = {
            "bootstrap_servers": KafkaServerList,
            "client_id": ClientId,
            "group_id": GroupID,
            "enable_auto_commit": False,
            "auto_offset_reset": "earliest",
            "key_deserializer": lambda m: json.loads(m.decode('utf-8')),
            "value_deserializer": lambda m: json.loads(m.decode('utf-8')),
        }

        try:
            self._consumer = KafkaConsumer(**self._kwargs)
            self._consumer.subscribe(topics=(Topics))
        except Exception as err:
            print("Consumer init failed, %s" % err)

    def consumeMsg(self):
        try:
            while True:
                for consumerrecord in self._consumer:
                    if consumerrecord:
                        message = {
                            "Topic": consumerrecord.topic,
                            "Partition": consumerrecord.partition,
                            "Offset": consumerrecord.offset,
                            "Key": consumerrecord.key,
                            "Value": consumerrecord.value
                        }
                        print(message)
                        # 消费逻辑执行完毕后在提交偏移量
                        self._consumer.commit()
        except Exception as err:
            print(err)


def main():
    try:
        c = Consumer(KafkaServerList=['127.0.0.1:29092'], Topics=['test'])
        c.consumeMsg()
    except Exception as err:
        print(err)


if __name__ == "__main__":
    try:
        main()
    finally:
        sys.exit()

4 消费者

4.1 简单demo

启动后消费者可以从kafka服务器获取数据.

from kafka import KafkaConsumer

# 参数为接收主题和kafka服务器地址
consumer = KafkaConsumer('test',
                         bootstrap_servers=['127.0.0.1:29092'])

# 这是一个永久堵塞的过程,生产者消息会缓存在消息队列中,并且不删除,
# 所以每个消息在消息队列中都有偏移
# consumer是一个消息队列,当后台有消息时,这个消息队列就会自动增加.
# 所以遍历也总是会有数据,当消息队列中没有数据时,就会堵塞等待消息带来
for msg in consumer:
    print("{}:{}:{}: key={} value={}".format(msg.topic, 
                                             msg.partition,
                                             msg.offset, 
                                             msg.key,msg.value))

4.2 消费者组

启动多个消费者,只有其中某一个成员可以消费到,满足要求,消费组可以横向扩展提高处理能力。

from kafka import KafkaConsumer
# 使用group,对于同一个group的成员只有一个消费者实例可以读取数据
consumer = KafkaConsumer('test',
                         group_id='my-group',
                         bootstrap_servers=['127.0.0.1:29092'])
for msg in consumer:
    print("{}:{}:{}: key={} value={}".format(msg.topic,
                                             msg.partition,
                                             msg.offset,
                                             msg.key,msg.value))

4.3 读取目前最早可读的消息

auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest。源码定义:{‘smallest’: ‘earliest’, ‘largest’: ‘latest’}

from kafka import KafkaConsumer
# 使用group,对于同一个group的成员只有一个消费者实例可以读取数据
consumer = KafkaConsumer('test',
                         auto_offset_reset='earliest',
                         bootstrap_servers=['127.0.0.1:29092'])
for msg in consumer:
    print("{}:{}:{}: key={} value={}".format(msg.topic,
                                             msg.partition,
                                             msg.offset,
                                             msg.key,msg.value))

4.4 手动设置偏移量

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
# ==========读取指定位置消息===============
consumer = KafkaConsumer('test',
                         bootstrap_servers=['127.0.0.1:29092'])

print(consumer.partitions_for_topic("test"))  # 获取test主题的分区信息
print(consumer.topics())  # 获取主题列表
print(consumer.subscription())  # 获取当前消费者订阅的主题
print(consumer.assignment())  # 获取当前消费者topic、分区信息
print(consumer.beginning_offsets(consumer.assignment()))  # 获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic='test', partition=0), 5)  # 重置偏移量,从第5个偏移量消费

for msg in consumer:
    print("{}:{}:{}: key={} value={}".format(msg.topic,
                                             msg.partition,
                                             msg.offset,
                                             msg.key,msg.value))

4.5 订阅多个主题

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer('test',
                         bootstrap_servers=['127.0.0.1:29092'])

# 订阅要消费的主题
consumer.subscribe(topics=('test','test0'))  
print(consumer.topics())

#获取当前主题的最新偏移量
print(consumer.position(TopicPartition(topic='test', partition=0))) 

for msg in consumer:
    print("{}:{}:{}: key={} value={}".format(msg.topic,
                                             msg.partition,
                                             msg.offset,
                                             msg.key,msg.value))

4.6 手动拉取消息

from kafka import KafkaConsumer
import time

consumer = KafkaConsumer('test',
                         bootstrap_servers=['127.0.0.1:29092'])

# 订阅要消费的主题
consumer.subscribe(topics=('test','test0'))
while True:
    msg = consumer.poll(timeout_ms=5)   # 从kafka获取消息
    print(msg)
    time.sleep(2)

4.7 消息挂起与恢复

# ==============消息恢复和挂起===========

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:29092'])

consumer.subscribe(topics=('test'))
consumer.topics()

# pause执行后,consumer不能读取,直到调用resume后恢复
consumer.pause(TopicPartition(topic=u'test', partition=0)) 
num = 0
while True:
    print(num)
    print(consumer.paused())   # 获取当前挂起的消费者
    msg = consumer.poll(timeout_ms=5)
    print(msg)
    time.sleep(2)
    num = num + 1
    if num == 10:
        print("resume...")
        consumer.resume(TopicPartition(topic='test', partition=0))
        print("resume......")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/356678.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MotoSimEG-VRC教程:动态输送带创建以及示教编程与仿真运行

目录 任务描述 简易输送带外部设备创建 输送带模型添加与配置 工件安装到输送带 输送带输送工件程序编写与仿真运行 任务描述 在MotoSimEG-VRC中创建1条输送带,并且能够实现将工件从输送带起始点位置处输送到结束点位置处。 简易输送带外部设备创建 在MotoS…

Linux系统之终端管理命令的基本使用

Linux系统之终端管理命令的基本使用一、检查本地系统环境1.检查系统版本2.检查系统内核版本二、终端介绍1.终端简介2.Linux终端简介3.终端的发展三、终端的相关术语1.终端模拟器2.tty终端3.pts终端4.pty终端5.控制台终端四、终端管理命令ps1.直接使用ps命令2.列出登录详细信息五…

RPA落地指南:什么是RPA

什么是RPA RPA在企业中起什么作用并扮演什么角色呢?想要充分了解RPA,我们需要知道RPA的相关概念、特点、功能以及能解决的问题。接下来对这些内容进行详细介绍。 1.1 RPA的3个核心概念 RPA的中文译名是“机器人流程自动化”,顾名思义&…

初始C语言 - 数组(一维数组、二维数组、数组越界、数组传参)

目录 一、一维数组的创建和初始化 1、数组的创建 2、 数组的初始化 3.一维数组的使用 数组通过下标来访问 总结: 1. 数组是使用下标来访问的,下标是从0开始。 2. 数组的大小可以通过计算得到。 4、一维数组在内存中的存储 二、 二维数组的创建和初始化 1.二…

算法导论【分治思想】—大数乘法、矩阵相乘、残缺棋盘

这里写自定义目录标题分治法概述特点大数相乘问题分治算法矩阵相乘分治算法残缺棋盘分治算法分治法概述 在分而治之的方法中,一个问题被划分为较小的问题,然后较小的问题被独立地解决,最后较小问题的解决方案被组合成一个大问题的解决。 通常…

【软件测试】自动化测试工程师必会的单元测试编写(总结),你真的了解吗......

目录:导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜)前言 单元测试编写的目的…

supervisor-男程序员的福音

supervisor是什么 supervisor是用python语言编写的,只能用于类Unix系统上的进程管理工具。 supervisor有什么用 举一个常见的场景,比如你的项目已经到了测试联调阶段,QA需要你把程序启动起来,然后进行测试,那么启动…

用Java实现多线程打印奇偶数

用Java实现多线程打印奇偶数1. wait()和 notify() 方法的作用:2. Java实现(1)Thread1.class 奇数线程(2)Thread2.class 偶数线程(3)共享资源类(4)测试1. wait()和 notify…

一篇文章带你玩转 Kubernetes:组件、核心概念和Nginx实战演示

目录一、简介1.1 容器部署时代1.2 Kubernetes有哪些优点二、Kubernetes 组件介绍三、Kubernetes 核心概念3.1 Namespace3.2 Pod3.3 Deployment3.4 Service3.5 Ingress四、Kubernetes 核心概念实战4.1 部署yaml文件4.2 通过Pod IP访问Nginx4.3 通过Service IP访问Nginx4.4 修改i…

[数据结构]:顺序表(C语言实现)

目录 前言 顺序表实现 01-开发环境 02-文件布局 03-代码 01-主函数 02-头文件 03-SeqListCommon.cpp 04-SeqListPositionOperation.cpp 05-SeqListValueOperation.cpp 结语 前言 此专栏包含408考研数据结构全部内容,除其中使用到C引用外,全为…

node+vue微信小程序的社区后勤报修系统

社区后勤报修系统小程序进行总体设计和详细设计。总体设计主要包括小程序功能设计、小程序总体结构设计、小程序数据结构设计和小程序安全设计等:详细设计主要包括社区后勤报修系统小程序数据库访问的实现,主要功能模块的具体实现,模块实现关键代码等。最后对社区后…

目标检测论文阅读:DETR算法笔记

标题:End-to-End Object Detection with Transformers 会议:ECCV2020 论文地址:https://link.springer.com/10.1007/978-3-030-58452-8_13 官方代码:https://github.com/facebookresearch/detr 作者单位:巴黎第九大学、…

【Linux】进程替换

文章目录进程程序替换替换原理替换函数函数返回值函数命名理解在makefile文件中一次生成两个可执行文件总结:程序替换时运行其它语言程序进程程序替换 程序要运行要先加载到内存当中 , 如何做到? 加载器加载进来,然后程序替换 为什么? ->冯诺依曼 因为CPU读取数据的时候只…

【原创】java+swing+sqlserver药品管理系统设计与实现

之前数据库都是用的mysql,今天我们使用sqlserver在配合swing来开发一个药品管理系统。方便医院工作人员进行药品的管理,基础功能基本都是一些增删改查操作。 功能分析: 药品管理系统主要提供给管理员和员工使用,功能如下&#x…

[python]win10安装gym

anconda3里面安装: pip install gym[atari,accept-rom-license]0.26.1 AutoRom 测试结果: import gym envgym.make(Assault-v4,render_modehuman) env.reset() for _ in range(100000): actionenv.action_space.sample() env.step(action) env.c…

数据结构——算法的时间复杂度

🌇个人主页:_麦麦_ 📚今日名言:生命中曾经有过的所有灿烂,都终究需要用寂寞来偿还。——《百年孤独》 目录 一、前言 二、正文 1.算法效率 1.1如何衡量一个算法的好坏 1.2算法的复杂度 2. 时间复杂度 2.1时间复杂度的…

元胞自动机

文章目录前言文献阅读摘要主要贡献方法框架实验结论元胞自动机元胞自动机是什么?构成及规则案例及代码实现总结前言 This week,the paper proposes a Multi-directional Temporal Convolutional Artificial Neural Network (MTCAN) model to impute and forecast P…

部署dapr的辛酸历程

前言dapr大概的了解,个人理解他就是一个分布式服务的管理,把微服务常用的组件(缓存,消息中间件、分布式锁、安全id4等)和监控以及服务注册、发现等等一系列功能以一个很抽象的方式管理起来。可能我们部署微服务用consul、ocelot、polly套件、…

DDD单根 聚合根 实体 值对象

前言2004年Eric Evans 发表Domain-Driven Design –Tackling Complexity in the Heart of Software (领域驱动设计),简称Evans DDD。快二十年的时间,领域驱动设计在不断地发展,后微服务时代强调的东西,在国…