文章目录
- 1.配置文件
- 2.消费者
- 1.注解方式
- 2.KafkaConsumer
- 3.依赖
- 1.注解依赖
- 2.KafkaConsumer依赖
本文档只是为了留档方便以后工作运维,或者给同事分享文档内容比较简陋命令也不是特别全,不适合小白观看,如有不懂可以私信,上班期间都是在得
1.配置文件
- Yml配置
spring:
kafka:
bootstrap-servers:
consumer:
group-id: iot-testaaaaaaaaaa11aaaaaaaaa
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
security:
protocol: SASL_SSL
sasl:
mechanism: PLAIN
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required
username=""
password="";
ssl:
truststore:
type: JKS
location: src/main/resources/client.truststore.jks
password:
endpoint.identification.algorithm:
2.消费者
1.注解方式
@KafkaListener(topics = {"abcd"})
public void listen(ConsumerRecord<?, ?> record){
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("---->"+record);
System.out.println("---->"+message);
}
}
2.KafkaConsumer
/**
* @author XHao
*/
public class MqsConsumer {
public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties";
private KafkaConsumer<Object, Object> consumer;
MqsConsumer(String path) {
Properties props = new Properties();
try {
InputStream in = new BufferedInputStream(new FileInputStream(path));
props.load(in);
} catch (IOException e) {
e.printStackTrace();
return;
}
consumer = new KafkaConsumer<Object, Object>(props);
}
public MqsConsumer() {
Properties props = new Properties();
try {
props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);
} catch (IOException e) {
e.printStackTrace();
return;
}
consumer = new KafkaConsumer<Object, Object>(props);
}
public void consume(List topics) {
consumer.subscribe(topics);
}
public ConsumerRecords<Object, Object> poll(long timeout) {
return consumer.poll(timeout);
}
public void close() {
consumer.close();
}
/**
* get classloader from thread context if no classloader found in thread
* context return the classloader which has loaded this class
*
* @return classloader
*/
public static ClassLoader getCurrentClassLoader() {
ClassLoader classLoader = Thread.currentThread()
.getContextClassLoader();
if (classLoader == null) {
classLoader = MqsConsumer.class.getClassLoader();
}
return classLoader;
}
/**
* 从classpath 加载配置信息
*
* @param configFileName 配置文件名称
* @return 配置信息
* @throws IOException
*/
public static Properties loadFromClasspath(String configFileName) throws IOException {
ClassLoader classLoader = getCurrentClassLoader();
Properties config = new Properties();
List<URL> properties = new ArrayList<URL>();
Enumeration<URL> propertyResources = classLoader
.getResources(configFileName);
while (propertyResources.hasMoreElements()) {
properties.add(propertyResources.nextElement());
}
for (URL url : properties) {
InputStream is = null;
try {
is = url.openStream();
config.load(is);
} finally {
if (is != null) {
is.close();
is = null;
}
}
}
return config;
}
}
3.依赖
1.注解依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.KafkaConsumer依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
可视化大屏项目经常用到消息转换,实时状态等 记录一下吧
如果点赞多,评论多会更新详细教程,待补充。