java 面试

news2024/11/16 15:50:51

面试

  • 目录
    • 概述
      • 需求:
    • 设计思路
    • 实现思路分析
      • 1.面试概要
  • 参考资料和推荐阅读

Survive by day and develop by night.
talk for import biz , show your perfect code,full busy,skip hardness,make a better result,wait for change,challenge Survive.
happy for hardess to solve denpendies.

目录

在这里插入图片描述

概述

需求:

java 面试

设计思路

实现思路分析

1.面试概要

面试官: 大概谈了一下:
1.自我介绍
2.项目大概介绍了一下
Spring clound 的使用
3.redis 分布式锁的实现点
4.kafka 如何实现延迟队列的?
这个好像有点模糊?

如下补充一下kafka下如何实现延迟队列的?
如下补充一下kafka下如何实现延迟队列的?
二、kafka实践

项目中采用的消息中间件是kafka,那如何在kafka上实现类似延迟队列的功能。
kafka本身是不支持延迟队列功能,我们可以通过消息延时转发新主题,曲线完成该功能。

主要实践原理是通过定阅原始主题,并判断是否满足延迟时间要求,满足要求后转发新主题,不满足则阻塞等待,同时外置一个定时器,每1秒进行唤醒锁协作。

为了避免消息长时间得不到消费使用导致kafka的rebalance,使用kafka自提供的api
consumer.pause(Collections.singletonList(topicPartition));

package com.zte.sdn.oscp.kafka;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;

@SpringBootTest(classes = TestMainApplication.class)
public class DelayQueueTest {

    private KafkaConsumer<String, String> consumer;
    private KafkaProducer<String, String> producer;
    private volatile Boolean exit = false;
    private final Object lock = new Object();
    private final String servers = "127.0.0.1:4532";

    @BeforeEach
    void initConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "d");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000");
        consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
    }

    @BeforeEach
    void initProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new KafkaProducer<>(props);
    }

    @Test
    void testDelayQueue() throws IOException, InterruptedException {
        //主题
        String topic = "delay-minutes-1";
        List<String> topics = Collections.singletonList(topic);
        consumer.subscribe(topics);
        //定时器,实时1s解锁
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                synchronized (lock) {
                    consumer.resume(consumer.paused());
                    lock.notify();
                }
            }
        }, 0, 1000);

        do {
            synchronized (lock) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(200));
                //消息为空,则阻塞,等待定时器来唤醒
                if (consumerRecords.isEmpty()) {
                    lock.wait();
                    continue;
                }
                boolean timed = false;
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    //消息体固定为{"topic": "target","key": "key1","value": "value1"}
                    long timestamp = consumerRecord.timestamp();
                    TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                    //判断是否满足延迟要求,这里为1min,当然也可以设计更多延迟定义的主题
                    if (timestamp + 60 * 1000 < System.currentTimeMillis()) {
                        String value = consumerRecord.value();
                        ObjectMapper objectMapper = new ObjectMapper();
                        JsonNode jsonNode = objectMapper.readTree(value);
                        JsonNode jsonNodeTopic = jsonNode.get("topic");

                        String appTopic = null, appKey = null, appValue = null;

                        if (jsonNodeTopic != null) {
                            appTopic = jsonNodeTopic.asText();
                        }
                        if (appTopic == null) {
                            continue;
                        }
                        JsonNode jsonNodeKey = jsonNode.get("key");
                        if (jsonNodeKey != null) {
                            appKey = jsonNode.asText();
                        }

                        JsonNode jsonNodeValue = jsonNode.get("value");
                        if (jsonNodeValue != null) {
                            appValue = jsonNodeValue.asText();
                        }
                        // send to application topic
                        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(appTopic, appKey, appValue);
                        try {
                            producer.send(producerRecord).get();
                            // success. commit message
                            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1);
                            HashMap<TopicPartition, OffsetAndMetadata> metadataHashMap = new HashMap<>();
                            metadataHashMap.put(topicPartition, offsetAndMetadata);
                            consumer.commitSync(metadataHashMap);
                        } catch (ExecutionException e) {
                            //异步停止,并重置offset
                            consumer.pause(Collections.singletonList(topicPartition));
                            consumer.seek(topicPartition, consumerRecord.offset());
                            timed = true;
                            break;
                        }
                    } else {
                        //不满足延迟要求,并重置offset
                        consumer.pause(Collections.singletonList(topicPartition));
                        consumer.seek(topicPartition, consumerRecord.offset());
                        timed = true;
                        break;
                    }
                }

                if (timed) {
                    lock.wait();
                }
            }
        } while (!exit);
    }
}

上面的实践存在什么样的问题,考虑一个场景,有一个延迟一小时的队列,这样消息发出后,实际上一个小时后在该主题上的消息拉取才有意义(之前即使拉取下来也发送不出去),但上面的实现仍然会不停阻塞唤醒,相当于在做无用功。如何避免该问题。
这边的原理是通过定阅原始主题,并判断是否满足延迟时间要求,满足要求后转发新主题,不满足则停止消费并等待。

package com.zte.sdn.oscp.kafka;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

@SpringBootTest(classes = TestMainApplication.class)
public class DelayQueueSeniorTest {

    private KafkaConsumer<String, String> consumer;
    private KafkaProducer<String, String> producer;
    private volatile Boolean exit = false;
    private final String servers = "127.0.0.1:4532";

    @BeforeEach
    void initConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "d");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000");
        consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
    }

    @BeforeEach
    void initProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new KafkaProducer<>(props);
    }

    @Test
    void testDelayQueue() throws IOException, InterruptedException {
        String topic = "delay-minutes-1";
        List<String> topics = Collections.singletonList(topic);
        consumer.subscribe(topics);
        do {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(200));
            if (consumerRecords.isEmpty()) {
                continue;
            }
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                long timestamp = consumerRecord.timestamp();
                TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                //超时一分钟
                long span = timestamp + 60 * 1000 - System.currentTimeMillis();
                if (span <= 0) {
                    String value = consumerRecord.value();
                    ObjectMapper objectMapper = new ObjectMapper();
                    JsonNode jsonNode = objectMapper.readTree(value);
                    JsonNode jsonNodeTopic = jsonNode.get("topic");

                    String appTopic = null, appKey = null, appValue = null;

                    if (jsonNodeTopic != null) {
                        appTopic = jsonNodeTopic.asText();
                    }
                    if (appTopic == null) {
                        continue;
                    }
                    JsonNode jsonNodeKey = jsonNode.get("key");
                    if (jsonNodeKey != null) {
                        appKey = jsonNode.asText();
                    }

                    JsonNode jsonNodeValue = jsonNode.get("value");
                    if (jsonNodeValue != null) {
                        appValue = jsonNodeValue.asText();
                    }
                    // send to application topic
                    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(appTopic, appKey, appValue);
                    try {
                        producer.send(producerRecord).get();
                        // success. commit message
                        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1);
                        HashMap<TopicPartition, OffsetAndMetadata> metadataHashMap = new HashMap<>();
                        metadataHashMap.put(topicPartition, offsetAndMetadata);
                        consumer.commitSync(metadataHashMap);
                    } catch (ExecutionException e) {
                        consumer.pause(Collections.singletonList(topicPartition));
                        consumer.seek(topicPartition, consumerRecord.offset());
                        Thread.sleep(span);
                        consumer.resume(consumer.paused());
                        break;
                    }
                } else {
                    consumer.pause(Collections.singletonList(topicPartition));
                    consumer.seek(topicPartition, consumerRecord.offset());
                    //通过计算延迟时间差值,然后等待,避免空转
                    Thread.sleep(span);
                    consumer.resume(consumer.paused());
                    break;
                }
            }
        } while (!exit);

    }
}

参考资料和推荐阅读

  1. 暂无

欢迎阅读,各位老铁,如果对你有帮助,点个赞加个关注呗!~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/381313.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JSTL核心库的简单使用

JSTL核心库的简单使用 7.1考试重点 7.1.1c:out输出数据 考试重点就是c的相关的 jar包下载地址:Apache Tomcat - Apache Taglibs Downloads 看会典型应用就可以<% page contentType"text/html;charsetUTF-8" language"java" %> <% taglib uri"…

DolphinDB 通过 Telegraf + Grafana 实现设备指标的采集监控和展示

基于原始数据采集的可视化监控是企业确保设备正常运行和安全生产的重要措施。本文详细介绍了如何从DolphinDB 出发&#xff0c;借助 Telegraf 对设备进行原始数据采集&#xff0c;并通过 Grafana 实现数据的可视化&#xff0c;从而实现设备指标的实时监控。1. 概览Telegraf 是 …

Mybatis-plus逻辑删除更新字段

MybatisPlus版本 <dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.2</version> </dependency> <dependency><groupId>com.baomidou</groupId&g…

优思学院|DFMEA是全球制造业的必修课!

DFMEA&#xff08;Design Failure Mode and Effects Analysis&#xff09;是一种分析技术&#xff0c;在产品设计的早期阶段识别和解决潜在的失效问题。它通过分析设计的各个方面&#xff0c;识别潜在的失效模式和影响&#xff0c;并提出相应的改进措施&#xff0c;以减少失效的…

服装企业 采购系统

技术&#xff1a;Java、JSP等摘要&#xff1a;随着我国市场经济的不断发展,企业之间的竞争越来越激烈,只有对企业库存物资资源全面掌握,充分发挥闲置资源的利用,对资源进行优化配置,才能使企业效益达到最大化。只有通过规范科学的物资管理手段,才能节省物资采购成本,提高工作效…

Java——面向对象

目录 前言 一、什么是面向对象&#xff1f; 面向过程 & 面向对象 面向对象 二、回顾方法的定义和调用 方法的定义 方法的调用 三、类与对象的创建 类和对象的关系 创建与初始化对象 四、构造器详解 五、创建对象内存分析 六、封装详解 七、什么是继承&#x…

Unity TextMeshPro

Unity TextMeshPro 简介 TextMeshPro(也简称为TMP)号称是Unity的终极文本解决方案&#xff0c;它是Unity 的 UI 文本和旧版文本网格体的完美替代品。 功能强大且易于使用&#xff0c;使用高级文本渲染技术以及一组自定义着色器;提供实质性的视觉质量改进&#xff0c;同时在文…

Python基础教程(入门教程),初学者学Python编程如何快速入门?

【导语】Python是一种跨平台的计算机程序设计语言&#xff0c;通过Python编程&#xff0c;我们能够解决现实生活中的很多困难&#xff0c;现如今&#xff0c;我们工作中的许多工作都需要通过编写计算机软件来完成&#xff0c;那么初学者学Python编程如何快速入门呢?下面就来给…

【用Group整理目录结构 Objective-C语言】

一、接下来,我们看另外一个知识点,怎么用Group把这一堆乱七八糟的文件给它整理一下,也算是封装一下吧, 1.这一堆杂乱无章的文件: 那么,哪些类是属于模型呢,哪些类是属于视图呢,哪些类是属于控制器呢, 我们接下来通过Group的方式,来给它们分一下类, 这样看起来就好…

虚拟机安装ubuntu窗口自适应问题以及软件窗口显示不全解决方法

这部分查了很多博客&#xff0c;首先感谢前人栽树。 直接上我在安装过程中的有效解决步骤&#xff0c; 文后会描述遇到的非有效解决步骤&#xff0c;以供遇到相同问题的同学参考。 打开终端窗口 (ctrlaltt),当然肯定是一条一条的执行。 sudo apt-get update sudo apt-get upg…

第三章-OpenCV基础-7-形态学

前置 形态学主要是从图像中提取分量信息&#xff0c;该分量信息通常是图像理解时所使用的最本质的形状特征,对于表达和描绘图像的形状有重要意义。 大体就是通过一系列操作让图像信息中的关键信息更加凸出。同时&#xff0c;形态学的操作都是基于灰度图进行。 相关操作最主要…

Filebeat处理多行换行的问题

问题&#xff1a;在使用filebeatelabscience或者filebeatelk 又或者其他桥接器的时候&#xff0c;因为filbeat默认使用单行显示的原因&#xff0c;但日志出现堆栈错误或其他多行日志时会出现如下错误处理办法&#xff1a;1.固定日志格式 这里不展开说明2.匹配日志 找到你的file…

【Flutter入门到进阶】Flutter基础篇---布局

1 GridView网格布局组件 1.1 说明 1.1.1 图例 1.1.2 说明 GridView网格布局在实际项目中用的也是非常多的&#xff0c;当我们想让可以滚动的元素使用矩阵方式排列的时 候。此时我们可以用网格列表组件GridView实现布局 GridView创建网格列表主要有下面三种方式 1、可以通过Gr…

纳睿雷达在科创板上市:总市值达93亿元,2022年营收约2亿元

3月1日&#xff0c;广东纳睿雷达科技股份有限公司&#xff08;下称“纳睿雷达”&#xff0c;SH:688522&#xff09;在科创板上市。本次上市&#xff0c;纳睿雷达的发行价为46.68元/股&#xff0c;发行数量为3866.68万股&#xff0c;募资总额约为18.05亿元。 上市首日&#xff…

关于“腺样体面容”的两大认知误区,你需要了解一下

仅供医学专业人士阅读参考看完不要再中招了&#xff01;随着父母越来越重视孩子的外表和健康成长&#xff0c;“腺样脸”几乎成为聚会上不可避免的热门话题。在各种交流和讨论中&#xff0c;你经常听到朋友焦虑有点高兴地说&#xff1a;“虽然我的孩子总是张嘴睡觉&#xff0c;…

pandas: 三种算法实现递归分析Excel中各列相关性

目录 前言 目的 思路 代码实现 1. 循环遍历整个SDGs列&#xff0c;两两拿到数据 2. 调用pandas库函数直接进行分析 完整源码 运行效果 总结 前言 博主之前刚刚被学弟邀请参与了2023美赛&#xff0c;这也是第一次正式接触数学建模竞赛&#xff0c;现在已经提交等待结果…

【自动化测试】一位自动化测试工程师居然不会封装框架?神秘自动化测试框架......

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 自动化测试框架 自…

02 Android基础--service

02 Android基础--service什么是service&#xff1f;service的demo使用Service的种类前台service的使用背景什么是service&#xff1f; Service(服务)是一个一种可以在后台执行长时间运行操作而没有用户界面的应用组件。 服务分为两种形式&#xff1a;非绑定状态与绑定状态。 非…

深入Linux内核理解NIO与Epoll

目录 深入Linux内核理解NIO与Epoll IO模型 BIO(Blocking IO) 代码演示&#xff1a; 缺点&#xff1a; BIO总结&#xff1a; NIO(Non Blocking IO) NIO非阻塞代码示例&#xff1a; 使用telnet客户端Debug代码演示&#xff1a; 总结: NIO引入多路复用器Selector的代码演…

Python - 模块、包

模块 什么是模块&#xff08;module&#xff09; 是一个Python文件模块包含&#xff1a;函数、类、变量、可执行的代码模块分类&#xff1a; 内置标准模块&#xff08;又称标准库&#xff09;第三方开源模块自定义模块 导入模块的方式 几种方式&#xff1a; import [模块名…