Kafka应用Demo:按主题订阅消费消息

news2024/11/25 4:26:39

安装环境

  Kafka安装可参考官方网站的指导(https://kafka.apache.org/quickstart), 按步骤解压压缩包,修改配置。然后再启动zookeeper和kafka-server即可。

  需要注意的一点:如果是在VMware虚拟机上启动的kafka, 需要修改一下server.properties配置文件,增加如下配置:
在这里插入图片描述
  advertised.listener指定访问kafka的IP和端口,IP设置为虚拟机暴露给外部访问的IP。通过本地代码连接kafka,需要使用该配置

生产者代码样例

public class KafkaProducerService {
    private static final String NEO_TOPIC = "elon-topic";
    private KafkaProducer<String, String> producer = null;

    public KafkaProducerService() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.5.128:9092");
        props.put("acks", "0");
        props.put("group.id", "1111");
        props.put("retries", "2");
        //设置key和value序列化方式
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);

        //生产者实例
        producer = new KafkaProducer<>(props);
    }

    /**
     * 外部调用的发消息接口
     */
    public void sendMessage() {
        for (int i = 0; i < 10; ++i) {
            int p = i % 2;
            ProducerRecord<String, String> record = new ProducerRecord(NEO_TOPIC, p, "neo", JSON.toJSONString(i));
            producer.send(record);
        }
    }
}

 发送消息时,将10个数据分别发送到0分区和1分区。

消费者代码样例

public class KafkaConsumerService {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);
    private static final String NEO_TOPIC = "elon-topic";

    Properties properties = new Properties();
    private KafkaConsumer consumer = null;

    public KafkaConsumerService() {
        properties.put("bootstrap.servers","192.168.5.128:9092");  // 指定 Broker
        properties.put("group.id", "neo1");              // 指定消费组群 ID
        properties.put("max.poll.records", "5");
        properties.put("enable.auto.commit", "false");
        properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象
        properties.put("value.deserializer", StringDeserializer.class);  // 将 value 的字节数组转成 Java 对象

        consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singletonList(NEO_TOPIC));  // 订阅主题 order-events
        new Thread(this::receiveMessage).start();
    }

    public void receiveMessage() {
        try {
            while (true) {
                synchronized (this) {
                    ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                    LOGGER.info("Fetch record num:{}", records.count());
                    for (ConsumerRecord<String,String> record: records) {
                        String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]",
                                record.topic(), record.partition(), record.offset(), record.key(), record.value());
                        LOGGER.info("Received:" + info);
                        Thread.sleep(100);
                    }
                    consumer.commitSync();
                }
            }
        } catch (Exception e){

        } finally {
            consumer.close();
        }
    }

 消费者按主题订阅。从打印的结果可以看到,消费者循环从topic下取出各个分区的消息依次消费。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1649084.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JavaEE技术之MySql高级-搭建主从复制(主从同步原理、一主多从配置)

文章目录 MySQL主从同步1、MySQL主从同步原理2、一主多从配置2.1、准备主服务器2.2、准备从服务器2.3、启动主从同步2.4、实现主从同步2.5、停止和重置2.6、常见问题问题1问题2 MySQL主从同步 1、MySQL主从同步原理 基本原理&#xff1a; slave会从master读取binlog来进行数据…

python中如何遍历字典

1. 遍历字典的键key ① >>> d{list:[1, 2, 3],1:123,111:python3,tuple:(4, 5, 6)} >>> for key in d:print(str(key):str(d[key])) list:[1, 2, 3] 1:123 111:python3 tuple:(4, 5, 6) ② >>> d{list:[1, 2, 3],1:123,111:python3,tuple:(4, 5, 6…

书生·浦语大模型实战营之手把手带你评测 Llama 3 能力(OpenCompass 版)

书生浦语大模型实战营之手把手带你评测 Llama 3 能力&#xff08;OpenCompass 版&#xff09; 环境配置 conda create -n llama3 python3.10 pytorch torchvision pytorch-cuda -c nvidia -c pytorch -y conda activate llama3conda install git git-lfs install✨下载 Llama3…

HP Z620 服务器打开VTx虚拟技术

在使用Virtual Box的时候&#xff0c;虚拟主机启动报错&#xff1a;提示需要VTx。于是到bios里面去设置VTx。 这里有个小坑&#xff0c;就是HP 的bios配置里面&#xff0c;VTx不在常规的“System Configuration”、“Advanced”等地方&#xff0c;而是在“Security”菜单里&…

nvcc: command not found

nvcc: command not found nvcc命令是 NVIDIA CUDA 编译器&#xff0c;就类似于gcc是c语言的编译器&#xff0c;用于编译 CUDA 代码并生成 GPU 可执行文件。由于程序是要经过编译器编程成可执行的二进制文件&#xff0c;而cuda程序有两种代码&#xff0c;一种是运行在CPU上的ho…

营销5.0时代,企业的痛如何解?

进入营销5.0阶段之后&#xff0c;许多企业都需解决连接客户效能低下的问题。针对这个问题&#xff0c;产品经理、软件开发公司包括个人开发者&#xff0c;要怎么找到有效的“解药”&#xff1f; 营销不仅每年都在变化&#xff0c;甚至每天都在变化。 ——现代营销学之父&…

js实现json数据可编辑

背景 项目中有低代码平台&#xff0c;由于历史脏数据和非同步编辑的问题&#xff0c;偶尔会出现数据错乱的问题&#xff0c;希望有一个快捷的方式修改数据 之前在用Formily的时候有注意到designable/react 里面的json数据编辑功能非常不错如果能应用到项目里就完美了 design…

《架构风清扬-Java面试系列第29讲》聊聊DelayQueue的使用场景

DelayQueue是BlockingQueue接口的一个实现类之一 这个属于基础性问题&#xff0c;老规矩&#xff0c;我们将从使用场景和代码示例来进行讲解 来&#xff0c;思考片刻&#xff0c;给出你的答案 1&#xff0c;使用场景 实现&#xff1a;延迟队列&#xff0c;其中元素只有在其预定…

图片编辑工具-Gimp

一、前言 GIMP&#xff08;GNU Image Manipulation Program&#xff09;是一款免费开源的图像编辑软件&#xff0c;具有功能强大和跨平台的特性。 GIMP作为一个图像编辑器&#xff0c;它提供了广泛的图像处理功能&#xff0c;包括但不限于照片修饰、图像合成以及创建艺术作品…

Day28:ElasticSearch入门、Spring整合ES、开发社区搜索功能

ElasticSearch入门 Elasticsearch简介 一个分布式的、Restful风格的搜索引擎。支持对各种类型的数据的检索&#xff08;非结构化的也可以&#xff09;。搜索速度快&#xff0c;可以提供实时的搜索服务。便于水平扩展&#xff08;集群式部署&#xff09;&#xff0c;每秒可以处…

练习项目后端代码解析注解篇(annotation)

前言 本来想从接口处入手的&#xff0c;但是一下看到接口里几十个方法&#xff0c;眼睛有点抗拒&#xff0c;想想还是先看作者写的自定义注解吧。 项目里有三个自定义注解&#xff1a; 分别是AccessLimit注解、OperationLogger注解、VisitLogger注解 AccessLimit注解 这是一…

为什么下载卡在idealTree:NodeJS: sill idealTree buildDeps

可能使用的是npm config set registry https://registry.npm.taobao.org而这个镜像文件已经过期了 解决方法如下&#xff1a; 先使用 npm cache clean --force 清除缓存 再切换镜像源 再使用npm config get registry 进行查看是否换源成功 再使用 npm install -g vue/cli 就…

数据结构算法题day01

积累基本的写代码方式&#xff0c;其实都大同小异 王道、天勤、算法1800题、中选择 【day01】在带头节点的链表中&#xff0c;删除所有值为X的结点&#xff0c;并释放空间。 假设值为X的结点不唯一&#xff0c;试编写算法一实现上述操作。算法思路&#xff1a; a.首先找到x结点…

加速乐 js解混淆 __jsl_clearance_s生成

提示&#xff01;本文章仅供学习交流&#xff0c;严禁用于任何商业和非法用途&#xff0c;未经许可禁止转载&#xff0c;禁止任何修改后二次传播&#xff0c;如有侵权&#xff0c;可联系本文作者删除&#xff01; 目标网站 aHR0cHM6Ly9jcmVkaXQuaGVmZWkuZ292LmNuL2NyZWRpdC13…

nginx 启动,查看,停止

nginx 启动&#xff0c;查看&#xff0c;停止 启动 start nginx 查看是否启动成功 tasklist | findstr nginx 停止 nginx -s stop 测试配置文件的语法是否有误 nginx -t 重启nginx nginx-s reload

分享天某云对象存储开发的体验

最近体验了天某云对象存储的功能&#xff0c;作为一名资深开发者&#xff0c;开发体验差强人意&#xff0c;与阿里云存在一定的差距。 首先在开发文档上居然没有基于nodejs的代码示例&#xff0c;只有java,c#,go等的代码示例&#xff0c;虽然有javascript的&#xff0c;但那也只…

面试经验分享 | 蓝队面试经验

关于蓝队面试经验 1.自我介绍能力 重要性 为什么将自我介绍能力放在第一位&#xff0c;实际上自我介绍才是面试中最重要的一点&#xff0c;因为护网面试并没有确定的题目&#xff0c;让面试官去提问 更多是的和面试官的一种 “交谈” &#xff0c;面试的难易程度也自然就取决…

Kubernetes 教程:在 Containerd 容器中使用 GPU

原文链接:Kubernetes 教程:在 Containerd 容器中使用 GPU 云原生实验室本文介绍了如何在使用 Containerd 作为运行时的 Kubernetes 集群中使用 GPU 资源。https://fuckcloudnative.io/posts/add-nvidia-gpu-support-to-k8s-with-containerd/ 前两天闹得沸沸扬扬的事件不知道…

C++ | Leetcode C++题解之第71题简化路径

题目&#xff1a; 题解&#xff1a; class Solution { public:string simplifyPath(string path) {auto split [](const string& s, char delim) -> vector<string> {vector<string> ans;string cur;for (char ch: s) {if (ch delim) {ans.push_back(mov…