前言
由于业务要求,我需要将104协议的报文内容解析后传到kafka里,然后程序也是一个SpringBoot项目,所以本篇文章我想说一说我是如何将那些数据传到kafka中并判断其是否消费,至于104协议的报文内容的解析和通信在此不去介绍,涉及到netty的知识。
这篇文章我暂时不讲怎么将消费后的数据传到mysql中,因为那块我还实现,后面再补充,但我想将消费后的数据传到mysql中的步骤应该是消费者使用@KafkaListene 去监听topic获取到生产者发送到kafka的数据,然后将数据保存到数据库
主要讲生产者怎么发送到kafka,以及用命令来判断消息消费下来了
步骤
1、安装kafka
下载链接
解压安装包
tar -zxvf kafka_2.13-3.3.1.tgz -C /opt/module/
修改解压后的文件名称
mv kafka_2.13-3.3.1/ kafka
修改server.properties 和 zookeeper.properties 配置文件
cd config/
server.properties :
其中zookeeper.connect 的地址名字按需求而定
zookeeper.properties:
配置环境变量
vim etc/profile
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
刷新一下环境变量
source /etc/profile
然后重点,先启动zookeeper再启动kafka (要在kafka目录下输入命令)
#启动zookeeper服务
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#启动kafka服务
bin/kafka-server-start.sh -daemon config/server.properties
2、导入kafka的依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.14.RELEASE</version>
</dependency>
3、配置yml文件
server:
port: 8082
# 数据库数据源
。。。
kafka:
producer:
bootstrap-servers: 127.0.0.1:9092
retries: 3
acks: all
max-block-ms: 6000
batch-size: 4096
linger-ms: 1000
buffer-memory: 33554432
max-request-size: 1048576
client-id: 自定义名字
compression-type: gzip
consumer:
bootstrap-servers: 127.0.0.1:9092
enable-auto-commit: true
auto-commit-interval-ms: 1000
max-poll-records: 100
group-id: 自定义名字
session-timeout-ms: 120000
request-timeout-ms: 120000
auto-offset-reset: latest
4、编写生产者
package com.axinite.iec104.kafka.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.errors.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class MessageProducer {
@Qualifier("kafkaTemplateWithNoTransaction")
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Qualifier("kafkaTemplateWithTransaction")
@Autowired
private KafkaTemplate<String, String> kafkaTemplateWithTransaction;
public static IotMessageProducer messageProducer;
public static KafkaTemplate<String, String> staticKafkaTemplate;
@PostConstruct
public void init(){
messageProducer = new IotMessageProducer();
staticKafkaTemplate = this.kafkaTemplate;
}
/**
* 发送消息(同步)
* @param topic 主题
* @param key 键
* @param message 值
*/
public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException, java.util.concurrent.TimeoutException {
//可以指定最长等待时间,也可以不指定
staticKafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
}
/**
* 发送消息并获取结果
* @param topic
* @param message
* @throws ExecutionException
* @throws InterruptedException
*/
public void sendMessageGetResult(String topic, String key, String message) throws ExecutionException, InterruptedException {
SendResult<String, String> result = staticKafkaTemplate.send(topic, key, message).get();
log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
log.info("The partition the message was sent to: " + result.getRecordMetadata().partition());
}
/**
* 发送消息(异步)
* @param topic 主题
* @param message 消息内容
*/
public void sendMessageAsync(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = staticKafkaTemplate.send(topic, message);
//添加回调
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.error("sendMessageAsync failure! topic : {}, message: {}", topic, message);
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
log.info("sendMessageAsync success! topic: {}, message: {}", topic, message);
}
});
}
/**
* 可以将消息组装成 Message 对象和 ProducerRecord 对象发送
* @param topic
* @param key
* @param message
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
public void testMessageBuilder(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
// 组装消息
Message msg = MessageBuilder.withPayload(message)
.setHeader(KafkaHeaders.MESSAGE_KEY, key)
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.PREFIX,"kafka_")
.build();
//同步发送
staticKafkaTemplate.send(msg).get();
}
/**
* 以事务方式发送消息
* @param topic
* @param key
* @param message
*/
public void sendMessageInTransaction(String topic, String key, String message) {
kafkaTemplateWithTransaction.executeInTransaction(new KafkaOperations.OperationsCallback<String, String, Object>() {
@Override
public Object doInOperations(KafkaOperations<String, String> kafkaOperations) {
kafkaOperations.send(topic, key, message);
//出现异常将会中断事务,消息不会发送出去
throw new RuntimeException("exception");
}
});
}
}
我这里使用@PostConstruct初始化是为了启动启动类后,先加载并初始化,由于我项目需要做netty的服务端通信,那么启动后会调用客户端的run方法,如下:
@Component
public class Iec104TcpServerSlaveTest implements ApplicationRunner{
/**
*
* @Title: test
* @Description: 测试 iec104 协议TCP传输方式服务端做从机服务
* @param @throws Exception
* @return void
* @throws
*/
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("我启动了没?");
Iec104Config iec104Config = new Iec104Config();
iec104Config.setFrameAmountMax((short) 1);
iec104Config.setTerminnalAddress((short) 1);
Iec104SlaveFactory.createTcpServerSlave(2404).setDataHandler(new SysDataHandler()).setConfig(iec104Config).run();
Thread.sleep(1000000);
}
}
因为接口实现了ApplicationRunner接口,那么启动类启动后最后会运行这个run方法去启动服务端,运行后它会执行里面的Check104Handler去检查104报文,那么我如何去使用这个MessageProducer将报文解析后的数据传到kafka? 就需要将在Check104Handler引入它,但是我试了使用@Autowired会找不到这个MessageProducer,不知原因是什么,我认为大概是由于这个run方法没执行完毕,注入不了这个MessageProducer,因为服务端一直在跑,这个run方法执行时调用Check104Handler中还没有注入不这个MessageProducer。所以由于不能使用@Autowired,我使用了@PostConstruct提前加载注入到容器并初始化,以下是它的使用:
package com.axinite.iec104.server.handler;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.axinite.iec104.analysis.protocol104.Analysis;
import com.axinite.iec104.common.Iec104Constant;
import com.axinite.iec104.enums.EmsEnergyEnum;
import com.axinite.iec104.enums.MobileEnergyEnum;
import com.axinite.iec104.kafka.producer.MessageProducer;
import com.axinite.iec104.util.ByteUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.LinkedHashMap;
import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;
/**
*
* @ClassName: Check104Handler
* @Description: 检查104报文
*/
@Component
public class Check104Handler extends ChannelInboundHandlerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelInboundHandlerAdapter.class);
/**
* 拦截系统消息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf result = (ByteBuf) msg;
// log(result);
byte[] bytes = new byte[result.readableBytes()];
result.readBytes(bytes);
LOGGER.info("接收到的报文: " + ByteUtil.byteArrayToHexString(bytes));
String content = ByteUtil.byteArrayToHexString(bytes);
String aa = Analysis.analysis(content);
LOGGER.info("------------报文详细信息------------");
System.out.println(aa);
MessageProducer.messageProducer.sendMessageGetResult("test","test", aa);
}
if (bytes.length < Iec104Constant.APCI_LENGTH || bytes[0] != Iec104Constant.HEAD_DATA) {
LOGGER.error("报文无效");
ReferenceCountUtil.release(result);
} else {
result.writeBytes(bytes);
ctx.fireChannelRead(msg);
}
}
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
}
主要代码:
MessageProducer.messageProducer.sendMessageGetResult("test","test", aa);
5、启动完程序,使用命令去检查数据是否消费下来
注意: 启动程序的前提是,你本地已经先启动了zookeeper,然后再启动了kafka,如下命令(注意它的先后顺序,zookeeper先,kafka后)
在kafka的根目录下输入以下命令
#在kafka的根目录下输入以下命令
#启动zookeeper服务 (先开启zk,再开kafka)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#启动kafka服务
bin/kafka-server-start.sh -daemon config/server.properties
然后启动consumer服务去查看你的topic
#启动consumer服务
bin/kafka-console-consumer.sh --bootstrap-server ip:port --topic test
数据内容我就不放了
6、将消费的数据入库
未完待续
7、放些kafka常用的命令
#启动producer服务
bin/kafka-console-producer.sh --bootstrap-server ip:port --topic first
#启动kafka服务
bin/kafka-server-start.sh -daemon config/server.properties
#启动zookeeper服务 (先开启zk,再开kafka)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#停止服务
bin/kafka-server-stop.sh
bin/zookeeper-server-stop.sh
#启动consumer服务
bin/kafka-console-consumer.sh --bootstrap-server ip:port --topic topic名字
#查看当前有哪些topic
bin/kafka-topics.sh --bootstrap-server ip:port --list
#重头打印所有消费后的数据
bin/kafka-console-consumer.sh --bootstrap-server ip:port --from-beginning --topic topic名字