springboot整合ELK+kafka采集日志

news2024/12/27 11:31:19

一、背景介绍

在分布式的项目中,各功能模块产生的日志比较分散,同时为满足性能要求,同一个微服务会集群化部署,当某一次业务报错后,如果不能确定产生的节点,那么只能逐个节点去查看日志文件;logback中RollingFileAppender,ConsoleAppender这类同步化记录器也降低系统性能,综上一些问题,可能考虑采用ELK (elasticsearch+logstash+kibana)配合消息中间件去异步采集,统一展示去解决。

这里之所以要加入kafka是因为

  1. 如果直接利用logstash同步日志,则每个节点都需要部署logstash,且logstash会严重消耗性能、浪费资源;
  2. 当访问量特别高时,产生的日志速度也会特别快,kafka可以削峰限流、降低logstash的压力;
  3. 当logstash故障时消息可以存储到kafka中不会丢失。

二、 整体流程图

在这里插入图片描述

三、搭建kafka+zk环境

1、创建文件夹

mkdir /usr/elklog/kafka

2、在创建好的文件夹下创建文件docker-compose.yml

version: "2"

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.8
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    networks:
      - es_default
  kafka:
    image: docker.io/bitnami/kafka:3.2
    user: root
    ports:
      - "9092:9092"
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.22:9092  #这里替换为你宿主机IP或host,在集群下,各节点会把这个地址注册到集群,并把主节点的暴露给客户端,不要注册localhost
#      - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092

    depends_on:
      - zookeeper
    networks:
      - es_default
networks:
  es_default:
    name: es_default
#    external: true
volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local



3、在docker-compose.yml同级目录中输入启动命令

docker-compose up -d  

这里用的是docker-compose方式安装,安装之前需要先安装好docker和docker-compose

docker安装方式:https://blog.csdn.net/qq_38639813/article/details/129384923

docker-compose安装方式:https://blog.csdn.net/qq_38639813/article/details/129751441

四、搭建elk环境

1、拉取elk所需镜像

docker pull elasticsearch:7.10.1
docker pull kibana:7.10.1
docker pull elastic/metricbeat:7.10.1
docker pull elastic/logstash:7.10.1

2、创建文件夹:

mkdir /usr/elklog/elk
mkdir /usr/elklog/elk/logstash
mkdir /usr/elklog/elk/logstash/pipeline

mkdir /usr/elklog/elk/es
mkdir /usr/elklog/elk/es/data

3、给data文件夹授权

chmod 777 /usr/elklog/elk/es/data

4、在/usr/elklog/elk/logstash/pipeline中创建logstash.conf

logstash.conf文件作用是将kafka中的日志消息获取出来 ,再推送给elasticsearch

input {
  kafka {
    bootstrap_servers => "192.168.3.22:9092"  #kafka的地址,替换为你自己的 
	client_id => "logstash"  
    auto_offset_reset => "latest"  
    consumer_threads => 5
    topics => ["demoCoreKafkaLog","webapiKafkaApp"]  #获取哪些topic,在springboot项目的logback-spring.xml中指定
    type => demo   #自定义
#    codec => "json"
  }
}

output {
  stdout {  }

  elasticsearch {
    hosts => ["http://192.168.3.22:9200"]   #es的地址
	index => "demolog-%{+YYYY.MM.dd}"    #这里将会是创建的索引名,后续 kibana将会用不同索引区别
	#user => "elastic"
    #password => "changeme"
  }
}

5、在/usr/elklog/elk中创建docker-compose.yml

version: "2"

services:
  elasticsearch:
    image: elasticsearch:7.10.1
    restart: always
    privileged: true
    ports:
      - "9200:9200"
      - "9300:9300"
    volumes:
      - /usr/elklog/elk/es/data:/usr/share/elasticsearch/data
    environment:
      - discovery.type=single-node
    networks:
      - es_default
  kibana:
    image: kibana:7.10.1
    restart: always
    privileged: true
    ports:
      - "5601:5601"
    environment:
      - ELASTICSEARCH_URL=http://192.168.3.22:9200
    depends_on:
      - elasticsearch
    networks:
      - es_default
  metricbeat:
    image: elastic/metricbeat:7.10.1
    restart: always
    user: root
    environment:
      - ELASTICSEARCH_HOSTS=http://192.168.3.22:9200
    depends_on:
      - elasticsearch
      - kibana
    command:  -E setup.kibana.host="192.168.3.22:5601" -E setup.dashboards.enabled=true -E setup.template.overwrite=false -E output.elasticsearch.hosts=["192.168.3.22:9200"] -E setup.ilm.overwrite=true
    networks:
      - es_default
  logstash:
    image: elastic/logstash:7.10.1
    restart: always
    user: root
    volumes:
      - /usr/elklog/elk/logstash/pipeline:/usr/share/logstash/pipeline/  
    depends_on:
      - elasticsearch
      - kibana
    networks:
      - es_default
networks:
  es_default:
    driver: bridge
    name: es_default


6、启动服务

docker-compose up -d

检验es是否安装成功:http://192.168.3.22:9200

在这里插入图片描述

检验kibana是否安装成功:192.168.3.22:5601
在这里插入图片描述

7、kibana设置中文

从容器中复制出kibana.yml,修改该文件,再复制回去,重启容器:

docker cp elk-kibana-1:/usr/share/kibana/config/kibana.yml kibana.yml

在这个文件最后加上:     i18n.locale: "zh-CN"

docker cp kibana.yml elk-kibana-1:/usr/share/kibana/config/kibana.yml


重启kibana容器便可

五、springboot代码

1、引入依赖

<!-- Kafka资源的引入 -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
    <groupId>com.github.danielwegener</groupId>
    <artifactId>logback-kafka-appender</artifactId>
    <version>0.2.0-RC1</version>
</dependency>
<dependency>
    <groupId>net.logstash.logback</groupId>
    <artifactId>logstash-logback-encoder</artifactId>
    <version>6.4</version>
</dependency>

2、创建KafkaOutputStream

package com.elk.log;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;

public class KafkaOutputStream extends OutputStream {


    Producer logProducer;
    String topic;

    public KafkaOutputStream(Producer producer, String topic) {
        this.logProducer = producer;
        this.topic = topic;
    }


    @Override
    public void write(int b) throws IOException {
        this.logProducer.send(new ProducerRecord<>(this.topic, b));
    }

    @Override
    public void write(byte[] b) throws IOException {
        this.logProducer.send(new ProducerRecord<String, String>(this.topic, new String(b, Charset.defaultCharset())));
    }

    @Override
    public void flush() throws IOException {
        this.logProducer.flush();
    }
}

3、创建KafkaAppender

package com.elk.log;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Layout;
import ch.qos.logback.core.OutputStreamAppender;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.util.StringUtils;

import java.io.OutputStream;
import java.util.Properties;

public class KafkaAppender<E> extends OutputStreamAppender<E> {

   private Producer logProducer;
   private  String bootstrapServers;
   private 	Layout<E> layout;
   private  String topic;

    public void setLayout(Layout<E> layout) {
        this.layout = layout;
    }

    public void setBootstrapServers(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    @Override
    protected void append(E event) {
        if (event instanceof ILoggingEvent) {
            String msg = layout.doLayout(event);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, 0,((ILoggingEvent) event).getLevel().toString(), msg);
            logProducer.send(producerRecord);
        }
    }

    @Override
    public void start() {

        if (StringUtils.isEmpty(topic)) {
            topic = "Kafka-app-log";
        }
        if (StringUtils.isEmpty(bootstrapServers)) {
            bootstrapServers = "localhost:9092";
        }
        logProducer = createProducer();

        OutputStream targetStream = new KafkaOutputStream(logProducer, topic);
        super.setOutputStream(targetStream);
        super.start();
    }

    @Override
    public void stop() {
        super.stop();
        if (logProducer != null) {
            logProducer.close();
        }
    }


    //创建生产者
    private Producer createProducer() {
        synchronized (this) {
            if (logProducer != null) {
                return logProducer;
            }

            Properties props = new Properties();
            props.put("bootstrap.servers", bootstrapServers);
            //判断是否成功,我们指定了“all”将会阻塞消息 0.关闭 1.主broker确认 -1(all).所在节点都确认
            props.put("acks", "0");
            //失败重试次数
            props.put("retries", 0);
            //延迟100ms,100ms内数据会缓存进行发送
            props.put("linger.ms", 100);
            //超时关闭连接
			//props.put("connections.max.idle.ms", 10000);
            props.put("batch.size", 16384);
            props.put("buffer.memory", 33554432);
            //该属性对性能影响非常大,如果吞吐量不够,消息生产过快,超过本地buffer.memory时,将阻塞1000毫秒,等待有空闲容量再继续
            props.put("max.block.ms",1000);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            return new KafkaProducer<String, String>(props);
        }
    }

}


4、创建logback-spring.xml,放到application.yml的同级目录

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds">
    <!--    <include resource="org/springframework/boot/logging/logback/base.xml"/>-->
    <logger name="com.elk" level="info"/>

    <!-- 定义日志文件 输入位置 -->
    <property name="logPath" value="logs" />
<!--    <property name="logPath" value="D:/logs/truckDispatch" />-->

    <!-- 控制台输出日志 -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger -%msg%n</pattern>
            <charset class="java.nio.charset.Charset">UTF-8</charset>
        </encoder>
    </appender>

    <!-- INFO日志文件 -->
    <appender name="infoAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>INFO</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 文件名称 -->
            <fileNamePattern>${logPath}\%d{yyyyMMdd}\info.log</fileNamePattern>
            <!-- 文件最大保存历史天数 -->
            <MaxHistory>30</MaxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern>
            <charset class="java.nio.charset.Charset">UTF-8</charset>
        </encoder>
    </appender>

    <!-- DEBUG日志文件 -->
    <appender name="debugAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>DEBUG</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 文件名称 -->
            <fileNamePattern>${logPath}\%d{yyyyMMdd}\debug.log</fileNamePattern>
            <!-- 文件最大保存历史天数 -->
            <MaxHistory>30</MaxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern>
            <charset class="java.nio.charset.Charset">UTF-8</charset>
        </encoder>
    </appender>

    <!-- WARN日志文件 -->
    <appender name="warnAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>WARN</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 文件名称 -->
            <fileNamePattern>${logPath}\%d{yyyyMMdd}\warn.log</fileNamePattern>
            <!-- 文件最大保存历史天数 -->
            <MaxHistory>30</MaxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern>
            <charset class="java.nio.charset.Charset">UTF-8</charset>
        </encoder>
    </appender>

    <!-- ERROR日志文件 -->
    <appender name="errorAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>ERROR</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 文件名称 -->
            <fileNamePattern>${logPath}\%d{yyyyMMdd}\error.log</fileNamePattern>
            <!-- 文件最大保存历史天数 -->
            <MaxHistory>30</MaxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern>
            <charset class="java.nio.charset.Charset">UTF-8</charset>
        </encoder>
    </appender>


    <!--  往kafka推送日志  -->
    <appender name="kafkaAppender" class="com.elk.log.KafkaAppender">
        <!-- kafka地址 -->
        <bootstrapServers>192.168.3.22:9092</bootstrapServers>
        <!-- 配置topic -->
        <topic>demoCoreKafkaLog</topic>
        <!-- encoder负责两件事,一是将一个event事件转换成一组byte数组,二是将转换后的字节数据输出到文件中 -->
        <encoder>
            <pattern>${HOSTNAME} %date [%thread] %level %logger{36} [%file : %line] %msg%n</pattern>
            <charset>utf8</charset>
        </encoder>
        <!-- layout主要的功能就是:将一个event事件转化为一个String字符串 -->
        <layout class="ch.qos.logback.classic.PatternLayout">
            <pattern>${HOSTNAME} %date [%thread] %level %logger{36} [%file : %line] %msg%n</pattern>
        </layout>
    </appender>




    <!--  指定这个包的日志级别为error  -->
    <logger name="org.springframework" additivity="false">
        <level value="ERROR" />
           <!-- 控制台输出 -->
<!--        <appender-ref ref="STDOUT" />-->
        <appender-ref ref="errorAppender" />
    </logger>

    <!-- 由于启动的时候,以下两个包下打印debug级别日志很多 ,所以调到ERROR-->
    <!--  指定这个包的日志级别为error  -->
    <logger name="org.apache.tomcat.util" additivity="false">
        <level value="ERROR"/>
        <!-- 控制台输出 -->
<!--        <appender-ref ref="STDOUT"/>-->
        <appender-ref ref="errorAppender"/>
    </logger>

    <!-- 默认spring boot导入hibernate很多的依赖包,启动的时候,会有hibernate相关的内容,直接去除 -->
    <!--  指定这个包的日志级别为error  -->
    <logger name="org.hibernate.validator" additivity="false">
        <level value="ERROR"/>
        <!-- 控制台输出 -->
<!--        <appender-ref ref="STDOUT"/>-->
        <appender-ref ref="errorAppender"/>
    </logger>

    <!--  监控所有包,日志输入到以下位置,并设置日志级别  -->
    <root level="WARN"><!--INFO-->
        <!-- 控制台输出 -->
        <appender-ref ref="STDOUT"/>
        <!-- 这里因为已经通过kafka往es中导入日志,所以就没必要再往日志文件中写入日志,可以注释掉下面四个,提高性能 -->
        <appender-ref ref="infoAppender"/>
        <appender-ref ref="debugAppender"/>
        <appender-ref ref="warnAppender"/>
        <appender-ref ref="errorAppender"/>
        <appender-ref ref="kafkaAppender"/>
    </root>
</configuration>

5、配置文件无需任何修改

server:
  tomcat:
    uri-encoding: UTF-8
    max-threads: 1000
    min-spare-threads: 30
  port: 8087
  connection-timeout: 5000ms
  servlet:
    context-path: /

6、编写测试类

package com.elk.log;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {



    @GetMapping("/testLog")
    public String testLog() {
        log.warn("gotest");
        return "ok";
    }


    @GetMapping("/testLog1")
    public Integer testLog1() {
        int i = 1/0;
        return i;
    }
}

六、利用kibana查看日志

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

注意:这里的索引名字就是logstash.conf中创建的索引名,出现这个也意味着整个流程成功

在这里插入图片描述

在这里插入图片描述
此时索引模式创建完毕,我创建的索引模式名字是demo*

在这里插入图片描述
在这里插入图片描述
这时就可以看到日志了,可以进一步调用测试接口去验证,我这里不在展示,至此全部完毕

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/792208.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL体系结构及执行过程

一、MySQL体系结构 1、网络连接层 客户端连接器&#xff08;Client Cnnectors&#xff09;&#xff1a;提供支持与MySQL服务器建立连接。 建立连接命令&#xff1a;mysql -h -u -p -h指定MySQL服务的IP 若本地连接则不需要 每一个连接均会保存用户权限&#xff0c;中途修改权…

关于在虚拟机CentOS7的Docker下安装Oracle

这不三阶段了&#xff0c;要上Oracle了&#xff0c;感觉这个班卷的程度到位。二阶段我就上了ElementUI和MyBatis&#xff0c;项目也是用这些技术写的&#xff0c;整体钻研程度还行。于是布置了两个任务&#xff1a;在windows下安一下Oracle&#xff0c;在windows下安装Oracle那…

从零开始制作CPU

文章目录 简介门与门或门非门异或门 ALU加法半加器全加器8位加法器 减法补码8位补码器8位减法器 存储锁存器8位锁存器带边沿触发的锁存器 内存内存单元16位内存 CPUPC程序计数器CPU连接 自制乘法器移位8位乘法器 自制除法器比较一位比较8位比较 8位除法器 简介 黑马最近出了个…

BOOTMGR丢失无法开机怎么办?

BOOTMGR&#xff08;引导管理器&#xff0c;Boot Manager的简称&#xff09;其实是一个引导操作系统的程序&#xff0c;通常位于系统保留分区或系统分区中。如果系统分区或系统保留分区的根目录中的BOOTMGR存在错误&#xff0c;那么系统将会在计算机启动时提示找不到操作系统。…

一文读懂ChatGPT,ChatGPT百科全书

引言 ChatGPT是什么&#xff1f; ChatGPT是一款先进的自然语言处理&#xff08;NLP&#xff09;模型&#xff0c;由OpenAI开发和维护。它基于OpenAI的第四代生成预训练Transformer&#xff08;GPT-4&#xff09;架构&#xff0c;旨在通过深度学习技术理解和生成人类语言。ChatG…

大一统真的来了:多模态共享参数的 Meta-Transformer

出品人&#xff1a;Towhee 技术团队 作者&#xff1a;张晨 在探索通用人工智能的多种可能发展方向中&#xff0c;多模态大模型&#xff08;MLLM&#xff09;已成为当前备受关注的重要方向。随着 GPT-4 对图文理解的冲击&#xff0c;更多模态的理解成为了学术界的热点话题&#…

优思学院|PDCA循环与精益管理有何相通之处?

PDCA是精益管理和六西格玛管理的基础原则&#xff0c;通常PDCA可以转化为六西格玛的DMAIC&#xff0c;变成一套以数据驱动方式为主来减少过程变异的改善方法&#xff0c;也可以应用于精益管理的原则上&#xff0c;处理优化价值流和减少浪费的改进上。 PDCA循环原则 这是由美国…

centos下安装ftp-读取目录列表失败-

1.下载安装ftp服务器端和客户端 #1.安装yum -y install vsftpdyum -y install ftp #2.修改配置文件vim /etc/vsftpd.conflocal_enablesYESwrite_enableYESanonymous_enableYESanon_mkdir_write_enableYES //允许匿名用户在FTP上创建目录anon_upload_enableYES //允许匿名用户…

【ElementUI组件封装】搭建架子、按钮自行封装、封装element的表格、表单

通过原生 button 封装类 el-button 组件封装 el-form 相关表单通用组件封装 el-table 相关表格通用组件 Vite Vue3 ElementPlus业务组件封装 数字化管理平台 Vue3ViteVueRouterPiniaAxiosElementPlus 个人博客地址 开发环境&#xff1a;Vite3 Vue3 兼容性&#xff1a;Vite…

前端企业微信开发内嵌H5记录 右边侧边栏开发

企业微信内嵌H5&#xff08;侧边栏&#xff09;开发流程 1、如果要想在企业微信和客户聊天的过程中出现右侧侧边栏&#xff0c;需要添加非本企业的人员微信&#xff0c;右边侧边栏就会自动出现&#xff08;可折叠&#xff09;。 示例&#xff1a; 2、创建一个自建应用 a.先登…

VMware horizon 8 建立手动桌面池

准备一台win10的虚拟机&#xff0c;改静态IP,计算机名&#xff0c;加入域&#xff0c;把Agent软件上传到机器中。 2&#xff1a;右键管理员身份安装程序。 一般默认 根据自己实际情况选择 启用桌面远程功能 安装完成 安装完成以后创建一个快照&#xff0c;以后是好知道机…

申请美国J1签证所需材料清单

在申请前往美国进行交流学习、文化交流或实习等项目的J1签证时&#xff0c;申请人需要准备一系列必要的材料。这些材料将有助于确保申请的顺利进行&#xff0c;以下是J1签证材料清单的详细内容&#xff1a; 1. 签证申请表格&#xff1a; 首先&#xff0c;申请人需要填写并签署D…

多网点多品牌精密空调集中监控方案

精密机房空调主要应用于通信数据机房、交换机 房、IDC机房、精密医疗设备室、外科手术室、实验室、精密电子仪器仪表生产车间等环境,这样的环境对空气的温度、湿度、洁净度、气流分布等各项指标有很高的要求&#xff0c;必须由每年365天、每天24小时安全可靠运行的专用机房精密…

DoIP学习笔记系列:导航篇

文章目录 1. 前言2. 导航3. 参考资料 1. 前言 DoIP学习笔记系列是一整套基于网络的诊断协议学习笔记&#xff0c;非常适合对有UDS基础但对DoIP没有实战经验的小伙伴参考&#xff0c;通过源协议讲解&#xff0c;企标讲解&#xff0c;测试需求讲解&#xff0c;测试用例讲解&…

使用 Apache DolphinScheduler 进行 EMR 任务调度

By AWS Team 前言 随着企业规模的扩大&#xff0c;业务数据的激增&#xff0c;我们会使用 Hadoop/Spark 框架来处理大量数据的 ETL/聚合分析作业&#xff0c;⽽这些作业将需要由统一的作业调度平台去定时调度。 在 Amazon EMR 中&#xff0c;可以使用 AWS 提供 Step Functio…

Flutter系列文章-Flutter进阶

在前两篇文章中&#xff0c;我们已经了解了Flutter的基础知识&#xff0c;包括Flutter的设计理念、框架结构、Widget系统、基础Widgets以及布局。在本文中&#xff0c;我们将进一步探讨Flutter的高级主题&#xff0c;包括处理用户交互、创建动画、访问网络数据等等。为了更好地…

黄东旭:The Future of Database,掀开 TiDB Serverless 的引擎盖

在 PingCAP 用户峰会 2023 上&#xff0c; PingCAP 联合创始人兼 CTO 黄东旭 分享了“The Future of Database”为主题的演讲&#xff0c; 介绍了 TiDB Serverless 作为未来一代数据库的核心设计理念。黄东旭 通过分享个人经历和示例&#xff0c;强调了数据库的服务化而非服务化…

【C 程序设计】第 1 章:C 语言简介与思维导图

目录 一、C 语言思维导图 &#xff08;1&#xff09;数据类型 &#xff08;2&#xff09;运算 &#xff08;3&#xff09;控制结构 &#xff08;4&#xff09;过程式&#xff0c;模块化程序设计 &#xff08;5&#xff09;输入输出 &#xff08;6&#xff09;编码规…

AntDB数据库与东方通TongWeb完成兼容互认,共筑数字化底座核心能力

近日&#xff0c;湖南亚信安慧科技有限公司&#xff08;简称&#xff1a;亚信安慧&#xff09;与北京东方通科技股份有限公司&#xff08;简称&#xff1a;东方通&#xff09;完成AntDB数据库与东方通应用服务器TongWeb V7.0的兼容互认。经测试&#xff0c;AntDB数据库能与东方…

C++入门之stl六大组件--Vector库函数的介绍,以及模拟实现一些常用接口

文章目录 前言 一、vector的介绍和使用 1.vector的介绍 2.vector的使用 2.1vector的定义 ​编辑 2.2vector iterator的使用 2.3vector空间增长问题 2.4vector增删查改 2.5vector迭代器失效问题 会引起迭代器失效的操作有&#xff1a; 二、模拟实现一些vector常用接口…