kafka实现消息接受和发送

news2024/12/25 23:40:55

1、首先引入依赖

<dependency>
   <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

2、设置环境

spring:
  kafka:
    # 配置连接到服务端集群的配置项 ip:port,ip:port
    bootstrap-servers: 192.168.211.136:9092
    consumer:
      #      auto-commit-interval: 100
      auto-offset-reset: earliest
      # enable-auto-commit: false  # 进行手动提交 默认是自动提交
      #      enable-auto-commit: true
      group-id: test-consumer-group
      # 默认值即为字符串
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 默认值即为字符串
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    producer:
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 0
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

3、启动类实现

package com.jjw;

import com.jjw.producer.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
public class KafkaClientApplicaton {
    public static void main(String[] args) {
        SpringApplication.run(KafkaClientApplicaton.class, args);
    }

    @RestController
    class TestController {
        @Autowired
        private Producer producer;

        /**
         * @return
         * @throws Exception
         */
        @GetMapping("/send/{message}")
        public String sendM1(@PathVariable(name="message") String message) throws Exception {
            producer.send(message);
            return "ok";
        }
    }

    //1,关闭自动,2.设置手动提交模式 3 在消费者端 进行 确认
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        //配置手动提交offset
        factory.getContainerProperties().setAckMode((ContainerProperties.AckMode.MANUAL));
        return factory;
    }
}

4、生产者类实现

package com.jjw.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class Producer {
    @Autowired
    private KafkaTemplate kafkaTemplate;
  

    public void send(String message) throws Exception {
        //设置主题
        //设置消息内容
        kafkaTemplate.send("jjw", message);
    }
}

5、消费者类实现

package com.jjw.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class KafkaConsumer {
 
    @KafkaListener(topics = {"jjw"})
    public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) throws IOException {
        String value = record.value();
        System.out.println("接收到的消息:"+value);
        System.out.println("offset"+record.offset());
        //System.out.println("key"+record.key());
        // 手动提交
        //ack.acknowledge();
    }
}

6、运行启动类并进行测试即在浏览器中输入如下内容

在这里插入图片描述

7、消息接收方收到的内容

接收到的消息:jjwjjjwjjw
offset46

需要注意的是这个是在服务器上首先把kafka搭建好了的方式,可参考如下方式搭建

1、下载镜像

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka

2、创建容器

docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper

docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.211.136:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.211.136:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/711168.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第九十五天学习记录:C++核心:类和对象Ⅳ(五星重要)

C对象模型和this指针 成员变量和成员函数分开存储 在C中&#xff0c;类内的成员变量和成员函数分开存储只有非静态成员变量才属于类的对象上 #include<iostream> using namespace std;class Person {int m_A;//非静态成员变量 属于类的对象上static int m_B;//静态成…

Gradio库的Gallery模块介绍与select方法详解

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️ &#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

Flask boostrap实现图片视频上传下载展示

Flask boostrap实现图片视频上传下载展示 1、展示效果2、前端代码3、后端代码 1、展示效果 项目目录结构 2、前端代码 html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title>&l…

Go 程序是怎样跑起来的

Go 程序是怎样跑起来的 引入 我们从一个 helloworld 的例子开始 package mainimport "fmt"func main() {fmt.Println("hello world") }用 vim 要打开&#xff0c;输入命令&#xff1a; :%!xxd下面是输出 00000000:7061 636b 6167 6520 6d61 696e 0a0a…

tomcat概述,优化,多实例部署

目录 一、概述 二、三个容器 1、Web 容器&#xff1a; 2、Servlet 容器&#xff1a; 3、JSP 容器&#xff1a; 三、Tomcat 功能组件结构 四、优化 1、启动速度优化 2、配置参数优化 五、多实例部署 一、概述 Tomcat 是 Java 语言开发的&#xff0c;Tomcat 服务器是一…

ubuntu下安装docker遇到的问题

如果你还没有安装虚拟机&#xff0c;推荐一篇关于安装Ubuntu的详细教程&#xff1a; VMware虚拟机安装Ubuntu20.04详细图文教程https://blog.csdn.net/weixin_41805734/article/details/120698714首先&#xff0c;安装docker的前提是虚拟机能够联网&#xff0c;如果能看到右上…

官方文档中docker安装php插件xdebug

docker安装php插件 直接上代码常见问题如果查看系统类型如何查看xdebug合适的版本安装异常提示Cannot find autoconf. Please check your autoconf installation and the$PHP_AUTOCONF environment variable. Then, rerun this script.configure: error: in /tmp/pear/temp/pea…

Zookeeper 分布式锁

优质博文&#xff1a;IT-BLOG-CN 一、简介 随着公司业务的发展&#xff0c;单机应用已经无法支撑现有的用户量&#xff0c;之前采用synchronized和Lock锁已经无法满足分布式系统的要求。我们应用程序目前都会运行120台&#xff0c;节假日会扩容至240台&#xff0c;属于多JVM环…

领域事件驱动(二)聚合与聚合根的了解

上一章对值对象以及实体进行了一些简单的讲解&#xff1a; 聚合 聚合&#xff1a;我们把一些关联性极强、生命周期一致的实体、值对象放到一个聚合里。 聚合有一个聚合根和上下文边界&#xff0c;这个边界根据业务单一职责和高内聚原则&#xff0c;定义了聚合内部应该包含哪…

U-Boot移植 - 2_环境搭建和u-boot烧录启动

文章目录 1. 编译环境搭建1.1 交叉编译器下载1.2 交叉编译器安装 2. 编译原厂uboot3. 烧录开发板3.1 烧录到SD卡3.2 启动开发板 1. 编译环境搭建 1.1 交叉编译器下载 嵌入式Linux开发&#xff0c;程序编译通常在电脑端的Linux&#xff08;如虚拟机中的Ubuntu)下进行编译&…

阿里云ECS部署

nginx 安装nginx # 查看dnf版本 dnf --version# 查找是否是否安装 dnf search nginx# 安装nginx dnf install nginx# 启动nginx systemctl start nginx# 查看nginx运行状态 systemctl status nginx# 相当于开机自启&#xff08;重启服务器&#xff0c;nginx自动启动&#xff…

GoLand下载、安装

一、Goland下载 官方最新版本下载地址&#xff1a; Download GoLand: A Go IDE with extended support for JavaScript, TypeScript, and databases 其他版本下载&#xff1a; Other Versions - GoLand 二、安装过程 1.下载好goland-2021.1.1安装包后&#xff0c;双击运行安装包…

【ARIMA-WOA-CNN-LSTM】合差分自回归移动平均方法-鲸鱼优化-卷积神经网络-长短期记忆神经网络研究(Python代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

Redis主从复制模式4

哨兵模式 有个哨兵一直在巡逻&#xff0c;突然发现。老大挂了&#xff0c;小弟们会自动投票&#xff0c;从众小弟中选出新的老大。即自动版的谋权篡位。我们把这个过程称为哨兵模式 设置哨兵模式语法格式&#xff1a; sentinel monitor 被监控主机名/IP Redis服务端口 票数 关闭…

AutoCV第十一课:DL基础

目录 DL基础前言1. BP训练mnist2. 权重初始化理论分析总结 DL基础 前言 手写AI推出的全新保姆级从零手写自动驾驶CV课程&#xff0c;链接。记录下个人学习笔记&#xff0c;仅供自己参考。 本次课程我们来了解下 BP 反向传播和学习权重初始化相关知识 课程大纲可看下面的思维导…

java新特性stream

stream Java 8 是一个非常成功的版本&#xff0c;这个版本新增的Stream&#xff0c;配合同版本出现的 Lambda &#xff0c;给我们操作集合&#xff08;Collection&#xff09;提供了极大的便利。 Stream将要处理的元素集合看作一种流&#xff0c;在流的过程中&#xff0c;借助…

PMP常用英文术语缩写总结(文字版+表格版+图片版)

PMP常用英文术语缩写总结&#xff08;文字版表格版图片版&#xff09; 文字版 PMBOK Project Management Body of Knowledge 项目管理知识体系 PMI Project Management Institute 项目管理协会 PMO Project Management Office 项目管理办公室 PMIS Project Management Inf…

Spring Boot 中的服务注册是什么,原理,如何使用

Spring Boot 中的服务注册是什么&#xff0c;原理&#xff0c;如何使用 Spring Boot 是一个非常流行的 Java 后端框架&#xff0c;它提供了许多便捷的功能和工具&#xff0c;使得开发者可以更加高效地开发微服务应用。其中&#xff0c;服务注册是 Spring Boot 微服务架构中非常…

代码源 线段树模板

线段树1 思路&#xff1a; 我们需要维护的东西是序列的最小值和最小值个数 这道题没有修改操作&#xff0c;因此不考虑修改 然后考虑Pushup 最小值很简单&#xff0c;直接取min 最小值个数怎么维护呢&#xff1f;考虑这个区间需要维护的值如何从左右两个区间获得 如果左右…