gRPC框架使用
一、gRPC基础知识
知识点 | 说明 | 重要程度 | 应用场景 |
---|---|---|---|
RPC原理 | 远程过程调用的基本原理 | ⭐⭐⭐⭐⭐ | 分布式系统通信 |
协议设计 | Protocol Buffers的使用 | ⭐⭐⭐⭐⭐ | 接口定义、数据序列化 |
服务定义 | gRPC服务和方法定义 | ⭐⭐⭐⭐ | 服务接口设计 |
性能优化 | 连接池、压缩、流式处理 | ⭐⭐⭐⭐ | 高性能RPC服务 |
让我们通过一个完整的示例来学习gRPC:
syntax = "proto3";
package user;
option go_package = "./pb";
// 用户服务定义
service UserService {
// 创建用户
rpc CreateUser (CreateUserRequest) returns (CreateUserResponse);
// 获取用户
rpc GetUser (GetUserRequest) returns (GetUserResponse);
// 更新用户
rpc UpdateUser (UpdateUserRequest) returns (UpdateUserResponse);
// 删除用户
rpc DeleteUser (DeleteUserRequest) returns (DeleteUserResponse);
// 流式获取用户列表
rpc ListUsers (ListUsersRequest) returns (stream UserResponse);
}
// 用户信息
message User {
int64 id = 1;
string name = 2;
string email = 3;
UserStatus status = 4;
repeated string roles = 5;
map<string, string> metadata = 6;
}
// 用户状态
enum UserStatus {
UNKNOWN = 0;
ACTIVE = 1;
INACTIVE = 2;
SUSPENDED = 3;
}
// 创建用户请求
message CreateUserRequest {
string name = 1;
string email = 2;
repeated string roles = 3;
}
// 创建用户响应
message CreateUserResponse {
User user = 1;
}
// 获取用户请求
message GetUserRequest {
int64 id = 1;
}
// 获取用户响应
message GetUserResponse {
User user = 1;
}
// 更新用户请求
message UpdateUserRequest {
User user = 1;
repeated string update_fields = 2;
}
// 更新用户响应
message UpdateUserResponse {
User user = 1;
}
// 删除用户请求
message DeleteUserRequest {
int64 id = 1;
}
// 删除用户响应
message DeleteUserResponse {
bool success = 1;
}
// 列出用户请求
message ListUsersRequest {
int32 page_size = 1;
string page_token = 2;
}
// 用户响应
message UserResponse {
User user = 1;
}
首先,创建Protocol Buffers定义文件:
package main
import (
"context"
"fmt"
"log"
"net"
"sync"
"time"
pb "your_project/pb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type userServer struct {
pb.UnimplementedUserServiceServer
mu sync.RWMutex
users map[int64]*pb.User
pb.UserServiceServer
}
func newUserServer() *userServer {
return &userServer{
users: make(map[int64]*pb.User),
}
}
func (s *userServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
// 实现超时控制
if deadline, ok := ctx.Deadline(); ok {
if time.Now().After(deadline) {
return nil, status.Error(codes.DeadlineExceeded, "deadline exceeded")
}
}
s.mu.Lock()
defer s.mu.Unlock()
// 生成用户ID
userID := int64(len(s.users) + 1)
// 创建新用户
user := &pb.User{
Id: userID,
Name: req.Name,
Email: req.Email,
Roles: req.Roles,
Status: pb.UserStatus_ACTIVE,
Metadata: make(map[string, string),
}
// 存储用户
s.users[userID] = user
return &pb.CreateUserResponse{User: user}, nil
}
func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
s.mu.RLock()
defer s.mu.RUnlock()
user, exists := s.users[req.Id]
if !exists {
return nil, status.Errorf(codes.NotFound, "user not found: %d", req.Id)
}
return &pb.GetUserResponse{User: user}, nil
}
func (s *userServer) UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) (*pb.UpdateUserResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
user, exists := s.users[req.User.Id]
if !exists {
return nil, status.Errorf(codes.NotFound, "user not found: %d", req.User.Id)
}
// 更新字段
if len(req.UpdateFields) == 0 {
// 更新所有字段
user.Name = req.User.Name
user.Email = req.User.Email
user.Roles = req.User.Roles
user.Status = req.User.Status
user.Metadata = req.User.Metadata
} else {
// 只更新指定字段
for _, field := range req.UpdateFields {
switch field {
case "name":
user.Name = req.User.Name
case "email":
user.Email = req.User.Email
case "roles":
user.Roles = req.User.Roles
case "status":
user.Status = req.User.Status
case "metadata":
user.Metadata = req.User.Metadata
}
}
}
return &pb.UpdateUserResponse{User: user}, nil
}
func (s *userServer) DeleteUser(ctx context.Context, req *pb.DeleteUserRequest) (*pb.DeleteUserResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
if _, exists := s.users[req.Id]; !exists {
return nil, status.Errorf(codes.NotFound, "user not found: %d", req.Id)
}
delete(s.users, req.Id)
return &pb.DeleteUserResponse{Success: true}, nil
}
func (s *userServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
s.mu.RLock()
defer s.mu.RUnlock()
pageSize := int(req.PageSize)
if pageSize <= 0 {
pageSize = 10 // 默认页大小
}
// 模拟分页
start := 0
for _, user := range s.users {
if start >= len(s.users) {
break
}
// 发送用户数据
err := stream.Send(&pb.UserResponse{User: user})
if err != nil {
return err
}
start++
if start%pageSize == 0 {
// 模拟流处理延迟
time.Sleep(100 * time.Millisecond)
}
}
return nil
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// 创建gRPC服务器
s := grpc.NewServer(
grpc.UnaryInterceptor(loggingInterceptor),
)
// 注册服务
pb.RegisterUserServiceServer(s, newUserServer())
fmt.Println("Server starting on :50051...")
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
// 日志拦截器
func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()
// 调用处理器
resp, err := handler(ctx, req)
// 记录请求日志
log.Printf(
"Method: %s, Duration: %v, Error: %v",
info.FullMethod,
time.Since(start),
err,
)
return resp, err
}
接下来,创建服务器实现:
package main
import (
"context"
"fmt"
"log"
"net"
"sync"
"time"
pb "your_project/pb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type userServer struct {
pb.UnimplementedUserServiceServer
mu sync.RWMutex
users map[int64]*pb.User
pb.UserServiceServer
}
func newUserServer() *userServer {
return &userServer{
users: make(map[int64]*pb.User),
}
}
func (s *userServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
// 实现超时控制
if deadline, ok := ctx.Deadline(); ok {
if time.Now().After(deadline) {
return nil, status.Error(codes.DeadlineExceeded, "deadline exceeded")
}
}
s.mu.Lock()
defer s.mu.Unlock()
// 生成用户ID
userID := int64(len(s.users) + 1)
// 创建新用户
user := &pb.User{
Id: userID,
Name: req.Name,
Email: req.Email,
Roles: req.Roles,
Status: pb.UserStatus_ACTIVE,
Metadata: make(map[string, string),
}
// 存储用户
s.users[userID] = user
return &pb.CreateUserResponse{User: user}, nil
}
func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
s.mu.RLock()
defer s.mu.RUnlock()
user, exists := s.users[req.Id]
if !exists {
return nil, status.Errorf(codes.NotFound, "user not found: %d", req.Id)
}
return &pb.GetUserResponse{User: user}, nil
}
func (s *userServer) UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) (*pb.UpdateUserResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
user, exists := s.users[req.User.Id]
if !exists {
return nil, status.Errorf(codes.NotFound, "user not found: %d", req.User.Id)
}
// 更新字段
if len(req.UpdateFields) == 0 {
// 更新所有字段
user.Name = req.User.Name
user.Email = req.User.Email
user.Roles = req.User.Roles
user.Status = req.User.Status
user.Metadata = req.User.Metadata
} else {
// 只更新指定字段
for _, field := range req.UpdateFields {
switch field {
case "name":
user.Name = req.User.Name
case "email":
user.Email = req.User.Email
case "roles":
user.Roles = req.User.Roles
case "status":
user.Status = req.User.Status
case "metadata":
user.Metadata = req.User.Metadata
}
}
}
return &pb.UpdateUserResponse{User: user}, nil
}
func (s *userServer) DeleteUser(ctx context.Context, req *pb.DeleteUserRequest) (*pb.DeleteUserResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
if _, exists := s.users[req.Id]; !exists {
return nil, status.Errorf(codes.NotFound, "user not found: %d", req.Id)
}
delete(s.users, req.Id)
return &pb.DeleteUserResponse{Success: true}, nil
}
func (s *userServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
s.mu.RLock()
defer s.mu.RUnlock()
pageSize := int(req.PageSize)
if pageSize <= 0 {
pageSize = 10 // 默认页大小
}
// 模拟分页
start := 0
for _, user := range s.users {
if start >= len(s.users) {
break
}
// 发送用户数据
err := stream.Send(&pb.UserResponse{User: user})
if err != nil {
return err
}
start++
if start%pageSize == 0 {
// 模拟流处理延迟
time.Sleep(100 * time.Millisecond)
}
}
return nil
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// 创建gRPC服务器
s := grpc.NewServer(
grpc.UnaryInterceptor(loggingInterceptor),
)
// 注册服务
pb.RegisterUserServiceServer(s, newUserServer())
fmt.Println("Server starting on :50051...")
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
// 日志拦截器
func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()
// 调用处理器
resp, err := handler(ctx, req)
// 记录请求日志
log.Printf(
"Method: %s, Duration: %v, Error: %v",
info.FullMethod,
time.Since(start),
err,
)
return resp, err
}
创建客户端实现:
package main
import (
"context"
"fmt"
"io"
"log"
"time"
pb "your_project/pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
// 建立连接
conn, err := grpc.Dial(
":50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// 创建客户端
client := pb.NewUserServiceClient(conn)
// 设置超时上下文
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
// 创建用户
user, err := createUser(ctx, client, "John Doe", "john@example.com", []string{"user"})
if err != nil {
log.Fatalf("Create user failed: %v", err)
}
fmt.Printf("Created user: %v\n", user)
// 获取用户
getResp, err := client.GetUser(ctx, &pb.GetUserRequest{Id: user.Id})
if err != nil {
log.Fatalf("Get user failed: %v", err)
}
fmt.Printf("Got user: %v\n", getResp.User)
// 更新用户
updateReq := &pb.UpdateUserRequest{
User: &pb.User{
Id: user.Id,
Name: "John Smith",
Email: user.Email,
Roles: append(user.Roles, "admin"),
},
UpdateFields: []string{"name", "roles"},
}
updateResp, err := client.UpdateUser(ctx, updateReq)
if err != nil {
log.Fatalf("Update user failed: %v", err)
}
fmt.Printf("Updated user: %v\n", updateResp.User)
// 流式获取用户列表
stream, err := client.ListUsers(ctx, &pb.ListUsersRequest{PageSize: 5})
if err != nil {
log.Fatalf("List users failed: %v", err)
}
fmt.Println("\nUser list:")
for {
user, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Failed to receive user: %v", err)
}
fmt.Printf("- %v\n", user.User)
}
// 删除用户
deleteResp, err := client.DeleteUser(ctx, &pb.DeleteUserRequest{Id: user.Id})
if err != nil {
log.Fatalf("Delete user failed: %v", err)
}
fmt.Printf("Delete user success: %v\n", deleteResp.Success)
}
func createUser(ctx context.Context, client pb.UserServiceClient, name, email string, roles []string) (*pb.User, error) {
resp, err := client.CreateUser(ctx, &pb.CreateUserRequest{
Name: name,
Email: email,
Roles: roles,
})
if err != nil {
return nil, err
}
return resp.User, nil
}
让我们创建gRPC服务的流程图:
二、gRPC性能优化
1. 连接管理
// 连接池配置
type GrpcClientPool struct {
conns []*grpc.ClientConn
mu sync.Mutex
current int
size int
}
// 创建连接池
func NewGrpcClientPool(target string, size int) (*GrpcClientPool, error) {
pool := &GrpcClientPool{
conns: make([]*grpc.ClientConn, size),
size: size,
}
for i := 0; i < size; i++ {
conn, err := grpc.Dial(
target,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
if err != nil {
return nil, err
}
pool.conns[i] = conn
}
return pool, nil
}
// 获取连接
func (p *GrpcClientPool) GetConn() *grpc.ClientConn {
p.mu.Lock()
defer p.mu.Unlock()
conn := p.conns[p.current]
p.current = (p.current + 1) % p.size
return conn
}
2. 压缩选项
// 服务端压缩配置
s := grpc.NewServer(
grpc.DefaultCompression(grpc.Gzip),
)
// 客户端压缩配置
conn, err := grpc.Dial(
target,
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")),
)
3. 流式处理优化
优化方向 | 方法 | 效果 |
---|---|---|
批量处理 | 合并多个小请求 | 减少网络开销 |
流控制 | 控制发送速率 | 避免过载 |
缓冲区优化 | 调整缓冲区大小 | 提高吞吐量 |
三、gRPC高级特性
1. 拦截器
2. 负载均衡
// 自定义负载均衡器
type customBalancer struct {
conns []*grpc.ClientConn
mu sync.Mutex
index int
}
func (b *customBalancer) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
b.mu.Lock()
defer b.mu.Unlock()
conn := b.conns[b.index]
b.index = (b.index + 1) % len(b.conns)
return balancer.PickResult{SubConn: conn}, nil
}
3. 健康检查
// 实现健康检查服务
type healthServer struct {
pb.UnimplementedHealthServer
}
func (s *healthServer) Check(ctx context.Context, req *pb.HealthCheckRequest) (*pb.HealthCheckResponse, error) {
return &pb.HealthCheckResponse{
Status: pb.HealthCheckResponse_SERVING,
}, nil
}
func (s *healthServer) Watch(req *pb.HealthCheckRequest, stream pb.Health_WatchServer) error {
return stream.Send(&pb.HealthCheckResponse{
Status: pb.HealthCheckResponse_SERVING,
})
}
四、最佳实践建议
1. 服务设计
-
接口定义
- 清晰的服务边界
- 合理的方法命名
- 版本控制
-
错误处理
- 使用标准错误码
- 详细的错误信息
- 优雅的降级策略
-
安全性
- TLS加密
- 认证授权
- 请求限流
2. 性能优化
-
连接管理
- 使用连接池
- 复用连接
- 及时关闭
-
数据处理
- 批量处理
- 压缩数据
- 流式传输
-
监控告警
- 性能指标
- 错误统计
- 及时报警
怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!