kafka小实站

news2025/1/3 3:04:41

需要先在前面的文章里面照着下载好kafka,并且启动 先启动zookeeper

项目目录

package kafka; 
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * kafka消费者
 *
 **/
@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = "${kafka.topic.name}")
    public void listen(ConsumerRecord<?, ?> record) {
        log.info("topic={}, offset={}, message={}", record.topic(), record.offset(), record.value());
    }


}

comsumer代码

定义了一个简单的 Kafka 消费者,它使用 Spring 的 Kafka Listener 来监听指定的主题,并处理接收到的消息

  • @Component
    标记这个类为一个 Spring 的组件,使其被 Spring 容器管理并可以被自动扫描和加载。

  • @Slf4j
    这是 Lombok 提供的注解,自动为类生成一个名为 log日志记录器,可以直接用 log.info() 等方法记录日志,而无需手动定义日志实例。

  • @KafkaListener
    这是 Spring Kafka 提供的注解,用于标记一个方法为 Kafka 消息的监听器。

    • topics:指定要监听的 Kafka 主题。这里用占位符 ${kafka.topic.name},说明具体的主题名称是从配置文件(如 application.propertiesapplication.yml)中读取的。
  • 自动监听
    当有新的消息发布到指定的 Kafka 主题时,Spring 会自动调用标注了 @KafkaListener 的方法来处理消息。

  • 参数 ConsumerRecord<?, ?> record
    代表 Kafka 中一条消费记录,包含了消息的元数据(如主题名、分区、偏移量)和消息体。

    • record.topic():获取消息所属的 Kafka 主题名。
    • record.offset():获取消息的偏移量(在 Kafka 中,每条消息都有一个唯一的偏移量)。
    • record.value():获取消息的内容。
  • log.info
    使用日志记录器将接收到的消息详细信息打印到日志中,包括:

    • 消息所在的 主题名
    • 偏移量offset),用于定位消息在分区中的位置。
    • 消息的 内容
package kafka; 

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;

/**
 * kafka生产者
 **/
@Component
@Slf4j
public class KafkaProducer implements CommandLineRunner {

    @Resource
    KafkaTemplate kafkaTemplate;

    @Value("${kafka.topic.name}")
    private String topic;

    /**
     * 发送kafka消息
     *
     * @param string
     */
    public void send(String string) {
        ListenableFuture future = kafkaTemplate.send(topic, JSONObject.toJSONString(string));
        //future.addCallback(o -> log.info("kafka消息发送成功:" + jsonString), throwable -> log.error("kafka消息发送失败:" + jsonString));
    }

    /**
     * 容器启动完成后,生产测试数据
     */
    @Override
    public void run(String... args) {
        for (int i = 0; i < 10; i++) {
            send("hello world" + i);
        }
    }
}

producer代码

  • KafkaTemplate
    这是 Spring Kafka 提供的核心类,用于向 Kafka 发送消息。通过 @Resource 注解实现依赖注入。

  • @Value("${kafka.topic.name}")
    通过 Spring 的配置文件(如 application.propertiesapplication.yml)注入 Kafka 主题名称。

  • kafkaTemplate.send(topic, message)
    通过 KafkaTemplate 将消息发送到指定主题。

  • JSONObject.toJSONString(string)
    使用阿里巴巴的 fastjson 库将消息序列化为 JSON 格式。

  • ListenableFuture
    Kafka 消息发送是异步的,send() 方法返回一个 ListenableFuture 对象,可以用来监听消息的发送结果。

  • 日志记录(被注释)
    使用 addCallback 方法为发送成功和失败分别设置回调逻辑:

    • 发送成功时记录日志:log.info("kafka消息发送成功:" + string)
    • 发送失败时记录错误日志:log.error("kafka消息发送失败:" + string)
    • run(String... args)
      这是 CommandLineRunner 接口的方法,Spring 容器启动完成后会自动调用。

    • 循环发送消息
      启动后向 Kafka 主题发送 10 条测试消息,例如:hello world0hello world1 等。

## kafka ##
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=test


spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.topic.name=mqtt_location_data
#
#spring.data.mongodb.host=localhost
#spring.data.mongodb.port=27017
#spring.data.mongodb.database=your_database_name

配置文件application.properties 

运行结果:

2024-12-26 01:32:30.839  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:00.850  INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
2024-12-26 01:33:00.855  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.856  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.856  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted.
2024-12-26 01:33:00.970  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.971  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 01:33:00.980  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.847  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.847  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 01:33:15.874  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.875  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 3: {consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a=Assignment(partitions=[mqtt_location_data-0])}
2024-12-26 01:33:15.879  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.880  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-26 01:33:15.881  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-26 01:33:15.887  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-26 01:33:15.888  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions assigned: [mqtt_location_data-0]
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=10, message="hello world0"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=11, message="hello world1"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=12, message="hello world2"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=13, message="hello world3"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=14, message="hello world4"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=15, message="hello world5"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=16, message="hello world6"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=17, message="hello world7"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=18, message="hello world8"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=19, message="hello world9"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=20, message="hello world0"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=21, message="hello world1"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=22, message="hello world2"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=23, message="hello world3"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=24, message="hello world4"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=25, message="hello world5"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=26, message="hello world6"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=27, message="hello world7"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=28, message="hello world8"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=29, message="hello world9"
2024-12-26 04:36:36.611  INFO 22936 --- [ntainer#0-0-C-1] o.a.kafka.clients.FetchSessionHandler    : [Consumer clientId=consumer-test-1, groupId=test] Error sending fetch request (sessionId=1770952277, epoch=389) to node 0:

org.apache.kafka.common.errors.DisconnectException: null

2024-12-26 04:36:36.612  INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
2024-12-26 04:36:36.727  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 04:36:36.731 ERROR 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Offset commit failed on partition mqtt_location_data-0 at offset 30: The coordinator is not aware of this member.
2024-12-26 04:36:36.731  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] OffsetCommit failed with Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}: The coordinator is not aware of this member.
2024-12-26 04:36:36.732  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2024-12-26 04:36:36.732  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2024-12-26 04:36:36.732  WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Asynchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2024-12-26 04:36:36.732  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Failing OffsetCommit request since the consumer is not part of an active group
2024-12-26 04:36:36.733  WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Synchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2024-12-26 04:36:36.733  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2024-12-26 04:36:36.733  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Lost previously assigned partitions mqtt_location_data-0
2024-12-26 04:36:36.733  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions lost: [mqtt_location_data-0]
2024-12-26 04:36:36.733  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions revoked: [mqtt_location_data-0]
2024-12-26 04:36:36.733  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 04:36:36.734  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Attempt to heartbeat with stale Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, ignoring the error
2024-12-26 04:36:36.735  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: need to re-join with the given member-id
2024-12-26 04:36:36.735  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 04:36:37.543  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}
2024-12-26 04:36:37.543  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 5: {consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61=Assignment(partitions=[mqtt_location_data-0])}
2024-12-26 04:36:37.553  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}
2024-12-26 04:36:37.554  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-26 04:36:37.554  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-26 04:36:37.557  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=30, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-26 04:36:37.589  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions assigned: [mqtt_location_data-0]
2024-12-30 22:44:08.518  INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
2024-12-30 22:44:09.620  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-30 22:44:09.629  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Attempt to heartbeat with Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, resetting generation
2024-12-30 22:44:09.629  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response
2024-12-30 22:44:09.629  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response
2024-12-30 22:44:09.629  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Failing OffsetCommit request since the consumer is not part of an active group
2024-12-30 22:44:09.630  WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Synchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2024-12-30 22:44:09.630  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2024-12-30 22:44:09.630  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Lost previously assigned partitions mqtt_location_data-0
2024-12-30 22:44:09.630  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions lost: [mqtt_location_data-0]
2024-12-30 22:44:09.630  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions revoked: [mqtt_location_data-0]
2024-12-30 22:44:09.630  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-30 22:44:09.636 ERROR 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Offset commit failed on partition mqtt_location_data-0 at offset 30: The coordinator is not aware of this member.
2024-12-30 22:44:09.636  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] OffsetCommit failed with Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}: The coordinator is not aware of this member.
2024-12-30 22:44:09.637  WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Asynchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer member's generation is already stale, meaning it has already participated another rebalance and got a new generation. You can try completing the rebalance by calling poll() and then retry commit again
2024-12-30 22:44:09.640  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: need to re-join with the given member-id
2024-12-30 22:44:09.640  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-30 22:44:09.663  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=7, memberId='consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd', protocol='range'}
2024-12-30 22:44:09.663  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 7: {consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd=Assignment(partitions=[mqtt_location_data-0])}
2024-12-30 22:44:09.667  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=7, memberId='consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd', protocol='range'}
2024-12-30 22:44:09.667  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-30 22:44:09.667  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-30 22:44:09.672  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=30, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-30 22:44:10.200  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions assigned: [mqtt_location_data-0]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2268409.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【从零开始入门unity游戏开发之——C#篇39】C#反射使用——Type 类、Assembly 类、Activator 类操作程序集

文章目录 前言一、前置知识1、编译器2、程序集&#xff08;Assembly&#xff09;3、元数据&#xff08;Metadata&#xff09; 二、反射1、反射的概念2、反射的作用3、反射的核心Type 类3.1 Type 类介绍3.2 不同方法获取 Type3.3 获取type类型所在的程序集的相关信息 4、反射的常…

(桌面运维学习)通过备份C盘,进行Windows系统的软件初始化

通过PE工具备份C盘&#xff0c;进行Windows系统的软件初始化 需求场景&#xff1a;快速初始化一批型号和主板一样的电脑系统型号也要一致&#xff08;Win10专业版就最好全是WIn10专业版&#xff09;&#xff0c;初始化的内容包括已配置好的环境和已安装的软件。主要用于公司桌面…

【ELK】ES单节点升级为集群模式--太细了!

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言准备工作1. 查看现状【单节点】2. 原节点改集群模式3. 改es配置文件&#xff0c;增加集群相关配置项4. *改docker映射的端口* 启动新节点5. docker-compose起一…

Path-of-Thoughts:将“思维链“升级为“思维图“,三阶段框架取代单一推理,提升大模型复杂关系推理准确性至88.2%与效率提升5%

Path-of-Thoughts&#xff1a;将"思维链"升级为"思维图"&#xff0c;三阶段框架取代单一推理&#xff0c;提升大模型复杂关系推理准确性至88.2%与效率提升5% 论文大纲理解通用流程框架 观察和假设观察现象提出假设实验验证解法拆解解法&#xff1a;Path-of…

ThinkPHP 8高效构建Web应用-第一个简单的MVC应用示例

【图书介绍】《ThinkPHP 8高效构建Web应用》-CSDN博客 《2025新书 ThinkPHP 8高效构建Web应用 编程与应用开发丛书 夏磊 清华大学出版社教材书籍 9787302678236 ThinkPHP 8高效构建Web应用》【摘要 书评 试读】- 京东图书 使用VS Code开发ThinkPHP项目-CSDN博客 我们先实现一…

No.3十六届蓝桥杯备战|数据类型长度|sizeof|typedef|练习(C++)

数据类型⻓度 每⼀种数据类型都有⾃⼰的⻓度&#xff0c;使⽤不同的数据类型&#xff0c;能够创建出⻓度不同的变量&#xff0c;变量⻓度的不同&#xff0c;存储的数据范围就有所差异。 sizeof操作符 sizeof 是⼀个关键字&#xff0c;也是操作符&#xff0c;专⻔是⽤来计算特…

大数据组件(一)快速入门调度组件Airflow

大数据组件(一)快速入门调度组件Airflow DolphinScheduler和 Airflow是数据领域很流行的两款开源任务调度系统。DolphinScheduler 致力于用可视化的方式去完成一个 DAG 工作流&#xff0c;而 Airflow 则想的是用类似于编程的方式完成一个 DAG 工作流。 Apache DolphinSchedule…

jpeg学习

相关最全的一篇文章链接&#xff1a;https://www.cnblogs.com/wtysos11/p/14089482.html YUV基础知识 Y表示亮度分量&#xff1a;如果只显示Y的话&#xff0c;图像看起来会是一张黑白照。 U&#xff08;Cb&#xff09;表示色度分量&#xff1a;是照片蓝色部分去掉亮度&#x…

内部类(3)

大家好&#xff0c;今天我们继续来看看内部类&#xff0c;今天我们来学习一下内部类的分类&#xff0c;我们来看看一共有几种&#xff0c;它们有什么作用&#xff0c;那么话不多说&#xff0c;我们直接开始。 9.1 内部类的分类 先来看下,内部类都可以在一个类的哪些位置进行定…

你还在用rand()生成随机数?

1. rand() 的缺陷 伪随机数生成器使用数学算法来产生具有良好统计特性的数字序列&#xff0c;但这些数字并非真正随机。 C 标准库中的 rand() 函数并不保证所生成的随机序列的质量。某些 rand() 实现生成的数字周期较短&#xff0c;且这些数字是可以预测的。对于有强伪随机数…

基于FPGA的2ASK+帧同步系统verilog开发,包含testbench,高斯信道,误码统计,可设置SNR

目录 1.算法仿真效果 2.算法涉及理论知识概要 2.1 2ASK调制解调 2.2 帧同步 3.Verilog核心程序 4.完整算法代码文件获得 1.算法仿真效果 vivado2019.2仿真结果如下&#xff08;完整代码运行后无水印&#xff09;&#xff1a; 设置SNR8db 设置SNR20db 整体波形效果&…

RT-Thread中堆和栈怎么跟单片机内存相联系

现在RT-ThreadMCU的应用方式越来越普遍&#xff0c;RT-Thread需要配置MCU中的RAM到的系统中&#xff0c;进入系统内存管理&#xff0c;才能提供给基于实时系统的应用程序使用&#xff0c;比如给应用程序提供malloc、free等函数调用功能。在嵌入式软件开发中&#xff0c;我们经常…

2、Bert论文笔记

Bert论文 1、解决的问题2、预训练微调2.1预训练微调概念2.2深度双向2.3基于特征和微调&#xff08;预训练下游策略&#xff09; 3、模型架构4、输入/输出1.输入&#xff1a;2.输出&#xff1a;3.Learned Embeddings(学习嵌入)1. **Token Embedding**2. **Position Embedding**3…

TiDB 的MPP架构概述

MPP架构介绍&#xff1a; 如图&#xff0c;TiDB Server 作为协调者&#xff0c;首先 TiDB Server 会把每个TiFlash 拥有的region 会在TiFlash上做交换&#xff0c;让表连接在一个TiFlash上。另外 TiFlash会作为计算节点&#xff0c;每个TiFlash都负责数据交换&#xff0c;表连接…

3、redis的高可用

主从复制 主从复制&#xff1a;这是redis高可用的基础。哨兵模式和集群都是建立在此基础之上。 主从模式和数据库的主从模式是一样的&#xff0c;主负责写入&#xff0c;然后把写入的数据同步到从&#xff0c;从节点只能读不能写。read only。 不能做高可用的切换&#xff…

【架构-38】如何选择通信协议和数据格式

一、通信协议选择 不同的协议适用于不同的应用场景&#xff0c;关键在于数据传输的需求&#xff0c;如&#xff1a;实时性、带宽、可靠性等。下面是几种常见通信协议的适用场景&#xff1a; WebSocket 适用场景&#xff1a;实时、双向数据传输、低延迟、持久连接 特点&#x…

SpringCloudAlibaba 技术栈—Sentinel

1、什么是sentinel? Sentinel是一个用于微服务架构的流量管理和控制系统&#xff0c;它通过限制和控制进入系统的流量&#xff0c;来保护系统免受过载和故障的影响&#xff0c;确保服务的稳定性。简而言之&#xff0c;它就是一个帮助微服务在高负载情况下也能稳定运行的工具。…

初学STM32 ---高级定时器互补输出带死区控制

互补输出&#xff0c;还带死区控制&#xff0c;什么意思&#xff1f; 带死区控制的互补输出应用之H桥 捕获/比较通道的输出部分&#xff08;通道1至3&#xff09; 死区时间计算 举个栗子&#xff08;F1为例&#xff09;&#xff1a;DTG[7:0]250&#xff0c;250即二进制&#x…

RoboMIND:多体现基准 机器人操纵的智能规范数据

我们介绍了 RoboMIND&#xff0c;这是机器人操纵的多体现智能规范数据的基准&#xff0c;包括 4 个实施例、279 个不同任务和 61 个不同对象类别的 55k 真实世界演示轨迹。 工业机器人企业 埃斯顿自动化 | 埃夫特机器人 | 节卡机器人 | 珞石机器人 | 法奥机器人 | 非夕科技 | C…

Hadoop HA安装配置(容器环境),大数据职业技能竞赛模块A平台搭建,jdk+zookeeper+hadoop HA

HA概述 &#xff08;1&#xff09; 所谓HA&#xff08;High Availablity&#xff09;,即高可用&#xff08;7*24小时不中断服务&#xff09;。 &#xff08;2&#xff09; 实现高可用最关键的策略是消除单点故障&#xff0c;HA严格来说应该分为各个组件的HA机制&#xff0c;H…