docker kafka go demo

news2024/11/26 20:33:37

配置

创建网桥

docker network create app-tier --driver bridge

拉取并启动镜像

docker run -d --name kafka-server --hostname kafka-server \
    --network app-tier \
    -p 9092:9092 \
    -e ALLOW_PLAINTEXT_LISTENER=yes \
    -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.66.1:9092 \
    -e KAFKA_CFG_NODE_ID=0 \
    -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
    -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
    -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
    -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
    -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
    bitnami/kafka:latest

KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.66.1:9092,高亮位置为自己的服务器ip

创建一个first分区

docker run -it --rm --network app-tier bitnami/kafka:latest kafka-topics.sh --topic first --create --bootstrap-server kafka-server:9092

查看一下分区

docker run -it --rm --network app-tier bitnami/kafka:latest kafka-topics.sh --list --bootstrap-server kafka-server:9092

Go生产与消费kafka中的消息

package main

import (
	"context"
	"fmt"
	"github.com/segmentio/kafka-go"
	"log"
	"os"
	"os/signal"
)

func prod() {
	// 设置 Kafka 代理地址
	brokerList := []string{"192.168.66.1:9092"}

	// 创建一个 Kafka 生产者
	producer := kafka.NewWriter(kafka.WriterConfig{
		Brokers:  brokerList,
		Topic:    "first",
		Balancer: &kafka.LeastBytes{},
	})

	// 待发送的消息
	message := kafka.Message{
		Key:   []byte("key"),
		Value: []byte("Hello, Kafka!"),
	}

	// 发送消息
	err := producer.WriteMessages(context.Background(), message)
	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	// 关闭 Kafka 生产者
	err = producer.Close()
	if err != nil {
		log.Fatal("failed to close writer:", err)
	}

	fmt.Println("Message sent successfully!")
}

func main() {

	go prod()
	// 设置 Kafka broker 地址和主题名称
	brokerAddress := "192.168.66.1:9092"
	topic := "first"

	// 创建 Kafka 连接
	conn, err := kafka.DialLeader(context.Background(), "tcp", brokerAddress, topic, 0)
	if err != nil {
		log.Fatalf("Failed to connect to Kafka broker: %s", err)
	}
	defer conn.Close()

	// 设置消费者起始偏移量为最新
	//conn.ResetOffsets()
	
	// 创建消费者
	consumer := kafka.NewReader(kafka.ReaderConfig{
		Brokers:   []string{brokerAddress},
		Topic:     topic,
		Partition: 0,
		MinBytes:  10e3, // 最小字节数
		MaxBytes:  10e6, // 最大字节数
	})

	// 创建一个信号通道,用于捕获中断信号
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// 开始消费消息
	for {
		select {
		case <-signals:
			log.Println("Received interrupt signal, shutting down...")
			return
		default:
			// 从 Kafka 获取一条消息
			msg, err := consumer.ReadMessage(context.Background())
			if err != nil {
				log.Printf("Failed to read message: %s", err)
				continue
			}

			// 处理消息
			fmt.Printf("Received message: %s\n", string(msg.Value))
		}
	}
}

上图
在这里插入图片描述

Reference
https://hub.docker.com/r/bitnami/kafka

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1357442.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

链表--206. 反转链表/easy

206. 反转链表 1、题目2、题目分析3、解题步骤4、复杂度最优解代码示例5、抽象与扩展 1、题目 给你单链表的头节点 head &#xff0c;请你反转链表&#xff0c;并返回反转后的链表。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5] 输出&#xff1a;[5,4,3,2,1]示例 2…

【大数据进阶第二阶段之Hadoop学习笔记】Hadoop 运行环境搭建

【大数据进阶第二阶段之Hadoop学习笔记】Hadoop 概述-CSDN博客 【大数据进阶第二阶段之Hadoop学习笔记】Hadoop 运行环境搭建-CSDN博客 【大数据进阶第二阶段之Hadoop学习笔记】Hadoop 运行模式-CSDN博客 1、模板虚拟机环境准备 1.1、 hadoop100 虚拟机配置要求如下 &…

如何通过Python将各种数据写入到Excel工作表

在数据处理和报告生成等工作中&#xff0c;Excel表格是一种常见且广泛使用的工具。然而&#xff0c;手动将大量数据输入到Excel表格中既费时又容易出错。为了提高效率并减少错误&#xff0c;使用Python编程语言来自动化数据写入Excel表格是一个明智的选择。Python作为一种简单易…

【Spring进阶系列丨第六篇】Spring的Bean管理(基于注解)

文章目录 一、说明二、用于创建对象的2.1、Component注解2.1.1、定义Bean2.1.2、主配置文件配置扫描注解2.1.3、测试2.1.4、Component注解总结 2.2、Controller注解2.3、Service注解2.4、Repository注解 三、用于注入数据的3.1、Autowired注解3.1.1、定义Bean3.1.2、主配置文件…

《PCI Express体系结构导读》随记 —— 第I篇 第2章 PCI总线的桥与配置(6)

接前一篇文章&#xff1a;《PCI Express体系结构导读》随记 —— 第I篇 第2章 PCI总线的桥与配置&#xff08;5&#xff09; 2.2 HOST主桥 本节以MPC8548处理器为例&#xff0c;说明HOST主桥在PowerPC处理器中的实现机制&#xff0c;并简要介绍x86处理器系统使用的HOST主桥。 …

【springboot+vue项目(零)】开发项目经验积累(处理问题)

一、VUEElement UI &#xff08;一&#xff09;elementui下拉框默认值不是对应中文问题 v-model绑定的值必须是字符串&#xff0c;才会显示默认选中对应中文&#xff0c;如果是数字&#xff0c;则显示数字&#xff0c;修改为&#xff1a; handleOpenAddDialog() {this.dialogT…

【Emgu.CV教程】第24篇 、色彩处理之LUT()查找表转换颜色

LUT (Look-Up Table)查找表转换&#xff0c;是对原始图像的像素数值进行快速转换&#xff0c;以实现图像的像素压缩目的。LUT()函数的官方定义如下&#xff1a; public static void LUT(IInputArray src, // 输入图像IInputArray lut, // 查找表IOutputArray dst, // 输出图像…

2023春季李宏毅机器学习笔记 05 :机器如何生成图像

资料 课程主页&#xff1a;https://speech.ee.ntu.edu.tw/~hylee/ml/2023-spring.phpGithub&#xff1a;https://github.com/Fafa-DL/Lhy_Machine_LearningB站课程&#xff1a;https://space.bilibili.com/253734135/channel/collectiondetail?sid2014800 一、图像生成常见模型…

提示循环引用 一个循环引用但无法列出导致循环的引用且文件打不开无法修改

目录 设备环境&#xff1a; 提示内容&#xff1a; 具体错误问题描述&#xff1a; 图示&#xff1a; Office 报错 WPS 报错 问题分析&#xff1a; 问题解决&#xff1a; 关注我的 GitHub&#xff08;魔法网络访问&#xff09;&#xff1a; 设备环境&#xff1a; Window…

(湖科大教书匠)计算机网络微课堂(下)

第四章、网络层 网络层概述 网络层主要任务是实习网络互连&#xff0c;进而实现数据包在各网络之间的传输 因特网使用TCP/IP协议栈 由于TCP/IP协议栈的网络层使用网际协议IP&#xff0c;是整个协议栈的核心协议&#xff0c;因此TCP/IP协议栈的网络层常称为网际层 网络层提供…

1.1 理解大数据(2)

小肥柴的Hadoop之旅 1.1 理解大数据&#xff08;2&#xff09; 目录1.1 理解大数据1.1.3 大数据概述1.1.4 更多思考 参考文献和资料 目录 1.1 理解大数据 1.1.3 大数据概述 step_0 大数据定义 【《大数据算法设计分析》】&#xff1a; 通常来讲大数据&#xff08;Big Data&am…

【Linux操作系统】探秘Linux奥秘:Linux 操作系统的解密与实战

&#x1f308;个人主页&#xff1a;Sarapines Programmer&#x1f525; 系列专栏&#xff1a;《操作系统实验室》&#x1f516;诗赋清音&#xff1a;柳垂轻絮拂人衣&#xff0c;心随风舞梦飞。 山川湖海皆可涉&#xff0c;勇者征途逐星辉。 目录 &#x1fa90;1 初识Linux OS …

目标跟踪算法中的卡尔曼滤波学习

在使用多目标跟踪算法时&#xff0c;接触到卡尔曼滤波&#xff0c;一直没时间总结下&#xff0c;现在来填坑。 1. 背景知识 在理解卡尔曼滤波前&#xff0c;有几个概念值得考虑下&#xff1a;时序序列模型&#xff0c;滤波&#xff0c;线性动态系统 1. 时间序列模型 时间序…

Elasticsearch:Serarch tutorial - 使用 Python 进行搜索 (一)

本实践教程将教你如何使用 Elasticsearch 构建完整的搜索解决方案。 在本教程中你将学习&#xff1a; 如何对数据集执行全文关键字搜索&#xff08;可选使用过滤器&#xff09;如何使用机器学习模型生成、存储和搜索密集向量嵌入如何使用 ELSER 模型生成和搜索稀疏向量如何使用…

【KingbaseES】实现MySql函数Field

CREATE OR REPLACE FUNCTION field(value TEXT, VARIADIC arr TEXT[]) RETURNS INT AS $$ DECLAREi INT; BEGINFOR i IN 1 .. array_length(arr, 1) LOOPIF arr[i] value THENRETURN i;END IF;END LOOP;RETURN 0; END; $$ LANGUAGE plpgsql IMMUTABLE;

【Mac】Mac新机配置前端环境教程

1、先安装谷歌浏览器&#xff0c;稳定版 作为一名前端程序员&#xff0c;和浏览器打交道肯定是必不可少的。Chrome&#xff0c;Edge 都有着丰富的扩展资源。你可以下载插件帮助你更好的在工作中调试代码。 React Developer Tools Vue.js devtools GraphQL developer tools 如…

信号量原理及其应用

一、信号量定义 信号量是一种用于保护临界资源的同步机制。它可以用来控制对共享资源的访问&#xff0c;以避免并发访问导致的数据不一致或竞争条件。信号量的PV操作是原子操作&#xff0c;即不可被中断的操作。 在信号量的操作中&#xff0c;P操作&#xff08;也称为wait操作…

imgaug库指南(四):从入门到精通的【图像增强】之旅

引言 在深度学习和计算机视觉的世界里&#xff0c;数据是模型训练的基石&#xff0c;其质量与数量直接影响着模型的性能。然而&#xff0c;获取大量高质量的标注数据往往需要耗费大量的时间和资源。正因如此&#xff0c;数据增强技术应运而生&#xff0c;成为了解决这一问题的…

AIGC实战——自回归模型(Autoregressive Model)

AIGC实战——自回归模型 0. 前言1. 长短期记忆网络基本原理2. Recipes 数据集3. 处理文本数据3.1 文本与图像数据处理的差异3.2 文本数据处理步骤 4. 构建 LSTM 模型4.1 模型架构4.2 LSTM 计算流程4.3 训练 LSTM 5. LSTM 模型分析小结系列链接 0. 前言 自回归模型 (Autoregres…