Kafka消息中间件
同时市场上也发展处ActiveMq、RabbitMQ、Kafka、RocketMQ、Pulsar等众多优秀的框架;在大数据领域中Kafka目前是使用较多的框架。Kafka作为内部消息通知的框架,可以适应项目中大数据量的高吞吐、实时流计算等功能实现。
分布式消息中间件
虚拟机配置环境
下载完,tar -zxvf 解压
注意:要使用IP访问kafka需要开放host port(本机的ip),在server.properties下面配置:
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://your.host.name:9092
开启:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
jps
bin/kafka-server-start.sh config/server.properties
开启zookeeper可以输入jps看下开启状态,有QuorumPeerMain即可
依赖
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<exclusions>
<exclusion>
<artifactId>connect-json</artifactId
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<!-- reflections实现包扫描 -->
<reflections.version>0.9.11</reflections.version>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>${reflections.version}</version>
</dependency>
<!--json处理-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
巧用Reflections库实现包扫描(扫描某个包中某个接口实现、注解等)
作用是扫描没个工程中某一个相同特性的类,一起处理。
// MyInterface是这些接口的公共特性
@Test
public void testReflections() {
Reflections reflections = new Reflections("要扫描的接口所在的包");
Set<Class<? extends MyInterface>> classes = reflections.getSubTypesOf(MyInterface.class);
// 业务操作
for(Class clazz : classes) {
//logger.info(clazz.getName());
System.out.println("Found: " + clazz.getName());
}
}
配置
公共模块的配置kafka.properties
# kafka config
kafka.hosts=192.168.169.135:9092
# 内容现在默认接收模块中的多环境配置,也可以自己写内容
# 接收组,这里是项目包名
kafka.group=com.lead.news.${profiles.name}.${spring.application.name}
# 单消息通道,需要以sigle结尾,测试名称
# 多消息通道,自己区别
kafka.topic.admin-test=${kafka.topic.admin-test}
引用模块的配置,发送跟接收都要配置
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan("配置处的包名")
public class KafkaConfig {
}
工具配置类
- KafkaProducerConfig自动配置Kafka消费者
- KafkaConsumerConfig自动配置Kafka消费者
- RetryErrorHandler实现消费者处理消息失败后重新发送到消息队列
- KafkaMessage实现对发送的消息包装,提供重试次数、分类等信息
- KafkaSender实现消息的统一发送入口功能
- KafkaTopicConfig自动装载topic名称信息
- KafkaListener提供自动注册消息消费监听接口类
- KafkaListenerFactory提供启动时自动注册实现了KafkaListener的消息消费者
测试
import com.lead.news.configuration.kafka.KafkaListener;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.stereotype.Component;
/**
* Title:消息消费者
* Description:
* @author WZQ
* @version 1.0.0
* @date 2020/2/8
*/
@Component
public class TestKafkaListener implements KafkaListener<String,String> {
/**
* 消息名称,必须保持一致
* @return
*/
@Override
public String topic () {
return "topic.test";
}
/**
* 接收消息方法
* @param data
* @param consumer
*/
@Override
public void onMessage (ConsumerRecord< String, String > data, Consumer< ?, ?> consumer){
System.out.println("===========receive test message接受消息:" + data);
}
}
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
/**
* Title:消息生产者
* Description:
* @author WZQ
* @version 1.0.0
* @date 2020/2/8
*/
@SpringBootTest
@RunWith(SpringRunner.class)
public class KafkaTest {
@Resource
KafkaTemplate<String, String> kafkaTemplate;
@Test
public void test(){
try {
// 消息名称,必须保持一致;key可以随便;消息内容
this.kafkaTemplate.send("topic.test", "123key","123value");
System.out.println("=================消息发送了!!!================");
Thread.sleep(5000);// 休眠等待消费者接收消息
} catch (Exception e) {
e.printStackTrace();
}
}
}
使用例子
配置模块
配置类kafka.properties中加入新增的消息名称
# article消息名称
kafka.topic.submit-article-auth=${kafka.topic.submit-article-auth}
在工具类中的定义消息名称的KafkaTopicConfig类中加入新增的消息名称变量,获取配置中的名称
//文章提交的kafka消息审核名称
String submitArticleAuth;
消息内容的dto,根据业务
import lombok.Data;
/**
* Title:封装传递的消息,并且区分消息的类型
* Description:对应数据库的字段
* @author WZQ
* @version 1.0.0
* @date 2020/2/9
*/
@Data
public class SubmitArticleAuto {
// 文章类型
private ArticleType type;
// 文章ID
private Integer articleId;
// 自媒体或者爬虫
public enum ArticleType{
WEMEDIA,CRAWLER;
}
}
一般在配置模块中新建kafka.messages包存放发送消息的实体类,消息内容的再次抽象,包含kafka变量。
package configuration.kafka.messages;
import com.lead.news.configuration.kafka.KafkaMessage;
import com.lead.news.model.mess.admin.SubmitArticleAuto;
/**
* Title:文章提交的kafka消息审核实体类
* Description:针对单通道和多通道进行区分
* @author WZQ
* @version 1.0.0
* @date 2020/2/9
*/
public class SubmitArticleAuthMessage extends KafkaMessage<SubmitArticleAuto> {
public SubmitArticleAuthMessage(){}
public SubmitArticleAuthMessage(SubmitArticleAuto data){
super(data);
}
// 消息区分,最好跟配置的名字一样
@Override
public String getType() {
return "submit-article-auth";
}
}
在工具类中的发送消息KafkaSender类加入新增的消息方法
/**
* 发送审核文章的消息
* 消息名称,id,消息内容
* @param message
*/
public void sendSubmitArticleAuthMessage(SubmitArticleAuthMessage message) {
this.sendMesssage(kafkaTopicConfig.getSubmitArticleAuth(), UUID.randomUUID().toString(), message);
}
发送消息模块
配置类application.yml加入,对应配置模块的变量名,内容是这次模块的消息名称,被加载
# 文章提交的kafka消息审核名称
kafka:
topic:
submit-article-auth: wzq.topic.submit.article.auth.sigle.test
发送消息模块的kafka发送方法组件
package com.lead.news.media.kafka;
import com.lead.news.configuration.kafka.KafkaSender;
import com.lead.news.configuration.kafka.messages.SubmitArticleAuthMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* Title:提供者
* Description:发送消息模块引用类
* @author WZQ
* @version 1.0.0
* @date 2020/2/10
*/
@Component
public class AdminMessageSender {
@Autowired
KafkaSender kafkaSender;
/**
* 只发送审核文章行为消息
* @param message
*/
@Async
public void sendMessage(SubmitArticleAuthMessage message){
kafkaSender.sendSubmitArticleAuthMessage(message);
}
}
加入service业务中
@Autowired
private AdminMessageSender adminMessageSender;
// 业务结束后发送消息
SubmitArticleAuthMessage message = new SubmitArticleAuthMessage();
SubmitArticleAuto submitArticleAuto = new SubmitArticleAuto();
// id带过去给消费者,这样就可以获取数据库数据进行操作
submitArticleAuto.setArticleId(wmNews.getId());
submitArticleAuto.setType(SubmitArticleAuto.ArticleType.WEMEDIA);
message.setData(submitArticleAuto);
adminMessageSender.sendMessage(message);
接收消息模块
appication.yml也得加入消息名称变量,内容保持一致
# 文章提交的kafka消息审核
kafka:
topic:
submit-article-auth: wzq.topic.submit.article.auth.sigle.test
定义接受消息后处理消息业务的监听器
package com.lead.news.admin.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lead.news.admin.service.ReviewMediaArticleService;
import com.lead.news.configuration.kafka.KafkaListener;
import com.lead.news.configuration.kafka.KafkaTopicConfig;
import com.lead.news.configuration.kafka.messages.SubmitArticleAuthMessage;
import com.lead.news.model.mess.admin.SubmitArticleAuto;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* Title:消费者
* Description:负责接收kafka消息
* @author WZQ
* @version 1.0.0
* @date 2020/2/9
*/
@Component
@Log4j2
public class AutoReviewArticleListener implements KafkaListener<String,String> {
@Autowired
private KafkaTopicConfig kafkaTopicConfig;
@Autowired
private ObjectMapper mapper;
// 处理消息业务的service,参数是发送消息模块发过来的,根据dto自己定义
@Autowired
private ReviewMediaArticleService reviewMediaArticleService;
/*@Autowired
private ReviewCrawlerArticleService reviewCrawlerArticleService;*/
// 拿到消息名称
@Override
public String topic() {
return kafkaTopicConfig.getSubmitArticleAuth();
}
@Override
public void onMessage(ConsumerRecord<String, String> consumerRecord, Consumer<?, ?> consumer) {
String value = consumerRecord.value();
log.info("接收到的消息为:{}"+value);
try {
// 拿到json数据
SubmitArticleAuthMessage message = mapper.readValue(value, SubmitArticleAuthMessage.class);
if(message!=null){
SubmitArticleAuto.ArticleType type = message.getData().getType();
if(type==SubmitArticleAuto.ArticleType.WEMEDIA){
Integer articleId = message.getData().getArticleId();
if(articleId!=null){
//审核文章信息
reviewMediaArticleService.autoReviewArticleByMedia(articleId);
}
}/*else if(type==SubmitArticleAuto.ArticleType.CRAWLER){
Integer articleId = message.getData().getArticleId();
if(articleId!=null){
//审核爬虫文章信息
try {
reviewCrawlerArticleService.autoReivewArticleByCrawler(articleId);
} catch (Exception e) {
e.printStackTrace();
}
}
}*/
}
} catch (IOException e) {
e.printStackTrace();
log.error("处理自动审核文章错误:[{}],{}",value,e);
throw new RuntimeException("WS消息处理错误",e);
}
}
}