SpringBoot整合kafka到底怎么使用? (简单案例介绍——存放104协议解析报文信息)

news2025/1/23 13:49:53

前言

由于业务要求,我需要将104协议的报文内容解析后传到kafka里,然后程序也是一个SpringBoot项目,所以本篇文章我想说一说我是如何将那些数据传到kafka中并判断其是否消费,至于104协议的报文内容的解析和通信在此不去介绍,涉及到netty的知识。

这篇文章我暂时不讲怎么将消费后的数据传到mysql中,因为那块我还实现,后面再补充,但我想将消费后的数据传到mysql中的步骤应该是消费者使用@KafkaListene 去监听topic获取到生产者发送到kafka的数据,然后将数据保存到数据库

主要讲生产者怎么发送到kafka,以及用命令来判断消息消费下来了

步骤

1、安装kafka

下载链接

****下载链接****
解压安装包

 tar -zxvf kafka_2.13-3.3.1.tgz -C  /opt/module/

修改解压后的文件名称

mv kafka_2.13-3.3.1/ kafka

修改server.propertieszookeeper.properties 配置文件

cd config/

server.properties
在这里插入图片描述
在这里插入图片描述
其中zookeeper.connect 的地址名字按需求而定

zookeeper.properties:
在这里插入图片描述

配置环境变量

vim etc/profile
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

刷新一下环境变量

source /etc/profile

然后重点,先启动zookeeper再启动kafka (要在kafka目录下输入命令)

#启动zookeeper服务
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#启动kafka服务
bin/kafka-server-start.sh -daemon config/server.properties

2、导入kafka的依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.5.14.RELEASE</version>
        </dependency>

3、配置yml文件

server:
  port: 8082

# 数据库数据源
。。。

kafka:
  producer:
    bootstrap-servers: 127.0.0.1:9092
    retries: 3
    acks: all
    max-block-ms: 6000
    batch-size: 4096
    linger-ms: 1000
    buffer-memory: 33554432
    max-request-size: 1048576
    client-id: 自定义名字
    compression-type: gzip
  consumer:
    bootstrap-servers: 127.0.0.1:9092
    enable-auto-commit: true
    auto-commit-interval-ms: 1000
    max-poll-records: 100
    group-id: 自定义名字
    session-timeout-ms: 120000
    request-timeout-ms: 120000
    auto-offset-reset: latest

4、编写生产者

package com.axinite.iec104.kafka.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.errors.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;


@Service
@Slf4j
public class MessageProducer {

    @Qualifier("kafkaTemplateWithNoTransaction")
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Qualifier("kafkaTemplateWithTransaction")
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplateWithTransaction;

    public static IotMessageProducer messageProducer;
    public static KafkaTemplate<String, String> staticKafkaTemplate;

    @PostConstruct
    public void init(){

        messageProducer = new IotMessageProducer();
        staticKafkaTemplate = this.kafkaTemplate;
    }

    /**
     * 发送消息(同步)
     * @param topic 主题
     * @param key 键
     * @param message 值
     */
    public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException, java.util.concurrent.TimeoutException {
        //可以指定最长等待时间,也可以不指定
        staticKafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
        log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
    }

    /**
     * 发送消息并获取结果
     * @param topic
     * @param message
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public void sendMessageGetResult(String topic, String key, String message) throws ExecutionException, InterruptedException {
        SendResult<String, String> result = staticKafkaTemplate.send(topic, key, message).get();
        log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
        log.info("The partition the message was sent to: " + result.getRecordMetadata().partition());
    }

    /**
     * 发送消息(异步)
     * @param topic 主题
     * @param message 消息内容
     */
    public void sendMessageAsync(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = staticKafkaTemplate.send(topic, message);
        //添加回调
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("sendMessageAsync failure! topic : {}, message: {}", topic, message);
            }

            @Override
            public void onSuccess(SendResult<String, String> stringStringSendResult) {
                log.info("sendMessageAsync success! topic: {}, message: {}", topic, message);
            }
        });
    }

    /**
     * 可以将消息组装成 Message 对象和 ProducerRecord 对象发送
     * @param topic
     * @param key
     * @param message
     * @throws InterruptedException
     * @throws ExecutionException
     * @throws TimeoutException
     */
    public void testMessageBuilder(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
        // 组装消息
        Message msg = MessageBuilder.withPayload(message)
                .setHeader(KafkaHeaders.MESSAGE_KEY, key)
                .setHeader(KafkaHeaders.TOPIC, topic)
                .setHeader(KafkaHeaders.PREFIX,"kafka_")
                .build();
        //同步发送
        staticKafkaTemplate.send(msg).get();
    }

    /**
     * 以事务方式发送消息
     * @param topic
     * @param key
     * @param message
     */
    public void sendMessageInTransaction(String topic, String key, String message) {
        kafkaTemplateWithTransaction.executeInTransaction(new KafkaOperations.OperationsCallback<String, String, Object>() {
            @Override
            public Object doInOperations(KafkaOperations<String, String> kafkaOperations) {
                kafkaOperations.send(topic, key, message);
                //出现异常将会中断事务,消息不会发送出去
                throw new RuntimeException("exception");
            }
        });
    }

}

我这里使用@PostConstruct初始化是为了启动启动类后,先加载并初始化,由于我项目需要做netty的服务端通信,那么启动后会调用客户端的run方法,如下:

@Component
public class Iec104TcpServerSlaveTest implements ApplicationRunner{

	/**
	 * 
	* @Title: test   
	* @Description: 测试 iec104 协议TCP传输方式服务端做从机服务
	* @param @throws Exception 
	* @return void   
	* @throws
	 */

	@Override
	public void run(ApplicationArguments args) throws Exception {
		System.out.println("我启动了没?");
		Iec104Config iec104Config  = new Iec104Config();
		iec104Config.setFrameAmountMax((short) 1);
		iec104Config.setTerminnalAddress((short) 1);
		Iec104SlaveFactory.createTcpServerSlave(2404).setDataHandler(new SysDataHandler()).setConfig(iec104Config).run();
		Thread.sleep(1000000);
	}

}

因为接口实现了ApplicationRunner接口,那么启动类启动后最后会运行这个run方法去启动服务端,运行后它会执行里面的Check104Handler去检查104报文,那么我如何去使用这个MessageProducer将报文解析后的数据传到kafka? 就需要将在Check104Handler引入它,但是我试了使用@Autowired会找不到这个MessageProducer,不知原因是什么,我认为大概是由于这个run方法没执行完毕,注入不了这个MessageProducer,因为服务端一直在跑,这个run方法执行时调用Check104Handler中还没有注入不这个MessageProducer。所以由于不能使用@Autowired,我使用了@PostConstruct提前加载注入到容器并初始化,以下是它的使用:

package com.axinite.iec104.server.handler;

import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.axinite.iec104.analysis.protocol104.Analysis;
import com.axinite.iec104.common.Iec104Constant;
import com.axinite.iec104.enums.EmsEnergyEnum;
import com.axinite.iec104.enums.MobileEnergyEnum;
import com.axinite.iec104.kafka.producer.MessageProducer;
import com.axinite.iec104.util.ByteUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.LinkedHashMap;

import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;

/**
 * 
* @ClassName: Check104Handler  
* @Description: 检查104报文 
 */
@Component
public class Check104Handler extends ChannelInboundHandlerAdapter {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(ChannelInboundHandlerAdapter.class);


	/**
	 * 拦截系统消息
	 */
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf result = (ByteBuf) msg;
//		log(result);
		byte[] bytes = new byte[result.readableBytes()];
		result.readBytes(bytes);
		LOGGER.info("接收到的报文: " + ByteUtil.byteArrayToHexString(bytes));
		String content = ByteUtil.byteArrayToHexString(bytes);
		String aa = Analysis.analysis(content);
		LOGGER.info("------------报文详细信息------------");
		System.out.println(aa);
		MessageProducer.messageProducer.sendMessageGetResult("test","test", aa);
		}

		if (bytes.length < Iec104Constant.APCI_LENGTH || bytes[0] != Iec104Constant.HEAD_DATA) {
			LOGGER.error("报文无效");
			ReferenceCountUtil.release(result);
		} else {
			result.writeBytes(bytes);
			ctx.fireChannelRead(msg);
		}
	}

	public static void log(ByteBuf buffer) {
		int length = buffer.readableBytes();
		int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
		StringBuilder buf = new StringBuilder(rows * 80 * 2)
				.append("read index:").append(buffer.readerIndex())
				.append(" write index:").append(buffer.writerIndex())
				.append(" capacity:").append(buffer.capacity())
				.append(NEWLINE);
		appendPrettyHexDump(buf, buffer);
		System.out.println(buf.toString());
	}
}

主要代码:

 MessageProducer.messageProducer.sendMessageGetResult("test","test", aa);

5、启动完程序,使用命令去检查数据是否消费下来

注意: 启动程序的前提是,你本地已经先启动了zookeeper,然后再启动了kafka,如下命令(注意它的先后顺序,zookeeper先,kafka后)

在kafka的根目录下输入以下命令

#在kafka的根目录下输入以下命令

#启动zookeeper服务 (先开启zk,再开kafka)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

#启动kafka服务
bin/kafka-server-start.sh -daemon config/server.properties

然后启动consumer服务去查看你的topic

#启动consumer服务
bin/kafka-console-consumer.sh --bootstrap-server ip:port --topic test

数据内容我就不放了

6、将消费的数据入库

未完待续

7、放些kafka常用的命令

#启动producer服务
bin/kafka-console-producer.sh --bootstrap-server ip:port --topic first

#启动kafka服务
bin/kafka-server-start.sh -daemon config/server.properties

#启动zookeeper服务 (先开启zk,再开kafka)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

#停止服务
bin/kafka-server-stop.sh
bin/zookeeper-server-stop.sh 

#启动consumer服务
bin/kafka-console-consumer.sh --bootstrap-server ip:port --topic topic名字

#查看当前有哪些topic
bin/kafka-topics.sh --bootstrap-server ip:port --list

#重头打印所有消费后的数据
bin/kafka-console-consumer.sh --bootstrap-server ip:port --from-beginning --topic topic名字

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/77797.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【javassist】javassist 入门案例 生成类

1.概述 前面介绍的ASM入门门槛还是挺高的,需要跟底层的字节码指令打交道,优点是小巧、性能好。Javassist是一个性能比ASM稍差但使用起来简单很多的字节码操作库,不需要使用者掌握字节码指令,由东京工业大学的数学和计算机科学系的教授Shigeru Chiba开发。 本节将分为两个…

2022卡塔尔世界杯。CSDN世界杯勋章来啦

卡塔尔世界杯正在如火如茶的举办着&#xff0c;在比赛场上&#xff0c;我看到了来自世界各地的球队&#xff0c;他们都充满活力&#xff0c;充满激情&#xff0c;每一支球队都在努力的拼搏&#xff0c;无论是为了胜利&#xff0c;还是为了荣誉。我看到了一支支优秀的球队&#…

前端分页处理

页面中实现的分页效果&#xff0c;要么后端提供接口&#xff0c;每次点击下一页就调用接口&#xff0c;若不提供接口&#xff0c;分页得前端自己去截取。 方法一&#xff1a;slice方法 slice(参数1&#xff0c;参数2)方法是返回一个新的数组对象&#xff0c;左开右闭 参数1&…

Mel spectrum梅尔频谱与MFCCs

梅尔频谱和梅尔倒谱MFCCs是使用非常广泛的声音特征形式。 1.Mel-spectrogram梅尔语谱图 机器学习的第一步都是要提取出相应的特征(feature)&#xff0c;如果输入数据是图片&#xff0c;例如28*28的图片&#xff0c;那么只需要把每个像素(pixel)作为特征&#xff0c;对应的像素…

【数据结构】链表定义及其常用的基本操作(C/C++)

目录 ●图示 ●链表类型定义 ●常用的基本操作&#xff08;单链表&#xff09; ●简单案例 ●图示 ●链表类型定义 1.单链表存储结构的定义 typedef struct lnode{elemtype data;struct lnode *next; }lnode,*linklist; 定义链表L&#xff1a;linklist &L&#x…

Windows系统服务器如何架设网站

Windows系统服务器如何架设网站 架设网站我们的需服务器必须要有iis功能&#xff0c;我们随便找个网站素材进行搭建 大家跟着我的步骤操作就可以啦 下面我们以Windows server 2008系统为例 右键我的电脑----管理-----角色-----web服务器&#xff08;iis&#xff09; Internet信…

值得一看!从0编写一份PID控制代码

【推荐阅读】 浅析linux 系统进程冻结&#xff08;freezing of task&#xff09; 深入linux内核架构--进程&线程 纯干货&#xff0c;linux内存管理——内存管理架构&#xff08;建议收藏&#xff09; 轻松学会linux下查看内存频率,内核函数,cpu频率 概述Linux内核驱动之GPI…

Servlet与表单、数据库综合项目实战【学生信息管理】

✅作者简介&#xff1a;热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏&#xff1a;JAVA开发者…

开关电源环路稳定性分析(07)——电压型补偿网络

大家好&#xff0c;这里是大话硬件。 在前面的文章中&#xff0c;已经分析了控制级和功率级的传递函数&#xff0c;这一节咱们来分析反馈级的传递函数。 在分析反馈网络的传递函数之前&#xff0c;我想&#xff0c;应该有几个问题需要做一下介绍。 1. 功率级和控制级传递函数…

游戏开发41课 unity shader 优化

Shader有专门语言用来编写&#xff0c;常见类型有DirectX的HLSL&#xff0c;OpenGL的GLSL以及NVIDIA的Cg&#xff0c;为了优化shader代码&#xff0c;我们需要知道代码从被编写到被执行的流程&#xff0c;知道什么样的代码是不好的。 注意点&#xff1a; 避免if、switch分支语…

关于前端低代码的一些个人观点

2022&#xff0c;低代码彻底火了&#xff0c;甚至火到没有点相关经验&#xff0c;都不好意思出去面试的程度&#xff0c;堪称lowcode“元年”。在整个互联网大裁员的背景下&#xff0c;无论你是否相信它是降本提效的利器&#xff0c;彷佛都不重要了。因为行业趋势总是这般浩浩荡…

为什么其他地方的智能网联汽车产业都跟着北京模式走?

随着新一轮科技革命和产业变革加速演进&#xff0c;智能网联汽车已成为全球汽车产业发展的战略方向&#xff0c;是全球大国竞争的重要科技和产业领域。在技术路线层面&#xff0c;我国率先提出车路云融合的智能网联汽车“中国方案”&#xff0c;该路线已逐步成为国际共识。在产…

大学生端午节网页作业制作 学生端午节日网页设计模板 传统文化节日端午节静态网页成品代码下载 端午节日网页设计作品

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

【云原生】二进制k8s集群(下)部署高可用master节点

内容预知 本次部署说明 本次部署的架构组件 1. 新master节点的搭建 1.1 对master02 进行初始化配置 1.2 将master01的配置移植到master02 2.负载均衡的部署 3. k8s的web UI界面的搭建 二进制部署k8s集群部署的步骤总结 &#xff08;1&#xff09;k8s的数据存储中中心的搭建…

Spring项目建立过程

1&#xff0c;导入依赖 导入Spring依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId></dependency> 2&#xff0c;实现部分 2.1 自动给我们请求返回一个登录页面&am…

【计算机网络】物理层

物理层考虑的是在各种传输媒体上传输比特流&#xff0c;而不是指具体的传输媒体。物理层作用是尽可能地屏蔽媒体之间的差异。 物理层的主要任务是描述与传输媒体有关的一些特性&#xff1a; 机械特性、电气特性、功能特性、过程特性。 数据通信系统可分为&#xff1a;源系统、…

攻击类型分析

攻击类型分析 2018 年&#xff0c;主要的攻击类型 1 为 SYN Flood&#xff0c;UDP Flood&#xff0c;ACK Flood&#xff0c;HTTP Flood&#xff0c;HTTPS Flood&#xff0c; 这五大类攻击占了总攻击次数的 96&#xff05;&#xff0c;反射类攻击不足 3%。和 2017 年相比&…

Vue系列之组件化

文章の目录一、组件化开发思想1、现实中的组件化思想体现2、编程中的组件化思想体现3、组件化规范: Web Components二、组件注册1、全局组件注册语法2、组件语法3、组件注册注意事项4、局部组件注册写在最后一、组件化开发思想 1、现实中的组件化思想体现 标准分治重用组合 2…

k8s网络策略

网络策略介绍 网络策略官方文档&#xff1a;https://kubernetes.io/zh-cn/docs/concepts/services-networking/network-policies/ 网络策略是控制Pod之间如何进行通信的规则&#xff0c;它使用标签来筛选Pod&#xff0c;并在该组Pod之上定义规则来定义管控其流量&#xff0c;…

何为Spring Batch?怎么玩?

何为批处理&#xff1f; 何为批处理&#xff0c;大白话讲就是将数据分批次进行处理的过程。比如&#xff1a;银行对账&#xff0c;跨系统数据同步等。这些处理逻辑一般来说都不需要人工参与就能够自动高效地进行复杂的数据处理与分析。 典型批处理特点&#xff1a; 自动执行&…