Python——gRPC详解及实战避坑方案(下)

微信扫一扫,分享到朋友圈

Python——gRPC详解及实战避坑方案(下)

上一篇文章讲述了RPC服务的概念和gRPC的基本使用、proto语法的使用教程。然而在我们真正把gRPC服务部署到生产环境上的时候,会遇到很多问题,首选要考虑的就是协议的数据认证问题,其次,gRPC也支持流式通信的模式,本篇文章会做一个介绍说明。

RPC的身份认证

RPC服务一般在服务内网使用,但是也有存在于外网的RPC服务,但是不论是哪一种RPC服务,走http2.0还是其他基于TCP实现socket的协议,在部署生产环境的时候还是需要增加身份加密认证机制的,客户端与服务端需要做一个可信任的认证,以保障数据的安全性。

RPC服务一般的加密认证方法

基于 SSL/TLS 的通道加密通常身份认证机制是通过 TLS/SSL 对传输通道进行加密,以防止请求和响应消息中的敏感数据泄漏。使用的场景主要有三种:

  • 后端微服务直接开放给端侧,例如手机 App、TV、多屏等,没有统一的 API Gateway/SLB 做安全接入和认证;
  • 后端微服务直接开放给 DMZ 部署的管理或者运维类 Portal;
  • 后端微服务直接开放给第三方合作伙伴 / 渠道。 除了上述常用的跨网络场景之外,对于一些安全等级要求比较高的业务场景,即便是内网通信,只要跨主机 /VM/ 容器通信,都强制要求对传输通道进行加密。在该场景下,即便只存在内网各模块的 RPC 调用,仍然需要做 SSL/TLS。

使用 SSL/TLS 的典型场景如下所示:

目前使用最广的 SSL/TLS 工具 / 类库就是 OpenSSL,它是为网络通信提供安全及数据完整性的一种安全协议,囊括了主要的密码算法、常用的密钥和证书封装管理功能以及 SSL 协议。

多数 SSL 加密网站是用名为 OpenSSL 的开源软件包,由于这也是互联网应用最广泛的安全传输方法,被网银、在线支付、电商网站、门户网站、电子邮件等重要网站广泛使用。

针对敏感数据的单独加密有些 RPC 调用并不涉及敏感数据的传输,或者敏感字段占比较低,为了最大程度的提升吞吐量,降低调用时延,通常会采用 HTTP/TCP + 敏感字段单独加密的方式,既保障了敏感信息的传输安全,同时也降低了采用 SSL/TLS 加密通道带来的性能损耗,对于 JDK 原生的 SSL 类库,这种性能提升尤其明显。

它的工作原理如下所示:

通常使用 Handler 拦截机制,对请求和响应消息进行统一拦截,根据注解或者加解密标识对敏感字段进行加解密,这样可以避免侵入业务。

采用该方案的缺点主要有两个:

  • 对敏感信息的识别可能存在偏差,容易遗漏或者过度保护,需要解读数据和隐私保护方面的法律法规,而且不同国家对敏感数据的定义也不同,这会为识别带来很多困难;
  • 接口升级时容易遗漏,例如开发新增字段,忘记识别是否为敏感数据。

gRPC的加密认证方法

对于gRPC,SSL/TLS协议也是基本的身份加密认证方法,SSL/TLS协议采用公钥加密法,客户端向服务器端索要公钥,然后用公钥加密信息,服务器收到密文后,用自己的私钥解密。

SSL/TLSSSL/TLS 分为单向认证和双向认证,在实际业务中,单向认证使用较多,即客户端认证服务端,服务端不认证客户端。认证流程如下:

  • 客户端向服务端传送客户端 SSL 协议的版本号、支持的加密算法种类、产生的随机数,以及其它可选信息;
  • 服务端返回握手应答,向客户端传送确认 SSL 协议的版本号、加密算法的种类、随机数以及其它相关信息;
  • 服务端向客户端发送自己的公钥;
  • 客户端对服务端的证书进行认证,服务端的合法性校验包括:证书是否过期、发行服务器证书的 CA 是否可靠、发行者证书的公钥能否正确解开服务器证书的“发行者的数字签名”、服务器证书上的域名是否和服务器的实际域名相匹配等;
  • 客户端随机产生一个用于后面通讯的“对称密码”,然后用服务端的公钥对其加密,将加密后的“预主密码”传给服务端;
  • 服务端将用自己的私钥解开加密的“预主密码”,然后执行一系列步骤来产生主密码;
  • 客户端向服务端发出信息,指明后面的数据通讯将使用主密码为对称密钥,同时通知服务器客户端的握手过程结束;
  • 服务端向客户端发出信息,指明后面的数据通讯将使用主密码为对称密钥,同时通知客户端服务器端的握手过程结束;
  • SSL 的握手部分结束,SSL 安全通道建立,客户端和服务端开始使用相同的对称密钥对数据进行加密,然后通过 Socket 进行传输。

实践gRPC的TLS认证

我们可以简单通过一个例子来实践一下gRPC的服务端和客户端的TLS认证机制,其中还包括证书的生成、服务端、客户端初始化的编写等等步骤。

生成证书

openssl req -newkey rsa:2048 -nodes -keyout server.key -x509 -days 3650 -out server.crt
复制代码

在执行生成证书的过程中,需要填入Country Name、State or Province Name、Locality Name、Organization Name、Organizational Unit Name、Common Name、Email Address等等,这些按需要填入就可以了,或者都可以留空。 注:其中的Common Name最好是自己定义之后填上,这个在客户端连接的时候可以指定连接的名字,否则留空的话有可能自动获取不到

我们可以定义Common Name为 rpc_service ,全部回车填完就会生成 server.keyserver.crt 文件

部署gRPC服务

接上篇文章的proto文件定义( blog.csdn.net/dream_succe… ) 实现服务端代码server.py:

from concurrent import futures
import time
import grpc
import test_pb2
import test_pb2_grpc
# 实现 proto 文件中定义的 SearchService
class RequestRpc(test_pb2_grpc.SearchService):
# 实现 proto 文件中定义的 rpc 调用
def doRequest(self, request, context):
return test_pb2.Search(query = 'hello {msg}'.format(msg = request.name)) # return的数据是符合定义的SearchResponse格式
def serve():
# 启动 rpc 服务,这里可定义最大接收和发送大小(单位M),默认只有4M
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=[
('grpc.max_send_message_length', 100 * 1024 * 1024),
('grpc.max_receive_message_length', 100 * 1024 * 1024)])
test_pb2_grpc.add_SearchServiceServicer_to_server(RequestRpc(), server)
server.add_insecure_port('[::]:50051')
server.start()
try:
while True:
time.sleep(60*60*24) # one day in seconds
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
serve()
复制代码

客户端代码client.py:

import grpc
import helloworld_pb2
import helloworld_pb2_grpc
def run():
# 连接 rpc 服务器
channel = grpc.insecure_channel('localhost:50051')
# 调用 rpc 服务
stub = test_pb2_grpc.SearchServiceStub(channel)
response = stub.doRequest(test_pb2.SearchRequest(query='henry'))
print("client received: ", response)
if __name__ == '__main__':
run()
复制代码

加入TLS认证

将生成的 server.keyserver.crt 文件放在server.py的项目目录下,其中去过client.py在不同项目的话, server.crt 文件还需要放置在client.py的项目目录下,我们给服务端加上TLS认证逻辑:

from concurrent import futures
import time
import grpc
import test_pb2
import test_pb2_grpc
# 实现 proto 文件中定义的 SearchService
class RequestRpc(test_pb2_grpc.SearchService):
# 实现 proto 文件中定义的 rpc 调用
def doRequest(self, request, context):
return test_pb2.Search(query = 'hello {msg}'.format(msg = request.name)) # return的数据是符合定义的SearchResponse格式
def serve():
# 启动 rpc 服务,这里可定义最大接收和发送大小(单位M),默认只有4M
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=[
('grpc.max_send_message_length', 100 * 1024 * 1024),
('grpc.max_receive_message_length', 100 * 1024 * 1024)])
test_pb2_grpc.add_SearchServiceServicer_to_server(RequestRpc(), server)
server.add_insecure_port('[::]:50051') # 注释掉这句不安全的启动服务的方法,加入以下写法:
# read in key and certificate
with open('server.key', 'rb') as f:
private_key = f.read()
with open('server.crt', 'rb') as f:
certificate_chain = f.read()
# create server credentials
server_credentials = grpc.ssl_server_credentials(
((private_key, certificate_chain,),))
server.add_secure_port('[::]:50051', server_credentials)
server.start()
try:
while True:
time.sleep(60*60*24) # one day in seconds
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
serve()
复制代码

对应的客户端逻辑为:

import grpc
import helloworld_pb2
import helloworld_pb2_grpc
def run():
# 读取证书
with open('server.crt', 'rb') as f:
trusted_certs = f.read()
credentials = grpc.ssl_channel_credentials(
root_certificates=trusted_certs)
# 连接 rpc 服务器,这里填入证书的COMMON NAME,我们定义的是rpc_service
channel = grpc.secure_channel("{}:{}".format('localhost', 50051), credentials,
options=(('grpc.ssl_target_name_override', "rpc_service",),
('grpc.max_send_message_length', 100 * 1024 * 1024),
('grpc.max_receive_message_length', 100 * 1024 * 1024)))
# 调用 rpc 服务
stub = test_pb2_grpc.SearchServiceStub(channel)
response = stub.doRequest(test_pb2.SearchRequest(query='henry'))
print("client received: ", response)
if __name__ == '__main__':
run()
复制代码

以上就可以实现带TLS认证的gRPC服务啦

gRPC的流式通信

流式通信的方式

grpc同http通讯一样,也是基于“请求响应”模式的一种通讯。 根据不同业务场景,可以分为:

  1. 客户端单次请求,服务端回应一次:
  2. 客户端一次请求,服务端流式应答(其实相当于返回给客户端多条数据)
  3. 客户端流式请求,服务端回应一次
  4. 客户端流式请求,服务端流式应答 类似于建立socket连接,做成聊天会话一样,服务端也可以给客户端推送很多数据,比如客户端在初始化连接的时候发个空消息或者消息id同步的数据给服务端,服务端可以把要同步给客户端的数据流式地传递给客户端。

流式通信的具体实现

例如我们要实现一个客户端试试从服务端获取位置经纬度的功能,我们可以在通过这种流式处理来完成。 服务端,在server.start()之前add这个流式获取经纬度的方法:

def ListFeatures(self, request, context):
left = min(request.lo.longitude, request.hi.longitude)
right = max(request.lo.longitude, request.hi.longitude)
top = max(request.lo.latitude, request.hi.latitude)
bottom = min(request.lo.latitude, request.hi.latitude)
for feature in self.db:
if (feature.location.longitude >= left and
feature.location.longitude <= right and
feature.location.latitude >= bottom and
feature.location.latitude <= top):
yield feature
复制代码

python里面是通过yield关键字实现一个生成器,流式响应的。 客户端通过for循环或者也可以用生成器逐步获取服务端的流式响应:

for feature in stub.ListFeatures(rectangle):
复制代码

什么时候用Streaming RPC

  1. 大规模数据包
  2. 实时场景

gRPC的异常处理

随着互联网的快速发展,互联网服务早已不是单体应用,而是由若干个模块组成的微服务,每个模块可以进行单独的扩容、缩容,独立上线部署等等;模块与模块之间通过网络进行联通。我们的应用必须对网络错误进行妥善的处理。比如网络出现抖动,正在通信的对端机器正好重新上线等。

gRPC的异常类型

gRPC有自己一套类似HTTP status code的错误码,每个错误码都是个字符串,如 INTERNAL、ABORTED、UNAVAILABLE。 我们经常遇到的就是“StatusCode=Unavailable, Detail=”failed to connect to all addresses”这样的报错 原因分析

  1. RPC的客户端请求没有到达服务端,例如网络解析抖动,云主机ip地址变更等
  2. 服务端初始化的实例有问题,无法处理客户端请求
  3. RPC客户端与服务端的连接断开或者连接未完成

对于以上情况,我们就需要增加重连机制、重发机制来保证服务可靠性。

重连机制

我们可以在连接gRPC服务的时候添加try except机制捕获异常,一旦发现连接异常可以尝试重试连接,如果在flask框架或者tornado框架里面,可以把连接方法写成一个单例,然后在框架封装好的请求异常处理机制里面添加重连,例如flask可以定义:

@app.errorhandler(Exception)
def handle_exception(e):
# 此处添加e的类型判断,如果是RPC连接出错,执行重新连接的代码
复制代码

如果是tornado可以定义:

def log_exception(self, typ, value, tb):
if issubclass(typ, RpcConnectError):
# 此处添加重新连接RPC的代码
复制代码

这样就可以保证在你的系统中RPC的连接异常得到重连尝试,以达到可靠性要求

重试机制

gRPC的channel在初始化的时候可以指定options参数,我们之前指定了最大发送和最大接收的字节数大小,这里也可以指定自动重试的配置,详情参考: github.com/grpc/propos… 透明重试 ,透明重试会在服务端的应用逻辑并没有接收到请求的时候,gRPC 会进行自动的重试。透明重试可以解决了上诉原因分析里面的第1、2点,我们也可以自行配置重试,通过配置service config的retryPolicy参数。

实现案例:

options = [('grpc.max_send_message_length', 100 * 1024 * 1024),
('grpc.max_receive_message_length', 100 * 1024 * 1024),
('grpc.enable_retries', 1),
('grpc.service_config', '{ "retryPolicy":{ "maxAttempts": 4, "initialBackoff": "0.1s", "maxBackoff": "1s", "backoffMutiplier": 2, "retryableStatusCodes": [ "UNAVAILABLE" ] } }')]
channel = grpc.insecure_channel("{}:{}".format('localhost', 50051),
options=options)
复制代码

上面的代码开启了grpc.enable_retries,虽然默认开启,但是设置为0也可以关闭掉透明重试; 另外grpc.service_config是一个配置,我们可以配置重试策略(参数配置可参考: grpc.github.io/grpc/core/g… ):

{
"retryPolicy":{
"maxAttempts": 4,
"initialBackoff": "0.1s",
"maxBackoff": "1s",
"backoffMutiplier": 2,
"retryableStatusCodes": [
"UNAVAILABLE" ]
}
}
复制代码

针对UNAVAILABLE这种错误进行重试,可以指定重试次数等等,具体参数含义可参考官网,简单介绍一下:

  • maxAttempts 必须是大于 1 的整数,对于大于5的值会被视为5
  • initialBackoff 和 maxBackoff 必须指定,并且必须具有大于0
  • backoffMultiplier 必须指定,并且大于零
  • retryableStatusCodes 必须制定为状态码的数据,不能为空,并且没有状态码必须是有效的 gPRC 状态码,可以是整数形式,并且不区分大小写

对冲策略

对冲是指在不等待响应的情况主动发送单次调用的多个请求 如果一个方法使用对冲策略,那么首先会像正常的 RPC 调用一样发送第一次请求,如果配置时间内没有响应,那么直接发送第二次请求,以此类推,直到发送了 maxAttempts 次

当对冲请求接收到 nonFatalStatusCodes后,会立即发送下一个对冲请求,不管 hedgingDelay 如果受到其他的状态码,则所有未完成的对冲请求都将被取消,并且将状态码返回给调用者

实现案例

options = [('grpc.max_send_message_length', 100 * 1024 * 1024),
('grpc.max_receive_message_length', 100 * 1024 * 1024),
('grpc.enable_retries', 1),
('grpc.service_config', '{ "hedgingPolicy":{ "maxAttempts": 4, "hedgingDelay": "0.5s", "nonFatalStatusCodes":[ "UNAVAILABLE", "INTERNAL", "ABORTED" ] } }')]
channel = grpc.insecure_channel("{}:{}".format('localhost', 50051),
options=options)
复制代码

注意: 使用对冲的时候,请求可能会访问到不同的后端(如果设置了负载均衡),那么就要求方法在多次执行下是安全,并且符合预期的

重试限流

当客户端的失败和成功比超过某个阈值时,gRPC 会通过禁用这些重试策略来防止由于重试导致服务器过载,这就是重试限流策略。同样可以在servie config中配置:

"retryThrottling":{
"maxTokens": 10,
"tokenRatio": 0.1
}
复制代码

对于每一个服务器,gRPC 客户端会维护一个 token_count 变量,最初设置为 maxToken , 值的范围是 0 – maxToken

对于每个 RPC 请求都会对 token_count 产生一下效果

  • 每个失败的 RPC 请求都会递减token_count 1
  • 成功 RPC 将会递增 token_count和tokenRatio 如果 token_count <= ( maxTokens / 2), 则关闭重试策略,直到 token_count > (maxTokens/2),恢复重试

对于对冲 RPC,发送第一个RPC请求后,如果 token_count 大于(maxTokens/2),才会发送后续的对冲请求 当 token_count <= ( maxTokens / 2) 时,重试请求会被取消,并且将状态码返回给调用者

实际的限流参数配置,还是需要根据服务器的性能资源来衡量。

参考文献

JavaScript 闭包的深入探索

上一篇

YARN 在快手的应用实践与技术演进之路

下一篇

你也可能喜欢

Python——gRPC详解及实战避坑方案(下)

长按储存图像,分享给朋友