详情页标题前

阿里云云原生大数据计算服务 MaxComputePython 3 UDTF读取MaxCompute资源示例-云淘科技

详情页1

本文为您介绍基于MaxCompute客户端通过Python 3 UDTF读取MaxCompute资源的使用示例。

前提条件

已安装MaxCompute客户端。更多安装MaxCompute客户端操作,请参见安装并配置MaxCompute客户端。

UDTF的动态参数说明

Python UDTF函数签名格式请参见函数签名及数据类型

  • 您可以在参数列表中使用*,表示接受任意长度、任意类型的输入参数。例如@annotate('double,*->string')表示接受第一个参数是DOUBLE类型,后接任意长度、任意类型的参数列表。此时,您需要自己编写代码判断输入的个数和参数类型,然后对它们进行相应的操作(您可以对比C语言里面的printf函数来理解此操作)。


    说明 *用在返回值列表中时,表示的是不同的含义。

  • UDTF的返回值可以使用*,表示返回任意个STRING类型。返回值的个数与调用函数时设置的别名个数有关。例如@annotate("bigint,string->double,*"),调用方式是UDTF(x, y) as (a, b, c),此处as后面设置了三个别名,即abc。编辑器会认定a为DOUBLE类型(Annotation中返回值第一列的类型是给定的),bc为STRING类型。因为这里给出了三个返回值,所以UDTF在调用forward时,forward必须是长度为3的数组,否则会出现运行时报错。


    说明 这种错误无法在编译时报出,因此UDTF的调用者在SQL中设置alias个数时,必须遵循该UDTF定义的规则。由于聚合函数的返回值个数固定是1,所以这个功能对UDAF来说并无意义。

UDTF代码示例

  • 读取MaxCompute资源代码示例。
    from odps.udf import annotate
    from odps.udf import BaseUDTF
    from odps.distcache import get_cache_file
    from odps.distcache import get_cache_table
    @annotate('string -> string, bigint')
    class UDTFExample(BaseUDTF):
        """读取资源文件和资源表里的pageid、adid_list,生成dict
        """
        def __init__(self):
            import json
            cache_file = get_cache_file('test_json.txt')
            self.my_dict = json.load(cache_file)
            cache_file.close()
            records = list(get_cache_table('table_resource1'))
            for record in records:
                self.my_dict[record[0]] = record[1]
        """输入pageid,输出pageid以及它对应的所有adid
        """
        def process(self, pageid):
            for adid in self.my_dict[pageid]:
                self.forward(pageid, adid)
  • 动态参数代码示例。
    from odps.udf import annotate
    from odps.udf import BaseUDTF
    import json
    @annotate('string,*->string,*')
    class JsonTuple(BaseUDTF):
        def process(self, *args):
            length = len(args)
            result = [None] * length
            try:
                obj = json.loads(args[0])
                for i in range(1, length):
                    result[i] = str(obj.get(args[i]))
            except Exception as err:
                result[0] = str(err)
                for i in range(1, length):
                    result[i] = None
            self.forward(*result)

    以上UDTF示例中,返回值个数会根据输入参数的个数来决定。输出参数中的第一个参数是一个JSON文本,后面的参数需要从JSON中根据Key进行解析。返回值中的第一个返回值是解析JSON过程中的出错信息,如果没有出错,则会根据输入的Key依次输出从JSON中解析出来的内容,使用示例如下。

    -- 根据输入参数的个数定制输出别名个数。
    SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b) FROM jsons;
    
    -- 变长部分可以一列都没有。
    SELECT my_json_tuple(json) as exceptions FROM jsons;
    
    -- 下面这个SQL会出现运行时错误,因为别名个数与实际输出个数不符。
    -- 注意编译时无法发现此错误。
    SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b, c) FROM jsons;

操作步骤

  1. 将UDTF代码示例保存为py_udtf_example.py文件,放置于MaxCompute客户端的bin目录中。
  2. 登录MaxCompute客户端创建资源表table_resource1和内部表tmp1(后续执行DML操作写入的目标表)并插入数据,准备资源文件test_json.txt并放置于MaxCompute客户端的bin目录中。更多登录MaxCompute客户端操作,请参见登录MaxCompute客户端。建表、插入数据命令及资源文件内容示例如下:
    • 创建资源表table_resource1,并插入数据。
      create table if not exists table_resource1 (pageid string, adid_list array);
      insert into table table_resource1 values("contact_page2",array(2,3,4)),("contact_page3",array(5,6,7));


      说明 由于table_resource1中adid_list字段数据类型为ARRAY,读取表资源时需要在Session级别执行set odps.sql.python.version=cp37;命令开启Python 3来支持读取ARRAY类型数据。

    • 创建内部表tmp1,并插入数据。
      create table if not exists tmp1 (pageid string);
      insert into table tmp1 values ("front_page"),("contact_page1"),("contact_page3");
    • 资源文件test_json.txt的内容如下。
      {"front_page":[1, 2, 3], "contact_page1":[3, 4, 5]}
  3. 通过MaxCompute客户端,将py_udtf_example.py文件、test_json.txt和表table_resource1添加为MaxCompute的资源。更多添加资源信息,请参见添加资源。命令示例如下。
    add py py_udtf_example.py;
    add file test_json.txt;
    add table table_resource1 as table_resource1;
  4. 在MaxCompute客户端上创建UDTF函数my_udtf。更多创建函数信息,请参见注册函数。命令示例如下。
    create function my_udtf as 'py_udtf_example.UDTFExample' using 'py_udtf_example.py, test_json.txt, table_resource1';
  5. 在MaxCompute客户端上执行SQL命令调用新创建的UDTF。命令示例如下:
    • 示例1:单纯使用UDTF函数运行SQL。
      select my_udtf(pageid) as (pageid, adid) from tmp1;

      返回结果如下。

      +------------+------------+
      | pageid     | adid       |
      +------------+------------+
      | front_page | 1          |
      | front_page | 2          |
      | front_page | 3          |
      | contact_page1 | 3          |
      | contact_page1 | 4          |
      | contact_page1 | 5          |
      | contact_page3 | 5          |
      | contact_page3 | 6          |
      | contact_page3 | 7          |
      +------------+------------+
    • 示例2:对示例1中的命令改写,结合Lateral View运行SQL。
      select pageid, adid from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid;

      返回结果如下。

      +--------+------------+
      | pageid | adid       |
      +--------+------------+
      | front_page | 1          |
      | front_page | 2          |
      | front_page | 3          |
      | contact_page1 | 3          |
      | contact_page1 | 4          |
      | contact_page1 | 5          |
      | contact_page3 | 5          |
      | contact_page3 | 6          |
      | contact_page3 | 7          |
      +--------+------------+
    • 示例3:结合聚合函数和Lateral View运行SQL。
      select adid, count(1) as cnt
          from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid
      group by adid;

      返回结果如下。

      +------------+------------+
      | adid       | cnt        |
      +------------+------------+
      | 1          | 1          |
      | 2          | 1          |
      | 3          | 2          |
      | 4          | 1          |
      | 5          | 2          |
      | 6          | 1          |
      | 7          | 1          |
      +------------+------------+

内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家

阿里云企业补贴进行中: 马上申请

腾讯云限时活动1折起,即将结束: 马上收藏

同尘科技为腾讯云授权服务中心。

购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠

转转请注明出处:https://www.yunxiaoer.com/157416.html

(0)
上一篇 2023年12月10日
下一篇 2023年12月10日
详情页2

相关推荐

  • 阿里云日志服务SLS公共响应头-云淘科技

    本文列举了日志服务的API的公共响应头信息。 参数列表 Log Service API是基于HTTP协议的Rest风格接口。所有的Log Service API响应都提供一组公共响应头,其详细定义如下: Header名称 类型 说明 Content-Length 数值 RFC 2616中定义的HTTP响应内容长度。 Content-MD5 字符串 RFC 2…

    阿里云日志服务SLS 2023年12月10日
  • 阿里云日志服务SLS窗口漏斗函数-云淘科技

    日志服务提供窗口漏斗函数,可用于分析用户行为、APP流量、产品目标转化等数据。本文介绍窗口漏斗函数的基本语法和示例。 日志服务支持如下窗口漏斗函数。 注意 在日志服务分析语句中,表示字符串的字符必须使用单引号(”)包裹,无符号包裹或被双引号(””)包裹的字符表示字段名或列名。例如:’status’表…

    2023年12月10日
  • 信息流广告,信息流部分建议宽度830px,只针对默认列表样式,顺序随机
  • 阿里云RDS数据库调试存储过程(pldebugger)-云淘科技

    RDS PostgreSQL提供pldebugger插件,用于调试存储过程。 背景信息 RDS PostgreSQL支持多种存储过程语言,例如plpgsql、plpython、plperl、pltcl等等。用户可以使用这些存储过程语言,创建对应的函数或存储过程。 前提条件 RDS PostgreSQL实例版本需要满足以下条件: 实例大版本:10、11、12或…

    2023年12月9日
  • 阿里云云原生大数据计算服务 MaxComputeSQL其他常见问题-云淘科技

    本文为您介绍在MaxCompute中执行SQL时,与数据类型、SQL限制等相关的常见问题。 问题类别 常见问题 数据类型 MaxCompute的时间类型字段是否可以不带时分秒? 在执行MaxCompute SQL过程中,对DOUBLE类型的数据进行等值比较,为什么结果不符合预期? 如何解决DECIMAL数据类型精度溢出问题? 新创建的MaxCompute项目…

  • 阿里云RDS数据库CheckInstanceExist – 查询实例是否存在-云淘科技

    该接口用于查询目标RDS实例是否存在。 接口说明 适用引擎 RDS MySQL RDS PostgreSQL RDS SQL Server RDS MariaDB 调试 您可以在OpenAPI Explorer中直接运行该接口,免去您计算签名的困扰。运行成功后,OpenAPI Explorer可以自动生成SDK代码示例。 调试调试授权信息下表是API对应的授…

    阿里云数据库 2023年12月9日

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信
本站为广大会员提供阿里云、腾讯云、华为云、百度云等一线大厂的购买,续费优惠,保证底价,买贵退差。