黑龙江新闻联播:Go gRPC“教程”-服务端流式RPC({三})

admin 1个月前 (08-19) 科技 59 2

〖前言〗

“上一篇”先容‘了’简朴模式RPC,『当数(据)量大』或者需要一「直」传 输数(据)[时刻,〖我们应该使用流式〗RPC,它允许我们边处置边传 输数(据)[。「本篇先先容」服务端流式RPC

《『服务端』流式》RPC:【〖客户〗端发送请】求到服务<器>,拿到一个流去读取返回{的}《新闻》【序列】。 〖客户〗端读取返回{的}流,直到内里没有任何《新闻》。

【情景模】拟:实时‘获取股票走势’。

1.【〖客户〗端要获取某原油股】{的}实时走势,{〖客户〗端发送}一个请求

2.服务端实时返回该股票{的}走势

【新建】proto<文>件

【新建】server_stream.proto<文>件

1.【界说发送信息】

// (界说发送请求信息)
message SimpleRequest{
    // 界说发送{的}参数,(接纳)驼峰命名< 〖「<{ ‘方式’[}>」〗[>,{小写加}下划线,“如”:student_name
    // 请求参数
    string data = 1;
}

2.界说吸收信息

// (界说流)式响应信息
message StreamResponse{
    // {流}式响应数(据)
    string stream_value = 1;
}

3.界说服务< 〖「<{ ‘方式’[}>」〗[>ListValue

《『服务端』流式》rpc,【只】要{『在』响应}数(据)前添加stream《即可》

// 界说我们{的}服务(“可界说多个服务”,每个服务可界说多个接口)
service StreamServer{
    // 《『服务端』流式》rpc,{『在』响应}数(据)前添加stream
    rpc ListValue(SimpleRequest)returns(stream StreamResponse){};
}

4.「编译」proto<文>件

进入[server_stream.proto所『在』目录,{运行指令}:

protoc --go_out=plugins=grpc:./ ./simple.proto

《确》立Server端

1.界说我们{的}服务,并「<实现>」ListValue< 〖「<{ ‘方式’[}>」〗[>

// SimpleService 界说我们{的}服务
type StreamService struct{}
// ListValue 「<实现>」ListValue< 〖「<{ ‘方式’[}>」〗[>
func (s *StreamService) ListValue(req *pb.SimpleRequest, srv pb.StreamServer_ListValueServer) error {
	for n := 0; n < 5; n++ {
		// 向流中发送《新闻》, <『《【默认每次】》』>send送《新闻》最大『长度《为》』`math.MaxInt32`bytes
		err := srv.Send(&pb.StreamResponse{
			StreamValue: req.Data + strconv.Itoa(n),
		})
		if err != nil {
			return err
		}
	}
	return nil
}

初学者可能以《为》对照疑《惑》,ListValue{的}参数和返回值是怎样确定{的}。实『在』这些都是「编译」proto时天生{的}.pb.go<文>件中有界说,我们只需要「<实现>」就可以‘了’。

2.{启动}gRPC服务<器>

const (
	// Address 监听地址
	Address string = ":8000"
	// Network  网络通信协议[
	Network string = "tcp"
)

func main() {
	// 《监听内陆端口》
	listener, err := net.Listen(Network, Address)
	if err != nil {
		log.Fatalf("net.Listen err: %v", err)
	}
	log.Println(Address + " net.Listing...")
	// 【新建】gRPC服务<器>实例
	// 默认单次吸收最大《新闻》『长度《为》』`1024*1024*4`bytes(4M),单次发送《新闻》最大『长度《为》』`math.MaxInt32`bytes
	// grpcServer := grpc.NewServer(grpc.MaxRecvMsgSize(1024*1024*4), grpc.MaxSendMsgSize(math.MaxInt32))
	grpcServer := grpc.NewServer()
	// 『在』gRPC服务<器>注册我们{的}服务
	pb.RegisterStreamServerServer(grpcServer, &StreamService{})

	//用服务<器> Serve() < 〖「<{ ‘方式’[}>」〗[>以及我们{的}端口信息区「<实现>」壅闭守候,(直到历程被杀死或)者 Stop() 被<挪用>
	err = grpcServer.Serve(listener)
	if err != nil {
		log.Fatalf("grpcServer.Serve err: %v", err)
	}
}

运行服务端

go run server.go
:8000 net.Listing...

《确》立Client端

1.《确》立<挪用>服务端ListValue< 〖「<{ ‘方式’[}>」〗[>

// listValue <挪用>服务端{的}ListValue< 〖「<{ ‘方式’[}>」〗[>
func listValue() {
	// 《确》立发送结构体
	req := pb.SimpleRequest{
		Data: "stream server grpc ",
	}
	// <挪用>我们{的}服务(ListValue< 〖「<{ ‘方式’[}>」〗[>)
	stream, err := grpcClient.ListValue(context.Background(), &req)
	if err != nil {
		log.Fatalf("Call ListStr err: %v", err)
	}
	for {
		//Recv() < 〖「<{ ‘方式’[}>」〗[>吸收服务端《新闻》,<『《【默认每次】》』>Recv()最大《新闻》『长度《为》』`1024*1024*4`bytes(4M)
		res, err := stream.Recv()
		// 判断《新闻》流是否已经竣{<事>}
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("ListStr get stream err: %v", err)
		}
		// {打}印返回值
		log.Println(res.StreamValue)
	}
}

2.{启动}gRPC〖客户〗端

// Address 「『毗邻』地址」
const Address string = ":8000"

var grpcClient pb.StreamServerClient

func main() {
	// 『毗邻』服务<器>
	conn, err := grpc.Dial(Address, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("net.Connect err: %v", err)
	}
	defer conn.Close()

	// 《确》立gRPC『毗邻』
	grpcClient = pb.NewStreamServerClient(conn)
	route()
	listValue()
}

运行〖客户〗端

go run client.go
stream server grpc 0
stream server grpc 1
stream server grpc 2
stream server grpc 3
stream server grpc 4

〖客户〗端一直从服务端获「取数(据)」

思索

“如”果服务端一直发送数(据),类似‘获取股票走势’实时数(据),〖客户〗端能自己住手获「取数(据)」吗?

「谜底」:可以{的}

1.我们把服务端{的}ListValue< 〖「<{ ‘方式’[}>」〗[>稍微修改

// ListValue 「<实现>」ListValue< 〖「<{ ‘方式’[}>」〗[>
func (s *StreamService) ListValue(req *pb.SimpleRequest, srv pb.StreamServer_ListValueServer) error {
	for n := 0; n < 15; n++ {
		// 向流中发送《新闻》, <『《【默认每次】》』>send送《新闻》最大『长度《为》』`math.MaxInt32`bytes
		err := srv.Send(&pb.StreamResponse{
			StreamValue: req.Data + strconv.Itoa(n),
		})
		if err != nil {
			return err
		}
		log.Println(n)
		time.Sleep(1 * time.Second)
	}
	return nil
}

2.再把〖客户〗端<挪用>ListValue< 〖「<{ ‘方式’[}>」〗[>{的}「<实现>」稍作修改,就可以获得效果‘了’

// listValue <挪用>服务端{的}ListValue< 〖「<{ ‘方式’[}>」〗[>
func listValue() {
	// 《确》立发送结构体
	req := pb.SimpleRequest{
		Data: "stream server grpc ",
	}
	// <挪用>我们{的}服务(Route< 〖「<{ ‘方式’[}>」〗[>)
	// 同时传入‘了’一个 context.Context ,『在』有需要时可以让我们〖改〗变RPC{的}行《为》,好比超时/作废一个正『在』运行{的}RPC
	stream, err := grpcClient.ListValue(context.Background(), &req)
	if err != nil {
		log.Fatalf("Call ListStr err: %v", err)
	}
	for {
		//Recv() < 〖「<{ ‘方式’[}>」〗[>吸收服务端《新闻》,<『《【默认每次】》』>Recv()最大《新闻》『长度《为》』`1024*1024*4`bytes(4M)
		res, err := stream.Recv()
		// 判断《新闻》流是否已经竣{<事>}
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("ListStr get stream err: %v", err)
		}
		// {打}印返回值
		log.Println(res.StreamValue)
		break
	}
	//‘可以使用’CloseSend()关闭stream,‘这样服务端就’不会继续发生流《新闻》
	//<挪用>CloseSend()『后』,若继续<挪用>Recv(),《<会重新>激活》stream,接着之前效果获取《新闻》
	stream.CloseSend()
}

只需要<挪用>CloseSend()< 〖「<{ ‘方式’[}>」〗[>,就可以关闭服务端{的}stream,让它住手发送数(据)。值得注意{的}是,<挪用>CloseSend()『后』,若继续<挪用>Recv(),《<会重新>激活》stream,接着当前{的}效果继续获取《新闻》。

这能完善解决〖客户〗端‘暂’停->继续获「取数(据)」{的}操作。

〖总结〗

本篇先容‘了’《『服务端』流式》RPC{的}简朴适用,〖客户〗端提议一个请求,服务端一直返回数(据),直到服务端住手发送数(据)或〖客户〗端自动住手吸收数(据)《为》止。下篇将先容〖客户〗端流式RPC

教程源码地址:https://github.com/Bingjian-Zhu/go-grpc-example
{参考}:gRPC『官方文档中文版』

,

诚信『在』线

诚信『在』线(www.dongfangculture.com )现已开放诚信『在』线手机《版下载》。『游戏公平』、“公开”、公正,“用实力赢取信誉”。

欧博开户声明:该文看法仅代表作者自己,与本平台无关。转载请注明:黑龙江新闻联播:Go gRPC“教程”-服务端流式RPC({三})

网友评论

  • (*)

最新评论

  • 联博开奖 2020-05-30 00:01:14 回复

    联博统计www.weqvip.com采用以太坊区块链高度哈希值作为统计数据,联博以太坊统计数据开源、公平、无任何作弊可能性。联博统计免费提供API接口,支持多语言接入。拉你进夸夸群

    1
    • Allbet注册 2020-06-21 02:40:24 回复

      @联博开奖 欧博注册欢迎进入欧博注册(Allbet Game):www.aLLbetgame.us,欧博官网是欧博集团的官方网站。欧博官网开放Allbet注册、Allbe代理、Allbet电脑客户端、Allbet手机版下载等业务。贼美丽的文~

文章归档

站点信息

  • 文章总数:444
  • 页面总数:0
  • 分类总数:8
  • 标签总数:980
  • 评论总数:105
  • 浏览总数:2752