本文探究一种轻量级的 pool 实现 ftp 连接。
一、背景
简要介绍:业务中一般使用较多的是各种开源组件,设计有点重,因此本文探究一种轻量级的 pool 池的思想实现。
期望:设置连接池最大连接数为 N 时,批量执行 M 个 FTP 请求,所有请求都可以成功。
关键点: 使用池的思想存储 FTP 链接,同时控制 FTP 连接的数量。
二、池思想及 sync.Pool 重点分析
池思想设计模式
Golang 是有自动垃圾回收机制的编程语言,使用三色并发标记算法标记对象并回收。但是如果想开发一个高性能的应用程序,就必须考虑垃圾回收给性能带来的影响。因为 Go 的垃圾回收机制有一个 STW 时间,而且在堆上大量创建对象,也会影响垃圾回收标记时间。
通常采用对象池的方式,将不用的对象回收起来,避免垃圾回收。另外,像数据库连接、TCP 长连接等,连接的创建是非常耗时操作,如果每次使用都创建新链接,则整个业务很多时间都花在了创建链接上了。因此,若将这些链接保存起来,避免每次使用时都重新创建,则能大大降低业务耗时,提升系统整体性能。
sync.Pool 要点
golang 中提供了 sync.Pool 数据结构,可以使用它来创建池化对象。不过使用时有几个重点是要关注的,避免踩坑:
- sync.Pool 本身是线程安全的,多个 goroutine 可以并发地调用存取对象。
- sync.Pool 不可用在使用之后复制使用。关于这一点 context 包里面有大量使用,不再赘述。
- sync.Pool 用来保存的是一组可独立访问的“临时”对象。注意这里的“临时”,这表明池中的对象可能在未来某个时间被毫无预兆的移除(因为长久不使用被 GC 回收了)。
关于第 3 点非常重要,下面我们实现一个 demo 来详细说明:
package main
import (
"fmt"
"net/http"
"sync"
"time"
)
func main() {
var p sync.Pool // 创建一个对象池
p.New = func() interface{} {
return &http.Client{
Timeout: 5 * time.Second,
}
}
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
client := p.Get().(*http.Client)
defer p.Put(client)
//获取http请求并打印返回码
resp, err := client.Get("https://www.baidu.com")
if err != nil {
fmt.Println("http get error", err)
return
}
resp.Body.Close()
fmt.Println("success", resp.Status)
}()
}
//等待所有请求结束
wg.Wait()
}
这里我们使用 New 定义了创建 http.Client 的方法,然后启动 10 个 goroutine 并发访问网址,使用的 http.Client 对象都是从池中获取的,使用完毕后再放回到池子。
实际上,这个池中可能创建了 10 个 http.Client ,也可能创建了 8 个,还有可能创建了 3 个。取决于每个请求执行时池中是否有空闲的 http.Client ,以及其它的 goroutine 是否及时的放回去。
另外这里要注意的是,我们设置了 New 字段,当没有空闲请求时,Get 方法会调用 New 重新生成一个新的 http.Client。这种方式实现的好处是不必担心没有 http.Client 可用,缺点是数量不可控。你可能会想,不设置 New 字段是否可以?也是可以的,实现如下:
package main
import (
"fmt"
"net/http"
"sync"
"time"
)
func main() {
var p sync.Pool // 创建一个对象池
for i := 0; i < 5; i++ {
p.Put(&http.Client{Timeout: 5 * time.Second}) // 不设置 New 字段,初始化时就放入5个可重用对象
}
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
client, ok := p.Get().(*http.Client)
if !ok {
fmt.Println("get client is nil")
return
}
defer p.Put(client)
resp, err := client.Get("https://www.baidu.com")
if err != nil {
fmt.Println("http get error", err)
return
}
resp.Body.Close()
fmt.Println("success", resp.Status)
}()
}
//等待所有请求结束
wg.Wait()
}
在初始化时直接放入一定数量的可重用对象,从而达到了控制数量的目的。但是不设置 New 字段的风险很大,因为池化的对象如果长时间没有被调用,可能会被回收,而我们是无法预知什么时候池化的对象是会被回收的。因此一般不会使用这种方式,而是通过其它的方式来实现并发数量控制。
至此,也清楚了我们想实现的诉求:既要通过池满足连接复用,也要控制连接数量。(我们已经知道,仅仅依靠 sync.Pool 是实现不了的)
三、FTP 连接池的实现
- 创建 ftp docker 容器
docker run -d --name ftp_server \
-p 2100:21 \
-p 30010-30019:30010-30019 \
-e "FTP_PASSIVE_PORTS=30010:30019" \
-e FTP_USER_HOME=/home/test \
-e FTP_USER_NAME=test \
-e FTP_USER_PASS=123456 \
-e FTP_USER_LIMIT=30 \
-e "PUBLICHOST=localhost" \
stilliard/pure-ftpd
- 使用 golang ftp client 库进行代码开发
package main
import (
"bytes"
"fmt"
"time"
"github.com/jlaffaye/ftp"
)
type FTPConnectionPool struct {
conns chan *ftp.ServerConn
maxConns int
}
func NewFTPConnectionPool(server, username, password string, maxConns int) (*FTPConnectionPool, error) {
pool := &FTPConnectionPool{
conns: make(chan *ftp.ServerConn, maxConns),
maxConns: maxConns,
}
for i := 0; i < maxConns; i++ {
conn, err := ftp.Dial(server, ftp.DialWithTimeout(5*time.Second))
if err != nil {
return nil, err
}
err = conn.Login(username, password)
if err != nil {
return nil, err
}
pool.conns <- conn
}
return pool, nil
}
func (p *FTPConnectionPool) GetConnection() (*ftp.ServerConn, error) {
return <-p.conns, nil
}
func (p *FTPConnectionPool) ReleaseConnection(conn *ftp.ServerConn) {
p.conns <- conn
}
func (p *FTPConnectionPool) Close() {
close(p.conns)
for conn := range p.conns {
_ = conn.Quit()
}
}
func (p *FTPConnectionPool) StoreFileWithPool(remotePath string, buffer []byte) error {
conn, err := p.GetConnection()
if err != nil {
return err
}
defer p.ReleaseConnection(conn)
data := bytes.NewBuffer(buffer)
err = conn.Stor(remotePath, data)
if err != nil {
return fmt.Errorf("failed to upload file: %w", err)
}
return nil
}
func main() {
fmt.Println("hello world")
}
- 性能测试
package main
import (
"fmt"
"log"
"sync"
"testing"
)
func BenchmarkFTPClient_StoreFileWithMaxConnections(b *testing.B) {
// Assume NewFTPConnectionPool has been called elsewhere to initialize the pool
// with a maxConns value of 4. For example:
pool, err := NewFTPConnectionPool("localhost:2100", "jovy", "123456", 5)
if err != nil {
log.Fatalf("Failed to initialize FTP connection pool: %v", err)
}
defer pool.Close()
var wg sync.WaitGroup
buffer := []byte("test data for benchmarking")
b.ResetTimer()
for i := 0; i < 50; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
// Use the connection pool to store the file
err := pool.StoreFileWithPool(fmt.Sprintf("file_%d.txt", i), buffer)
if err != nil {
b.Errorf("Failed to store file: %v", err)
}
}(i)
}
wg.Wait()
}
可以看到,池化后性能这块已经达到了极致。
至此,整个功能也实现差不多了,后续的错误处理及代码抽象可以在此基础上继续优化,感兴趣的同学可以测试看看。
四、参考
- 《深入理解 Go 并发编程》 鸟窝
- 在容器中搭建运行 FTP 服务器 https://www.niwoxuexi.com/blog/hangge/article/903.html
- linux开启ftp服务和golang实现ftp_server_client https://www.liuvv.com/p/d43abcbd.html