中心网关:gateway
四个微服务:user、message、note、relationship
1 中心网关实现服务发现
1.1 设计EtcdDiscovery类
package entity
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"gonote/gateway/internal/option"
messageService "gonote/message/service"
noteService "gonote/note/service"
relationshipService "gonote/relationship/service"
userService "gonote/user/service"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"sync"
"time"
)
type EtcdDiscovery struct {
etcdClient *clientv3.Client
serviceMap map[string]interface{}
mu sync.RWMutex
}
func NewEtcdDiscovery(ip string, port int) (*EtcdDiscovery, error) {
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{fmt.Sprintf("%v:%v", ip, port)},
DialTimeout: time.Second * 3,
})
if err != nil {
return nil, err
}
return &EtcdDiscovery{
etcdClient: etcdCli,
serviceMap: make(map[string]interface{}),
}, nil
}
func (ed *EtcdDiscovery) Start(serviceNames []string) {
for _, serviceName := range serviceNames {
resp, err := ed.etcdClient.Get(context.TODO(), serviceName)
if err != nil {
panic(err)
}
addr := string(resp.Kvs[0].Value)
conn, err := grpc.Dial(addr,
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}
// etcd存储的是各个微服务的监听地址,通过监听地址创建服务实例
switch serviceName {
case option.User:
grpcCli := userService.NewUserServiceClient(conn)
ed.serviceMap[serviceName] = grpcCli
case option.Relationship:
grpcCli := relationshipService.NewRelationshipServiceClient(conn)
ed.serviceMap[serviceName] = grpcCli
case option.Note:
grpcCli := noteService.NewNoteServiceClient(conn)
ed.serviceMap[serviceName] = grpcCli
case option.Message:
grpcCli := messageService.NewMessageServiceClient(conn)
ed.serviceMap[serviceName] = grpcCli
}
// 开启协程,监听etcd的变化,动态维护各个grpc服务实例
go ed.watch(serviceName)
}
}
func (ed *EtcdDiscovery) watch(serviceName string) {
watchChan := ed.etcdClient.Watch(context.TODO(), serviceName)
for event := range watchChan {
for _, e := range event.Events {
if e.Type == clientv3.EventTypePut {
addr := string(e.Kv.Value)
conn, err := grpc.Dial(addr,
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
continue
}
ed.mu.Lock()
switch serviceName {
case option.User:
grpcCli := userService.NewUserServiceClient(conn)
ed.serviceMap[serviceName] = grpcCli
case option.Relationship:
grpcCli := relationshipService.NewRelationshipServiceClient(conn)
ed.serviceMap[serviceName] = grpcCli
case option.Note:
grpcCli := noteService.NewNoteServiceClient(conn)
ed.serviceMap[serviceName] = grpcCli
case option.Message:
grpcCli := messageService.NewMessageServiceClient(conn)
ed.serviceMap[serviceName] = grpcCli
}
ed.mu.Unlock()
} else if e.Type == clientv3.EventTypeDelete {
ed.mu.Lock()
delete(ed.serviceMap, serviceName)
ed.mu.Unlock()
}
}
}
}
func (ed *EtcdDiscovery) GetServiceAddr(serviceName string) interface{} {
ed.mu.RLock()
defer ed.mu.RUnlock()
return ed.serviceMap[serviceName]
}
1.2 在web启动时,初始化EtcdDiscovery
package main
import (
"gonote/gateway/internal"
"gonote/gateway/internal/option"
"gonote/gateway/internal/util"
)
func init() {
util.InitLogger()
util.InitWebSocketServer(64)
err := util.InitRedis()
if err != nil {
panic(err)
}
util.InitKafka(option.Topic)
util.InitEtcdDiscovery([]string{
option.User,
option.Relationship,
option.Note,
option.Message})
}
func main() {
engine := internal.Router()
if err := engine.Run("0.0.0.0:9090"); err != nil {
panic(err)
}
}
1.3 调用EtcdDiscovery获取服务实例
举个用户注册的例子:
func UserLogin(c *gin.Context) {
em := c.PostForm("email")
pwd := c.PostForm("pwd")
// 获取服务实例
cli := util.EtcdDiscovery.GetServiceAddr(option.User).(service.UserServiceClient)
// 调用服务
_, err := cli.UserLogin(context.TODO(), &service.User{
Email: em,
Pwd: pwd,
})
if err != nil {
c.JSON(200, gin.H{
"code": 1,
"msg": err.Error(),
})
return
}
// 生成jwt令牌
token, err := util.GenToken(em)
if err != nil {
c.JSON(200, gin.H{
"code": 1,
"msg": err.Error(),
})
return
}
// session维护长连接
session := sessions.Default(c)
session.Set("email", em)
err = session.Save()
if err != nil {
c.JSON(200, gin.H{
"code": 1,
"msg": err.Error(),
})
return
}
c.JSON(200, gin.H{
"code": 0,
"data": token,
})
}
2 微服务端进行服务注册
user业务对应的微服务:
func init() {
util.InitLogger()
err := util.InitDB()
if err != nil {
panic(err)
}
util.InitKafka(option.Topic)
util.InitEtcdCli()
}
func main() {
addr := option.IP + ":" + option.Port
ln, err := net.Listen("tcp", addr)
if err != nil {
panic(err)
}
defer ln.Close()
defer util.EtcdCli.Close()
defer util.KafkaCli.Close()
// 服务注册
_, err = util.EtcdCli.Put(context.TODO(), "user", addr)
if err != nil {
panic(err)
}
server := grpc.NewServer()
service.RegisterUserServiceServer(server, &service.UserServiceImpl{})
if err = server.Serve(ln); err != nil {
panic(err)
}
}
通过etcd命令查看相关注册信息