gRPC三种流和消息格式
消息格式
RPC流
服务端实现protocol buffer定义的方法,客户端保留一个存根,提供服务端方法的抽象,客户端只需要调用存根中的方法,就可以远程调用服务端方法。
- 调用存根方法
- 存根创建HTTP POST请求(==gRPC中所有请求都是 POST==),设置
content-type
为application/grpc
- 到达服务端,会先检查请求头是不是gRPC请求,否则返回415
长度前缀的消息分帧
在写入消息前,先写入长度消息表明每条消息的大小。
每条消息都有额外的4字节来设置大小,也就是说消息的大小不能超过4GB
帧首中还有单字节无符号整数,用来表明数据是否进行了压缩
为1表示使用 message-encoding
中的编码机制进行了压缩
请求消息
客户端发送,包含3个部分:请求头信息、长度前缀的消息、流结束标记
1、对于gRPC 都是POST
2、协议:Http/Https
3、/服务名/方法名
4、目标URI的主机名
5、对不兼容代理的检测,gRPC下这个值必须为 trailers
6、超时时间
7、媒体类型
8、压缩类型
==当因为没有要发送的数据而需要关闭请求流时,必须发送一个带标记的空数据帧==
响应信息
服务端发送,包含3个部分:响应头信息、长度前缀的信息、trailers
END_STREAM 标记不会随数据帧一起发送,而是作为单独的头信息来发送,名为 trailer
三种流
一元RPC
通信时始终只有一个请求和一个响应
protocol buffer
syntax = "proto3";
package hello;
// 第一个分割参数,输出路径;第二个设置生成类的包路径
option go_package = "./proto/hello";
// 设置服务名称
service Greeter {
// 设置方法
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// 请求信息用户名.
message HelloRequest {
string name = 1;
}
// 响应信息
message HelloReply {
string message = 1;
}
服务端
package main
import (
"context"
"flag"
"fmt"
"log"
"net"
"google.golang.org/grpc"
pb "mygrpc/proto/hello"
)
var (
port = flag.Int("port", 50051, "The server port")
)
type server struct {
pb.UnimplementedGreeterServer
}
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.GetName())
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// 开启rpc
s := grpc.NewServer()
// 注册服务
pb.RegisterGreeterServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
客户端
package main
import (
"context"
"flag"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
defaultName = "world"
)
var (
addr = flag.String("addr", "localhost:50051", "the address to connect to")
name = flag.String("name", defaultName, "Name to greet")
)
func main() {
flag.Parse()
// 与服务建立连接.
conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// 创建指定服务的客户端
c := pb.NewGreeterClient(conn)
// 连接服务器并打印出其响应。
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// 调用指定方法
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
}
服务流RPC
通信时可以是一个请求,服务端多次响应,比如查询业务,服务端模糊匹配找到一次就返回客户端一次响应这样的多次响应。
在protobuf中的 service添加以下代码
rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
服务端代码
package main
import (
"context"
"flag"
"fmt"
"google.golang.org/grpc"
"io"
"log"
pb "mygrpc/proto/hello"
"net"
)
var (
port = flag.Int("port", 50051, "The service port")
)
type server struct {
pb.UnimplementedGreeterServer
}
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.GetName())
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
func (s *server) SearchOrders(req *pb.HelloRequest, stream pb.Greeter_SearchOrdersServer) error {
log.Printf("Recved %v", req.GetName())
// 具体返回多少个response根据业务逻辑调整
for i := 0; i < 10; i++ {
// 通过 send 方法不断推送数据
err := stream.Send(&pb.HelloReply{})
if err != nil {
log.Fatalf("Send error:%v", err)
return err
}
}
return nil
}
func (s *server) UpdateOrders(stream pb.Greeter_UpdateOrdersServer) error {
for {
log.Println("开始接受客户端的流")
// Recv 对客户端发来的请求接收
order, err := stream.Recv()
if err == io.EOF {
// 流结束,关闭并发送响应给客户端
return stream.Send(&pb.HelloReply{Message: "接受客户流结束"})
}
if err != nil {
return err
}
// 更新数据
log.Printf("Order ID : %s - %s", order.GetName(), "Updated")
}
}
func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// 开启rpc
s := grpc.NewServer()
// 注册服务
pb.RegisterGreeterServer(s, &server{})
log.Printf("service listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
客户端代码
package main
import (
"context"
"flag"
"io"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
defaultName = "world"
)
var (
addr = flag.String("addr", "localhost:50051", "the address to connect to")
name = flag.String("name", defaultName, "Name to greet")
)
func main() {
flag.Parse()
// 与服务建立连接.
conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
}
}(conn)
// 创建指定服务的客户端
c := pb.NewGreeterClient(conn)
// 连接服务器并打印出其响应。
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// 调用指定方法
searchStream, _ := c.SearchOrders(ctx, &pb.HelloRequest{Name: "开始服务端rpc流测试"})
for {
// 客户端 Recv 方法接收服务端发送的流
searchOrder, err := searchStream.Recv()
if err == io.EOF {
log.Print("EOF")
break
}
if err == nil {
log.Print("Search Result : ", searchOrder)
}
}
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
}
客户流RPC
客户端多个请求发给服务端,服务端发送一个响应给客户端,比如更新业务,客户端的读个请求发过来,服务端更新完返回一个成功的结果
在protobuf中的 service添加以下代码
rpc updateOrders(stream HelloRequest) returns (stream HelloReply);
服务端代码
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
ordersStr := "Updated Order IDs : "
for {
// Recv 对客户端发来的请求接收
order, err := stream.Recv()
if err == io.EOF {
// 流结束,关闭并发送响应给客户端
return stream.SendAndClose(&wrapper.StringValue{Value: "Orders processed " + ordersStr})
}
if err != nil {
return err
}
// 更新数据
orderMap[order.Id] = *order
log.Printf("Order ID : %s - %s", order.Id, "Updated")
ordersStr += order.Id + ", "
}
}
客户端代码
package main
import (
"context"
"flag"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
defaultName = "world"
)
var (
addr = flag.String("addr", "localhost:50051", "the address to connect to")
name = flag.String("name", defaultName, "Name to greet")
)
func main() {
flag.Parse()
// 与服务建立连接.
conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
}
}(conn)
// 创建指定服务的客户端
c := pb.NewGreeterClient(conn)
// 连接服务器并打印出其响应。
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// 调用指定方法
updateStream, err := c.UpdateOrders(ctx)
if err != nil {
log.Fatalf("%v.UpdateOrders(_) = _, %v", c, err)
}
// Updating order 1
if err := updateStream.Send(&pb.HelloRequest{Name: "1"}); err != nil {
log.Fatalf("%v.Send(%v) = %v", updateStream, &pb.HelloRequest{Name: "1"}, err)
}
// Updating order 2
if err := updateStream.Send(&pb.HelloRequest{Name: "2"}); err != nil {
log.Fatalf("%v.Send(%v) = %v", updateStream, &pb.HelloRequest{Name: "2"}, err)
}
// 发送关闭信号并接收服务端响应
err = updateStream.CloseSend()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil)
}
log.Printf("客户端流传输结束")
}
双工流RPC
对应的业务就比如实时的消息流
protobuf
// 设置双工rpc
rpc processOrders(stream HelloRequest) returns (stream HelloReply);
服务端
package main
import (
"flag"
"fmt"
"google.golang.org/grpc"
"io"
"log"
pb "mygrpc/proto/hello"
"net"
"sync"
)
var (
port = flag.Int("port", 50051, "The service port")
)
type server struct {
pb.UnimplementedGreeterServer
}
func (s *server) ProcessOrders(stream pb.Greeter_ProcessOrdersServer) error {
var (
waitGroup sync.WaitGroup // 一组 goroutine 的结束
// 设置通道
msgCh = make(chan *pb.HelloReply)
)
// 计数器加1
waitGroup.Add(1)
// 消费队列中的内容
go func() {
// 计数器减一
defer waitGroup.Done()
for {
v := <-msgCh
fmt.Println(v)
err := stream.Send(v)
if err != nil {
fmt.Println("Send error:", err)
break
}
}
}()
waitGroup.Add(1)
// 向队列中添加内容
go func() {
defer waitGroup.Done()
for {
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("recv error:%v", err)
}
fmt.Printf("Recved :%v \n", req.GetName())
msgCh <- &pb.HelloReply{Message: "服务端传输数据"}
}
close(msgCh)
}()
// 等待 计数器问0 推出
waitGroup.Wait()
// 返回nil表示已经完成响应
return nil
}
func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// 开启rpc
s := grpc.NewServer()
// 注册服务
pb.RegisterGreeterServer(s, &server{})
log.Printf("service listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
客户端
package main
import (
"context"
"flag"
"fmt"
"io"
"log"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
defaultName = "world"
)
var (
addr = flag.String("addr", "localhost:50051", "the address to connect to")
name = flag.String("name", defaultName, "Name to greet")
)
func main() {
flag.Parse()
// 与服务建立连接.
conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
}
}(conn)
// 创建指定服务的客户端
c := pb.NewGreeterClient(conn)
// 连接服务器并打印出其响应。
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// 设置
var wg sync.WaitGroup
// 调用指定方法
stream, _ := c.ProcessOrders(ctx)
if err != nil {
panic(err)
}
// 3.开两个goroutine 分别用于Recv()和Send()
wg.Add(1)
go func() {
defer wg.Done()
for {
req, err := stream.Recv()
if err == io.EOF {
fmt.Println("Server Closed")
break
}
if err != nil {
continue
}
fmt.Printf("Recv Data:%v \n", req.GetMessage())
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 2; i++ {
err := stream.Send(&pb.HelloRequest{Name: "hello world"})
if err != nil {
log.Printf("send error:%v\n", err)
}
}
// 4. 发送完毕关闭stream
err := stream.CloseSend()
if err != nil {
log.Printf("Send error:%v\n", err)
return
}
}()
wg.Wait()
log.Printf("服务端流传输结束")
}
代码仓库
https://github.com/onenewcode/mygrpc.git
也可以直接下载绑定的资源。
- 点赞
- 收藏
- 关注作者
评论(0)