Kafka在企业级应用中的实践

news2025/2/22 20:25:16
alt

前言

前面说了很多Kafka的性能优点,有些童鞋要说了,这Kafka在企业开发或者企业级应用中要怎么用呢?今天咱们就来简单探究一下。

1、 使用 Kafka 进行消息的异步处理

Kafka 提供了一个可靠的消息传递机制,使得企业能够将不同组件之间的通信解耦,实现高效的异步处理。在企业级应用中,可以通过以下步骤来使用 Kafka 进行消息的异步处理:

  1. 创建一个或多个主题(topic)用于存储消息。主题可以按照业务逻辑进行划分,每个主题可以有多个分区(partition)。
  2. 生产者(Producer)将消息发送到指定的主题中。
  3. 消费者(Consumer)从主题订阅消息,并将其处理逻辑与生产者解耦。消费者可以根据需求选择不同的消费模式,如订阅所有消息或只订阅特定分区的消息。
  4. 消费者可以将处理结果发送到其他系统,或者将消息转发到其他 Kafka 主题中进行进一步处理。

通过使用 Kafka 进行消息的异步处理,企业可以实现高效、可伸缩的系统架构,并且降低各个组件之间的耦合程度。

2、 Kafka 的消息转发和备份机制

Kafka 借助其分布式的架构和复制机制,实现了消息的转发和备份,确保数据的可靠性和持久性:

  1. 消息转发:Kafka 通过将消息分发到多个分区来实现消息的转发,每个分区可以由多个消费者订阅。分区之间的消息转发通过消费者群组协调器(Consumer Group Coordinator)来实现,协调器负责将消息均匀地分发给消费者。
  2. 备份机制:Kafka 将每个分区的消息进行副本(Replica)备份,并将副本分布在不同的 Broker 节点上。如果某个 Broker 节点发生故障,可以通过副本在其他节点上进行数据的恢复,确保数据的可靠性和持久性。

通过消息转发和备份机制,Kafka 实现了高可用性和数据冗余,保证了数据流的可靠性和持久性。

3、 Kafka Connect 和 Kafka Streams 的用途和特性

  1. Kafka Connect:是 Kafka 提供的一个工具,用于将外部系统和 Kafka 进行连接。通过 Kafka Connect,企业可以轻松地实现数据的导入和导出,与各种数据源(如数据库、文件系统)进行集成,并且可以自定义开发 Connectors,与特定的数据源进行交互。Kafka Connect 实现了高性能、可伸缩的数据传输,并且提供了故障恢复和数据转换等功能。

使用 Kafka Connect 在 Java 中有两种方式:Standalone 模式和分布式模式。

  1. Standalone 模式:
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.Connect;
import java.util.Properties;

public class KafkaConnectStandaloneApp {
    public static void main(String[] args) throws InterruptedException {
        // 创建配置
        Properties props = new Properties();
        props.setProperty(StandaloneConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.setProperty(StandaloneConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
        props.setProperty(StandaloneConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
        
        // 创建 Standalone 模式的 Kafka Connect
        Connect connect = new Connect(new StandaloneConfig(props));
        connect.start(); // 启动 Kafka Connect
        Thread.sleep(5000); // 等待一段时间
        
        // 停止 Kafka Connect
        connect.stop();
    }
}
  1. 分布式模式:
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.Connect;
import java.util.Properties;

public class KafkaConnectDistributedApp {
    public static void main(String[] args) throws InterruptedException {
        // 创建配置
        Properties props = new Properties();
        props.setProperty(DistributedConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        // 创建分布式模式的 Kafka Connect
        Connect connect = new Connect(new DistributedConfig(props));
        connect.start(); // 启动 Kafka Connect
        Thread.sleep(5000); // 等待一段时间
        
        // 停止 Kafka Connect
        connect.stop();
    }
}

注意:上述示例代码中的配置项可以根据实际需要进行调整,例如连接到的 Kafka 服务器地址,序列化器等。 2. Kafka Streams:是一个轻量级的流处理库,用于对 Kafka 主题的数据进行实时处理和转换。通过 Kafka Streams,企业可以构建实时的数据处理应用程序,实现数据的实时计算、流合并、按键分组和聚合等功能。Kafka Streams 提供了高性能的流处理和事件驱动的架构,并且与 Kafka 生态系统的其他组件无缝集成,提供了可扩展、容错的流处理解。 引入jar包

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.8.0</version>
    </dependency>
</dependencies>
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;

public class KafkaStreamsApp {
    public static void main(String[] args) {
        // 创建配置
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 创建流构建器
        StreamsBuilder builder = new StreamsBuilder();

        // 从输入主题接收数据
        builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
                .peek((k, v) -> System.out.println("Received: key=" + k + ", value=" + v))
                .to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

        // 创建 Kafka Streams 应用程序
        KafkaStreams streams = new KafkaStreams(builder.build(), props);

        // 启动应用程序
        streams.start();

        // 添加关闭钩子以优雅地关闭应用程序
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

顶尖架构师栈

关注回复关键字

【C01】超10G后端学习面试资源

【IDEA】最新IDEA激活工具和码及教程

【JetBrains软件名】 最新软件激活工具和码及教程

工具&码&教程

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1068468.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[UUCTF 2022 新生赛]ezpop - 反序列化+字符串逃逸【***】

[UUCTF 2022 新生赛]ezpop 一、解题过程二、其他WP三、总结反思 一、解题过程 题目代码&#xff1a; <?php //flag in flag.php error_reporting(0); class UUCTF{public $name,$key,$basedata,$ob;function __construct($str){$this->name$str;}function __wakeup(){i…

Vue Router(二)

目录 一、嵌套路由 1、路由定义 2、代码例子 3、重定向 二、懒加载 1、缘由 2、代码例子 三、导航守卫 1、全局前置守卫 2、全局后置守卫 3、meta元信息 四、生命周期 1、解释 2、执行顺序 3、例子 五、keep-alive组件缓存&#xff08;保活&#xff09; 1、介…

AT2659S——L1频段卫星导航射频前端低噪声放大器芯片

AT2659S芯片采用2.9 mm 2.8mm 1.1 mm的6 pin SOT23-6封装。 应用领域&#xff1a; 导航天线 集成导航功能的手机 自动导航 定位功能移动设备 个人导航仪 笔记本/PAD AT2659S 是一款具有低功耗、高增益、低噪声系数的低噪声放大器&#xff08;LNA&#xff09;芯片&#x…

32、Flink table api和SQL 之用户自定义 Sources Sinks实现及详细示例

Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…

虚拟机Ubuntu18.04安装对应ROS版本详细教程!(含错误提示解决)

参考链接&#xff1a; Ubuntu18.04安装Ros(最新最详细亲测)_向日葵骑士Faraday的博客-CSDN博客 1.4 ROS的安装与配置_哔哩哔哩_bilibili ROS官网&#xff1a;http://wiki.ros.org/melodic/Installation/Ubuntu 一、检查cmake 安装ROS时会自动安装旧版的Cmake3.10.2。所以在…

Thingsboard二次开发---5.在Thingsboard中增加解决方案管理功能

前言 在使用Thingsboard的过程中发现TB虽然非常灵活&#xff0c;但实际的最终用户更希望是针对特定场景的成熟解决方案&#xff0c;页面都做好&#xff0c;不需要再进行配置&#xff0c;所以在原来的基础上增加了解决方案的功能&#xff0c;此方案比较适合给用户提供SaaS化的解…

【Hugging Face】如何从hub中下载文件

huggingface_hub库提供了从存储在Hub上的仓库中下载文件的功能。您可以独立使用这些函数或将它们集成到您自己的库中&#xff0c;使您的用户更方便地与Hub交互。本指南将向您展示如何&#xff1a; 下载并缓存单个文件。下载并缓存整个代码库。将文件下载到本地文件夹。 下载单…

语义分割,实例分割,全景分割梳理

语义分割&#xff08;semantic segmentation&#xff09; 实例分割&#xff08;instance segmentation&#xff09; 全景分割&#xff08;Panoptic Segmentation&#xff09; 下面基于《Panoptic Segmentation 》这篇论文进行这几个概念的梳理 论文链接&#xff1a;https:/…

基于安卓android微信小程序的垃圾废品回收类软件

运行环境 开发语言&#xff1a;Java 框架&#xff1a;ssm JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09; 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&a…

软件测试的价值

测试人员可以参与到软件的全生命周期中&#xff0c;一切软件质量相关的活动。具体来说主要有&#xff1a; 1、需求评审&#xff0c;帮助产品梳理业务逻辑 测试人员对产品功能有丰富的业务测试经验&#xff0c;有时会比产品还要熟悉平台的整体业务逻辑&#xff0c;所以需求评审…

【PG】Linux系统部署PostgreSQL单机数据库

安装方式 1 安装包方式 &#xff08;Packages and Installers&#xff09; 支持的操作系统包括 liunxMacosWindowsBSDSolaris 2 源码安装 &#xff08;Source code&#xff09; 下载源码包 通过下载地址PostgreSQL: File Browser 可以看到有各个版本的源码目录 选择13.1…

WPF向Avalonia迁移(二、一些可能使用到的库)

可能使用到的一些库 1. UI库 开源项目&#xff1a;https://github.com/irihitech/Semi.Avalonia 如果想引用他的DataGrid样式还需要添加Semi.Avalonia.DataGrid 2. 图表库 LiveChartsCore.SkiaSharpView.Avalonia 3.SVG库 开源项目&#xff1a;https://github.com/wieslaw…

prometheus使用数据源的timestamp而非server的timestamp

关于timestamp指标的解释 prometheus中的指标timestamp有两个&#xff1a; prometheus拉取时刻的timestamp&#xff0c;即服务端的timestamp&#xff1a;time.Now()&#xff1b;exporter的/metrics接口&#xff0c;除了返回metric&#xff0c;value&#xff0c;还返回timesta…

chromium线程模型(1)-普通线程实现(ui和io线程)

通过chromium 官方文档&#xff0c;线程和任务一节我们可以知道 &#xff0c;chromium有两类线程&#xff0c;一类是普通线程&#xff0c;最典型的就是io线程和ui线程。 另一类是 线程池线程。 今天我们先分析普通线程的实现&#xff0c;下一篇文章分析线程池的实现。&#xff…

asp.net电影院选座系统VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio

一、源码特点 asp.net电影院选座系统 是一套完善的web设计管理系统&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为vs2010&#xff0c;数据库为sqlserver2008&#xff0c;使用c#语言开发 asp.net电影院选座系统1 二、功能介…

简述CRM系统软件的作用

销售部门作为企业重要的营收部门&#xff0c;做好企业管理意义重大。如今市场竞争激烈&#xff0c;人工管理很难兼顾。不少企业借助CRM销售管理系统优化改进工作流程中各个环节存在的问题。下面小Z来简单说说CRM系统是做什么的&#xff1f; 一、客户档案管理 通过CRM销售管理…

使用企业订货系统后的效果|软件定制开发|APP小程序搭建

使用企业订货系统后的效果|软件定制开发|APP小程序搭建 企业订货系统是一种高效的采购管理系统&#xff0c;它可以帮助企业更好地管理采购流程&#xff0c;降低采购成本&#xff0c;提高采购效率。 可以帮助企业提高销售效率和降低成本的软件工具。使用该系统后&#xff0c;企业…

押注“AI写小说”!陈天桥加持,宜搜科技二闯港股IPO

大数据产业创新服务媒体 ——聚焦数据 改变商业 9月29日&#xff0c;IPO征程一波三折的宜搜科技又一次递交招股书&#xff0c;向港交所上市发起冲击。 2014年&#xff0c;宜搜科技美股上市失败&#xff0c;三年后挂牌新三板。2019年&#xff0c;宜搜科技终止挂牌&#xff0c;冲…

vue:权限绑定菜单(全局引入,在template内用v-if调用)

登录成功后&#xff0c;将返回的权限保存到缓存 sessionStorage&#xff1a;浏览页面期间保存&#xff0c;关闭浏览器后丢掉数据 在utils内index.js内定义isAuth方法 在main.js内引入&#xff0c;并挂载全局 在vue页面内&#xff0c;在template内用v-if调用

WPF向Avalonia迁移(一、一些通用迁移项目)

通用变更 WPF&#xff1a;Visibility 其他参考文档 WPF&#xff1a; <TextBlock Visibility"Visible"/><TextBlock Visibility"Collapsed"/><TextBlock Visibility"Hidden"/>Avalonia &#xff1a; <TextBlock IsVisib…