kafka用java收发消息

news2025/1/11 7:14:09

用java客户端代码来对kafka收发消息
具体代码如下

package com.cool.interesting.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;

public class KafkaTest {

    private static final String BOOTSTRAP_SERVERS = "192.168.47.145:9092";
    private static final String TOPIC_NAME = "test";

    public static void main(String[] args) {
        // 生产者示例
        produceMessage();

        // 消费者示例
        consumeMessage();

        //从指定偏移量消费消息
        consumeOffsetMessage();
    }
    //生产者代码
    private static void produceMessage() {
    	Properties props = new Properties();
    	//acks是保证消息的发送机制,有以下几个值
    	//acks = 0:表示生产端发送消息后立即返回,不等待broker端的响应结果。通常此时生产端吞吐量最高,消息发送的可靠性最低。
    	//acks = 1: 表示leader副本成功写入就会响应Producer,而无需等待ISR(同步副本)集合中的其他副本写入成功。这种方案提供了适当的持久性,保证了一定的吞吐量。默认值即是1。
    	//acks = all或-1: 表示不仅要等leader副本成功写入,还要求ISR中的其他副本成功写入,才会响应Producer。这种方案提供了最高的持久性,但也提供了最差的吞吐量。
    	//调优建议:建议根据实际情况设置,如果要严格保证消息不丢失,请设置为all或-1;如果允许存在丢失,建议设置为1;一般不建议设为0,除非无所谓消息丢不丢失。props.put(ProducerConfig.ACKS_CONFIG,1);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        //key和value序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //其他配置参数详见org.apache.kafka.clients.producer.ProducerConfig类

        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                //异步发送
                Future<RecordMetadata> send = producer.send(new ProducerRecord<>(TOPIC_NAME,  message));
                System.out.println("Sent message: " + message);
            }
        }
    }
    //正常消费者代码
     private static void consumeMessage() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        //将订阅的topic绑定到一个消费者(这个group_id 是自己定义的)
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test99");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //订阅一个topic
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        while (true) {
            //设置kafak从broker拉取消息的超时时间
            // (这意味着 poll() 方法将在等待最多 2秒的时间内尝试从 Kafka 集群拉取消息,如果在超时时间内没有拉取到消息,将返回一个空的 ConsumerRecords 对象)
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received_message: " + record.value());
            }
        }

    }
    //指定偏移量开始消费
    private static void consumeOffsetMessage() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        //将订阅的topic绑定到一个消费者(这个group_id 是自己定义的)
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //订阅一个topic
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));
        //如果要指定偏移量,必须先poll一次,不然代码报错
        ConsumerRecords<String, String> poll = consumer.poll(0);
        System.out.println("poll:"+poll.isEmpty());
        //创建一个分区(参数为topic_name,和分区序号)
        TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
        // 指定要消费的偏移量
        long offset = 3;
        //从指定偏移量开始消息消息
        consumer.seek(topicPartition, offset);
        while (true) {
            //设置kafka从broker拉取消息的超时时间
            // (这意味着 poll() 方法将在等待最多 2秒的时间内尝试从 Kafka 集群拉取消息,如果在超时时间内没有拉取到消息,将返回一个空的 ConsumerRecords 对象)
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received_message: " + record.value());
            }
        }
    }
 }

安装kafka的可视化工具:offset explorer
offset explorer 是一个用于查看和管理 Kafka 消费者组的工具,它允许你检查消费者组的偏移量(offset),并且可以查看每个消费者组在每个分区上的偏移量情况。这对于监控和调试 Kafka 消费者组非常有用。
下载地址为:https://www.kafkatool.com/download.html
如下图所示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1679664.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024042001-计算机网络 - 物理层

计算机网络 - 物理层 计算机网络 - 物理层 通信方式带通调制 通信方式 根据信息在传输线上的传送方向&#xff0c;分为以下三种通信方式&#xff1a; 单工通信&#xff1a;单向传输半双工通信&#xff1a;双向交替传输全双工通信&#xff1a;双向同时传输 带通调制 模拟信号…

程序验证之Dafny--证明霍尔逻辑的半自动化利器

一、What is Dafny?【来自官网介绍 Dafny 】 1)介绍 Dafny 是一种支持验证的编程语言&#xff0c;配备了一个静态程序验证器。 通过将复杂的自动推理与熟悉的编程习语和工具相结合&#xff0c;使开发者能够编写可证明正确的代码&#xff08;相对于 {P}&#xff33;{Q} 这种…

数据结构(C):树的概念和二叉树初见

目录 &#x1f37a;0.前言 1.树概念及结构 2.认识一棵树 3.树的表示 3.1树在实际中的运用&#xff08;表示文件系统的目录树结构&#xff09; 4.二叉树 4.1特殊的二叉树 4.2二叉树的性质 &#x1f48e;5.结束语 &#x1f37a;0.前言 言C之言&#xff0c;聊C之识&…

OpenAI 震撼发布:GPT-4o免费,实时语音视频交互开启新纪元

OpenAI 震撼发布&#xff1a;GPT-4o免费&#xff0c;实时语音视频交互开启新纪元 在仅仅问世17个月后&#xff0c;OpenAI 研制出了仿佛科幻片中登场的超级人工智能——GPT-4o&#xff0c;而且所有人都可以完全免费使用&#xff0c;让这个科技界的巨浪让人震撼无比&#xff01;…

JSP技术

前言&#xff1a;虽然现在Vue盛行&#xff0c;但是对于初学者和一些项目我们还是采用jsp技术来编写前端代码&#xff0c;一些老的项目也需要jsp去维护。就像老师说的法国的银行系统还是采用COBOL这种古老语言。本篇文章主要介绍jsp技术。 目录 一、概述 &#xff08;1&#…

Rust构造JSON和解析JSON

目录 一、Rust构造JSON和解析JSON 二、知识点 serde_json JSON 一、Rust构造JSON和解析JSON 添加依赖项 cargo add serde-json 代码&#xff1a; use serde_json::{Result, Value};fn main() -> Result<()>{//构造json结构 cpu_loadlet data r#"{"…

docker安装minio附带图片

1.拉镜像 docker pull minio/minio 2.创建挂载点目录 mkdir -p /usr/local/minio/config mkdir -p /usr/local/minio/data 3.创建minio容器 docker run \ -p 19000:9000 \ -p 9090:9090 \ --nethost \ --name minio \ -d --restartalways \ -e "MINIO_ACCESS_KEYmini…

【前端】CSS基础(4)

文章目录 前言1、CSS常用属性1.1 文本属性1.1.1 文本对齐1.1.2 文本装饰1.1.3 文本缩进1.1.5 行高 前言 这篇博客仅仅是对CSS的基本结构进行了一些说明&#xff0c;关于CSS的更多讲解以及HTML、Javascript部分的讲解可以关注一下下面的专栏&#xff0c;会持续更新的。 链接&…

第 397 场 LeetCode 周赛题解

A 两个字符串的排列差 模拟&#xff1a;遍历 s s s 记录各字符出现的位置&#xff0c;然后遍历 t t t 计算排列差 class Solution {public:int findPermutationDifference(string s, string t) {int n s.size();vector<int> loc(26);for (int i 0; i < n; i)loc[s…

一种基于电场连续性的高压MOSFET紧凑模型,用于精确表征电容特性

来源&#xff1a;A Compact Model of High-Voltage MOSFET Based on Electric Field Continuity for Accurate Characterization of Capacitance&#xff08;TED 24年&#xff09; 摘要 本文提出了一种新的高压MOSFET&#xff08;HV MOS&#xff09;紧凑模型&#xff0c;以消…

每天Get一个小技巧:用DolphinScheduler实现隔几天调度

转载自tuoluzhe8521 这篇小短文将教会你如何使用Apache DolphinScheduler实现隔几天调度&#xff0c;有此需求的小伙伴学起来&#xff01; 1 场景分析 DolphinScheduler定时器模块-定时调度时每3秒|每3分钟|每3天这种定时&#xff0c;不能够跨分钟&#xff0c;跨小时&#x…

在做题中学习(59):除自身以为数组的乘积

238. 除自身以外数组的乘积 - 力扣&#xff08;LeetCode&#xff09; 解法&#xff1a;前缀积和后缀积 思路&#xff1a;answer中的每一个元素都是除自己以外所有元素的和。那就处理一个前缀积数组和后缀积数组。 而前缀积(f[i])是&#xff1a;[0,i-1]所有元素的乘积 后缀…

6. 网络编程-网络io与select、poll,epoll

https://0voice.com/uiwebsite/html/courses/v13.7.html 首先看看这个学习计划 网络、网络编程、网络原理基础组件&#xff0c;20个。中间件 Redis ,MySQL&#xff0c;Kafka&#xff0c;RPC&#xff0c;Nginx开源框架&#xff08;解决方案&#xff09;业务开发(工程师开发&am…

【C语言】5.C语言函数(2)

文章目录 7.嵌套调⽤和链式访问7.1 嵌套调⽤7.2 链式访问 8.函数的声明和定义8.1 单个⽂件8.2 多个⽂件8.3 static 和 extern8.3.1 static 修饰局部变量8.3.2 static 修饰全局变量8.3.3 static 修饰函数 7.嵌套调⽤和链式访问 7.1 嵌套调⽤ 嵌套调用就是函数之间的互相调用。…

震撼发布!GPT-4o 上线!

5 月 14日凌晨一点&#xff0c;OpenAI 发布了 GPT-4o&#xff01; 新模型的功能简单概括就是&#xff1a;更快、更智能、更像人类。 秉承着持续更新的态度&#xff0c;Hulu AI 快速接入 GPT-4o 啦&#xff01; 继 5 月份上线 Suno 之后&#xff0c;这次是 Hulu AI 的又一重大…

Android 应用开发-实现将公共存储空间内的文件复制到应用的私用存储空间中

一、前言 几个月前&#xff0c;我用Android Studio给公司销售部门的同事开发了一款手机app&#xff0c;让同事们用自己的手机就能进行商品的扫码盘点操作&#xff0c;帮他们提高了工作效率&#xff0c;他们用了一段时间&#xff0c;反映还不错。不过前几天&#xff0c;销售部门…

Mysql 事务隔离级别

前言 在数据库管理系统中&#xff0c;事务&#xff08;Transaction&#xff09;是保证数据一致性和完整性的重要机制。在并发环境下&#xff0c;多个事务同时操作相同的数据可能会引发各种问题&#xff0c;如脏读、不可重复读、幻读等。为了解决这些问题&#xff0c;MySQL提供…

腾讯开源混元DiT文生图模型,消费级单卡可推理

节前&#xff0c;我们组织了一场算法岗技术&面试讨论会&#xff0c;邀请了一些互联网大厂朋友、今年参加社招和校招面试的同学。 针对大模型技术趋势、大模型落地项目经验分享、新手如何入门算法岗、该如何准备面试攻略、面试常考点等热门话题进行了深入的讨论。 总结链接…

script标签以及defer和async属性

1. <script>标签 将JavaScript代码嵌入到HTML中主要方式是使用<script>元素。 使用<script>的方式有两种&#xff1a; &#xff08;1&#xff09;直接在网页中嵌入JavaScript代码&#xff1a; <script>function sayHi() {console.log("Hi"…

基于springboot+vue+Mysql的大学生社团活动平台

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…