SpringBoot-集成Kafka详解

news2024/7/6 13:25:16

SpringBoot集成Kafka

1、构建项目

1.1、引入依赖
<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>2.2.5.RELEASE</version>
</parent>
<dependencies>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
   <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
   </dependency>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-test</artifactId>
   </dependency>
   <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
       <optional>true</optional>
   </dependency>
   <dependency>
       <groupId>com.alibaba</groupId>
       <artifactId>fastjson</artifactId>
       <version>1.2.28</version>
   </dependency>
   <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
   </dependency>
</dependencies>
1.2、application.yml配置
spring:
  application:
    name: application-kafka
  kafka:
    bootstrap-servers: localhost:9092 #这个是kafka的地址,对应你server.properties中配置的
    producer:
      batch-size: 16384 #批量大小
      acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      retries: 10 # 消息发送重试次数
      #transaction-id-prefix: transaction
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        linger:
          ms: 2000 #提交延迟
        #partitioner: #指定分区器
          #class: pers.zhang.config.CustomerPartitionHandler
    consumer:
      group-id: testGroup #默认的消费组ID
      enable-auto-commit: true #是否自动提交offset
      auto-commit-interval: 2000 #提交offset延时
      # 当kafka中没有初始offset或offset超出范围时将自动重置offset
      # earliest:重置为分区中最小的offset;
      # latest:重置为分区中最新的offset(消费分区中新产生的数据);
      # none:只要有一个分区不存在已提交的offset,就抛出异常;
      auto-offset-reset: latest
      max-poll-records: 500 #单次拉取消息的最大条数
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session:
          timeout:
            ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)
        request:
          timeout:
            ms: 18000 # 消费请求的超时时间
    listener:
      missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
#      type: batch


1.3、简单生产
@RestController
public class kafkaProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;


    @GetMapping("/kafka/normal/{message}")
    public void sendNormalMessage(@PathVariable("message") String message) {
        kafkaTemplate.send("sb_topic", message);
    }
}
1.4、简单消费
@Component
public class KafkaConsumer {

    //监听消费
    @KafkaListener(topics = {"sb_topic"})
    public void onNormalMessage(ConsumerRecord<String, Object> record) {
        System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +
                record.value());
    }

}
简单消费:sb_topic-0=111
简单消费:sb_topic-0=222
简单消费:sb_topic-0=333

2、生产者

2.1、带回调的生产者

kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法,

/**
 * 回调的第一种写法
 * @param message
 */
@GetMapping("/kafka/callbackOne/{message}")
public void sendCallbackOneMessage(@PathVariable("message") String message) {
    kafkaTemplate.send("sb_topic", message).addCallback(new SuccessCallback<SendResult<String, Object>>() {
        //成功的回调
        @Override
        public void onSuccess(SendResult<String, Object> success) {
            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset);
        }
    }, new FailureCallback() {
        //失败的回调
        @Override
        public void onFailure(Throwable throwable) {
            System.out.println("发送消息失败1:" + throwable.getMessage());
        }
    });
}

 

发送消息成功1:sb_topic-0-3
简单消费:sb_topic-0=one
/**
 * 回调的第二种写法
 * @param message
 */
@GetMapping("/kafka/callbackTwo/{message}")
public void sendCallbackTwoMessage(@PathVariable("message") String message) {
    kafkaTemplate.send("sb_topic", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
        @Override
        public void onFailure(Throwable throwable) {
            System.out.println("发送消息失败2:"+throwable.getMessage());
        }

        @Override
        public void onSuccess(SendResult<String, Object> result) {
            System.out.println("发送消息成功2:" + result.getRecordMetadata().topic() + "-"
                    + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
        }
    });
}

发送消息成功2:sb_topic-0-4
简单消费:sb_topic-0=two
2.2、监听器

Kafka提供了ProducerListener 监听器来异步监听生产者消息是否发送成功,我们可以自定义一个kafkaTemplate添加ProducerListener,当消息发送失败我们可以拿到消息进行重试或者把失败消息记录到数据库定时重试。

@Configuration
public class KafkaConfig {
    
    @Autowired
    ProducerFactory producerFactory;
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<String, Object>();
        kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
            @Override
            public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
                System.out.println("发送成功 " + producerRecord.toString());
            }

            @Override
            public void onSuccess(String topic, Integer partition, String key, Object value, RecordMetadata recordMetadata) {
                System.out.println("发送成功 topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);
            }

            @Override
            public void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {
                System.out.println("发送失败" + producerRecord.toString());
                System.out.println(exception.getMessage());
            }

            @Override
            public void onError(String topic, Integer partition, String key, Object value, Exception exception) {
                System.out.println("发送失败" + "topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);
                System.out.println(exception.getMessage());
            }
        });
        return kafkaTemplate;
    }
}

注意:当我们发送一条消息,既会走 ListenableFutureCallback 回调,也会走ProducerListener回调。

2.3、自定义分区器

我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

1、若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
2、若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;
3、patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;
我们来自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区
 

public class CustomizePartitioner implements Partitioner {
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        //自定义分区规则,默认全部发送到0号分区
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

在application.properties中配置自定义分区器,配置的值就是分区器类的全路径名

# 自定义分区器
spring.kafka.producer.properties.partitioner.class=pers.zhang.config.CustomizePartitioner
2.4、事务提交

如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务:

@GetMapping("/kafka/transaction/{message}")
public void sendTransactionMessage(@PathVariable("message") String message) {
    //声明事务:后面报错消息不会发出去
    kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, Object, Object>() {

        @Override
        public Object doInOperations(KafkaOperations<String, Object> operations) {
            operations.send("sb_topic", message + " test executeInTransaction");
            throw new RuntimeException("fail");
        }
    });
    // //不声明事务:后面报错但前面消息已经发送成功了
    // kafkaTemplate.send("sb_topic", message + " test executeInNoTransaction");
    // throw new RuntimeException("fail");
}

注意:如果声明了事务,需要在application.yml中指定:

spring:
	kafka:
		producer:
			transaction-id-prefix: tx_ #事务id前缀

3、消费者

3.1、指定topic、partition、offset消费

前面我们在监听消费topic1的时候,监听的是topic1上所有的消息,如果我们想指定topic、指定partition、指定offset来消费呢?也很简单,@KafkaListener注解已全部为我们提供

spring:
	kafka:
		listener:
			type: batch #设置批量消费
		consumer:
			max-poll-records: 50 #每次最多消费多少条消息

属性解释:

  • id:消费者ID
  • groupId:消费组ID
  • topics:监听的topic,可监听多个
  • topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听,手动分区。
//批量消费
@KafkaListener(id = "consumer2", topics = {"sb_topic"}, groupId = "sb_group")
public void onBatchMessage(List<ConsumerRecord<String, Object>> records) {
    System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size());
    for (ConsumerRecord<String, Object> record : records) {
        System.out.println(record.value());
    }
}
>>> 批量消费一次,recoreds.size()=4
hello
hello
hello
hello
>>> 批量消费一次,recoreds.size()=2
hello
hello
3.2、异常处理

ConsumerAwareListenerErrorHandler 异常处理器,新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器。
 

//异常处理器
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
    return new ConsumerAwareListenerErrorHandler() {
        @Override
        public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
            System.out.println("消费异常:" + message.getPayload());
            return null;
        }
    };
}

// 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
@KafkaListener(topics = {"sb_topic"},errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {
    throw new Exception("简单消费-模拟异常");
}

// 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
@KafkaListener(topics = "sb_topic",errorHandler="consumerAwareErrorHandler")
public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
    System.out.println("批量消费一次...");
    throw new Exception("批量消费-模拟异常");
}
批量消费一次...
消费异常:[ConsumerRecord(topic = sb_topic, partition = 0, leaderEpoch = 0, offset = 19, CreateTime = 1692604586558, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello), ConsumerRecord(topic = sb_topic, partition = 0, leaderEpoch = 0, offset = 20, CreateTime = 1692604587164, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello), ConsumerRecord(topic = sb_topic, partition = 0, leaderEpoch = 0, offset = 21, CreateTime = 1692604587790, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)]
3.3、消息过滤器

消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。

配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。
 

@Autowired
ConsumerFactory consumerFactory;

//消息过滤器
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory);
    //被过滤的消息将被丢弃
    factory.setAckDiscarded(true);
    //消息过滤策略
    factory.setRecordFilterStrategy(new RecordFilterStrategy() {
        @Override
        public boolean filter(ConsumerRecord consumerRecord) {
            if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
                return false;
            }
            return true;
        }
    });
    return factory;
}

//消息过滤监听
@KafkaListener(topics = {"sb_topic"},containerFactory = "filterContainerFactory")
public void onMessage6(ConsumerRecord<?, ?> record) {
    System.out.println(record.value());
}

上面实现了一个"过滤奇数、接收偶数"的过滤策略,我们向topic发送0-9总共10条消息,看一下监听器的消费情况,可以看到监听器只消费了偶数:

3.4、消息转发

在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。

在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下:
 

//消息转发 从sb_topic转发到sb_topic2
@KafkaListener(topics = {"sb_topic"})
@SendTo("sb_topic2")
public String onMessage7(ConsumerRecord<?, ?> record) {
    return record.value()+"-forward message";
}

@KafkaListener(topics = {"sb_topic2"})
public void onMessage8(ConsumerRecord<?, ?> record) {
    System.out.println("收到sb_topic转发过来的消息:" + record.value());
}
收到sb_topic转发过来的消息:hello-forward message
收到sb_topic转发过来的消息:hello-forward message
收到sb_topic转发过来的消息:hello-forward message
收到sb_topic转发过来的消息:hello-forward message
3.5、定时启动、停止

默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢——使用KafkaListenerEndpointRegistry,下面我们就来实现:

1、禁止监听器自启动;
2、创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;
新建一个定时任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry 在Spring中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动
 

@EnableScheduling
@Component
public class CronTimer {

    /**
     * @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
     * 而是会被注册在KafkaListenerEndpointRegistry中,
     * 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
     **/
    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @Autowired
    private ConsumerFactory consumerFactory;
​
    // 监听器容器工厂(设置禁止KafkaListener自启动)
    @Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
        //禁止KafkaListener自启动
        container.setAutoStartup(false);
        return container;
    }

    // 监听器
    @KafkaListener(id="timingConsumer",topics = "sb_topic",containerFactory = "delayContainerFactory")
    public void onMessage1(ConsumerRecord<?, ?> record){
        System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }

    // 定时启动监听器
    @Scheduled(cron = "0 42 11 * * ? ")
    public void startListener() {
        System.out.println("启动监听器...");
        // "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
        if (!registry.getListenerContainer("timingConsumer").isRunning()) {
            registry.getListenerContainer("timingConsumer").start();
        }
        //registry.getListenerContainer("timingConsumer").resume();
    }
​
    // 定时停止监听器
    @Scheduled(cron = "0 45 11 * * ? ")
    public void shutDownListener() {
        System.out.println("关闭监听器...");
        registry.getListenerContainer("timingConsumer").pause();
    }
}

启动项目,触发生产者向topic1发送消息,可以看到consumer没有消费,因为这时监听器还没有开始工作,

11:42分监听器启动开始工作,消费消息

11:45分监听器停止工作:

3.6、手动确认消息

默认情况下Kafka的消费者是自动确认消息的,通常情况下我们需要在业务处理成功之后手动触发消息的签收,否则可能会出现:消息消费到一半消费者异常,消息并未消费成功但是消息已经自动被确认,也不会再投递给消费者,也就导致消息丢失了。

当 auto.commit.enable 设置为false时,表示kafak的offset由customer手动维护,spring-kafka提供了通过ackMode的值表示不同的手动提交方式;
 

public enum AckMode {
    // 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
    RECORD,
    // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
    BATCH,
    // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
    TIME,
    // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
    COUNT,
    // TIME | COUNT 有一个条件满足时提交
    COUNT_TIME,
    // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
    MANUAL,
    // 手动调用Acknowledgment.acknowledge()后立即提交
    MANUAL_IMMEDIATE,
}

如果设置AckMode模式为MANUAL或者MANUAL_IMMEDIATE,则需要对监听消息的方法中,引入Acknowledgment对象参数,并调用acknowledge()方法进行手动提交;

第一步:添加kafka配置,把 spring.kafka.listener.ack-mode = manual 设置为手动
 

spring:
	kafka:
		listener:
			ack-mode: manual 
		consumer:
			enable-auto-commit: false

第二步;消费消息的时候,给方法添加Acknowledgment参数签收消息:

@KafkaListener(topics = {"sb_topic"})
public void onMessage9(ConsumerRecord<String, Object> record, Acknowledgment ack) {
    System.out.println("收到消息:" + record.value());
    //确认消息
    ack.acknowledge();
}

4、配置详解

4.1、生产者yml方式
server:
  port: 8081
spring:
  kafka:
    producer:
      # Kafka服务器
      bootstrap-servers: 175.24.228.202:9092
      # 开启事务,必须在开启了事务的方法中发送,否则报错
      transaction-id-prefix: kafkaTx-
      # 发生错误后,消息重发的次数,开启事务必须设置大于0。
      retries: 3
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      # 开启事务时,必须设置为all
      acks: all
      # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 生产者内存缓冲区的大小。
      buffer-memory: 1024000
      # 键的序列化方式
      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # 值的序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
4.2、生产者Config方式
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.transaction.KafkaTransactionManager;

import java.util.HashMap;
import java.util.Map;

/**
 * kafka配置,也可以写在yml,这个文件会覆盖yml
 */
@SpringBootConfiguration
public class KafkaProviderConfig {

    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.producer.transaction-id-prefix}")
    private String transactionIdPrefix;
    @Value("${spring.kafka.producer.acks}")
    private String acks;
    @Value("${spring.kafka.producer.retries}")
    private String retries;
    @Value("${spring.kafka.producer.batch-size}")
    private String batchSize;
    @Value("${spring.kafka.producer.buffer-memory}")
    private String bufferMemory;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>(16);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
        //acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
        //acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
        //开启事务必须设为all
        props.put(ProducerConfig.ACKS_CONFIG, acks);
        //发生错误后,消息重发的次数,开启事务必须大于0
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送
        //批次的大小可以通过batch.size 参数设置.默认是16KB
        //较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。
        //比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟
        //实测batchSize这个参数没有用
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,
        //即使数据没达到16KB,也将这个批次发送出去
        props.put(ProducerConfig.LINGER_MS_CONFIG, "5000");
        //生产者内存缓冲区的大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        //反序列化,和生产者的序列化方式对应
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<Object, Object> producerFactory() {
        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
        //开启事务,会导致 LINGER_MS_CONFIG 配置失效
        factory.setTransactionIdPrefix(transactionIdPrefix);
        return factory;
    }

    @Bean
    public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    public KafkaTemplate<Object, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
4.3、消费者yml方式
server:
  port: 8082
spring:
  kafka:
    consumer:
      # Kafka服务器
      bootstrap-servers: 175.24.228.202:9092
      group-id: firstGroup
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      #auto-commit-interval: 2s
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
      # none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      #key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # 值的反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
      properties:
        spring:
          json:
            trusted:
              packages: "*"
      # 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
      # 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
      # 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
      # 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
      # 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
      # 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
      max-poll-records: 3
    properties:
      # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
      max:
        poll:
          interval:
            ms: 600000
      # 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
      session:
        timeout:
          ms: 10000
    listener:
      # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
      concurrency: 4
      # 自动提交关闭,需要设置手动消息确认
      ack-mode: manual_immediate
      # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
      missing-topics-fatal: false
      # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
      poll-timeout: 600000
4.4、消费者Config方式
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

/**
 * kafka配置,也可以写在yml,这个文件会覆盖yml
 */
@SpringBootConfiguration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${spring.kafka.properties.session.timeout.ms}")
    private String sessionTimeout;
    @Value("${spring.kafka.properties.max.poll.interval.ms}")
    private String maxPollIntervalTime;
    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.listener.concurrency}")
    private Integer concurrency;
    @Value("${spring.kafka.listener.missing-topics-fatal}")
    private boolean missingTopicsFatal;
    @Value("${spring.kafka.listener.poll-timeout}")
    private long pollTimeout;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>(16);
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        //自动提交的时间间隔,自动提交开启时生效
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");
        //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
        //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
        //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
        //none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);
        //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
        //这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
        //如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
        //然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
        //要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
        //注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        //序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return propsMap;
    }

    @Bean
    public ConsumerFactory<Object, Object> consumerFactory() {
        //配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
        try(JsonDeserializer<Object> deserializer = new JsonDeserializer<>()) {
            deserializer.trustedPackages("*");
            return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer);
        }
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //在侦听器容器中运行的线程数,一般设置为 机器数*分区数
        factory.setConcurrency(concurrency);
        //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
        factory.setMissingTopicsFatal(missingTopicsFatal);
        //自动提交关闭,需要设置手动消息确认
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        //设置为批量监听,需要用List接收
        //factory.setBatchListener(true);
        return factory;
    }
}

5、注解消费示例

5.1、简单消费
    /**
     * 指定一个消费者组,一个主题主题。
     * @param record
     */
    @KafkaListener(topics = IPHONE_TOPIC,groupId = APPLE_GROUP)
    public void simpleConsumer(ConsumerRecord<String, String> record) {
        System.out.println("进入simpleConsumer方法");
        System.out.printf(
                "分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
                record.partition(),
                record.offset(),
                record.key(),
                record.value(),
                record.timestamp()
        );
    }
5.2、监听多个主题
    /**
     * 指定多个主题。
     *
     * @param record
     */
    @KafkaListener(topics = {IPHONE_TOPIC,IPAD_TOPIC},groupId = APPLE_GROUP)
    public void topics(ConsumerRecord<String, String> record) {
        System.out.println("进入topics方法");
        System.out.printf(
                "主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
                record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value(),
                record.timestamp()
        );
    }
5.3、监听一个主题,指定分区消费
    /**
     * 监听一个主题,且指定消费主题的哪些分区。
     * 参数详解:消费者组=apple_group;监听主题=iphoneTopic;只消费的分区=1,2;消费者数量=2
     * @param record
     */
    @KafkaListener(
            groupId = APPLE_GROUP,
            topicPartitions = {
                    @TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"})
            },
            concurrency = "2"
    )
    public void consumeByPattern(ConsumerRecord<String, String> record) {
        System.out.println("consumeByPattern");
        System.out.printf(
                "主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
                record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value(),
                record.timestamp()
        );
    }
5.4、指定多个分区,指定起始偏移量,多线程消费
    /**
     * 指定多个分区从哪个偏移量开始消费。
     * 10个线程,也就是10个消费者
     */
    @KafkaListener(
            groupId = APPLE_GROUP,
            topicPartitions = {
                    @TopicPartition(
                            topic = IPAD_TOPIC,
                            partitions = {"0","1"},
                            partitionOffsets = {
                                    @PartitionOffset(partition = "2", initialOffset = "10"),
                                    @PartitionOffset(partition = "3", initialOffset = "0"),
                            }
                    )
            },
            concurrency = "10"
    )
    public void consumeByPartitionOffsets(ConsumerRecord<String, String> record) {
        System.out.println("consumeByPartitionOffsets");
        System.out.printf(
                "主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
                record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value(),
                record.timestamp()
        );
    }
5.5、监听多个主题,指定多个分区,指定起始偏移量
    /**
     * 指定多个主题。参数详解如上面的方法。
     * @param record
     */
    @KafkaListener(
            groupId = APPLE_GROUP,
            topicPartitions = {
                    @TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),
                    @TopicPartition(topic = IPAD_TOPIC, partitions = "1",
                            partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
            },
            concurrency = "4"
    )
    public void topics2(ConsumerRecord<String, String> record) {
        System.out.println("topics2");
        System.out.printf(
                "主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
                record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value(),
                record.timestamp()
        );
    }
5.6、指定多个监听器
    /**
     * 指定多个消费者组。参数详解如上面的方法。
     *
     * @param record
     */
    @KafkaListeners({
            @KafkaListener(
                    groupId = APPLE_GROUP,
                    topicPartitions = {
                            @TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),
                            @TopicPartition(topic = IPAD_TOPIC, partitions = "1",
                                    partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
                    },
                    concurrency = "3"
            ),
            @KafkaListener(
                    groupId = XM_GROUP,
                    topicPartitions = {
                            @TopicPartition(topic = XMPHONE_TOPIC, partitions = {"1", "2"}),
                            @TopicPartition(topic = XMPAD_TOPIC, partitions = "1",
                                    partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
                    },
                    concurrency = "3"
            )
    }
    )
    public void groupIds(ConsumerRecord<String, String> record) {
        System.out.println("groupIds");
        System.out.println("内容:" + record.value());
        System.out.println("分区:" + record.partition());
        System.out.println("偏移量:" + record.offset());
        System.out.println("创建消息的时间戳:" + record.timestamp());
    }
5.7、手动提交偏移量
    /**
     * 设置手动提交偏移量
     *
     * @param record
     */
    @KafkaListener(
            topics = IPHONE_TOPIC,
            groupId = APPLE_GROUP,
            //3个消费者
            concurrency = "3"
    )
    public void setCommitType(ConsumerRecord<String, String> record, Acknowledgment ack) {
        System.out.println("setCommitType");
        System.out.println("内容:" + record.value());
        System.out.println("分区:" + record.partition());
        System.out.println("偏移量:" + record.offset());
        System.out.println("创建消息的时间戳:" + record.timestamp());
        ack.acknowledge();
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1222077.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DevExpress中文教程 - 如何在macOS和Linux (CTP)上创建、修改报表(上)

DevExpress Reporting是.NET Framework下功能完善的报表平台&#xff0c;它附带了易于使用的Visual Studio报表设计器和丰富的报表控件集&#xff0c;包括数据透视表、图表&#xff0c;因此您可以构建无与伦比、信息清晰的报表。 DevExpress Reports — 跨平台报表组件&#x…

AnimateDiff搭配Stable diffution制作AI视频

话不多说&#xff0c;先看视频 1. AnimateDiff的技术原理 AnimateDiff可以搭配扩散模型算法&#xff08;Stable Diffusion&#xff09;来生成高质量的动态视频&#xff0c;其中动态模型&#xff08;Motion Models&#xff09;用来实时跟踪人物的动作以及画面的改变。我们使用 …

[ 云计算 | AWS ] AI 编程助手新势力 Amazon CodeWhisperer:优势功能及实用技巧

文章目录 一、Amazon CodeWhisperer 简介1.1 CodeWhisperer 是什么1.2 Amazon CodeWhisperer 是如何工作的 二、Amazon CodeWhisperer 的优势和功能2.1 Amazon CodeWhisperer 的优势2.2 Amazon CodeWhisperer 的代码功能 三、Amazon CodeWhisperer 安装3.1 安装到 IntelliJ IDE…

〖大前端 - 基础入门三大核心之JS篇㊱〗- JavaScript 的DOM节点操作

说明&#xff1a;该文属于 大前端全栈架构白宝书专栏&#xff0c;目前阶段免费&#xff0c;如需要项目实战或者是体系化资源&#xff0c;文末名片加V&#xff01;作者&#xff1a;不渴望力量的哈士奇(哈哥)&#xff0c;十余年工作经验, 从事过全栈研发、产品经理等工作&#xf…

股票魔法师第二阶段趋势模板选股公式,寻找上涨趋势

对于股票运行的阶段&#xff0c;不同的股票分析方法有着不同的划分方式。从传统的主力运作分析&#xff0c;可以分为吸筹、洗盘、试盘、拉升、出货五个阶段。在波浪理论中&#xff0c;一个完整的上升或下降周期包含8浪&#xff08;其中5浪是主浪、3浪是调整浪&#xff09;。在缠…

Gdevops北京站 2023年全球敏捷运维峰会-核心PPT资料下载

一、峰会简介 2023 Gdevops全球敏捷运维峰会-北京站成功举办&#xff0c;一众产学研界技术大佬与新锐专家&#xff0c;以智能为主线&#xff0c;就数据库、运维、架构、金融科技等领域进行了前沿技术与实践经验交流&#xff0c;一同畅聊AIGC、云原生、数智化转型下的新机遇。 …

【LeetCode刷题-滑动窗口】--159.至多包含两个不同字符的最长子串

159.至多包含两个不同字符的最长子串 方法&#xff1a;滑动窗口 定义两个指针left和right作为窗口的边界&#xff0c;将两个指针都设定在位置0&#xff0c;然后向右移动right指针&#xff0c;直到窗口内不超过两个不同的字符&#xff0c;如果某一点我们得到了3个不同的字符&am…

市县镇一体化视频会议系统

随着网络技术的飞速发展&#xff0c;县市各部门建成了业务专用通信网络。利用专用通信网络&#xff0c;省一市-县基本上都开通了局域网视频会议系统。我们在市局各科室和各县局间建成了专网跨网段的视频会议系统。连通宝视频会议系统建设方案软硬一体&#xff0c;可实现多点间语…

基于STM32的蓝牙低功耗(BLE)通信方案设计与实现

蓝牙低功耗&#xff08;Bluetooth Low Energy&#xff0c;简称BLE&#xff09;是一种能够在低功耗环境下实现无线通信的技术。在物联网应用中&#xff0c;BLE被广泛应用于传感器数据采集、健康监测设备、智能家居等领域。本文将基于STM32微控制器&#xff0c;设计并实现一个简单…

⑧【MySQL】数据库查询:内连接、外连接、自连接、子查询、多表查询

个人简介&#xff1a;Java领域新星创作者&#xff1b;阿里云技术博主、星级博主、专家博主&#xff1b;正在Java学习的路上摸爬滚打&#xff0c;记录学习的过程~ 个人主页&#xff1a;.29.的博客 学习社区&#xff1a;进去逛一逛~ 内连接、外连接、自连接、子查询、多表查询 ⑧…

汽车ECU的虚拟化技术初探(三)--U2A虚拟化辅助功能分析1

目录 1.基本概述 1.1 U2A虚拟化辅助功能 1.2 U2A虚拟化使能和资源分配 2. U2A架构概述 3. CPU运行模式 3.1 虚拟化模式 3.2 限制运行模式 3.3 权限运行模式 3.4 CPU运行模式小结 4.小结 1.基本概述 1.1 U2A虚拟化辅助功能 在汽车ECU的虚拟化技术初探(二)-CSDN博客中…

MyBatis #{} 和 ${} 的区别

前言&#xff1a; #{} 和 ${} 的区别是 MyBatis 中一个常见的面试题&#xff0c;#{} 和 ${} 是MyBatis 中获取参数的两种方式&#xff0c;但我们在项目中大多数使用的都是 #{} 来获取参数&#xff0c;那么它们两个有什么区别呢&#xff1f; 区别 一. #{} 采用预编译 SQL&…

鸿蒙4.0开发笔记之DevEco Studio如何使用Previewer窗口预览器(一)

一、预览器作用 DevEco Studio预览器概况在HarmonyOS应用开发过程中&#xff0c;通过使用预览器&#xff0c;可以查看应用的UI效果&#xff0c;方便开发者实时查看应用的运行效果&#xff0c;随时调整代码。 二、打开Previewer预览器 1、正常启动 打开预览器的位置在DevEco…

Docker Swarm: 容器编排的力量和优势深度解析

文章目录 Docker Swarm的核心概念1. 节点&#xff08;Node&#xff09;2. 服务&#xff08;Service&#xff09;3. 栈&#xff08;Stack&#xff09; 使用Docker Swarm1. 初始化Swarm2. 加入节点3. 创建服务4. 扩展和缩减服务5. 管理栈6. 管理服务更新 Docker Swarm的优势深度解…

03.webpack中hash,chunkhash和contenthash 的区别

hash、contenthash 和 chunkhash 是通过散列函数处理之后&#xff0c;生成的一串字符&#xff0c;可用于区分文件。 作用&#xff1a;善用文件的哈希值&#xff0c;解决浏览器缓存导致的资源未及时更新的问题 1.文件名不带哈希值 const path require(path) const HtmlWebpac…

DMA原理和应用

目录 1.什么是DMA 2.DMA的意义 3.DMA搬运的数据和方式 4.DMA 控制器和通道 5.DMA通道的优先级 6.DMA传输方式 7.DMA应用 实验一: 内存到内存搬运 CubeMX配置&#xff1a; ​编辑用到的库函数&#xff1a; 代码实现思路&#xff1a; 实验二: 内存到外设搬运 CubeMX…

html-网站菜单-点击显示导航栏

一、效果图 1.点击显示菜单栏&#xff0c;点击x号关闭&#xff1b; 2.点击一级菜单&#xff0c;展开显示二级&#xff0c;并且加号变为减号&#xff1b; 3.点击其他一级导航&#xff0c;自动收起展开的导航。 二、代码实现 <!DOCTYPE html> <html><head>&…

Apache Doris (五十四): Doris Join类型 - Bucket Shuffle Join

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 目录

场景交互与场景漫游-osgGA库(5)

osgGA库 osgGA库是OSG的一个附加的工具库&#xff0c;它为用户提供各种事件处理及操作处理。通过osgGA库读者可以像控制Windows窗口一样来处理各种事件 osgGA的事件处理器主要由两大部分组成&#xff0c;即事件适配器和动作适配器。osgGA:GUIEventHandler类主要提供了窗口系统的…

swin unetr的3D语义分割

基于monai库。其实我不是很喜欢这种&#xff0c;可扩展性太差了&#xff0c;除非说你想快速在自己的数据集上出结果。但是它的transform可以对3d医学图像增强操作&#xff0c;比torch的transform强一点&#xff0c;因为它的数据增强输入是&#xff08;x,y,z&#xff09;h,w,d格…