Go 高级教程 / GRpc/Pb · 2023年5月1日 0

gRPC负载均衡-Golang

一. 负载均衡三种解决方案

构建高可用、高性能的通信服务,通常采用服务注册与发现、负载均衡和容错处理等机制实现。根据负载均衡实现所在的位置不同,通常可分为以下三种解决方案:

  • 1、集中式LB(Proxy Model)
  • 2、进程内LB(Balancing-aware Client)
  • 3、独立 LB 进程(External Load Balancing Service)

出处在这里,写的很详细: 链接地址

二. gRPC的准备

gRPC 默认使用 protocol buffers,这是 Google 开源的一套成熟的结构数据序列化机制(当然也可以使用其他数据格式如 JSON)。其客户端提供Objective-C、Java接口,服务器侧则有Java、Golang、C++等接口,从而为移动端(iOS/Androi)到服务器端通讯提供了一种解决方案。链接地址

1.安装brew,这个自己百度谷歌.

2.终端 :

brew install autoconf automake libtool
复制代码

3.安装golang protobuf

go get -u github.com/golang/protobuf/proto // golang protobuf 库
go get -u github.com/golang/protobuf/protoc-gen-go //protoc --go_out 工具
复制代码

三. 简单的protobuf

../proto/hello.proto

syntax = "proto3";

package proto;

message SayReq {
    string content = 1;
}

message SayResp {
    string content = 1;
}

service Test{
    rpc Say(SayReq) returns (SayResp) {}
}
复制代码

在proto下 , 输入终端命令 protoc --go_out=plugins=grpc:. hello.proto

生成 hello.pb.go 文件,通过protoc就能生成不同语言需要的.pb.go文件


这里是介绍四种protoc,如果不用到流,可以不看:

1:简单 RPC

2:服务器端流式 RPC

3:客户端流式 RPC

4:双向流式 RPC

4种方式与以下定义四种服务对应:
test.proto文件:
service Test{

rpc LZX1(SayReq) returns (SayResp) {}
rpc LZX2(SayReq) returns (stream SayResp) {}
rpc LZX3(stream SayReq) returns (SayResp) {}
rpc LZX4(stream SayReq) returns (stream SayResp) {}
复制代码

}

生成的test.pb.go文件:

type TestClient interface {

LZX1(ctx context.Context, in *SayReq, opts ...grpc.CallOption) (*SayResp, error)
LZX2(ctx context.Context, in *SayReq, opts ...grpc.CallOption) (Test_LZX2Client, error)
LZX3(ctx context.Context, opts ...grpc.CallOption) (Test_LZX3Client, error)
LZX4(ctx context.Context, opts ...grpc.CallOption) (Test_LZX4Client, error)
复制代码

}

所以无流的函数,也就是第一种简单的RPC,都只是一一对应.只要有流,返回的类型都是 服务结构名_函数名Client;只要客户端是流式,传参将不包含其他参数.


四.6种负载均衡算法

1、轮询法

2、随机法

3、源地址哈希法

4、加权轮询法

5、加权随机法

6、最小连接数法

算法的描述看这里:链接地址

五. gRPC的例子

例子出处链接:链接地址

架构:

以下用的是随机负载均衡:

客户端 etcd/client/random/main.go

package main

import (
    etcd "github.com/coreos/etcd/client"
    grpclb "github.com/liyue201/grpc-lb"
    "github.com/liyue201/grpc-lb/examples/proto"
    registry "github.com/liyue201/grpc-lb/registry/etcd"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
    "log"
    "strconv"
    "time"
)

func main() {
    etcdConfg := etcd.Config{
        Endpoints: []string{"http://120.24.44.201:2379"},
    }
    r := registry.NewResolver("/grpc-lb", "test", etcdConfg) // 加载registry
    b := grpclb.NewBalancer(r, grpclb.NewRandomSelector())   //加载grpclbs
    c, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(b))
    if err != nil {
        log.Printf("grpc dial: %s", err)
        return
    }
    defer c.Close()

    client := proto.NewTestClient(c)
    var num int
    for i := 0; i 1000; i++ {
        resp, err := client.Say(context.Background(), &proto.SayReq{Content: "random"})
        if err != nil {
            log.Println(err)
            time.Sleep(time.Second)
            continue
        }
        time.Sleep(time.Second)
        num++
        log.Printf(resp.Content + ",  clientOfnum: " + strconv.Itoa(num))
    }
}
复制代码

服务端 etcd/server/main.go

package main

import (
    "flag"
    "fmt"
    etcd "github.com/coreos/etcd/client"
    "github.com/liyue201/grpc-lb/examples/proto"
    registry "github.com/liyue201/grpc-lb/registry/etcd"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
    "log"
    "net"
    "sync"
    "time"
)

var nodeID = flag.String("node", "node1", "node ID")
var port = flag.Int("port", 8080, "listening port")

type RpcServer struct {
    addr string
    s    *grpc.Server
}

func NewRpcServer(addr string) *RpcServer {
    s := grpc.NewServer()
    rs := &RpcServer{
        addr: addr,
        s:    s,
    }
    return rs
}

func (s *RpcServer) Run() {
    listener, err := net.Listen("tcp", s.addr)
    if err != nil {
        log.Printf("failed to listen: %v", err)
        return
    }
    log.Printf("rpc listening on:%s", s.addr)

    proto.RegisterTestServer(s.s, s)
    s.s.Serve(listener)
}

func (s *RpcServer) Stop() {
    s.s.GracefulStop()
}

var num int

func (s *RpcServer) Say(ctx context.Context, req *proto.SayReq) (*proto.SayResp, error) {
    num++
    text := "Hello " + req.Content + ", I am " + *nodeID + ", serverOfnum: " + strconv.Itoa(num)
    log.Println(text)

    return &proto.SayResp{Content: text}, nil
}

func StartService() {
    etcdConfg := etcd.Config{
        Endpoints: []string{"http://120.24.44.201:2379"},
    }

    registry, err := registry.NewRegistry(
        registry.Option{
            EtcdConfig:  etcdConfg,
            RegistryDir: "/grpc-lb",
            ServiceName: "test",
            NodeID:      *nodeID,
            NData: registry.NodeData{
                Addr: fmt.Sprintf("127.0.0.1:%d", *port),
                //Metadata: map[string]string{"weight": "1"},
            },
            Ttl: 10 * time.Second,
        })
    if err != nil {
        log.Panic(err)
        return
    }
    server := NewRpcServer(fmt.Sprintf("0.0.0.0:%d", *port))
    wg := sync.WaitGroup{}

    wg.Add(1)
    go func() {
        server.Run()
        wg.Done()
    }()

    wg.Add(1)
    go func() {
        registry.Register()
        wg.Done()
    }()

    //stop the server after one minute
    //go func() {
    //   time.Sleep(time.Minute)
    //   server.Stop()
    //   registry.Deregister()
    //}()

    wg.Wait()
}

func main() {
    flag.Parse()
    StartService()
}
复制代码

服务端的代码如下, 使用以下命令运行3个服务进程,再启动客户端。

go run main.go -node node1 -port 28544

go run main.go -node node2 -port 18562

go run main.go -node node3 -port 27772

六.最终效果

(客户端不停的随机访问三个服务端)

客户端:


服务端node1:


服务端node2:


服务端node3:


断开服务端node1,node3,client照样能跑,并只连接到node2服务端:

客户端:

服务器node2:


再次启动node1,node3服务端,client自动连接上,并继续随机访问3台服务端:

客户端:

服务端node1:

服务端node3:


最后,关闭所有服务端,客户端处于阻塞状态.

打赏 赞(0) 分享'
分享到...
微信
支付宝
微信二维码图片

微信扫描二维码打赏

支付宝二维码图片

支付宝扫描二维码打赏

文章目录