郑文峰的博客 郑文峰的博客
首页
  • python之路
  • go之路
  • 其他
  • redis
  • mysql
  • docker
  • k8s
读书破万卷
周刊
关于
  • 导航 (opens new window)
  • 代码片段 (opens new window)
  • 收藏
  • 友链
  • 外部页面

    • 开往 (opens new window)
  • 索引

    • 分类
    • 标签
    • 归档
GitHub (opens new window)

zhengwenfeng

穷则变,变则通,通则久
首页
  • python之路
  • go之路
  • 其他
  • redis
  • mysql
  • docker
  • k8s
读书破万卷
周刊
关于
  • 导航 (opens new window)
  • 代码片段 (opens new window)
  • 收藏
  • 友链
  • 外部页面

    • 开往 (opens new window)
  • 索引

    • 分类
    • 标签
    • 归档
GitHub (opens new window)
  • python

    • 基础

    • 第三方库

    • django

    • flask

    • tornado

    • 其他

      • python简单使用grpc
        • 0. 相关链接
        • 1. 创建protobuf文件
        • 2. 编译proto文件
        • 3. 简单测试protobuf数据结构的序列化与反序列化
        • 4. 创建grpc服务端
        • 5. 创建grpc客户端
      • pyspark streaming简介 和 消费 kafka示例
  • go

  • 其他

  • 编程
  • python
  • 其他
zhengwenfeng
2022-09-06
目录

python简单使用grpc

# 0. 相关链接

源码案例:https://github.com/tenqaz/python-examples (opens new window)

官方文档:https://grpc.io/docs/languages/python/quickstart (opens new window)

# 1. 创建protobuf文件

在目录proto目录下创建user.proto文件,创建User的rpc服务定义,该服务中包含AddUser和GetUser两个调用,并使用下面创建的对应的结构体作为请求体和响应体。 注意:需要添加package proto,否则下面编译生成的python文件引用路径则不正确。

syntax = "proto3";

// 包名
package proto;

// 定义User rpc服务
service User {
  // 定义rpc服务的方法
  rpc AddUser (UserRequest) returns (UserResponse);
  rpc GetUser (GetUserRequest) returns (GetUserResponse);
}

// 请求的结构体
message UserRequest {
  string name = 1;
  uint32 age = 2;
}

// 响应的结构体
message UserResponse {
  string msg = 1;
  int32 code = 2;
}

message GetUserRequest {
  string name = 1;
}

message GetUserResponse {
  string name = 1;
  string age = 2;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 2. 编译proto文件

首选需要安装grpc的库和工具

python -m pip install grpcio #安装grpc
python -m pip install grpcio-tools #安装grpc tools
1
2

然后,运行命令对proto文件进行编译,会根据上面的proto文件生成对应的python文件,你会发现在proto目录下创建了user_pb2.py和user_pb2_grpc.py两个文件

python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. ./proto/user.proto
1
  • --python_out=.,protobuf相关代码文件生成在这里
  • --grpc_python_out=.,grpc相关代码生成在这里
  • -I. ./proto/user.proto,proto文件路径

编译后:

  • user_pb2.py,用来和 protobuf 数据进行交互,这个就是根据proto文件定义好的数据结构类型生成的python化的数据结构文件
  • user_pb2_grpc.py: 用来和 grpc 进行交互,这个就是定义了rpc方法的类,包含了类的请求参数和响应等等,可用python直接实例化调用

# 3. 简单测试protobuf数据结构的序列化与反序列化

我们创建proto_test.py文件,创建User对象,填充值,并将该对象序列化成字符串输出

from proto import user_pb2

# 创建Student对象,将该对象序列化成字符串
s = user_pb2.UserRequest()
s.name = "zhangsan"
s.age = 12
req_str = s.SerializeToString()
print(req_str)
1
2
3
4
5
6
7
8

输出:

b'\n\x08zhangsan\x10\x0c'
1

然后我们再创建User对象将将上面的输出的序列化字符串反序列化进来。

# 将上面的输出的序列化字符串反序列化成对象
s2 = user_pb2.UserRequest()
s2.ParseFromString(req_str)
print(s2.name)
print(s2.age)
1
2
3
4
5

输出:

zhangsan
12
1
2

# 4. 创建grpc服务端

下面是使用之前创建的protobuf和grpc文件来构建grpc服务端代码。

import logging
from concurrent import futures

import grpc

from proto import user_pb2, user_pb2_grpc


class UserService(user_pb2_grpc.UserServicer):

    # 实现proto文件中rpc的调用
    def AddUser(self, request: user_pb2.UserRequest, context):
        return user_pb2.UserResponse(msg='add user(name={},age={}) success'.format(request.name, request.age), code=0)

    def GetUser(self, request: user_pb2.GetUserRequest, context):
        return user_pb2.GetUserResponse(name=request.name, age="1888")


def serve():
    # 使用线程池来完成grpc的请求
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=5))
    user_pb2_grpc.add_UserServicer_to_server(UserService(), server)
    server.add_insecure_port('[::]:50051')  # 绑定端口
    server.start()
    server.wait_for_termination()


if __name__ == '__main__':
    logging.basicConfig()
    serve()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

运行该服务端,会阻塞等待客户端的请求。

# 5. 创建grpc客户端

import logging

import grpc

from proto import user_pb2, user_pb2_grpc


def run():
    # 连接rpc服务
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = user_pb2_grpc.UserStub(channel)

        # 调用rpc服务的AddUser方法
        response: user_pb2.UserResponse = stub.AddUser(user_pb2.UserRequest(name="zhangsan", age=18))
        print("add user, response is 'msg={}, code={}'".format(response.msg, response.code))

        # 调用rpc服务的GetUser方法
        response: user_pb2.GetUserResponse = stub.GetUser(user_pb2.GetUserRequest(name="lisi"))
        print("get user[name={}, age={}]".format(response.name, response.age))


if __name__ == '__main__':
    logging.basicConfig()
    run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

运行客户端,调用rpc服务,输出:

add user, response is 'msg=add user(name=zhangsan,age=18) success, code=0'
get user[name=lisi, age=1888]
1
2
#python
上次更新: 2023/01/15, 15:47:48
tornado 使用peewee-async 完成异步orm数据库操作
pyspark streaming简介 和 消费 kafka示例

← tornado 使用peewee-async 完成异步orm数据库操作 pyspark streaming简介 和 消费 kafka示例→

最近更新
01
django rest_framework 分页
03-20
02
学习周刊-第03期-第09周
03-03
03
学习周刊-第02期-第08周
02-24
更多文章>
Theme by Vdoing | Copyright © 2022-2023 zhengwenfeng | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式