书写上回,上回讲到,Elasticsearch的使用前提即:语法,表结构,使用类型结构等。要学这个必须要看前面这个:GoLang学习之路,对Elasticsearch的使用,一文足以(包括泛型使用思想)(一),因为这篇是基础!!!!!!!
文章目录
- 使用ElasticSearch
- `使用前提`
- 使用API实现对Elasticsearch的增删改查
- 创建客户端
- 创建yaml文件
- 创建客户端
- 将配置文件加载到客户端对象中
- 创建索引结构
- 定义客户端结构体
- 定义创建索引结构的方法
- 写一个测试方法
- 插入一条数据的方法
- 判断是否存在索引,不存在就创建一个
- 批量处理
- 方式一
- 测试
- 方法二
- 方式三
- 查询
使用ElasticSearch
使用前提
- 必须要有一个
ElasticSearch
服务器 - 必须要有一个可视化工具
- 安装API包,
"github.com/elastic/go-elasticsearch/v8"
import "github.com/elastic/go-elasticsearch/v8"
但是这个包下面其实还有一些包,这些包非常的重要。当时我在使用的时候,根本不知道,走了不少的弯路的,找了官网的文档,又找了一些博客,都没有详细的说明情况和要点。要不就少些,要不就只把部分给列出来。但是现在我将这些无私的奉献给各位。
因为这个v8的包非常的多,所以很难将所有的放进去。这里我做一些解释:
- 客户端:
- 调用
NewDefaultClient()
和NewClient(cfg Config)
方法会返回一个普通客户端NewDefaultClient()
不需要去配置链接时的配置参数,默认参数链接,并返回一个普通客户端NewClient(cfg Config)
需要按照总共需要的配置需求去配置参数,并返回一个普通客户端
- 调用
NewTypedClient(cfg Config)
会返回一个属性客户端(相比普通客户端强大,但是有局限,后面再说)
- 调用
- 工具包:
- 这个工具包主要是
普通客户端
进行调用的,使用的范围是对于批量处理数据
的情况
- 这个工具包主要是
- 参数类型包:
- 我们在对
ElasticSearch
进行处理的时候会有很多中情况:- 首先是对于语法的选择,
ElasticSearch
有独属于他自己的一套语法。 - 查询时会有很多选择,比如对于字段是模糊查询,还是精确查询,还是对地图进行查询。这些参数都有,也有对于
AI
进行处理的参数。(建议下一个翻译软件,去看看。那个参数太多了。。。也就是说功能非常齐全)
- 首先是对于语法的选择,
- 我们在对
…很多内容在GoLang学习之路,对Elasticsearch的使用,一文足以(包括泛型使用思想)(一)
接下来正式开始
使用API实现对Elasticsearch的增删改查
为了实现这些CRUD,我总结了几个基本的使用步骤。(可以不按我这个创建客户端)
- 创建客户端
- 生成yaml配置文件
- 读取配置文件信息,并保存到客户端上
- 创建索引结构
- 插入数据
- 然后调用API
创建客户端
根据上面所说,客户端在创建的时候,分为两种,一种为普通客户端
,一种是属性客户端
。而后者的功能更为强大。但是前者的某些功能,属性客户端是没办法的。比如批量处理数据(bulk)
在实际的生产中我们需要创建两个客户端,以便我们在需求变化中获取主动权。
创建yaml文件
文件名: config.yaml
文件中的参数按自己配,千万别一样,你们是连不上 的。
es:
adders:
- http://8.210.237.26:9200
username: elastic
password: +Svn3a*I*b2xxbCe9
yaml 中为何要实现数组结构,其本质是,Elasticsearch为了给以后分布式扩展提供渠道。到时候只要将IP地址,填充到配置文件就可以了
创建客户端
建议可以看看配置方法中的源码。
import (
myElasticSearch "elasticsearch/common/esll"
"github.com/elastic/go-elasticsearch/v8"
"net"
"net/http"
"time"
)
type ESConfig struct {
Adders []string `mapstructure:"adders" json:"adders" yaml:"adders"`
Password string `mapstructure:"password" json:"password" yaml:"password"`
Username string `mapstructure:"username" json:"username" yaml:"username"`
}
func NewES(config *Config) *myElasticSearch.ElasticSearch {
//强化版客户端
clientType, err := elasticsearch.NewTypedClient(elasticsearch.Config{
Addresses: config.ElasticSearch.Adders,
Username: config.ElasticSearch.Username,
Password: config.ElasticSearch.Password,
Transport: &http.Transport{
//每个host的idle状态的最大连接数目
MaxConnsPerHost: 10,
//发送完request后等待serve response的时间
ResponseHeaderTimeout: 3 * time.Second,
//(net.Conn, error) 创建未加密的tcp连接
DialContext: (&net.Dialer{Timeout: time.Second}).DialContext,
//连接保持idle状态的最大时间,超时关闭pconn
// todo 看需求是否使用tls证书链接
/*TLSClientConfig: &tls.Config{
MaxVersion: tls.VersionTLS11,
InsecureSkipVerify: true,
},*/
},
EnableDebugLogger: true,
})
if err != nil {
panic("ElasticSearch clientType connect ping failed:" + err.Error())
}
//一般客户端
client, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: config.ElasticSearch.Adders,
Username: config.ElasticSearch.Username,
Password: config.ElasticSearch.Password,
Transport: &http.Transport{
//每个host的idle状态的最大连接数目
MaxConnsPerHost: 10,
//发送完request后等待serve response的时间
ResponseHeaderTimeout: 3 * time.Second,
//(net.Conn, error) 创建未加密的tcp连接
DialContext: (&net.Dialer{Timeout: time.Second}).DialContext,
//连接保持idle状态的最大时间,超时关闭pconn
// todo 看需求是否使用tls证书链接
/*TLSClientConfig: &tls.Config{
MaxVersion: tls.VersionTLS11,
InsecureSkipVerify: true,
},*/
},
EnableDebugLogger: true,
})
if err != nil {
panic("ElasticSearch client connect ping failed:" + err.Error())
}
return &myElasticSearch.ElasticSearch{
ClientTyped: clientType,
Client: client,
}
}
将配置文件加载到客户端对象中
viper,这个读取配置文件的工具包:详细请看:文章
import (
"fmt"
"github.com/fsnotify/fsnotify"
"github.com/go-playground/validator/v10"
"github.com/google/wire"
"github.com/spf13/viper"
)
type Config struct {
ElasticSearch *ESConfig `mapstructure:"es" validate:"required"`
}
var Cfg *Config
func ProvideConfig() *Config {
var cfg Config
v := viper.New()
//索引配置文件位置
v.SetConfigName("config.yaml")
v.AddConfigPath("./")
v.SetConfigType("yaml")
err := v.ReadInConfig()
if err != nil {
panic(fmt.Errorf("open error of config file:%s", err))
}
//监视器
v.WatchConfig()
v.OnConfigChange(func(in fsnotify.Event) {
fmt.Println("config file changed:", in.Name)
err := v.Unmarshal(&cfg)
if err != nil {
fmt.Println(err)
}
})
//反序列化
if err := v.Unmarshal(&cfg); err != nil {
panic(fmt.Errorf("fatal error config file : %s", err))
}
vs := validator.New()
//校验结构
err = vs.Struct(&cfg)
if err != nil {
panic(err)
}
Cfg = &cfg
return &cfg
}
创建索引结构
定义索引结构
在 esll.go 文件中写入
const MappingTpl = `{
"mappings":{
"properties":{
"categoryId": {
"type": "long"
},
"productName": {
"type": "keyword"
},
"masterPic": {
"type": "text"
},
"desc": {
"type": "keyword"
},
"price": {
"type": "long"
},
"startProvinceCode": {
"type": "text"
},
"startCityCode": {
"type": "text"
},
"update_time": {
"type": "long"
},
"create_time": {
"type": "long"
}
}
}
}`
定义客户端结构体
包:package esll
type ElasticSearch struct {
ClientTyped *elasticsearchV8.TypedClient
Client *elasticsearchV8.Client
}
这里强调说明一下。这里为什么要用两个客户端?因为对于真正的实际运用中会有各种各样的问题出现,不仅会有一个一个查询,一个一个插入的情况,更会有一批一批的查询,插入的。所以这里的客户端对应的都会各有不同。
ClientTyped
:功能强大,但是不支持批量处理
Client
:调用复杂,但是支持批量处理
定义创建索引结构的方法
包:package esll
// CreateIndex 创建所用的索引结构
func (e *ElasticSearch) CreateIndex(ctx context.Context, indexName string, mappings string) error {
mapping := types.NewTypeMapping()
err := mapping.UnmarshalJSON([]byte(mappings))
if err != nil {
return err
}
_, err = e.ClientTyped.Indices.Exists(indexName).Do(ctx)
if err != nil {
log.Printf("索引已经存在")
return err
}
_, err = e.ClientTyped.Indices.Create(indexName).Mappings(mapping).Do(ctx)
if err != nil {
log.Printf("索引创建失败")
return err
}
return nil
}
写一个测试方法
func TestMepping(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), 50*time.Second)
cfg := config.ProvideConfig()
client := config.NewES(cfg)
mapping := types.NewTypeMapping()
err := mapping.UnmarshalJSON([]byte(esll.MappingTpl))
if err != nil {
return
}
_, err = client.ClientTyped.Indices.Create("test2").Mappings(mapping).Do(ctx)
if err != nil {
fmt.Println(err)
}
}
这里的types文件,是参数的文件,具体可以看看源码详情,根据需求选择
插入一条数据的方法
// IndexDocument 创建一条索引进入文档
func (e *ElasticSearch) IndexDocument(ctx context.Context, indexName string, document interface{}) error {
do, err := e.ClientTyped.Index(indexName).Document(document ).Do(ctx)
result := do.Result
fmt.Println(result)
if err != nil {
log.Printf("创建索引文档失败:%s", err)
return err
}
return nil
}
判断是否存在索引,不存在就创建一个
// IsExists 是否存在索引,不存在就创建一个
func (e *ElasticSearch) IsExists(ctx context.Context, indexName string, mappings string) error {
_, err2 := e.ClientTyped.Indices.Exists(indexName).Do(ctx)
if err2 != nil {
//不存在就重新创建一个索引
err := e.CreateIndex(ctx, indexName, mappings)
if err != nil {
return err
}
}
return nil
}
批量处理
方式一
func (e *ElasticSearch) IndexDocumentList(ctx context.Context, indexName string, anyList any, mapping string) error {
//验证索引是否存在
if err := e.IsExists(ctx, indexName, mapping); err != nil {
return err
}
//RW := &sync.RWMutex{}
slice, err := transitionSlice(anyList)
if err != nil {
return err
}
buf := buffer(ctx, e, indexName, slice, "index")
//获取当前索引下的文档个数
//todo:这里诺是出现超量的索引,可以通过for循环确定索要令牌(技术上限流),或者通过协程处理
//写入缓存中,并绑定索引,
//转换成json格式
//结果我发现这个官方已经实现了。。。。
bulk, err := e.Client.Bulk(
bytes.NewReader(buf.Bytes()),
e.Client.Bulk.WithIndex(indexName),
e.Client.Bulk.WithContext(ctx),
e.Client.Bulk.WithRefresh("true"))
//先关闭缓存
defer bulk.Body.Close()
if err != nil {
log.Fatal("ElasticSearch 批量写入 失败:", err)
return err
}
return nil
}
// 上传的缓存逻辑
func buffer(ctx context.Context, client *ElasticSearch, indexName string, slice []any, CRUD string) bytes.Buffer {
c, _ := client.ClientTyped.Cat.Count().Index(indexName).Do(ctx)
num, _ := strconv.Atoi(*c[0].Count)
//创建缓存
var buf bytes.Buffer
for i := num; i < len(slice)+num; i++ {
index := []byte(fmt.Sprintf(`{ "%s" : { "_id" : "%d" } }%s`, CRUD, i, "\n"))
//这里可以优化通过算法插入
datas, _ := json.Marshal(slice[i-num])
datas = append(datas, "\n"...)
buf.Grow(len(index) + len(datas))
buf.Write(index)
buf.Write(datas)
}
return buf
}
// todo 数量过多的话可以通过,三种方式,一种通过创建协程,一种通过二叉树递归的方式,另一种通过创建协程加递归的方式
func transitionSlice(anyl any) ([]any, error) {
val, ok := isSlice(anyl)
if !ok {
return nil, errors.New("切片转换失败")
}
sliceLen := val.Len()
list := make([]any, sliceLen)
for i := 0; i < sliceLen; i++ {
list[i] = val.Index(i).Interface()
}
return list, nil
}
// 判断是否为切片类型
func isSlice(anySlice any) (val1 reflect.Value, ok bool) {
val := reflect.ValueOf(anySlice)
if val.Kind() == reflect.Slice {
ok = true
}
val1 = val
return val1, ok
}
测试
func TestDuck2(t *testing.T) {
var (
buf bytes.Buffer
res *esapi.Response
err error
)
cfg := config.ProvideConfig()
client := config.NewES(cfg).Client
for j := 1; j <= 1000; j++ {
meta := []byte(fmt.Sprintf(`{ "index" : { "_id" : "%d" } }%s`, j, "\n"))
data := []byte(`{"content":"` + strings.Repeat("ABC", 100) + `"}`)
data = append(data, "\n"...)
buf.Grow(len(meta) + len(data))
buf.Write(meta)
buf.Write(data)
}
res, err = client.Bulk(bytes.NewReader(buf.Bytes()), client.Bulk.WithIndex("test"), client.Bulk.WithRefresh("true"))
if err != nil {
t.Fatalf("Failed to index data: %s", err)
}
res.Body.Close()
if res.IsError() {
t.Fatalf("Failed to index data: %s", res.Status())
}
}
方法二
unc (e *ElasticSearch) UpdateDocumentList(ctx context.Context, indexName *string, anyList any, typeBulk string, mappings string) error {
//验证索引是否存在
if err := e.IsExists(ctx, *indexName, mappings); err != nil {
return err
}
slice, err := transitionSlice(anyList)
if err != nil {
return err
}
start := time.Now().UTC()
//设置批量配置文件
indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: *indexName,
Client: e.Client,
NumWorkers: 5,
FlushBytes: 1024000,
FlushInterval: 30 * time.Second,
})
if err != nil {
return err
}
i := 1002
//将数据一条一条塞入缓存中
for _, data := range slice {
marsha, _ := json.Marshal(data)
m := fmt.Sprintf(`%s`, marsha)
i++
doc := esutil.BulkIndexerItem{
Index: *indexName,
Action: typeBulk,
DocumentID: strconv.Itoa(i),
Body: strings.NewReader(m),
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem) {
fmt.Printf("[%d] %s test/%s", item2.Status, item2.Result, item.DocumentID)
},
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, err error) {
if err != nil {
fmt.Printf(" ERROR: %s \n", err)
} else {
fmt.Printf("ERROR: %s: %s \n", item2.Error.Type, item2.Error.Reason)
}
},
}
err := indexer.Add(ctx, doc)
if err != nil {
fmt.Println(" bulk upsert Add doc fail,", err)
}
}
stats := indexer.Stats()
fmt.Println(strings.Repeat("-", 80))
dur := time.Since(start)
m := int64(1000.0 / float64(dur/time.Millisecond) * float64(stats.NumFlushed))
if stats.NumFailed > 0 {
fmt.Printf("[%s.bulk:%s]总数据[%d]行,其中失败[%d], 耗时 %v (速度:%d docs/秒)\n",
*indexName,
typeBulk,
stats.NumAdded,
stats.NumFailed,
dur.Truncate(time.Millisecond), m)
} else {
fmt.Printf("[%s.bulk:%s]处理数据[%d]行,耗时%v (速度:%d docs/秒)\n",
*indexName,
typeBulk,
stats.NumAdded,
dur.Truncate(time.Millisecond), m)
}
err = indexer.Close(ctx)
//如果没有关闭就需要循环关闭直到彻底关闭
if err != nil {
go func(ctx2 context.Context) {
for {
err = indexer.Close(ctx2)
if err != nil {
return
}
}
}(ctx)
}
return nil
}
方式三
func (e *ElasticSearch) Findne(ctx context.Context, indexName string, queryStr any, size uint, offset uint) error {
if err, _ := e.ClientTyped.Indices.Exists(indexName).Do(ctx); err == false {
return fmt.Errorf("该索引不存在,不能查找:%s", err)
}
typeList := reflect.TypeOf(queryStr)
if typeList.Kind() == reflect.Ptr {
typeList = typeList.Elem()
}
val := reflect.ValueOf(queryStr)
if val.Kind() == reflect.Ptr {
val = val.Elem()
}
var name string
var value string
query := make(map[string]types.MatchQuery, typeList.NumField())
var chouse bool
chouse = true
var dol types.HitsMetadata
for i := 0; i < typeList.NumField(); i++ {
que := &types.MatchQuery{
Lenient: &chouse,
FuzzyTranspositions: &chouse,
}
name = typeList.Field(i).Name
value = val.FieldByName(name).String()
que.Query = value
query[name] = *que
do, _ := e.ClientTyped.Search().
Index(indexName).
Query(&types.Query{
Match: map[string]types.MatchQuery{
"price": {Query: "123456"},
},
},
).From(int(offset)).Size(int(size)).Do(ctx)
dol = do.Hits
}
mapp["*esll.ProductES"] = &ProductES{}
m := mapp
fmt.Println(m)
//获取类型
typeLs := reflect.TypeOf(queryStr)
nam := typeLs.String()
fmt.Println(nam)
//从map中找到对应的结构体
key := mapp[typeLs.String()]
list := make([]any, 0)
//深拷贝
/*
valo := &key
key2 := *valo
key3 := &key2
*/
var co any
for i := 0; i < len(dol.Hits); i++ {
co = deepcopy.Copy(key)
//转换为json字符数组
marshalJSON, _ := dol.Hits[i].Source_.MarshalJSON()
//解码并绑定
_ = json.Unmarshal(marshalJSON, &co)
list = append(list, co)
}
return nil
}
查询
var mapp = make(map[string]any, 0)
func (e *ElasticSearch) FindOne(ctx context.Context, searchStruct *SearchStruct) (map[string]any, error) {
do, err := e.ClientTyped.Search().
Index(searchStruct.IndexName).
Query(&types.Query{
MatchAll: &types.MatchAllQuery{
Boost: &searchStruct.Boost,
QueryName_: &searchStruct.FieldName,
},
},
).From(searchStruct.Form).Size(searchStruct.Size).Do(ctx)
if err != nil {
return nil, err
}
//name := make(map[string]any)
marshalJSON, err := do.Hits.Hits[0].Source_.MarshalJSON()
if err != nil {
return nil, err
}
var p ProductES
_ = json.Unmarshal(marshalJSON, &p)
fmt.Println(p)
//listMap := make(map[string]any)
//for i := 0; i < len(do.Hits.Hits); i++ {
// structName := name["structName"]
// stringE, _ := ToStringE(structName)
// structs := mapp[stringE]
// structl := deepcopy.Copy(structs)
// _ = json.Unmarshal(marshalJSON, &structl)
// toStringE, _ := ToStringE(i)
// listMap[toStringE] = stringE
//}
//
return nil, err
}
// ToStringE 字符串转换工具
func ToStringE(i any) (string, error) {
i = indirectToStringerOrError(i)
switch s := i.(type) {
case string:
return s, nil
case bool:
return strconv.FormatBool(s), nil
case float64:
return strconv.FormatFloat(s, 'f', -1, 64), nil
case float32:
return strconv.FormatFloat(float64(s), 'f', -1, 32), nil
case int:
return strconv.Itoa(s), nil
case int64:
return strconv.FormatInt(s, 10), nil
case int32:
return strconv.Itoa(int(s)), nil
case int16:
return strconv.FormatInt(int64(s), 10), nil
case int8:
return strconv.FormatInt(int64(s), 10), nil
case uint:
return strconv.FormatUint(uint64(s), 10), nil
case uint64:
return strconv.FormatUint(uint64(s), 10), nil
case uint32:
return strconv.FormatUint(uint64(s), 10), nil
case uint16:
return strconv.FormatUint(uint64(s), 10), nil
case uint8:
return strconv.FormatUint(uint64(s), 10), nil
case json.Number:
return s.String(), nil
case []byte:
return string(s), nil
case template.HTML:
return string(s), nil
case template.URL:
return string(s), nil
case template.JS:
return string(s), nil
case template.CSS:
return string(s), nil
case template.HTMLAttr:
return string(s), nil
case nil:
return "", nil
case fmt.Stringer:
return s.String(), nil
case error:
return s.Error(), nil
default:
return "", fmt.Errorf("unable to cast %#v of type %T to string", i, i)
}
}
var (
errorType = reflect.TypeOf((*error)(nil)).Elem()
fmtStringerType = reflect.TypeOf((*fmt.Stringer)(nil)).Elem()
)
// Copied from html/template/content.go.
// indirectToStringerOrError returns the value, after dereferencing as many times
// as necessary to reach the base type (or nil) or an implementation of fmt.Stringer
// or error,
func indirectToStringerOrError(a any) any {
if a == nil {
return nil
}
v := reflect.ValueOf(a)
for !v.Type().Implements(fmtStringerType) && !v.Type().Implements(errorType) && v.Kind() == reflect.Pointer && !v.IsNil() {
v = v.Elem()
}
return v.Interface()
}