【Kafka】Docker安装kafkajava kafka api

news2025/1/12 20:54:15

内容目录

    • 一、安装zookeeper
      • 1 拉取镜像
      • 2 创建network
      • 3 启动容器
    • 二、安装kafka
      • 1 拉取kafka镜像
      • 2 启动kafka容器
      • 3 创建topic
      • 4 创建生产者
      • 5 创建消费者
    • 三、kafka的java api
      • 1 producer
      • 2 消费者

docker依赖于zookeeper,首先安装zookeeper

一、安装zookeeper

1 拉取镜像

在这里插入图片描述

2 创建network

在启动之前,先指定一个网络

docker network create app-tier --driver bridge

3 启动容器

启动zookeeper容器

docker run -d --name zookeeper-server 
--network app-tier 
-p 2181:2181 
-e ALLOW_ANONYMOUS_LOGIN=yes 
bitnami/zookeeper:latest

测试是否成功
进入zookeeper

docker exec -it zookeeper-server /bin/sh

执行代码

zkCli.sh -server 10.249.53.1

二、安装kafka

1 拉取kafka镜像

在这里插入图片描述

2 启动kafka容器

docker run -d --name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e ALLOW_PLAINTEXT_LISTENER=yes \
-e KAFKA_ZOOKEEPER_CONNECT=10.249.53.1:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.249.53.1:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
    bitnami/kafka:latest

进入kafka

docker exec -it kafka /bin/sh

3 创建topic

-- 创建topic

./kafka-topics.sh --bootstrap-server 10.249.53.1:9092 --create  --replication-factor 1 --partitions 1 --topic kfk

查看topic
-- 分区topic

./kafka-topics.sh --list --bootstrap-server 10.249.53.1:9092

4 创建生产者

-- 生产者

./kafka-console-producer.sh --broker-list 10.249.53.1:9092 --topic kfk

5 创建消费者

-- 消费者

./kafka-console-consumer.sh --bootstrap-server 10.249.53.1:9092 --topic kfk --from-beginning

三、kafka的java api

1 producer

public class ProducerTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties props = new Properties();

        //指定Kafka集群的IP地址和端口号
        props.put("bootstrap.servers",
                "10.249.53.1:9092");

        //指定等待所有副本节点的应答
        props.put("acks","all");

        //指定消息发送最大尝试次数
        props.put("retries",1);

        //指定一批消息处理大小
        props.put("batch.size",16384);

        //指定请求延时
        props.put("linger.ms",1);

        //指定缓存区内存大小
        props.put("buffer.memory",33554432);

        //设置key序列化
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

        //设置value序列化
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        //生产数据
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);

        for (int i =0; i < 50; i++){
            producer.send(new ProducerRecord<String, String>
                    ("kfk",Integer.toString(i),"hello world-" + i)).get();
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        producer.close();

    }
}

2 消费者

public class ConsumerTest {
    public static void main(String[] args) {

        Properties props = new Properties();

        props.put("bootstrap.servers", "10.249.53.1:9092");

        props.put("group.id", "kfk1");

        props.put("enable.auto.commit", "true");

        props.put("auto.commit.interval.ms", "1000");

        props.put("auto.offset.reset","earliest");

        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String,  String> consumer  =  new
                KafkaConsumer<String,  String>(props);

        // 订阅topic
        consumer.subscribe(Arrays.asList("kfk"));



        // 消费数据
        while (true) {

            ConsumerRecords<String,  String> records  =
                    consumer.poll(100);

            for (ConsumerRecord<String, String> record : records)

                System.out.printf("offset = %d, key = %s, value= %s%n", record.offset(), record.key(), record.value());
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/643509.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[PyTorch][chapter 40][数据增强]

前言&#xff1a; 深度学习对数据量要求非常大, 我们通常会遇到图像的数据集比较小,影响Train效果。 这个时候可以通过transformer 方法,增加图像的多样性,达到数据 增强的效果。 transformer 不会单独使用&#xff0c;通常和其它torch 其他类一起使用 transformer 常用方法…

可视管理 数字孪生智慧隧道一体化管控平台

前言 交通是国家发展的关键&#xff0c;四通八达的交通路线&#xff0c;对国家经济、社会等方面的发展起着至关重要的作用。 建设背景 随着社会经济的持续发展与城市化进程的平稳推进&#xff0c;我国公路工程规模逐步扩大&#xff0c;公路工程建设直接影响着城市未来发展与…

Vue 报错 error:0308010C:digital envelope routines::unsupported

症状 Vue 报错error:0308010C:digital envelope routines::unsupported 原因 出现这个错误是因为 node.js V17版本中最近发布的OpenSSL3.0, 而OpenSSL3.0对允许算法和密钥大小增加了严格的限制&#xff0c;可能会对生态系统造成一些影响. 解决方法 方法1 打开终端&#x…

React 应用 Effect Hook 函数式中操作生命周期

React Hook入门小案例 在函数式组件中使用state响应式数据给大家演示了最简单的 Hook操作 那么 我们继续 首先 Hook官方介绍 他没有破坏性是完全可选的 百分比兼容 也就说 我们一起的 类 class的方式也完全可以用 只要 react 16,8以上就可以使用 Hook本身不会影响你的react的理…

ESXi 7.0 U3m Hitachi (日立) 定制版 OEM Custom Installer CD

VMware ESXi 7.0 Update 3m - 领先的裸机 Hypervisor (All OEM Customized Installer CDs) ESXi 7.0 U3m Standard (标准版) ESXi 7.0 U3m Dell (戴尔) 定制版 OEM Custom Installer CD ESXi 7.0 U3m HPE (慧与) 定制版 OEM Custom Installer CD ESXi 7.0 U3m Lenovo (联想) 定…

4.单表查询

SQL句子中语法格式提示&#xff1a; 1.中括号&#xff08;[]&#xff09;中的内容为可选项&#xff1b; 2.[&#xff0c;...]表示&#xff0c;前面的内容可重复&#xff1b; 3.大括号&#xff08;{}&#xff09;和竖线&#xff08;|&#xff09;表示选择项&#xff0c;在选择…

chatgpt赋能python:Python怎么导入第三方库

Python怎么导入第三方库 如果你是Python开发者&#xff0c;你一定会使用各种第三方库来加速你的开发过程。这些库可能是Python标准库之外的代码&#xff0c;或由其他人编写的自定义代码。使用这些库可以让你的开发更高效、更易于管理&#xff0c;并且可以避免重复造轮子。 但…

RabbitMQ虚拟主机无法启动的原因和解决方案

RabbitMQ虚拟主机无法启动的原因和解决方案 摘要&#xff1a; RabbitMQ是一个广泛使用的开源消息代理系统&#xff0c;但在使用过程中可能会遇到虚拟主机无法启动的问题。本文将探讨可能导致该问题的原因&#xff0c;并提供相应的解决方案&#xff0c;以帮助读者解决RabbitMQ虚…

Learning C++ No.31 【线程库实战】

引言&#xff1a; 北京时间&#xff1a;2023/6/11/14:40&#xff0c;实训课中&#xff0c;实训场地有空调&#xff0c;除了凳子坐着不舒服之外&#xff0c;其它条件都挺好&#xff0c;主要是我带上了我自己的小键盘&#xff0c;并且教室可以充电&#xff0c;哈哈哈&#xff0c…

在做自动化测试之前你需要知道的

B站视频教程&#xff1a;Python自动化测试&#xff1a;7天练完这60个实战项目&#xff0c;年薪过35w。 什么是自动化测试&#xff1f; 做测试好几年了&#xff0c;真正学习和实践自动化测试一年&#xff0c;自我感觉这一个年中收获许多。一直想动笔写一篇文章分享自动化测试实践…

信息系统管理工程师-学习笔记1-信息化知识

考点1 信息与信息系统 信息的概念 信息的定义: 是有别与物质与能量的第三种东西,是对事物运动状态或存在方式的不确定行的描述 信息是按特定方式组织在一起的客体属性的集合,具有超出这些客体属性本身之外的价值两层次 1.本体论层次 : 纯客观的层次,只与客体本身的因素有关,与主…

python cv2的一些操作,如膨胀,画线,滤波等

目录 0. cv2简介1. 打开摄像头2. 画图,画线3. 滤波4. 获取角点5. 梯度边缘6. 图形匹配7. 形态学变化-膨胀腐蚀8. 二值化阈值10. 总结 0. cv2简介 在这里先简单介绍一下cv2吧。 cv2 是 OpenCV Python 库的主要模块&#xff0c;提供了许多图像处理和计算机视觉方面的函数和工具。…

vue2组件通信

父传子 传递静态或动态 Prop <!-- 传入静态值 --> <blog-post title"hai hai hai"></blog-post><!-- 传入变量值 --> <blog-post :title"info.title"></blog-post>传入一个对象的所有 property 数据 post: {id: 1…

进程管道:popen函数实例

基础知识 可能最简单的在两个程序之间传递数据的方法就是使用popen和pclose函数了。它们的原型如下所示&#xff1a; #include <stdio.h>FILE *popen(const char *command, const char *type);int pclose(FILE *stream); 1&#xff0e;popen函数 popen函数允许一个程…

因为Json,controller方法单参数 导致脑袋短路

对于单参数方法&#xff0c; 一直喜欢用parameter方式。今天不知道为啥&#xff0c;就想用Json方式&#xff0c;然后无法直接传递。各种自我怀疑&#xff0c;然后尝试。 突然醒悟过来&#xff0c;Json方式是key/value模式&#xff0c;单参数String类型&#xff0c;没有key。必…

TreeMap源码

介绍 如果我们希望Map可以保持key的大小顺序时&#xff0c;就需要利用TreeMap。底层使用了红黑树&#xff0c;左子树总小于root&#xff0c;右子树总大于root&#xff0c;具有很好的平衡性,操作速度达到log(n)。 TreeMap 相比于HashMap多实现了了NavigableMap接口&#xff08…

5. SpringCloudAlibab 集成 gateway

一、什么是 Spring Cloud Gateway 1、网关简介 网关作为流量的入口&#xff0c;常用的功能包括路由转发&#xff0c;权限校验&#xff0c;限流等等。 SpringCloud Gateway是 Spring Cloud 官方推出的第二代网关框架&#xff0c;定位取代 Netflix Zuul。相对Zuul来说&#xf…

【多线程】原子引用ABA问题

目录 一、代码示例二、执行结果截图三、说明四、AtomicStampedReference使用4.1 代码示例4.2 截图 一、代码示例 package com.learning.atomic;import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicReference; /*** Author wangyouhui* Description …

2023年软考-高级信息系统项目管理工程师考试大纲

高级信息系统项目管理工程师考试大纲 2023年软考高级信息系统项目管理工程师考试大纲已于2023年5月出版。您可以在 中国计算机技术职业资格网 上找到更多关于考试的信息 。 信息系统项目管理师是对从事信息系统项目管理工作的专业技术人员基本理论和实践能力的综合考核,该专业…

新手如何挑选一款合适的功率放大器?

ATA系列功率放大器是&#xff08;AB&#xff09;类功放&#xff0c;相比于甲类功率放大器&#xff0c;它小信号输入时效率更高&#xff0c;随着输出功率的增大&#xff0c;效率也增高&#xff0c;它的效率比以及保真度而言&#xff0c;都优于A类和B类功放。 因为具有这些优势&a…