使用微服务Spring Cloud集成Kafka实现异步通信(消费者)

news2024/11/6 3:01:00

1、本文架构

本文目标是使用微服务Spring Cloud集成Kafka实现异步通信。其中Kafka Server部署在Ubuntu虚拟机上,微服务部署在Windows 11系统上,Kafka Producer微服务和Kafka Consumer微服务分别注册到Eureka注册中心。Kafka Producer和Kafka Consumer之间通过Kafka Server实现异步通信。

出于便于测试的目的,我通过浏览器触发Kafka Producer发送消息,观察Kafka Consumer的后台是否打印出接收到的消息内容。

Ubuntu 上部署Kafka Server,详见博文:Ubuntu下Kafka安装及使用-CSDN博客

Eureka注册中心的搭建过程和完整代码,详见博文:微服务1:搭建微服务注册中心(命令行简易版,不使用IDE)-CSDN博客

Kafka Producer微服务的完整代码,详见博文:使用微服务Spring Cloud集成Kafka实现异步通信-CSDN博客

本文的重点是实现下图中的深蓝色部分:Kafka Consumer微服务。

2、创建Spring boot项目(Kafka Consumer微服务项目):

mvn archetype:generate -DgroupId=com.test -DartifactId=microservice-kafka-consumer -DarchetypeArtifactId=maven-archetype-quickstart

项目代码的完整目录如下图所示:

编辑pom.xml,添加依赖包:

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  
  <groupId>com.test</groupId>
  <artifactId>microservice-kafka-consumer</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>microservice-kafka-consumer</name>
  <url>http://maven.apache.org</url>
  
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.0.RELEASE</version>
    <relativePath/> 
  </parent>
  
  <dependencies>
  	<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>         
	 
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>


  </dependencies>
  
    <dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Hoxton.SR4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>               
    </dependencies>
  </dependencyManagement>
  
  <build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
  
</project>

编辑application.yml,配置kafka消费者:

consumer:
      #消费的主题
      topic: test-topic
      #消费者组id
      group-id: test-group
      #是否自动提交偏移量
      enable-auto-commit: true
      #提交偏移量的间隔-毫秒
      auto-commit-ms: 1000
      #客户端消费的会话超时时间-毫秒
      session-timeout-ms: 10000
      #实现DeSerializer接口的反序列化类键
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #实现DeSerializer接口的反序列化类值
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

server:
  port: 8030
spring:
  application:
    name: microservice-kafka-consumer
  kafka:
    bootstrap-servers: 192.168.23.131:9092
    consumer:
      group-id: test-group
      enable-auto-commit: true
      auto-commit-ms: 1000
      session-timeout-ms: 10000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8080/eureka/
  instance:
    prefer-ip-address: true            

App.java的完整代码如下:

package com.test;

import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.kafka.annotation.KafkaListener;


@SpringBootApplication
@EnableDiscoveryClient
public class App 
{
	
	  @KafkaListener(topics = "mydemo1")
    public void listen(String msg) throws Exception {
        System.out.println( "-----> Recv a msg: " + msg );
    }
    
    public static void main( String[] args ){
        System.out.println( "Hello World!" );
        SpringApplication.run(App.class, args);
    }
}

3、测试

在浏览器输入,触发Kafka Producer向Kafka Server发送消息:

http://localhost:8020/kafka/sendMsg?msg=测试消息testmsg

在Kafka Consumer的后台打印出收到的消息:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2181846.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ajax ( 是什么、URL、axios、HTTP、快速收集表单 )Day01

AJAX 一、Ajax是什么1.1名词解释1.1.1 服务器1.1.2 同步与异步1. 同步&#xff08;Synchronous&#xff09;2. 异步&#xff08;Asynchronous&#xff09;3. 异步 vs 同步 场景4. 异步在 Web 开发中的常见应用&#xff1a; 1.2 URL 统一资源定位符1.2.1 URL - 查询参数1.2.2 ax…

经典RCU锁原理及Linux内核实现

经典RCU锁原理及Linux内核实现 RCU锁原理 RCU锁第一个特点就是适用于读很多写很少的场景&#xff0c;那它和读写锁有什么区别呢&#xff1f;区别就是RCU锁读者完全不用加锁&#xff08;多个写者之间仍需要竞争锁&#xff09;&#xff0c;而读写锁&#xff08;不管是读优先、写…

https://www.aitoolpath.com/ 一个工具数据库,目前储存了有2000+各种工具。每日更新

AI 工具爆炸&#xff1f;别怕&#xff0c;这个网站帮你整理好了&#xff01; 哇塞&#xff0c;兄弟们&#xff01;AI 时代真的来了&#xff01;现在各种 AI 工具跟雨后春笋似的&#xff0c;噌噌噌地往外冒。AI 写作、AI 绘画、AI 代码生成……简直是要逆天啊&#xff01; 可是…

XSS | XSS 常用语句以及绕过思路

关注这个漏洞的其他相关笔记&#xff1a;XSS 漏洞 - 学习手册-CSDN博客 0x01&#xff1a;干货 - XSS 测试常用标签语句 0x0101&#xff1a;<a> 标签 <!-- 点击链接触发 - JavaScript 伪协议 --><a hrefjavascript:console.log(1)>XSS1</a> <!-- 字…

智能制造--EAP设备自动化程序

EAP是设备自动化程序&#xff08;Equipment Automation Program&#xff09;的缩写&#xff0c;他是一种用于控制制造设备进行自动化生产的系统。EAP系统与MES系统整合&#xff0c;校验产品信息&#xff0c;自动做账&#xff0c;同时收集产品生产过程中的制程数据和设备参数数据…

Spring MVC__入门

目录 一、SpringMVC简介1、什么是MVC2、什么是SpringMVC 二、Spring MVC实现原理2.1核心组件2.2工作流程 三、helloworld1、开发环境2、创建maven工程3、配置web.xml4、创建请求控制器5、创建springMVC的配置文件6、测试HelloWorld7、总结 一、SpringMVC简介 1、什么是MVC MV…

git 报错git: ‘remote-https‘ is not a git command. See ‘git --help‘.

报错内容 原因与解决方案 第一种情况&#xff1a;git路径错误 第一种很好解决&#xff0c;在环境变量中配置正确的git路径即可&#xff1b; 第二种情况 git缺少依赖 这个情况&#xff0c;网上提供了多种解决方案。但如果比较懒&#xff0c;可以直接把仓库地址的https改成ht…

【Kotlin基于selenium实现自动化测试】初识selenium以及搭建项目基本骨架(1)

导读大纲 1.1 Java: Selenium 首选语言1.2 配置一个强大的开发环境 1.1 Java: Selenium 首选语言 Java 是开发人员和测试人员进行自动化 Web 测试的首选 Java 和 Selenium 之间的协同作用受到各种因素的驱动,从而提高它们的有效性 为什么Java经常被认为是Selenium的首选语言 广…

记录一次出现循环依赖问题

具体的结构设计&#xff1a; 在上面的图片中&#xff1a; UnboundBlackVerifyChain类中继承了UnboundChain类。但是UnboundChain类中注入了下面三个类。 Scope(“prototype”) UnboundLinkFlowCheck类 Scope(“prototype”) UnboundUserNameCheck类 Scope(“prototype”) Un…

[linux 驱动]input输入子系统详解与实战

目录 1 描述 2 结构体 2.1 input_class 2.2 input_dev 2.4 input_event 2.4 input_dev_type 3 input接口 3.1 input_allocate_device 3.2 input_free_device 3.3 input_register_device 3.4 input_unregister_device 3.5 input_event 3.6 input_sync 3.7 input_se…

用网络分析仪测试功分器驻波的5个步骤

在射频系统中&#xff0c;功分器的驻波比直接关系到信号的稳定性和传输效率。本文将带您深入了解驻波比的测试方法和影响其结果的因素。 一、功分器驻波比 驻波(Voltage Standing Wave Ratio)&#xff0c;简称SWR或VSWR&#xff0c;是指频率相同、传输方向相反的两种波&#xf…

.NET Core 高性能并发编程

一、高性能大并发架构设计 .NET Core 是一个高性能、可扩展的开发框架&#xff0c;可以用于构建各种类型的应用程序&#xff0c;包括高性能大并发应用程序。为了设计和开发高性能大并发 .NET Core 应用程序&#xff0c;需要考虑以下几个方面&#xff1a; 1. 异步编程 异步编程…

最大正方形 Python题解

最大正方形 题目描述 在一个 n m n\times m nm 的只包含 0 0 0 和 1 1 1 的矩阵里找出一个不包含 0 0 0 的最大正方形&#xff0c;输出边长。 输入格式 输入文件第一行为两个整数 n , m ( 1 ≤ n , m ≤ 100 ) n,m(1\leq n,m\leq 100) n,m(1≤n,m≤100)&#xff0c;接…

养猪场饲料加工机械设备有哪些

养猪场饲料加工机械设备主要包括以下几类&#xff1a;1‌、粉碎机‌&#xff1a;主要用于将原料进行粉碎&#xff0c;以便与其他饲料原料混合均匀。常见的粉碎机有水滴式粉碎机和立式粉碎机两种&#xff0c;用户可以根据原料的特性选择适合的机型。2‌、搅拌机‌&#xff1a;用…

ONVIF、GB28181技术特点和使用场景分析

技术背景 好多开发者希望搞明白ONVIF和GB28181的区别和各自适合的场景&#xff0c;为什么大牛直播SDK只做了GB28181接入端&#xff0c;没有做ONVIF&#xff1f;本文就二者差别&#xff0c;做个大概的介绍。 ONVIF ONVIF&#xff08;Open Network Video Interface Forum&…

【Linux 23】线程池

文章目录 &#x1f308; 一、线程池的概念&#x1f308; 二、线程池的应用场景&#x1f308; 三、线程池的实现 &#x1f308; 一、线程池的概念 线程池 (thread pool) 是一种利用池化技术的线程使用模式。 虽然创建线程的代价比创建进程的要小很多&#xff0c;但小并不意味着…

Mysql高级篇(下)——日志

日志 一、日志概述二、日志弊端二、日志分类三、 各日志详情介绍1、慢查询日志&#xff08;Slow Query Log&#xff09;2、通用查询日志&#xff08;General Query Log&#xff09;3、错误日志&#xff08;Error Log&#xff09;4、二进制日志&#xff08;Binary Log&#xff0…

初识Linux · 进程等待

目录 前言&#xff1a; 进程等待是什么 为什么需要进程等待 进程等待都在做什么 前言&#xff1a; 通过上文的学习&#xff0c;我们了解了进程终止&#xff0c;知道终止是在干什么&#xff0c;终止的三种情况&#xff0c;以及有了退出码&#xff0c;错误码的概念&#xff…

基于大数据的学生体质健康信息系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…

图像数据增强albumentations之自然景色

一 背景 最近在做关于图像数据增强方面&#xff0c;发现albumentations这个包比较好用&#xff0c;在此学习一下如何使用API二 albumentations 安装 注意&#xff0c;注意&#xff0c;注意 python版本3.8 pip install -U albumentations三 API学习 1 模拟雨水 import os i…