Go操作Kafka之kafka-go

news2025/1/10 11:13:04

Kafka是一种高吞吐量的分布式发布订阅消息系统,本文介绍了如何使用kafka-go这个库实现Go语言与kafka的交互。

Go社区中目前有三个比较常用的kafka客户端库 , 它们各有特点。

首先是IBM/sarama(这个库已经由Shopify转给了IBM),之前我写过一篇使用sarama操作Kafka的教程,相较于sarama, kafka-go 更简单、更易用。

segmentio/kafka-go 是纯Go实现,提供了与kafka交互的低级别和高级别两套API,同时也支持Context。

此外社区中另一个比较常用的confluentinc/confluent-kafka-go,它是一个基于cgo的librdkafka包装,在项目中使用它会引入对C库的依赖。

准备Kafka环境

这里推荐使用Docker Compose快速搭建一套本地开发环境。

以下docker-compose.yml文件用来搭建一套单节点zookeeper和单节点kafka环境,并且在8080端口提供kafka-ui管理界面。

version: '2.1'

services:
  zoo1:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo1
    container_name: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888

  kafka1:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
      - "9999:9999"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1
  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    ports:
      - 8080:8080
    depends_on:
      - kafka1
    environment:
      DYNAMIC_CONFIG_ENABLED: "TRUE"

将上述docker-compose.yml文件在本地保存,在同一目录下执行以下命令启动容器。

docker-compose up -d

容器启动后,使用浏览器打开127.0.0.1:8080 即可看到如下kafka-ui界面。

在这里插入图片描述

点击页面右侧的“Configure new cluster”按钮,配置kafka服务连接信息。

在这里插入图片描述
填写完信息后,点击页面下方的“Submit”按钮提交即可。

在这里插入图片描述

安装kafka-go

执行以下命令下载 kafka-go依赖。

go get github.com/segmentio/kafka-go

注意:kafka-go 需要 Go 1.15或更高版本。

kafka-go使用指南

kafka-go 提供了两套与Kafka交互的API。

  • 低级别( low-level):基于与 Kafka 服务器的原始网络连接实现。
  • 高级别(high-level):对于常用读写操作封装了一套更易用的API。

通常建议直接使用高级别的交互API。

Connection

Conn 类型是 kafka-go 包的核心。它代表与 Kafka broker之间的连接。基于它实现了一套与Kafka交互的低级别 API。

发送消息

下面是连接至Kafka之后,使用Conn发送消息的代码示例。

// writeByConn 基于Conn发送消息
func writeByConn() {
	topic := "my-topic"
	partition := 0

	// 连接至Kafka集群的Leader节点
	conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}

	// 设置发送消息的超时时间
	conn.SetWriteDeadline(time.Now().Add(10 * time.Second))

	// 发送消息
	_, err = conn.WriteMessages(
		kafka.Message{Value: []byte("one!")},
		kafka.Message{Value: []byte("two!")},
		kafka.Message{Value: []byte("three!")},
	)
	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	// 关闭连接
	if err := conn.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}
}

消费消息
// readByConn 连接至kafka后接收消息
func readByConn() {
	// 指定要连接的topic和partition
	topic := "my-topic"
	partition := 0

	// 连接至Kafka的leader节点
	conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}

	// 设置读取超时时间
	conn.SetReadDeadline(time.Now().Add(10 * time.Second))
	// 读取一批消息,得到的batch是一系列消息的迭代器
	batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

	// 遍历读取消息
	b := make([]byte, 10e3) // 10KB max per message
	for {
		n, err := batch.Read(b)
		if err != nil {
			break
		}
		fmt.Println(string(b[:n]))
	}

	// 关闭batch
	if err := batch.Close(); err != nil {
		log.Fatal("failed to close batch:", err)
	}

	// 关闭连接
	if err := conn.Close(); err != nil {
		log.Fatal("failed to close connection:", err)
	}
}

使用batch.Read更高效一些,但是需要根据消息长度选择合适的buffer(上述代码中的b),如果传入的buffer太小(消息装不下)就会返回io.ErrShortBuffer错误。

如果不考虑内存分配的效率问题,也可以按以下代码使用batch.ReadMessage读取消息。

for {
  msg, err := batch.ReadMessage()
  if err != nil {
    break
  }
  fmt.Println(string(msg.Value))
}

创建topic

当Kafka关闭自动创建topic的设置时,可按如下方式创建topic。

// createTopicByConn 创建topic
func createTopicByConn() {
	// 指定要创建的topic名称
	topic := "my-topic"

	// 连接至任意kafka节点
	conn, err := kafka.Dial("tcp", "localhost:9092")
	if err != nil {
		panic(err.Error())
	}
	defer conn.Close()

	// 获取当前控制节点信息
	controller, err := conn.Controller()
	if err != nil {
		panic(err.Error())
	}
	var controllerConn *kafka.Conn
	// 连接至leader节点
	controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
	if err != nil {
		panic(err.Error())
	}
	defer controllerConn.Close()

	topicConfigs := []kafka.TopicConfig{
		{
			Topic:             topic,
			NumPartitions:     1,
			ReplicationFactor: 1,
		},
	}

	// 创建topic
	err = controllerConn.CreateTopics(topicConfigs...)
	if err != nil {
		panic(err.Error())
	}
}

通过非leader节点连接leader节点

下面的示例代码演示了如何通过已有的非leader节点的Conn,连接至 leader节点。

conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
    panic(err.Error())
}
defer conn.Close()
// 获取当前控制节点信息
controller, err := conn.Controller()
if err != nil {
    panic(err.Error())
}
var connLeader *kafka.Conn
connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
    panic(err.Error())
}
defer connLeader.Close()

获取topic列表
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
    panic(err.Error())
}
defer conn.Close()

partitions, err := conn.ReadPartitions()
if err != nil {
    panic(err.Error())
}

m := map[string]struct{}{}
// 遍历所有分区取topic
for _, p := range partitions {
    m[p.Topic] = struct{}{}
}
for k := range m {
    fmt.Println(k)
}

Reader

Reader是由 kafka-go 包提供的另一个概念,对于从单个主题-分区(topic-partition)消费消息这种典型场景,使用它能够简化代码。Reader 还实现了自动重连和偏移量管理,并支持使用 Context 支持异步取消和超时的 API。

注意: 当进程退出时,必须在 Reader 上调用 Close() 。Kafka服务器需要一个优雅的断开连接来阻止它继续尝试向已连接的客户端发送消息。如果进程使用 SIGINT (shell 中的 Ctrl-C)或 SIGTERM (如 docker stop 或 kubernetes start)终止,那么下面给出的示例不会调用 Close()。当同一topic上有新Reader连接时,可能导致延迟(例如,新进程启动或新容器运行)。在这种场景下应使用signal.Notify处理程序在进程关闭时关闭Reader。

消费消息

下面的代码演示了如何使用Reader连接至Kafka消费消息。

// readByReader 通过Reader接收消息
func readByReader() {
	// 创建Reader
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:   []string{"localhost:9092", "localhost:9093", "localhost:9094"},
		Topic:     "topic-A",
		Partition: 0,
		MaxBytes:  10e6, // 10MB
	})
	r.SetOffset(42) // 设置Offset

	// 接收消息
	for {
		m, err := r.ReadMessage(context.Background())
		if err != nil {
			break
		}
		fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
	}

	// 程序退出前关闭Reader
	if err := r.Close(); err != nil {
		log.Fatal("failed to close reader:", err)
	}
}

消费者组

kafka-go支持消费者组,包括broker管理的offset。要启用消费者组,只需在 ReaderConfig 中指定 GroupID。

使用消费者组时,ReadMessage 会自动提交偏移量。

// 创建一个reader,指定GroupID,从 topic-A 消费消息
r := kafka.NewReader(kafka.ReaderConfig{
	Brokers:  []string{"localhost:9092", "localhost:9093", "localhost:9094"},
	GroupID:  "consumer-group-id", // 指定消费者组id
	Topic:    "topic-A",
	MaxBytes: 10e6, // 10MB
})

// 接收消息
for {
	m, err := r.ReadMessage(context.Background())
	if err != nil {
		break
	}
	fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}

// 程序退出前关闭Reader
if err := r.Close(); err != nil {
	log.Fatal("failed to close reader:", err)
}

在使用消费者组时会有以下限制:

(*Reader).SetOffset 当设置了GroupID时会返回错误
(*Reader).Offset 当设置了GroupID时会永远返回 -1
(*Reader).Lag 当设置了GroupID时会永远返回 -1
(*Reader).ReadLag 当设置了GroupID时会返回错误
(*Reader).Stats 当设置了GroupID时会返回一个-1的分区

显式提交

kafka-go 也支持显式提交。当需要显式提交时不要调用 ReadMessage,而是调用 FetchMessage获取消息,然后调用 CommitMessages 显式提交。

ctx := context.Background()
for {
    // 获取消息
    m, err := r.FetchMessage(ctx)
    if err != nil {
        break
    }
    // 处理消息
    fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
    // 显式提交
    if err := r.CommitMessages(ctx, m); err != nil {
        log.Fatal("failed to commit messages:", err)
    }
}

在消费者组中提交消息时,具有给定主题/分区的最大偏移量的消息确定该分区的提交偏移量的值。例如,如果通过调用 FetchMessage 获取了单个分区的偏移量为 1、2 和 3 的消息,则使用偏移量为3的消息调用 CommitMessages 也将导致该分区的偏移量为 1 和 2 的消息被提交。

管理提交间隔

默认情况下,调用CommitMessages将同步向Kafka提交偏移量。为了提高性能,可以在ReaderConfig中设置CommitInterval来定期向Kafka提交偏移。

// 创建一个reader从 topic-A 消费消息
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9092", "localhost:9093", "localhost:9094"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    MaxBytes:       10e6, // 10MB
    CommitInterval: time.Second, // 每秒刷新一次提交给 Kafka
})

Writer

向Kafka发送消息,除了使用基于Conn的低级API,kafka-go包还提供了更高级别的 Writer 类型。大多数情况下使用Writer即可满足条件,它支持以下特性。

  • 对错误进行自动重试和重新连接。
  • 在可用分区之间可配置的消息分布。
  • 向Kafka同步或异步写入消息。
  • 使用Context的异步取消。
  • 关闭时清除挂起的消息以支持正常关闭。
  • 在发布消息之前自动创建不存在的topic。
发送消息
// 创建一个writer 向topic-A发送消息
w := &kafka.Writer{
	Addr:         kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:        "topic-A",
	Balancer:     &kafka.LeastBytes{}, // 指定分区的balancer模式为最小字节分布
	RequiredAcks: kafka.RequireAll,    // ack模式
	Async:        true,                // 异步
}

err := w.WriteMessages(context.Background(),
	kafka.Message{
		Key:   []byte("Key-A"),
		Value: []byte("Hello World!"),
	},
	kafka.Message{
		Key:   []byte("Key-B"),
		Value: []byte("One!"),
	},
	kafka.Message{
		Key:   []byte("Key-C"),
		Value: []byte("Two!"),
	},
)
if err != nil {
    log.Fatal("failed to write messages:", err)
}

if err := w.Close(); err != nil {
    log.Fatal("failed to close writer:", err)
}

创建不存在的topic

如果给Writer配置了AllowAutoTopicCreation:true,那么当发送消息至某个不存在的topic时,则会自动创建topic。


// 创建不存在的topic
// 如果给Writer配置了AllowAutoTopicCreation:true,那么当发送消息至某个不存在的topic时,则会自动创建topic。
func writeByWriter2() {
	writer := kafka.Writer{
		Addr:                   kafka.TCP("192.168.2.204:9092"),
		Topic:                  "kafka-test-topic",
		AllowAutoTopicCreation: true, //自动创建topic
	}

	messages := []kafka.Message{
		{
			Key:   []byte("Key-A"),
			Value: []byte("Hello World!"),
		},
		{
			Key:   []byte("Key-B"),
			Value: []byte("One!"),
		},
		{
			Key:   []byte("Key-C"),
			Value: []byte("Tow!"),
		},
	}

	const retries = 3
	//重试3次
	for i := 0; i < retries; i++ {
		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
		defer cancel()

		err := writer.WriteMessages(ctx, messages...)
		if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
			time.Sleep(time.Millisecond * 250)
			continue
		}

		if err != nil {
			log.Fatal("unexpected error %v", err)
		}
		break
	}

	//关闭Writer
	if err := writer.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}
}
写入多个topic

通常,WriterConfig.Topic用于初始化单个topic的Writer。通过去掉WriterConfig中的Topic配置,分别设置每条消息的message.topic,可以实现将消息发送至多个topic。

w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
    // 注意: 当此处不设置Topic时,后续的每条消息都需要指定Topic
	Balancer: &kafka.LeastBytes{},
}

err := w.WriteMessages(context.Background(),
    // 注意: 每条消息都需要指定一个 Topic, 否则就会报错
	kafka.Message{
        Topic: "topic-A",
		Key:   []byte("Key-A"),
		Value: []byte("Hello World!"),
	},
	kafka.Message{
        Topic: "topic-B",
		Key:   []byte("Key-B"),
		Value: []byte("One!"),
	},
	kafka.Message{
        Topic: "topic-C",
		Key:   []byte("Key-C"),
		Value: []byte("Two!"),
	},
)
if err != nil {
    log.Fatal("failed to write messages:", err)
}

if err := w.Close(); err != nil {
    log.Fatal("failed to close writer:", err)
}

注意:Writer中的Topic和Message中的Topic是互斥的,同一时刻有且只能设置一处。

其他配置

TLS

对于基本的 Conn 类型或在 Reader/Writer 配置中,可以在Dialer中设置TLS选项。如果 TLS 字段为空,则它将不启用TLS 连接。

注意:不在Conn/Reder/Writer上配置TLS,连接到启用TLS的Kafka集群,可能会出现io.ErrUnexpectedEOF错误。

Connection
dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls config...},  // 指定TLS配置
}

conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")

Reader
dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls config...},  // 指定TLS配置
}

r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9092", "localhost:9093", "localhost:9094"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    Dialer:         dialer,
})

Writer

创建Writer时可以按如下方式指定TLS配置。

w := kafka.Writer{
    Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), 
    Topic:   "topic-A",
    Balancer: &kafka.Hash{},
    Transport: &kafka.Transport{
        TLS: &tls.Config{},  // 指定TLS配置
      },
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1578944.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TCP/IP协议、HTTP协议和FTP协议等网络协议简介

文章目录 一、常见的网络协议二、TCP/IP协议1、TCP/IP协议模型被划分为四个层次2、TCP/IP五层模型3、TCP/IP七层模型 三、FTP网络协议四、Http网络协议1、Http网络协议简介2、Http网络协议的内容3、HTTP请求协议包组成4、HTTP响应协议包组成 一、常见的网络协议 常见的网络协议…

uniapp如何配置后使用uni.chooseLocation等地图位置api

在uniapp中想要使用uni.getLocation、uni.chooseLocation ……api的时候我们需要在小程序就开启配置&#xff0c;不然无法使用。 第一步&#xff1a;首先找到manifest.json 第二步&#xff1a;点击源码视图 第三步&#xff1a;在 mp-weixin 加入下面代码 "permission&…

DC-2靶机知识点

知识点总结 1.IP访问与域名访问 2.端口扫描 3.目录扫描 4.cewl密码生成器 5.指纹探测 6.爆破ssh 7.msf的使用 8.rbash逃逸 9.git提权 靶机&#xff0c;攻击机就不多说了&#xff0c;给个靶机地址 https://download.vulnhub.com/dc/DC-2.zip 环境配置 因为访问该靶机…

Python数学建模学习-莱斯利(Leslie)种群模型

Leslie模型是一种用于离散时间的生物种群增长模型&#xff0c;经常用于描述年龄结构对种群增长的影响。在1945年&#xff0c;人口生态学家Patrick H. Leslie&#xff08;莱斯利&#xff09;为了研究具有离散年龄结构的种群&#xff0c;特别是对于有不同年龄阶段的生物&#xff…

鸿蒙OS开发实战:【自动化测试框架】使用指南

概述 为支撑HarmonyOS操作系统的自动化测试活动开展&#xff0c;我们提供了支持JS/TS语言的单元及UI测试框架&#xff0c;支持开发者针对应用接口进行单元测试&#xff0c;并且可基于UI操作进行UI自动化脚本的编写。 本指南重点介绍自动化测试框架的主要功能&#xff0c;同时…

三行命令解决Ubuntu Linux联网问题

一开始我找到官方文档描述可以通过命令行连接到 WiFi 网络&#xff1a;https://cn.linux-console.net/?p10334#google_vignette 但是我的 $ ls /sys/class/net 命令无法显示无线网络接口&#xff0c;那么没有无线网卡&#xff1f; 所以我参照其他的博客在终端超级用户的权…

JUC:ScheduledThreadPoolExecutor 延迟任务线程池的使用

文章目录 ScheduledThreadPoolExecutortimer&#xff08;不建议用&#xff09;ScheduledThreadPoolExecutor处理异常应用 ScheduledThreadPoolExecutor timer&#xff08;不建议用&#xff09; timer也可以进行延迟运行&#xff0c;但是会有很多问题。 比如task1运行时间超过…

FOR循环

oracle从入门到总裁:​​​​​​https://blog.csdn.net/weixin_67859959/article/details/135209645 前面两种循环都要根据条件是否成立而确定循环体的执行&#xff0c;具体循环体执行多少次事先并不知道。 FOR 循环可以控制循环执行的次数&#xff0c;由循环变量控制循环体的…

学透Spring Boot — 005. 深入理解 Spring Boot Starter 依赖管理

前面的文章直观的展示了&#xff0c;使用Spring Boot 集成 Hibernate 和手动集成 Hibernate 之间差距。 一个比喻 工作中使用过Spring Boot一段时间后&#xff0c;我越来越感觉到Spring Boot Starter带来的便利性。 使用手动集成 Hibernate&#xff0c; 就像去电脑城配电脑&…

二叉树的遍历的递归与非递归算法

一.二叉树的遍历&#xff1a; 按照一定规律对二叉树的每个结点进行访问且仅访问一次&#xff1b; 这里的访问&#xff1a;可以是计算二叉树中的结点数据&#xff0c;打印该结点的信息&#xff0c;也可以是对结点进行的任何其它操作&#xff01; 为什么需要遍历二叉树&#x…

EditPlus来啦(免费使用!)

hello&#xff0c;我是小索奇 今天推荐一款编辑器&#xff0c;是索奇学习JavaSE时入手滴&#xff0c;非常好用哈&#xff0c;小索奇还是通过老杜-杜老师入手滴&#xff0c;相信很多人也是通过老杜认识嘞&#xff0c;来寻找破解版或者准备入手这个间接使用的编辑器~ EditPlus是…

在Ubuntu上搭建Prometheus + Grafana监控系统

1.Prometheus 部署 从官网下载页面找到最新的二进制文件下载 cd ~ curl -LO https://github.com/prometheus/prometheus/releases/download/v2.51.1/prometheus-2.51.1.linux-amd64.tar.gz将文件解压到指定目录 tar xf prometheus-2.51.1.linux-amd64.tar.gz -C /usr/local为…

⼿机客户端画K线图流程

优质博文&#xff1a;IT-BLOG-CN 一、什么是K线流程 K线图是一种用于展示金融市场价格走势的图表。它通常由四个关键价格点组成&#xff0c;即开盘价、收盘价、最高价和最低价。K线图的流程可以简单概括为以下几个步骤&#xff1a; 【1】收集数据&#xff1a; 首先&#xff0c…

Oracle 正则表达式

一、Oracle 正则表达式相关函数 (1) regexp_like &#xff1a;同 like 功能相似&#xff08;模糊 匹配&#xff09; (2) regexp_instr &#xff1a;同 instr 功能相似&#xff08;返回字符所在 下标&#xff09; (3) regexp_substr &#xff1a; 同 substr 功能相似&…

虚拟网络设备与Linux网络协议栈

在现代计算环境中&#xff0c;虚拟网络设备在实现灵活的网络配置和隔离方面发挥了至关重要的作用&#x1f527;&#xff0c;特别是在容器化和虚拟化技术广泛应用的今天&#x1f310;。而Linux网络协议栈则是操作系统处理网络通信的核心&#x1f4bb;&#xff0c;它支持广泛的协…

点击上传文件

一、页面样式&#xff1a; &#xff08;1&#xff09;点击前&#xff1a; &#xff08;2&#xff09;点击后&#xff1a; 设计&#xff1a;①自定义elementPlus图标&#xff1b;②使用Tooltip实现鼠标悬浮按钮上出现文字提示&#xff1b;③上传与更换的切换样式&#xff1b;…

全栈开发医疗小程序 SpringBoot2.X + Vue + UniAPP 带源码

看到好多坛友都在求SpringBoot2.X Vue UniAPP&#xff0c;全栈开发医疗小程序 – 带源码课件&#xff0c;我看了一下&#xff0c;要么链接过期&#xff0c;要么课件有压缩密码。特意整理了一份分享给大家&#xff0c;个人认为还是比较全面的。希望对大家有所帮助&#xff01;…

云计算(五)—— OpenStack基础环境配置与API使用

OpenStack基础环境配置与API使用 项目实训一 【实训题目】 使用cURL命令获取实例列表 【实训目的】 理解OpenStack的身份认证和API请求流程。 【实训准备】 &#xff08;1&#xff09;复习OpenStack的认证与API请求流程的相关内容。 &#xff08;2&#xff09;熟悉cURL…

openGauss学习笔记-258 openGauss性能调优-使用Plan Hint进行调优-指定子查询不展开的Hint

文章目录 openGauss学习笔记-258 openGauss性能调优-使用Plan Hint进行调优-指定子查询不展开的Hint258.1 功能描述258.2 语法格式258.3 示例 openGauss学习笔记-258 openGauss性能调优-使用Plan Hint进行调优-指定子查询不展开的Hint 258.1 功能描述 数据库在对查询进行逻辑…

vulhub中Apache Solr Velocity 注入远程命令执行漏洞复现 (CVE-2019-17558)

Apache Solr 是一个开源的搜索服务器。 在其 5.0.0 到 8.3.1版本中&#xff0c;用户可以注入自定义模板&#xff0c;通过Velocity模板语言执行任意命令。 访问http://your-ip:8983即可查看到一个无需权限的Apache Solr服务。 1.默认情况下params.resource.loader.enabled配置…