使用微服务Spring Cloud集成Kafka实现异步通信

news2025/1/13 9:46:20

在微服务架构中,使用Spring Cloud集成Apache Kafka来实现异步通信是一种常见且高效的做法。Kafka作为一个分布式流处理平台,能够处理高吞吐量的数据,非常适合用于微服务之间的消息传递。

微服务之间的通信方式包括同步通信和异步通信。

1)同步通信:通常通过HTTP RESTful API或RPC(远程过程调用)实现。服务消费者通过发送HTTP请求到服务提供者,服务提供者处理请求后返回响应。这种方式简单直接,但可能会受到网络延迟和并发量的影响。

同步通信的实现代码参见博文:微服务3:微服务间接口远程调用(同步通信方式)-CSDN博客

2)异步通信:通过消息队列(如RabbitMQ、Kafka等)实现。服务消费者将消息发送到队列中,服务提供者从队列中拉取消息并进行处理。这种方式实现了服务之间的解耦,提高了系统的可扩展性和容错性。但也需要考虑消息的顺序性、一致性和可靠性等问题。

1、本文目标

本文的目标是使用微服务Spring Cloud集成Kafka实现异步通信。本文实现了一个简单的Kafka Producer微服务,连接至部署再Ubuntu系统上的Kafka Server,同时在Ubuntu通过命令行终端启动一个监听的消费者,当通过浏览器测试接口想Kafka Producer微服务发送一条消息,Kafka Producer微服务即刻将该消息发送至Ubuntu系统上的Kafka Server,同时在Kafka consumer终端上可收到并显示出该消息。具体系统架构如下图所示。

部署Kafka Server和Kafka consumer,参见博文:Ubuntu下Kafka安装及使用-CSDN博客

Eureka注册中心的实现,参见博文:

微服务1:搭建微服务注册中心(命令行简易版,不使用IDE)-CSDN博客

2、创建Kafka Producer

mvn archetype:generate -DgroupId=com.test -DartifactId=microservice-kafka -DarchetypeArtifactId=maven-archetype-quickstart

完整代码的目录如下:

编辑pom.xml,添加依赖包:

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  
  <groupId>com.test</groupId>
  <artifactId>microservice-kafka</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>microservice-kafka</name>
  <url>http://maven.apache.org</url>
  
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.0.RELEASE</version>
    <relativePath/> 
  </parent>
  
  <dependencies>
  	<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>         
	 
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>


  </dependencies>
  
    <dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Hoxton.SR4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>               
    </dependencies>
  </dependencyManagement>
  
  <build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
  
</project>

编辑application.yml,配置kafka:

bootstrap-servers: 192.168.23.131:9092其中192.168.23.131是Kafka Server的IP地址。

server:
  port: 8020
spring:
  application:
    name: microservice-kafka
  kafka:
    bootstrap-servers: 192.168.23.131:9092
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8080/eureka/
  instance:
    prefer-ip-address: true            

App.java的完整代码如下:

package com.test;

import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;


@SpringBootApplication
@EnableDiscoveryClient
public class App 
{
    public static void main( String[] args )
    {
        System.out.println( "Hello World!" );
        SpringApplication.run(App.class, args);
    }
}

KafkaController.java的完整代码如下:

package com.test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.kafka.core.*;



@RequestMapping("/kafka")
@RestController
public class KafkaController {
	
	  @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
 
    @GetMapping("sendMsg")
    public String helloProducer(String msg){
        kafkaTemplate.send("mydemo1",msg);
        return "ok";
    }

}




启动Kafka Producer 和Eureka

mvn spring-boot:run

3、启动Kafka Server及Consumer

bin/kafka-server-start.sh config/server.properties&

创建主题

./bin/kafka-topics.sh --create --bootstrap-server demo1:9092 --replication-factor 1 --partitions 1 --topic mydemo1

在命令行终端启动消费者

bin/kafka-console-consumer.sh --bootstrap-server demo1:9092 --topic mydemo1

4、浏览器测试

在浏览器输入:

http://localhost:8020/kafka/sendMsg?msg=测试消息testmsg

此时在Ubuntu的Consumer终端可以看到从浏览器输入的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2178740.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

GPU参数指标

以英伟达的A800卡为例&#xff0c;简单聊聊GPU卡的核心参数指标&#xff0c;A800的核心指标主要有5个&#xff0c;为算力、显存大小、显存带宽、功耗情况和卡间互联速率。 性能&#xff1a;则可以理解为货车对不同货物类型的马力大小&#xff0c;决定能“拉动”多少重量的货&…

实用工具推荐---- PDF 转换

直接上链接&#xff1a;爱PDF |面向 PDF 爱好者的在线 PDF 工具 (ilovepdf.com) 主要功能如下&#xff1a; 全免费&#xff01;&#xff01;&#xff01;&#xff01;

什么是 Apache Ingress

Apache Ingress 主要用于管理来自外部的 HTTP 和 HTTPS 流量&#xff0c;并将其路由到合适的 Kubernetes 服务。 容器化与 Kubernetes 是现代云原生应用程序的基础。Kubernetes 的主要职责是管理容器集群&#xff0c;确保它们的高可用性和可扩展性&#xff0c;同时还提供自动化…

httpsok-v1.17.0-SSL通配符证书自动续签

&#x1f525;httpsok-v1.17.0-SSL通配符证书自动续签 介绍 httpsok 是一个便捷的 HTTPS 证书自动续签工具&#xff0c;基于全新的设计理念&#xff0c;专为 Nginx 、OpenResty 服务器设计。已服务众多中小企业&#xff0c;稳定、安全、可靠。 一行命令&#xff0c;一分钟轻…

Java中使用接口实现回调函数的详解与示例

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐&#xff1a;「storm…

【2025】springboot基于微信小程序记账本的设计与实现(源码+文档+调试+答疑)

文章目录 前言一、主要技术&#xff1f;二、项目内容1.整体介绍&#xff08;示范&#xff09;2.运行截图3.系统测试 总结更多项目 前言 时代在飞速进步&#xff0c;每个行业都在努力发展现在先进技术&#xff0c;通过这些先进的技术来提高自己的水平和优势&#xff0c;记账本小…

【游戏分组】

题目来源 from itertools import combinations def get_input(): """获取输入的整数列表。""" return list(map(int, input("请输入10个整数&#xff08;用空格分隔&#xff09;: ").split())) def get_min_difference(arr): &q…

OpenCV C++霍夫圆查找

OpenCV 中的霍夫圆检测基于 霍夫变换 (Hough Transform)&#xff0c;它是一种从边缘图像中识别几何形状的算法。霍夫圆检测是专门用于检测图像中的圆形形状的。它通过将图像中的每个像素映射到可能的圆参数空间&#xff0c;来确定哪些像素符合圆形状。 1. 霍夫变换的原理 霍夫…

【韩顺平Java笔记】第3章:变量

只记录我觉得重点的&#xff0c;自用&#xff0c;如果有漏的请自己看视频 文章目录 33. 内容梳理34. 变量原理34.1 为什么需要变量35. 变量概念35.1 概念35.2 变量使用的基本步骤36. 变量入门36.1 变量使用入门案例 37. 变量细节37.1 变量使用注意事项 38. 加号使用38.1 程序中…

身份证号、定位信息等个人信息敏感性判定解析

关于身份证号号码以及精确定位信息是否是敏感个人信息的疑问一直以来不少合规安全从业者有疑惑&#xff0c;本文来自于《数安标准强基助力计划 》作者为指南和标准的起草者&#xff0c;其观点具有一定的权威性&#xff0c;一下为内容摘要&#xff0c;以供大家学习和解惑&#x…

【sourceTree问题】拉取提交的时候需要频繁输入账号密码

用sourceTree进行代码管理的时候会出现一直让输入账号密码的问题&#xff0c;烦不胜烦&#xff0c;可以点击【设置】 → 【编辑配置文件...】打开配置文件&#xff1a; 在配置文件里找到url&#xff0c;把url里面的网址修改为&#xff1a; http://username:passwordxxxxx/xx…

LeetCode 热题 100 回顾7

干货分享&#xff0c;感谢您的阅读&#xff01;原文见&#xff1a;LeetCode 热题 100 回顾_力code热题100-CSDN博客 一、哈希部分 1.两数之和 &#xff08;简单&#xff09; 题目描述 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标…

阿里云 SAE Web:百毫秒高弹性的实时事件中心的架构和挑战

作者&#xff1a;胡志广(独鳌) 背景 Serverless 应用引擎 SAE 事件中心主要面向早期的 SAE 控制台只有针对于应用维度的事件&#xff0c;这个事件是 K8s 原生的事件&#xff0c;其实绝大多数的用户并不会关心&#xff0c;同时也可能看不懂。而事件中心&#xff0c;是希望能够…

实验3 使用Wiresharkl观察ping命令的工作过程

1、实验目的&#xff1a; 了解嗅探器工具Ethereal&#xff08;Wireshark&#xff09;的下载和安装方法&#xff1b; 掌握Ethereal&#xff08;Wireshark&#xff09;的简单使用方法&#xff1b; 了解抓包结果的分析方法&#xff08;最好是把菜单中所有的菜单命令都尝试一下&…

SpringCloud-Alibaba第二代微服务快速入门

1.简介 Spring Cloud Alibaba其实是阿里的微服务解决方案&#xff0c;是阿里巴巴结合自身微服务实践,开源的微服务全家桶&#xff0c;在Spring Cloud项目中孵化成为Spring Cloud的子项目。第一代的Spring Cloud标准中很多组件已经停更,如&#xff1a;Eureak,zuul等。所以Sprin…

复刻谷歌爆火的AI生成播客应用:高效方案分享

随着Google Illuminate等产品的推出,将复杂文档转换为音频播客的需求日益增长。MIT最近开源的PDF2Audio项目为此提供了一个开放且高效的解决方案。本文将详细介绍如何复刻这一功能,并分享一个适用于AI代理和播客创业者的高效方案。 一、背景 近年来,将文本内容转换为音频播…

第一课:什么是易语言?

易语言是一门计算机程序设计语言&#xff0c;也通常代指与之对应的集成开发环境&#xff0c;其特点是通过汉语进行编程。易语言的创始人是吴涛。早期版本的名字为E语言。 简单的说&#xff0c;易语言是一个小型的软件开发系统。(就是写程序用的&#xff0c;够通俗吧&#xff1f…

Vue76 编程式路由导航

笔记 作用&#xff1a;不借助<router-link> 实现路由跳转&#xff0c;让路由跳转更加灵活 具体编码&#xff1a; //$router的两个API this.$router.push({name:xiangqing,params:{id:xxx,title:xxx} })this.$router.replace({name:xiangqing,params:{id:xxx,title:xxx} …

LeetCode 热题 100 回顾19

干货分享&#xff0c;感谢您的阅读&#xff01;原文见&#xff1a;LeetCode 热题 100 回顾_力code热题100-CSDN博客 一、哈希部分 1.两数之和 &#xff08;简单&#xff09; 题目描述 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标…

buuctf_藏藏藏

题目&#xff1a;没什么&#xff0c;一张图片&#xff0c;叫“藏藏藏.jpg” winhex&#xff0c;隐写看了&#xff0c;没什么结果 上kali&#xff0c;为了方便&#xff0c;我将图片命名为0.jpg。本文将讲述我遇到的俩个难点 1.kali未配置打不出中文字符 2.kali打不开docx 下面是…