详情页标题前

阿里云云原生大数据计算服务 MaxComputePyODPS使用第三方包-云淘科技

详情页1

本文为您介绍如何在PyODPS中使用第三方。PyODPS制作第三方包的操作请参见PyODPS制作第三方包。

前提条件

  • 已开通MaxCompute产品。如何开通请参见开通MaxCompute。

  • 已开通DataWorks产品。如何开通请参见开通DataWorks。

上传三方包

使用三方包前,请确保您生成的包已被上传至MaxCompute Archive资源。上传方式如下:

  • 使用代码上传资源。您需要将packages.tar.gz替换成目标包所在的路径和文件名:

    import os
    from odps import ODPS
    
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用 Access Key ID / Access Key Secret 字符串
    o = ODPS(
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='',
        endpoint='',
    )
    o.create_resource("test_packed.tar.gz", "archive", fileobj=open("packages.tar.gz", "rb"))
  • 使用DataWorks上传资源。具体操作请参见创建或上传资源。

在Python UDF中使用三方包

您需要对UDF进行修改以使用上传的三方包。具体如下:

  1. 在UDF类的_init_方法中添加对三方包的引用。

  2. 在UDF代码(例如evaluate或process方法)中调用三方包。

示例

下面以实现scipy中的psi函数为例,为您介绍如何在Python UDF中使用三方包。

  1. 使用以下命令打包scipy。

    pyodps-pack -o scipy-bundle.tar.gz scipy
  2. 编写以下代码,并将其保存为test_psi_udf.py

    import sys
    from odps.udf import annotate
    
    @annotate("double->double")
    class MyPsi(object):
        def __init__(self):
            # 将路径增加到引用路径
            sys.path.insert(0, "work/scipy-bundle.tar.gz/packages")
    
        def evaluate(self, arg0):
            # 将 import 语句保持在 evaluate 函数内部
            from scipy.special import psi
    
            return float(psi(arg0))

    代码解释:__init__函数中将work/scipy-bundle.tar.gz/packages添加至sys.path,因为MaxCompute会将所有UDF引用的Archive资源以资源名称为目录解压至work目录下,而packages则是pyodps-pack生成包的子目录。而将对scipy的import放在evaluate函数体内部的原因是三方包仅在执行时可用,当UDF在MaxCompute服务端被解析时,解析环境不包含三方包,函数体外的三方包import会导致报错。

  3. test_psi_udf.py上传为MaxCompute Python资源,并将scipy-bundle.tar.gz上传为Archive资源。

  4. 创建UDF名为test_psi_udf,引用上述两个资源文件,并指定类名为test_psi_udf.MyPsi

    步骤3~4中,可以使用PyODPS或者MaxCompute客户端的方式执行。

    • 使用PyODPS执行方法:

      import os
      from odps import ODPS
      
      # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
      # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
      # 不建议直接使用 Access Key ID / Access Key Secret 字符串
      o = ODPS(
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
          project='',
          endpoint='',
      )
      
      bundle_res = o.create_resource(
          "scipy-bundle.tar.gz", "archive", fileobj=open("scipy-bundle.tar.gz", "rb")
      )
      udf_res = o.create_resource(
          "test_psi_udf.py", "py", fileobj=open("test_psi_udf.py", "rb")
      )
      o.create_function(
          "test_psi_udf", class_type="test_psi_udf.MyPsi", resources=[bundle_res, udf_res]
      )
    • 使用MaxCompute客户端执行方法:

      add archive scipy-bundle.tar.gz;
      add py test_psi_udf.py;
      create function test_psi_udf as test_psi_udf.MyPsi using test_psi_udf.py,scipy-bundle.tar.gz;
  5. 完成以上操作后,即可使用UDF执行SQL。

    set odps.pypy.enabled=false;
    set odps.isolation.session.enable=true;
    select test_psi_udf(sepal_length) from iris;

在PyODPS DataFrame中使用三方包

PyODPS DataFrame支持在execute或persist时使用libraries参数使用上面的第三方库。 下面以map方法为例,apply或map_reduce方法的过程类似。

  1. 使用以下命令打包scipy。

    pyodps-pack -o scipy-bundle.tar.gz scipy
  2. 假定表名为test_float_col,内容只包含一列FLOAT值:

       col1
    0  3.75
    1  2.51

    计算psi(col1)的值,代码如下:

    import os
    from odps import ODPS, options
    
    def psi(v):
        from scipy.special import psi
    
        return float(psi(v))
    
    # 如果 Project 开启了 Isolation,下面的选项不是必需的
    options.sql.settings = {"odps.isolation.session.enable": True}
    
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用 Access Key ID / Access Key Secret 字符串
    o = ODPS(
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='',
        endpoint='',
    )
    df = o.get_table("test_float_col").to_df()
    # 直接执行并取得结果
    df.col1.map(psi).execute(libraries=["scipy-bundle.tar.gz"])
    # 保存到另一张表
    df.col1.map(psi).persist("result_table", libraries=["scipy-bundle.tar.gz"])
  3. (可选)如果希望在整个执行过程中使用相同的三方包,可以设置全局选项:

    from odps import options
    options.df.libraries = ["scipy-bundle.tar.gz"]

完成以上操作后,即可在DataFrame执行时使用相关的三方包。

在DataWorks中使用三方包

DataWorks PyODPS节点预置了若干三方包,同时提供了load_resource_package方法用以引用其他的包,具体使用方式请参见使用三方包。

手动上传和使用三方包

说明

以下内容仅作为维护旧项目或者旧环境的参考,新项目建议直接使用pyodps-pack打包。

部分旧项目可能采用了之前的方式使用三方包,即手动上传所有依赖的Wheel包并在代码中引用,或者使用了不支持二进制包的旧版MaxCompute环境,本节内容为该场景准备。下面以在map中使用python_dateutil为例为您介绍使用三方包的步骤。

  1. 在Linux Bash中使用pip download命令,下载包及其依赖到某个路径。下载后会出现两个包,即six-1.10.0-py2.py3-none-any.whlpython_dateutil-2.5.3-py2.py3-none-any.whl

    pip download python-dateutil -d /to/path/

    说明

    您需要下载支持Linux环境的包,建议直接在Linux下调用该命令。

  2. 将上述已下载的两个包分别上传至ODPS资源。

    • 方式一:通过代码上传。

      # 这里要确保资源名的后缀是正确的文件类型
      odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb'))
      odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))
    • 方式二:通过DataWorks界面上传。

      您可以参考创建或上传资源完成目标资源的上传与提交。

  3. 使用三方包。

    假定DataFrame只有一个STRING类型的字段。

    >>> df
                   datestr
    0  2016-08-26 14:03:29
    1  2015-08-26 14:03:29
    • 全局配置使用到的三方库如下:

      >>> from odps import options
      >>>
      >>> def get_year(t):
      >>>     from dateutil.parser import parse
      >>>     return parse(t).strftime('%Y')
      >>>
      >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
      >>> df.datestr.map(get_year)
         datestr
      0     2016
      1     2015
    • 通过立即运行方法的libraries参数指定:

      >>> def get_year(t):
      >>>     from dateutil.parser import parse
      >>>     return parse(t).strftime('%Y')
      >>>
      >>> df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl'])
         datestr
      0     2016
      1     2015

PyODPS默认支持执行纯Python且不含文件操作的第三方库。在较新版本的MaxCompute服务下,PyODPS也支持执行带有二进制代码或带有文件操作的Python库。这些库名必须拥有一定的后缀,可根据下表判断:

平台

Python版本

可用的后缀

RHEL 5 x86_64

Python 2.7

cp27-cp27m-manylinux1_x86_64

RHEL 5 x86_64

Python 3.7

cp37-cp37m-manylinux1_x86_64

RHEL 7 x86_64

Python 2.7

cp27-cp27m-manylinux1_x86_64, cp27-cp27m-manylinux2010_x86_64, cp27-cp27m-manylinux2014_x86_64

RHEL 7 x86_64

Python 3.7

cp37-cp37m-manylinux1_x86_64, cp37-cp37m-manylinux2010_x86_64, cp37-cp37m-manylinux2014_x86_64

RHEL 7 Arm64

Python 3.7

cp37-cp37m-manylinux2014_aarch64

所有的Wheel包都需要以Archive格式上传,whl后缀的包需要重命名为zip后缀。同时,作业需要开启odps.isolation.session.enable选项,或者在Project级别开启Isolation。以下示例展示了如何上传并使用scipy中的特殊函数:

# 对于含有二进制代码的包,必须使用 Archive 方式上传资源,whl 后缀需要改为 zip
odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb'))

# 如果 Project 开启了 Isolation,下面的选项不是必需的
options.sql.settings = { 'odps.isolation.session.enable': True }

def psi(value):
    # 建议在函数内部 import 第三方库,以防止不同操作系统下二进制包结构差异造成执行错误
    from scipy.special import psi
    return float(psi(value))

df.float_col.map(psi).execute(libraries=['scipy.zip'])

对于只提供源码的二进制包,可以在Linux Shell中打包成Wheel再上传,Mac和Windows中生成的Wheel包无法在MaxCompute中使用,Linux Shell中打包命令如下:

python setup.py bdist_wheel

内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家

阿里云企业补贴进行中: 马上申请

腾讯云限时活动1折起,即将结束: 马上收藏

同尘科技为腾讯云授权服务中心。

购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠

转转请注明出处:https://www.yunxiaoer.com/158203.html

(0)
上一篇 2023年12月10日 上午1:16
下一篇 2023年12月10日
详情页2

相关推荐

  • DataWorks中maxcompute有没有大文本数据类型,支持mysql 的text字段?-云小二-阿里云

    DataWorks中maxcompute有没有大文本数据类型,支持mysql 的text字段? 以下为热心网友提供的参考意见 目前应该是只有string ,此回答整理自钉群“DataWorks交流群(答疑@机器人)” 以下为热心网友提供的参考意见 MaxCompute,作为阿里云的云原生大数据计算服务,支持大量数据类型的处理和存储,包括VARCHAR、CHA…

    阿里云 2023年12月10日
  • 阿里云对象存储OSSPHP分片上传-云淘科技

    OSS提供的分片上传(Multipart Upload)功能,将要上传的较大文件(Object)分成多个分片(Part)来分别上传,上传完成后再调用CompleteMultipartUpload接口将这些Part组合成一个Object来达到断点续传的效果。 分片上传流程 分片上传(Multipart Upload)分为以下三个步骤: 初始化一个分片上传事件。…

    阿里云对象存储 2023年12月10日
  • 阿里云RDS数据库使用流程-云淘科技

    快速入门旨在介绍如何创建RDS SQL Server实例、进行基本设置以及连接实例数据库,使用户能够了解从购买RDS实例到开始使用实例的流程。 使用流程 若您初次使用阿里云RDS,请先了解阿里云RDS使用限制。 通常,从新购实例到可以开始使用实例,您需要完成如下操作。 创建RDS SQL Server实例 创建数据库和账号 设置白名单 申请或释放外网地址 连…

    阿里云数据库 2023年12月9日
  • 阿里云云原生大数据计算服务 MaxCompute读取以分区方式存储的OSS数据-云淘科技

    MaxCompute支持创建OSS外部表为分区表,访问OSS上以分区方式存储的数据,通过该方式可降低读取数据量并提升数据处理效率。本文为您介绍MaxCompute支持的OSS标准分区路径格式和自定义分区路径格式。 背景信息 创建OSS外部表后,MaxCompute会全量扫描OSS目录下的所有数据,包括子目录下的数据文件。但当数据量比较大时,对全目录扫描会产生…

  • 信息流广告,信息流部分建议宽度830px,只针对默认列表样式,顺序随机
  • 阿里云RDS数据库通过客户端、命令行连接RDS MySQL实例-云淘科技

    本文介绍如何设置白名单以及通过客户端、命令行连接RDS MySQL实例。 前提条件 已创建RDS MySQL实例。更多信息,请参见快速创建RDS MySQL实例。 已为实例创建数据库和账号。更多信息,请参见创建数据库和账号。 已为实例设置IP白名单。更多信息,请参见设置IP白名单。 建议 如果您满足内网访问条件,建议使用内网连接地址连接实例,延迟更低,稳定性…

    阿里云数据库 2023年12月9日

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信
本站为广大会员提供阿里云、腾讯云、华为云、百度云等一线大厂的购买,续费优惠,保证底价,买贵退差。