三十九、大数据技术之Kafka3.x(2)

news2025/1/10 10:19:47

🌻🌻 目录

  • 一、Kafka 生产者
    • 1.1 生产者消息发送流程
      • 1.1.1 发送原理
      • 1.1.2 生产者重要参数列表
    • 1.2 异步发送API
      • 1.2.1 普通异步发送
      • 1.2.2 带回调函数的异步发送
    • 1.3 同步发送 API
    • 1.4 生产者分区
      • 1.4.1 分区好处
      • 1.4.2 生产者发送消息的分区策略
      • 1.4.3 自定义分区器
    • 1.5 生产经验——生产者如何提高吞吐量
    • 1.6 生产经验——数据可靠性
    • 1.7 生产经验——数据去重
      • 1.7.1 数据传递语义
      • 1.7.2 幂等性
      • 1.7.3 生产者事务
    • 1.8 生产经验——数据有序
    • 1.9 生产经验——数据乱序

一、Kafka 生产者

1.1 生产者消息发送流程

1.1.1 发送原理

在消息发送的过程中,涉及到了两个线程——main线程Sender线程。在main线程中创建了一个双端队列RecordAccumulatormain线程将消息发送给RecordAccumulatorSender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker

在这里插入图片描述

1.1.2 生产者重要参数列表

参数名称描述
- -bootstrap-server生产者连接集群所需的broker地址清单。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置1个或者多个,中间用逗号隔开。注意这里并非需要所有的broker地址,因为生产者从给定的broker里查找到其他broker信息。
key.serializer和value.serializer指定发送消息的key和value的序列化类型。一定要写全类名。
buffer.memoryRecordAccumulator缓冲区总大小,默认32m
batch.size缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。
acks0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader收到数据后应答。-1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。
max.in.flight.requests.per.connection允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。
retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms两次重试之间的时间间隔,默认是100ms。
enable.idempotence是否开启幂等性,默认true,开启幂等性。
compression.type生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4和zstd。

1.2 异步发送API

1.2.1 普通异步发送

1)需求:创建Kafka生产者,采用异步的方式发送到Kafka Broker

在这里插入图片描述

2)代码编写

(1)创建工程kafka

在这里插入图片描述

在这里插入图片描述

(2)导入依赖

在这里插入图片描述

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.0.0</version>
    </dependency>
</dependencies>

(3)创建包名:com.gansu.kafka.producer

(4)编写不带回调函数的API代码 CustomProducer

在这里插入图片描述

package com.gansu.kafka.producter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducter {

    public static void main(String[] args) throws InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.10.102:9092");
        // key,value序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 3; i++) {
            kafkaProducer.send(new ProducerRecord<String,String>("first","这个问题我找了好久才找到"+i));
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

问题:idea无法连接含有kafka的服务器linux,导致idea生产者生产了消息,服务器消费者无法接收

在这里插入图片描述

D:\develop\jdk\jdk\bin\java.exe -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:14340,suspend=y,server=n -javaagent:C:\Users\Administrator\.IntelliJIdea2018.2\system\captureAgent\debugger-agent.jar=file:/C:/Users/Administrator/AppData/Local/Temp/capture8410.props -Dfile.encoding=UTF-8 -classpath "D:\develop\jdk\jdk\jre\lib\charsets.jar;D:\develop\jdk\jdk\jre\lib\deploy.jar;D:\develop\jdk\jdk\jre\lib\ext\access-bridge-
……
com.gansu.kafka.producter.CustomProducter
Connected to the target VM, address: '127.0.0.1:14340', transport: 'socket'
07:43:27.919 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
	acks = -1
	batch.size = 16384
	bootstrap.servers = [linux-102:9092]
	buffer.memory = 33554432
……
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

07:43:30.396 [main] WARN org.apache.kafka.clients.ClientUtils - Couldn't resolve server linux-102:9092 from ……
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
	at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
	at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:413)
	... 4 more
Disconnected from the target VM, address: '127.0.0.1:14340', transport: 'socket'
Process finished with exit code 1

解决:根据AI查询便知,需要在如下 /usr/local/kafka/config/server.properties 中增加配置

在这里插入图片描述

在这里插入图片描述

测试:

①在 linux-102上开启Kafka消费者。

./kafka-console-consumer.sh --bootstrap-server linux-102:9092 --topic first

②在IDEA中执行代码,观察linux-102控制台中是否接收到消息。

在这里插入图片描述

1.2.2 带回调函数的异步发送

回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exceptionnull,说明消息发送成功,如果Exception不为null,说明消息发送失败。

在这里插入图片描述

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

copy上面的CustomProducer修改名为CustomProducerCallback再增加回调代码:

在这里插入图片描述

package com.gansu.kafka.producter;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducterCallbacck{

    public static void main(String[] args) throws InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.10.102:9092");
        // key,value序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 3; i++) {
            kafkaProducer.send(new ProducerRecord<String,String>("first","aa"+i)
                 // 添加回调
                 , new Callback(){
                 // 该方法在Producer收到ack时调用,为异步调用
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                   if(exception == null){
                       // 没有异常,输出信息到控制台
                       System.out.println("主题是"+metadata.topic()+",分区是"+metadata.partition());
                   }else{
                       // 出现异常打印
                     exception.printStackTrace();
                   }
                }
            });
            // 延迟一会会看到数据发往不同分区
            Thread.sleep(10);
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

测试:

①在linux-102上开启Kafka消费者。

在这里插入图片描述

②在IDEA中执行代码,观察linux-102控制台中是否接收到消息。

③在IDEA控制台观察回调信息

在这里插入图片描述

1.3 同步发送 API

在这里插入图片描述

只需在异步发送的基础上,再调用一下get()方法即可。copy异步发送代码 CustomProducter,并修改名为 CustomProducterSycn

在这里插入图片描述

package com.gansu.kafka.producter;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducterSycn {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.10.102:9092");
        // key,value序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 3; i++) {
                                                                                    //增加get()抛出异常
            kafkaProducer.send(new ProducerRecord<String,String>("first","aa"+i)).get();
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

测试:

①在linux-102上开启Kafka消费者。

在这里插入图片描述

./kafka-console-consumer.sh --bootstrap-server linux-102:9092 --topic first

②在IDEA中执行代码,观察linux-102控制台中是否接收到消息。

在这里插入图片描述

1.4 生产者分区

1.4.1 分区好处

在这里插入图片描述

1.4.2 生产者发送消息的分区策略

1)默认的分区器 DefaultPartitioner

在IDEA中ctrl +n,全局查找DefaultPartitioner

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2)案例一

将数据发往指定partition的情况下,例如,将所有数据发往分区1中。

copy上面的回调 api CustomProducterCallback 并且修改名为 CustomProducterCallbackPartitions进行测试(可注掉 线程睡眠):

在这里插入图片描述

测试:

①在linux-102上开启Kafka消费者。

②在IDEA中执行代码,观察linux-102控制台中是否接收到消息。

③在IDEA控制台观察回调信息。

在这里插入图片描述
在这里插入图片描述

3)案例二

没有指明partition值但有key的情况下,将keyhash值与topicpartition数进行取余得到partition值。

在这里插入图片描述

1.4.3 自定义分区器

如果研发人员可以根据企业需求,自己重新实现分区器。

1)需求

例如我们实现一个分区器实现,发送过来的数据中如果包含root,就发往0号分区,不包含root,就发往1号分区。

2)实现步骤

  • (1) 定义分区器类 MyPartitioner 实现Partitioner接口。
  • (2) 重写partition()方法。

在这里插入图片描述

package com.gansu.kafka.producter;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;
/**
 * 1. 实现接口Partitioner
 * 2. 实现3个方法:partition,close,configure
 * 3. 编写partition方法,返回分区号
 */
public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        /**
         * 返回信息对应的分区
         * @param topic         主题
         * @param key           消息的key
         * @param keyBytes      消息的key序列化后的字节数组
         * @param value         消息的value
         * @param valueBytes    消息的value序列化后的字节数组
         * @param cluster       集群元数据可以查看分区信息
         * @return
         */
        // 获取消息
        String msgValue = value.toString();
        // 创建partitio
        int partition;
        // 判断消息是否包含Daniel
        if(msgValue.contains("Daniel")){

            partition = 0;
        }else

        partition = 1;
        // 返回分区号
        return partition;
    }
    // 关闭资源
    @Override
    public void close() {

    }
    // 配置方法
    @Override
    public void configure(Map<String, ?> configs) {

    }
}
  • (3) 使用分区器的方法,在生产者的配置中(CustomProducterCallbackPartitions)添加分区器参数。(关联自定义分区器)

在这里插入图片描述

package com.gansu.kafka.producter;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducterCallbackPartitions {

    public static void main(String[] args) throws InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.10.102:9092");

        //关联添加自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.gansu.kafka.producter.MyPartitioner");
        // key,value序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 3; i++) {
            // 指定数据发送到1号分区,key为空(IDEA中ctrl + p查看参数)
            // 依次指定key值为a,b,f ,数据key的hash值与3个分区求余,分别发往1、2、0
            kafkaProducer.send(new ProducerRecord<String,String>("first","Daniel-xiaojin"+i)
                 // 添加回调
                 , new Callback(){
                 // 该方法在Producer收到ack时调用,为异步调用
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                   if(exception == null){
                       // 没有异常,输出信息到控制台
                       System.out.println("主题是"+metadata.topic()+",分区是"+metadata.partition());
                   }else{
                       // 出现异常打印
                     exception.printStackTrace();
                   }
                }
            });
            // 延迟一会会看到数据发往不同分区
           // Thread.sleep(10);
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}
  • (4) 测试

① 在linux-102上开启Kafka消费者。

./kafka-console-consumer.sh --bootstrap-server linux-102:9092 --topic first

② 在IDEA控制台观察回调信息。

包含Daniel

在这里插入图片描述

不包含Daniel

在这里插入图片描述

1.5 生产经验——生产者如何提高吞吐量

在这里插入图片描述

创建类 CustomProducerParameters

在这里插入图片描述

package com.gansu.kafka.producter;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerParameters {

    public static void main(String[] args) {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.102:9092");
        // key,value序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
       // batch.size:批次大小,默认16K
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        // linger.ms:等待时间,默认0
        properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
        // RecordAccumulator:缓冲区大小,默认32M:buffer.memory
        properties.put(ProducerConfig.RECEIVE_BUFFER_CONFIG,33554432);
        // compression.type:压缩,默认none,可配置值gzip、snappy、lz4和zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i <3 ; i++) {

            kafkaProducer.send(new ProducerRecord<String,String>("first","Daniel-sys"));

        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

测试:

① 在linux-102上开启Kafka消费者。

② 在IDEA中执行代码,观察linux-102控制台中是否接收到消息。

在这里插入图片描述

1.6 生产经验——数据可靠性

0)回顾发送流程

发送流程

在这里插入图片描述

1)ack应答原理

ack 应答级别

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2)代码配置(复制前面 API类 CustomProducter 修改名为 CustomProducterAcks即可)

在这里插入图片描述

package com.gansu.kafka.producter;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import javax.rmi.PortableRemoteObject;
import java.util.Properties;

public class CustomProducterAcks {

    public static void main(String[] args) throws InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.10.102:9092");
        // key,value序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //设置acks  打开官网 直接CTRL+F 搜索 ack 即可查看,下面配置海量数据下,还是区别很大的
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        // 重试次数retries,默认是int最大值,2147483647
        properties.put(ProducerConfig.RETRIES_CONFIG,3);

        
        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 3; i++) {
            kafkaProducer.send(new ProducerRecord<String,String>("first","aa"+i));
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

官网搜索 可以查看

在这里插入图片描述

在这里插入图片描述

1.7 生产经验——数据去重

1.7.1 数据传递语义

在这里插入图片描述

1.7.2 幂等性

1)幂等性原理

在这里插入图片描述

2)如何使用幂等性

开启参数enable.idempotence 默认为truefalse关闭。

1.7.3 生产者事务

1)Kafka事务原理

在这里插入图片描述

2)Kafka的事务一共有如下5个API

// 1初始化事务
void initTransactions();

// 2开启事务
void beginTransaction() throws ProducerFencedException;

// 3在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;

// 4提交事务
void commitTransaction() throws ProducerFencedException;

// 5放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

3)单个Producer,使用事务保证消息的仅一次发送(复制前面 API类 CustomProducter 修改名为 CustomProducerTransactions即可)

在这里插入图片描述

在这里插入图片描述

如果执行失败,则不会发送数据:

在这里插入图片描述

1.8 生产经验——数据有序

在这里插入图片描述

1.9 生产经验——数据乱序

在这里插入图片描述

文章源码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2033135.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用Leaks定位iOS内存泄漏问题并解决

使用Leaks定位iOS内存泄漏问题并解决 前言 内存泄漏问题一直是程序开发中最令人头疼的问题&#xff0c;特别是C/C。虽然C/C在C11之后引入了许多新特性&#xff0c;包括智能指针&#xff0c;自动类型推导等&#xff0c;但C中动态内存的分配和释放仍然需要程序员来显式地进行。…

Linux线程thread详解(线程池)

在我们的进程虚拟地址的代码区&#xff0c;对于代码中的每个函数都有对应的地址&#xff0c;每个函数中的每行代码都有对应的代码&#xff0c;并且每个函数中的每行代码的地址都是连续的。既然代码是连续的&#xff0c;也就意味着我们可以将我们代码分块&#xff0c;分成不同的…

机器学习笔记:序列到序列学习[详细解释]

介绍 本节我们使用两个循环神经网络的编码器和解码器&#xff0c; 并将其应用于序列到序列&#xff08;sequence to sequence&#xff0c;seq2seq&#xff09;类的学习任务。遵循编码器&#xff0d;解码器架构的设计原则&#xff0c; 循环神经网络编码器使用长度可变的序列作为…

Jeecgboot3.6.3的vue3版本的一种flowable动态增加一个用户任务节点的方法(二)前端代码实现

因为这个项目license问题无法开源,更多技术支持与服务请加入我的知识星球。 这部分主要讲前端的功能实现 1、前端选择新增任务类型界面,点击新增节点 /*** 动态新增用户任务节点*/function handleAddTask(record: Recordable) {if (record.finishTime != null) {createMess…

在 .NET 8.0 中使用 xUnit 进行数据驱动测试

1. 前言 xUnit是一个功能强大且易于使用的单元测试框架。在.NET开发中&#xff0c;单元测试是非常重要的一部分&#xff0c;它可以帮助我们确保代码的正确性和可靠性。使用xUnit可以帮助我们编写更高效、更有效的单元测试&#xff0c;并提高代码质量和可维护性。 2. 特性 x…

Git-GitLab-Jenkins结合

目录 1.Git-GitLab-Jenkins结合2. 在pycharm配置git3. 实现提交代码后触发自动化测试&#xff08;1&#xff09;打开gitlab&#xff08;2&#xff09;Jenkins配置Git&#xff08;3&#xff09;选择需要的远程仓库 4.报告存在问题&#xff1a;5.也可以在Jenkins中设置定时触发&a…

OpenCV图像滤波(10)Laplacian函数的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 功能描述 计算图像的拉普拉斯值。 该函数通过使用 Sobel 运算符计算出的 x 和 y 的二阶导数之和来计算源图像的拉普拉斯值&#xff1a; dst Δ src ∂…

Elasticsearch:引入 Serverless 精简索引分片

作者&#xff1a;来自 Elastic Tanguy Leroux 在本文中&#xff0c;我们将介绍 Elasticsearch 的精简索引分片&#xff08;thin indexing shards&#xff09;&#xff0c;这是我们为 Elastic Cloud Serverless 开发的一种新型分片&#xff0c;允许将 Elasticsearch 索引存储在云…

大数据技术现场工程师特色实训室解决方案

一、引言 在大数据时代背景下&#xff0c;数据已成为新的生产要素&#xff0c;驱动着各行各业的创新发展。面对这一趋势&#xff0c;市场对于既掌握大数据理论知识又具备实战能力的大数据技术人才的需求急剧增加。为了应对这一挑战&#xff0c;唯众精心设计了一套全面的大数据…

国产 麒麟 ARM 环境编译 RocketMQ-Client-CPP

1.环境 系统版本&#xff1a;Linux 5.4.18-87.76-generic KYLINOS SMP Thu Aug 31 09:05:44 UTC 2023 aarch64 aarch64 aarch64 GNU/Linux GCC: gcc (Ubuntu 9.3.0-10kylin2) 9.3.0 G: g (Ubuntu 9.3.0-10kylin2) 9.3.0 RocketMQ服务端版本&#xff1a;5.1.1 RocketMQ-cpp …

修改docker的/var/lib/docker/overlay2储存路径

目录 目录 1.准备新的存储位置 1.创建新的存储目录 2.修改目录权限 2. 配置 Docker 使用新的存储位置 1.停止 Docker 服务 2.编辑 Docker 配置文件 3.迁移现有 Docker 数据 1.将现有的 Docker 数据从系统盘移动到新目录 2.启动 Docker 服务 3. 验证更改 4. 清理旧的…

RAGFlow v0.9 重磅升级,支持 GraphRAG,开启下一代 RAG 之旅!

一、引言 前面我们介绍过很多的关于大模型和RAG相关的技术&#xff0c;通过其关注程度足以看到市场上对RAG框架和成熟产品的迫切需求&#xff0c;因为想要个人独立从0开始实现一个RAG产品并非易事&#xff0c;虽然有相当多的RAG或者知识库开源产品&#xff0c;大部分其实很难应…

使用 Elasticsearch RestHighLevelClient 进行查询

Elasticsearch 提供了多种客户端库&#xff0c;以方便不同编程语言的用户进行操作。其中&#xff0c;Java 的 RestHighLevelClient 是 Elasticsearch 官方推荐的客户端之一&#xff0c;用于 Java 应用程序中。本文将介绍如何使用 Java 的 RestHighLevelClient 进行 Elasticsear…

Docker Hub 镜像代理加速

因为未知原因&#xff0c;docker hub 已经不能正常拉取镜像&#xff0c;可以使用以下代理服务来进行&#xff1a; "https://docker.m.daocloud.io", "https://noohub.ru", "https://huecker.io", "https://dockerhub.timeweb.cloud"…

深入浅出消息队列----【顺序消息的实现原理】

深入浅出消息队列----【顺序消息的实现原理】 何为顺序发消息的顺序性存储消息的顺序性消费消息的顺序性顺序消息消费的三把锁第一把锁&#xff1a;分布式锁第二把锁&#xff1a;Synchronized第三把锁&#xff1a;ReentrantLock 本文仅是文章笔记&#xff0c;整理了原文章中重要…

vue3仿飞书头像,根据不同名称生成不同的头像背景色

效果展示&#xff1a; 传递三个参数&#xff1a; name&#xff1a;要显示的名称&#xff1b;size&#xff1a;头像的大小&#xff1b;cutNum&#xff1a;分割当前名称的最后几位数&#xff1b; 代码如下&#xff1a; <template><div:style"{color: #fff,borde…

VMware虚拟机下安装Ubuntu22.04以及汉化配置保姆级教程

目录 一.VMware和Ubuntu下载 二.在VMware中创建Ubuntu 1.点击 创建新的虚拟机 2.选择典型 3.选择Ubuntu镜像包&#xff08;自定义存放的位置&#xff09; 4.创建个人信息&#xff08;密码一定要牢记&#xff09; 5.选择虚拟机的安装位置 6.其他配置项&#xff08;默认下…

在数字浪潮中扬帆远航,软件行业就业前景如何?

随着数字化转型的加速和信息技术的广泛应用&#xff0c;对于软件开发人员的需求持续增长。不仅传统IT企业需要大量的软件开发人才&#xff0c;各行各业的企业也普遍需要自主研发软件以满足其业务需求。对于具备较好的学习能力和适应能力的人来说&#xff0c;这个行业提供了更多…

jenkins一键推送到远程服务器并用docker容器启动

1.安装jenkins 我后端使用的是宝塔面板来安装的容器化jenkins,要选中允许外部访问&#xff0c;安装完之后没有那个选项了&#xff0c;一开始安装的时候要选中不使用域名和后面的允许外部访问。Jenkins 版本为&#xff1a; 2.462.1 2.配置Jenkins 2.1 Git plugin 安装完毕之…

江新安教授受邀引正基因进行《制药行业研发项目管理》培训

近日&#xff0c;科济管线创始人江新安教授应赛柏蓝邀请为北京引正基因科技有限公司&#xff08;简称引正基因&#xff09;进行《研发项目管理》授课。为提高项目管理水平&#xff0c;加强研发项目相关人员的管理能力&#xff0c;掌握研发项目管理技能与工具&#xff0c;江新安…