PAI Python SDK提供了易用的API(即HighLevel API),支持用户将模型部署到PAI创建推理服务。本文档介绍了如何使用SDK在PAI部署推理服务。
概要介绍
SDK提供了HighLevel API,即pai.model.Model
和pai.predictor.Predictor
,支持用户将模型部署到EAS,以及进行调用测试。
通过SDK创建推理服务的基本流程包括:
-
通过
pai.model.InferenceSpec
对象描述模型的推理服务配置,包括使用的 Processor 或是镜像的信息。 -
使用
InferenceSpec
对象和待部署的模型文件创建一个pai.model.Model
对象。 -
通过
pai.model.Model.deploy()
方法,指定服务使用的资源,服务名称等信息,在PAI创建一个推理服务。 -
通过
deploy
方法返回pai.predictor.Predictor
对象,提供了predict
方法向推理服务发送预测请求。
示例代码如下:
from pai.model import InferenceSpec, Model, container_serving_spec
from pai.image import retrieve, ImageScope
# 1. 使用 PAI 提供的PyTorch 推理镜像
torch_image = retrieve("PyTorch", framework_version="latest",
image_scope=ImageScope.INFERENCE)
# 2. 使用InferenceSpec描述模型的推理配置信息
inference_spec = container_serving_spec(
# 推理服务的启动命令
command="python app.py",
source_dir="./src/"
# 使用的推理镜像
image_uri=torch_image.image_uri,
)
# 3. 构建Model对象,用于模型部署
model = Model(
# 使用OSS Bucket上的模型文件
model_data="oss:///path-to-model-data",
inference_spec=inference_spec,
)
# 4. 部署模型到PAI-EAS,创建在线推理服务,返回Predictor对象
predictor = m.deploy(
service_name="example_torch_service",
instance_type="ecs.c6.xlarge",
)
# 5. 测试推理服务
res = predictor.predict(data=data)
配置模型的InferenceSpec
您可以通过Processor或镜像的方式部署推理服务,pai.model.InferenceSpec
对象用于描述模型的推理的配置,例如使用Processor或是镜像部署、运行服务的存储配置、模型服务预热配置、服务的RPC Batch功能配置等,构建的InferenceSpec
对象最终会用于推理服务的创建。
使用预置 Processor
Processor是PAI对于推理服务程序包的抽象描述,它能够基于用户提供的模型,直接构建一个推理服务。PAI提供了预置的Processor,支持一系列常见的机器学习模型格式,包括Tensorflow SavedModel、PyTorch TorchScript、XGBoost、LightGBM、PMML等,完整的介绍请参考文档:预置Processor使用说明。
对于使用Processor方式部署模型,可以参考以下示例配置InferenceSpec
。
# 使用预置的TensorFlow Processor
tf_infer_spec = InferenceSpec(processor="tensorflow_cpu_2.3")
# 使用预置的PyTorch Processor
tf_infer_spec = InferenceSpec(processor="pytorch_cpu_1.10")
# 使用预置的XGBoost Processor
xgb_infer_spec = InferenceSpec(processor="xgboost")
您可以在InferenceSpec
实例上配置推理服务的更多功能,例如配置服务预热文件、服务的RPC配置等,完整的服务参数信息可以见文档:服务模型所有相关参数说明。
# 直接配置InferenceSpec的属性
tf_infer_spec.warm_up_data_path = "oss:///path/to/warmup-data" # 配置服务预热文件路径
tf_infer_spec.metadata.rpc.keepalive = 1000 # 配置请求链接的keepalive时长
print(tf_infer_spec.warm_up_data_path)
print(tf_infer_spec.metadata.rpc.keepalive)
使用镜像部署
使用Processor部署模型提供了易用性,但是无法支持用户灵活自定义的诉求,例如模型或是推理服务程序有较为复杂的依赖。对于类似的场景,PAI提供了镜像部署的方式,支持用户以更加灵活自定义的方式部署模型。
您可以将模型服务的代码和相关的依赖打包构建成一个Docker镜像,然后推送到阿里云ACR镜像仓库服务,然后基于以上的Docker镜像构建InferenceSpec ,用于模型的部署。
from pai.model import InferenceSpec, container_serving_spec
# 通过 `container_serving_spec` 方法,用户可以构建一个使用镜像服务模型的InferenceSpec.
container_infer_spec = container_serving_spec(
# 推理服务运行使用的镜像
image_uri="",
# 运行在容器内的推理服务需要监听的端口, 用户发送的预测请求会被PAI转发到服务容器的该端口
port=8000,
environment_variables=environment_variables,
# 推理服务的启动命令
command=command,
# 推理服务依赖的Python包。
requirements=[
"scikit-learn",
"fastapi==0.87.0",
],
)
print(container_infer_spec.to_dict())
m = Model(
model_data="oss:///path-to-tensorflow-saved-model",
inference_spec=custom_container_infer_spec,
)
p = m.deploy(
instance_type="ecs.c6.xlarge"
)
当通过自定义镜像部署的方式部署模型时,需要将推理服务运行所需的代码准备到运行容器、构建镜像并推送到镜像仓库。SDK提供便利方法,支持您将本地的代码以及基础镜像的方式构建推理服务,而无需构建镜像。pai.model.container_serving_spec()
支持通过source_dir
指定一个本地代码文件目录(参数source_dir
),SDK会将代码目录打包上传到OSS Bucket,然后将OSS Bucket的路径挂载到运行容器中。用户指定的启动命令可以拉起推理服务。
from pai.model import InferenceSpec
inference_spec = container_serving_spec(
# 用户推理程序所在的本地目录路径,会被上传到OSS Bucket,然后挂载到运行容器,默认为 /ml/usercode/
source_dir="./src",
# 服务启动命令。当用户指定了 source_dir,则默认使用 /ml/usercode 作为工作目录执行command。
command="python run.py",
image_uri="",
requirements=[
"fastapi",
"uvicorn",
]
)
print(inference_spec.to_dict())
当用户有还有更多的数据、代码或是模型准备到推理服务的容器内时,可以使用pai.model.InferenceSpec.mount()
方法,将一个本地目录数据或是OSS上的数据路径挂载到在线服务容器中。
# 将本地的数据上传到OSS,然后挂载到容器的 `/ml/tokenizers` 目录下
inference_spec.mount("./bert_tokenizers/", "/ml/tokenizers/")
# 直接挂载用户存储在 OSS 上的数据到容器的 `/ml/data` 目录下
inference_spec.mount("oss:///path/to/data/", "/ml/data/")
获取PAI提供的公共镜像
PAI提供了一些常见的框架推理镜像,包括TensorFlow
、PyTorch
、XGBoost
等,支持用户快速创建推理服务。用户可以通过pai.image.list_images
,pai.image.retrieve
方法传递image_scope=ImageScope.INFERENCE
信息从而获取到相应的推理镜像,然后使用镜像部署的方式部署模型。
from pai.image import retrieve, ImageScope, list_images
# 获取PAI提供的所有 PyTorch 推理镜像
for image_info in list_images(framework_name="PyTorch", image_scope=ImageScope.INFERENCE):
print(image_info)
# 获取PAI提供的PyTorch 1.12版本的CPU推理镜像
retrieve(framework_name="PyTorch", framework_version="1.12", image_scope=ImageScope.INFERENCE)
# 获取PAI提供的PyTorch 1.12版本的GPU推理镜像
retrieve(framework_name="PyTorch", framework_version="1.12", accelerator_type="GPU", image_scope=ImageScope.INFERENCE)
# 获取PAI提供的PyTorch 最新版本的GPU推理镜像
retrieve(framework_name="PyTorch", framework_version="latest", accelerator_type="GPU", image_scope=ImageScope.INFERENCE)
部署在线推理服务
用户使用pai.model.InferenceSpec
和模型数据地址model_data
构建一个模型对象pai.model.Model
,然后通过调用.deploy
方法部署模型。model_data
可以是一个OSS URI,也可以是本地路径,对于本地路径的模型,相应的模型文件会被上传到OSS Bucket上,然后准备到推理服务中,供对应的服务程序加载使用。
当调用.deploy
方法部署模型时,用户需要指定服务所需的资源配置,服务实例个数,服务名称等服务相关参数。更多高阶参数说明,请参见服务模型所有相关参数说明。
from pai.model import Model
model = Model(
# model_data 模型所在的路径,可以是OSS URI,或是是本地路径。对于本地路径的模型,默认会被上传到OSS Bucket上。
model_data="oss:///path-to-model-data",
inference_spec=inference_spec,
)
# 部署到PAI-EAS
predictor = m.deploy(
# 推理服务的名称
service_name="example_xgb_service",
# 服务使用的机器类型
instance_type="ecs.c6.xlarge",
# 机器实例/服务的个数
instance_count=2,
# 用户的专有资源组,可选. 默认使用公共资源组
# resource_id="",
options={
"metadata.rpc.batching": True,
"metadata.rpc.keepalive": 50000,
"metadata.rpc.max_batch_size": 16,
"warm_up_data_path": "oss:///path-to-warmup-data",
},
)
当用户需要根据服务使用的资源数量(例如CPU、Memory)配置服务时,可以通过resource_config
参数配置每个服务实例申请的资源,示例如下:
from pai.model import ResourceConfig
predictor = m.deploy(
service_name="dedicated_rg_service",
# 指定单个服务实例使用的CPU和Memory资源
# 当前示例中,每一个服务使用2个核的CPU,以及4000Mb的内存
resource_config=ResourceConfig(
cpu=2,
memory=4000,
),
)
调用推理服务
pai.model.Model.deploy
方法通过调用EAS的API创建一个新的推理服务,并返回一个pai.predictor.Predictor
对象,指向新创建的推理服务。Predictor对象提供了predict
和raw_predict
方法,支持向推理服务发送预测请求。
说明
pai.predictor.Predictor.raw_predict
的输入和输出不会使用Serializer进行处理.
from pai.predictor import Predictor, EndpointType
# 创建一个新的推理服务
predictor = model.deploy(
instance_type="ecs.c6.xlarge",
service_name="example_xgb_service",
)
# 使用已有的推理服务
predictor = Predictor(
service_name="example_xgb_service",
# 默认使用 INTERNET 公网网络访问,用户可以配置使用 VPC 的网络(需要客户端代码运行在VPC环境下).
# endpoint_type=EndpointType.INTRANET,
)
# .predict 向对应服务发送数据请求,拿到相应结果。输入数据和响应结果会经过serializer处理。
res = predictor.predict(data_in_nested_list)
# .raw_predict 支持更为灵活的方式发送请求给到推理服务
response: RawResponse = predictor.raw_predict(
# 如果输入数据是bytes,或是file-like object,请求数据直接在HTTP请求体内传递。
# 否则则会经过一次JSON序列化,然后放在HTTP请求体内传递。
data=data_in_nested_list
# path="predict" # 自定义HTTP请求路径,默认将请求发送到"/"路径。
# headers=dict(), # 自定义请求Header
# method="POST" # 自定义请求的HTTP Method
# timeout=30, # 自定义请求的timeout
)
# 获取返回的body, headers
print(response.content, response.headers)
# 将返回结果JSON反序列化为Python对象
print(response.json())
# 停止推理服务
predictor.stop_service()
# 开始推理服务
predictor.start_service()
# 删除推理服务
predictor.delete_service()
使用Serializer处理推理服务的输入和输出
当使用SDK的Predictor对象请求推理服务,需要将输入的Python的数据结构序列化,将其转换为服务能够支持的数据格式进行传输。服务响应返回的数据也同样需要做一次反序列化转为可读,或是可以操作的Python对象。Predictor使用serializer
参数的处理预测数据序列化,以及预测响应结果的反序列化:
-
当调用
predict(data=)
方法时,data
参数会通过serilizer.serialize
方法序列化请求数据,获得的bytes
类型的结果,然后通过HTTP请求体传递给预测服务。 -
当推理服务返回HTTP响应结果之后,
Predictor
对象通过serializer.deserialize
方法反序列化HTTP响应的结果,作为predict
方法的返回。
SDK提供了一些预置的Serializer,支持常见数据的序列化处理,以及PAI内置的深度学习Processor的输入输出数据处理。
-
JsonSerializer
JsonSerializer
对象支持JSON
数据的序列化和反序列化。用户通过predict
方法传递的data
,可以是numpy.ndarray
,或是一个List
,JsonSerializer.serialize
负责将对应的数组序列化为JSON
字符串,JsonSerializer.deserialize
则负责将返回的JSON字符串反序列化为一个Python对象。
PAI提供的预置的XGBoost Processor、PMML Processor等使用JSON格式接收数据和响应结果,Predictor默认使用JsonSerializer处理这些processor创建的服务的输入输出数据。
from pai.serializers import JsonSerializer
# 在`.deploy`方法指定返回的predictor使用的serializer
p = Model(
inference_spec=InferenceSpec(processor="xgboost"),
model_data="oss:///path-to-xgboost-model"
).deploy(
instance_type="ecs.c6.xlarge",
# 可选: 使用 XGBoost processor 的 service 默认使用 JsonSerializer
serializer=JsonSerializer()
)
# 或是直接创建Predictor时指定对应的 serializer
p = Predictor(
service_name="example_xgb_service"
serializer=JsonSerializer(),
)
# 预测的返回结果也是一个list
res = p.predict([[2,3,4], [4,5,6]])
-
TensorFlowSerializer
PAI提供了预置的Tensorflow Processor,支持用户将TensorFlow的SavedModel直接部署到PAI创建推理服务。对应的服务的输入输出消息格式是Protocol Buffers
,具体文件格式请参见tf_predict.proto。
SDK提供了预置的 TensorFlowSerializer
,支持用户通过传递numpy.ndarray
数据类型的参数发送预测请求,Serializer负责使用对应的numpy.ndarray
生成对应的Protocol Buffers
消息,并将接收的Protocol Buffers
消息反序列化为numpy.ndarray
数据类型。
# 创建一个TensorFlow processor 服务.
tf_predictor = Model(
inference_spec=InferenceSpec(processor="tensorflow_cpu_2.7"),
model_data="oss:///path-to-tensorflow-saved-model"
).deploy(
instance_type="ecs.c6.xlarge",
# 可选: 使用 TensorFlow processor 的 service 默认使用 TensorFlowSerializer
# serializer=TensorFlowSerializer(),
)
# 使用TensorFlow processor启动的服务,支持用户通过API获取模型的服务签名
print(tf_predictor.inspect_signature_def())
# TensorFlow processor的输入要求一个Dict,Key是模型输入签名的名称,Value是具体的输入数据。
tf_result = tf_predictor.predict(data={
"flatten_input": numpy.zeros(28*28*2).reshape((-1, 28, 28))
})
assert result["dense_1"].shape == (2, 10)
-
PyTorchSerializer
PAI提供了预置的Pytorch Processor,支持用户使用TorchScript 格式的模型部署推理服务。使用PyTorch Processor启动的推理服务的输入输出使用Protocol Buffers
格式传递数据,具体文件格式请参见pytorch_predict_proto。
SDK提供了预置的PyTorchSerializer
,支持用户使用numpy.ndarray
发送请求,并将预测结果转换为numpy.ndarray
,由PyTorchSerializer
负责Protocol Buffers
消息和numpy.ndarray
的转换。
# 创建一个使用 PyTorch processor 服务.
torch_predictor = Model(
inference_spec=InferenceSpec(processor="pytorch_cpu_1.10"),
model_data="oss:///path-to-torch_script-model"
).deploy(
instance_type="ecs.c6.xlarge",
# 可选: 使用 PyTorch processor 的 service 默认使用 PyTorchSerializer
# serializer=PyTorchSerializer(),
)
# 1. 用户需要注意将对应的输入数据 reshape 成模型支持的形状。
# 2. 如果有多个输入数据,则需要使用List/Tuple传递,列表中的每一项是numpy.ndarray
torch_result = torch_predictor.predict(data=numpy.zeros(28 * 28 * 2).reshape((-1, 28, 28)))
assert torch_result.shape == (2, 10)
-
自定义Serializer
用户可以根据推理服务支持的数据格式自定义Serializer
类:自定义Serializer
类需继承pai.serializers.SerializerBase
,实现serialize
和deserialize
方法。
以下示例是一个自定义的NumpySerializer
,当predict
被调用时,整体的链路如下:
-
客户端: 用户传递
numpy.ndarray
, 或是pandas.DataFrame
,作为predict
的输入,调用NumpySerializer.serializer
序列化为npy format
,发送给到服务端。 -
服务端: 推理服务接收的
npy
格式数据,反序列化数据,获得推理结果,然后接输出的结果,序列化为npy
格式返回。 -
客户端: 接收到
npy
格式返回,通过NumpySerializer.deserialize
反序列化为numpy.ndarray
。
import pandas as pd
import numpy as np
import io
class NumpySerializer(SerializerBase):
def serialize(self, data: Union[np.ndarray, pd.DataFrame, bytes]) -> bytes:
"""Serialize input python object to npy format"""
if isinstance(data, bytes):
return data
elif isinstance(data, str):
return data.encode()
elif isinstance(data, pd.DataFrame):
data = data.to_numpy()
res = io.BytesIO()
np.save(res, data)
res.seek(0)
return res.read()
def deserialize(self, data: bytes) -> np.ndarray:
"""Deserialize prediction response to numpy.ndarray"""
f = io.BytesIO(data)
return np.load(f)
# 创建一个使用输入输出使用npy格式的推理服务.
predictor = Model(
inference_spec=infer_spec,
model_data="oss:///path-to-model"
).deploy(
instance_type="ecs.c6.xlarge",
# 使用自定义的serializer
serializer=NumpySerializer(),
)
res: predictor.predict(data=input_data)
assert isinstance(input_data, numpy.ndarray)
assert isinstance(res, numpy.ndarray)
本地部署和测试推理服务
对于自定义镜像部署,SDK提供了本地执行模式(当前不支持使用 Processor 部署的服务),通过在model.deploy
中,传递instance_type="local"
参数,指定在本地运行推理服务。 SDK通过docker
在本地拉起一个模型服务,依赖的模型会从OSS下载到本地,然后挂载到本地运行的容器镜像中。
from pai.predictor import LocalPredictor
p: LocalPredictor = model.deploy(
# 指定运行在本地
instance_type="local",
serializer=JsonSerializer()
)
p.predict(data)
# 删除对应的docker容器.
p.delete_service()
内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家
阿里云企业补贴进行中: 马上申请
腾讯云限时活动1折起,即将结束: 马上收藏
同尘科技为腾讯云授权服务中心。
购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠
转转请注明出处:https://www.yunxiaoer.com/161961.html