详情页标题前

阿里云云原生大数据计算服务 MaxCompute使用Flink(流式数据传输-新版)-云淘科技

详情页1

当前MaxCompute为您提供了新版的Flink Connector插件,新版插件支持将Flink数据写入至MaxCompute的普通和Transaction Table2.0类型表,提高了Flink数据写入MaxCompute的便捷性。本文为您介绍新版Flink Connector写入MaxCompute的能力支持情况与主要操作流程。

背景信息

  • 支持的写入模式:

    使用新版Flink Connector将数据写入MaxCompute时,支持通过upsert或insert两种写入方式。其中使用upsert时支持以下两种数据写入流程:

    • 按照Primary Key进行分组

    • 按照分区字段进行分组

      说明

      若分区数量过多,您可以按照分区字段进行分组,但使用该流程可能导致数据倾斜。

    您可在配置Flink数据写入MaxCompute时,通过设置Flink Connector参数指定使用哪种写入方式,全量Connector参数介绍请参见下文的附录:新版Flink Connector全量参数。

  • Flink Upsert写入任务的Checkpoint间隔建议设置3分钟以上,设置太小的话,写入效率得不到保障,并且可能引入大量小文件。

  • MaxCompute与实时计算Flink版的字段类型对照关系如下。

    Flink 数据类型

    MaxCompute 数据类型

    CHAR(p)

    CHAR(p)

    VARCHAR(p)

    VARCHAR(p)

    STRING

    STRING

    BOOLEAN

    BOOLEAN

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    INT

    INT

    BIGINT

    LONG

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DECIMAL(p, s)

    DECIMAL(p, s)

    DATE

    DATE

    TIMESTAMP(9) WITHOUT TIMEZONE

    TIMESTAMP

    TIMESTAMP(3) WITHOUT TIMEZONE

    DATETIME

    BYTES

    BINARY

    ARRAY

    LIST

    MAP

    MAP

    ROW

    STRUCT

Flink数据写入MaxCompute流程:自建开源Flink

  1. 准备工作:创建MaxCompute表。

    您需先创建好MaxCompute表,用于后续Flink数据写入。以下以创建两张表(Transaction Table2.0非分区表和分区表)作为示例,为您演示Flink数据写入MaxCompute的主要流程,其中表属性设置请参考Transaction Table2.0表参数。

    说明

    目前Transaction Table2.0处于邀测阶段,默认不支持直接使用,如需您需要使用此功能,请单击申请开通,在新功能试用申请页面申请开通使用Transaction Table2.0功能。关于Transaction Table2.0详情介绍,请参见Transaction Table2.0概述。

    --创建Transaction Table2.0非分区表
    CREATE TABLE mf_flink_tt (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, primary key (id)
    )
    tblproperties ("transactional"="true", 
                   "write.bucket.num" = "64", 
                   "acid.data.retain.hours"="12") ;
    
    --创建Transaction Table2.0分区表
    CREATE TABLE mf_flink_tt_part (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, 
      primary key (id)
    )
      partitioned by (dd string, hh string) 
      tblproperties ("transactional"="true", 
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
    
  2. 搭建开源Flink集群。当前仅支持1.13版本的开源Flink。

    您可单击Flink 1.13版本包下载链接,将包下载至本地环境,下载完成后进行解压。

  3. 下载Flink Connector并添加至Flink集群包中。

    1. 单击Flink Connector下载链接,将Flink Connector Jar包下载至本地环境。

    2. 将Flink Connector Jar包添加至解压后的Flink安装包的lib目录中。

      mv flink-connector-odps-1.13.0-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13.0-shaded.jar
  4. 启动Flink实例服务。

    cd $FLINK_HOME/bin
    ./start-cluster.sh
  5. 启动Flink客户端。

    cd $FLINK_HOME/bin
    ./sql-client.sh
  6. 创建Flink表,并配置Flink Connector参数。

    当前支持直接使用Flink SQL创建Flink表并配置参数,也支持使用Flink的DataStream API进行相关操作。两种操作的核心示例如下。

    使用Flink SQL

    1. 进入Flink SQL的编辑界面,执行以下命令完成建表与参数配置。

      -- 在 Flink SQL中注册一张对应的非分区表
      CREATE TABLE mf_flink (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt',
        'sink.operation' = 'upsert',
      	'odps.access.id'='LTAI5tRzd4W8cTyLZKT****',
        'odps.access.key'='gJwKaF3hK9MDAQgbO0zs*******',
      	'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
      	'odps.project.name'='mf_mc_bj'
      );
      
      -- 在 Flink SQL 中注册一张对应的分区表
      CREATE TABLE mf_flink_part (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        dd STRING,
        hh STRING,
        PRIMARY KEY(id) NOT ENFORCED
      ) PARTITIONED BY (`dd`,`hh`)
      WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt_part',
        'sink.operation' = 'upsert',
      	'odps.access.id'='LTAI5tRzd4W8cTyLZKT****',
        'odps.access.key'='gJwKaF3hK9MDAQgbO0zs*******',
      	'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
      	'odps.project.name'='mf_mc_bj'
      );
    2. 向Flink表中写入数据,并在MaxCompute表中查询,验证Flink数据写入MaxCompute的结果。

      --在flink Sql客户端中往非分区表里插入数据
      Insert into mf_flink values (1,'Danny',27, false);
      --在Maxcompute中查询返回
      select * from mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 27   | false  |
      +------------+------+------+--------+
      
      --在flink Sql客户端中往非分区表里插入数据
      Insert into mf_flink values (1,'Danny',28, false);
      --在Maxcompute中查询返回
      select * from mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 28   | false  |
      +------------+------+------+--------+
      
      --在flink Sql客户端中往分区表里插入数据
      Insert into mf_flink_part values (1,'Danny',27, false, '01','01');
      --在Maxcompute中查询返回
      select * from mf_flink_tt_part where dd=01 and hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 27   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+
      
      --在flink Sql客户端中分区表里插入数据
      Insert into mf_flink_part values (1,'Danny',30, false, '01','01');
      --在Maxcompute中查询返回
      select * from mf_flink_tt_part where dd=01 and hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 30   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+

    使用DataStream API

    1. 使用DataStream接口时,需先添加以下依赖。

              
                  com.aliyun.odps
                  flink-connector-odps
                  1.13.0
                  system
                  ${project.basedir}/lib/flink-connector-odps-1.13.0-shaded.jar
              
    2. 建表与参数配置的示例代码如下。

      package com.aliyun.odps.flink.examples;
      
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.odps.table.OdpsOptions;
      import org.apache.flink.odps.util.OdpsConf;
      import org.apache.flink.odps.util.OdpsPipeline;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.Table;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      import org.apache.flink.table.data.RowData;
      
      public class Examples {
      
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.enableCheckpointing(120 * 1000);
      
              StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
      
              Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table");
              DataStream input = streamTableEnvironment.toAppendStream(source, RowData.class);
      
              Configuration config = new Configuration();
              config.set(OdpsOptions.SINK_OPERATION, "upsert");
              config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8);
              config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100);
      
              OdpsConf odpsConfig = new OdpsConf("accessid",
                      "accesskey",
                      "endpoint",
                      "project",
                      "tunnel endpoint");
      
              OdpsPipeline.Builder builder = OdpsPipeline.builder();
              builder.projectName("sql2_isolation_2a")
                      .tableName("user_ledger_portfolio")
                      .partition("")
                      .configuration(config)
                      .odpsConf(odpsConfig)
                      .sink(input, false);
              env.execute();
          }
      }

Flink数据写入MaxCompute流程:阿里云全托管Flink

  1. 准备工作:创建MaxCompute表。

    您需先创建好MaxCompute表,用于后续Flink数据写入。以下以创建一张Transaction Table2.0表为例。

    set odps.sql.type.system.odps2=true;
    drop table mf_flink_upsert;
    create table mf_flink_upsert (
      c1 int not null, 
      c2 string, 
      gt timestamp,
      primary key (c1)
    ) 
      partitioned by (ds string)
      tblproperties ("transactional"="true",
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
  2. 登录实时计算控制台,添加Flink Connector,需要上传开源版Flink Connector。其中关键参数如下,详细操作流程请参见管理自定义连接器。

    'connector' = 'maxcompute',
  3. 通过Flink SQL作业创建Flink表,并构造Flink实时数据,完成作业开发后进行作业部署。

    重要

    目前仅支持vvr-4.0.16-flink-1.13版本,部署作业时引擎版本需选择为vvr-4.0.16-flink-1.13

    在Flink的作业开发页面,创建并编辑Flink SQL作业,以下示例为新建一张Flink数据源表、一张Flink临时结果表,并自动构建实时数据生成逻辑写入源表,通过计算逻辑将源表数据写入临时结果表。SQL作业开发详细操作请参见SQL作业开发。

    --创建flink数据源表,
    CREATE TEMPORARY TABLE fake_src_table
    (
        c1 int,
        c2 VARCHAR,
        gt as CURRENT_TIMESTAMP
    ) with (
      'connector' = 'faker',
      'fields.c2.expression' = '#{superhero.name}',
      'rows-per-second' = '100',
      'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}'
    );
    
    --flink创建临时结果表
    CREATE TEMPORARY TABLE test_c_d_g 
    (
        c1 int,
        c2 VARCHAR,
        gt TIMESTAMP,
        ds varchar,
        PRIMARY KEY(c1) NOT ENFORCED
     ) WITH (
        		'connector' = 'maxcompute',
        		'table.name' = 'mf_flink_upsert',
        		'sink.operation' = 'upsert',
        		'odps.access.id'='LTAI5tRzd4W8cTyL****',
        		'odps.access.key'='gJwKaF3hK9MDAQgb**********',
        		'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        		'odps.project.name'='mf_mc_bj'
    );
    
    --flink 计算逻辑
    insert into test_c_d_g
    select  c1 as c1,
            c2 as c2,
            gt as gt,
            date_format(gt, 'yyyyMMddHH') as ds
    from    fake_src_table;
  4. 在MaxCompute中查询数据,验证Flink数据写入MaxCompute的结果。

    select * from mf_flink_upsert where ds=2023061517;
    
    --返回,由于Flink中的数据为随机生成,实际MaxCompute查询结果与本示例不一定完全一致
    +------+----+------+----+
    | c1   | c2 | gt   | ds |
    +------+----+------+----+
    | 0    | Skaar | 2023-06-16 01:59:41.116 | 2023061517 |
    | 21   | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 |
    | 104  | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 |
    | 126  | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
    

附录:新版Flink Connector全量参数

  • 通用参数

    参数

    是否必配

    默认值

    说明

    connector

    无默认值

    connector类型,需要设置为MaxCompute

    odps.project.name

    无默认值

    MaxCompute的Project名称。

    odps.access.id

    无默认值

    您的阿里云账号AccessKey ID。您可以在访问凭证页面查看对应信息。

    odps.access.key

    无默认值

    您的阿里云账号AccessKey Secret。您可以在访问凭证页面查看对应信息。

    odps.end.point

    无默认值

    MaxCompute的Endpoint信息。各地域的MaxCompute Endpoint请参见Endpoint。

    table.name

    无默认值

    MaxCompute表名称,格式为[project.][schema.]table

    sink.operation

    insert

    写入类型,取值为insertupsert。只有Transaction Table2.0支持upsert写入。

    sink.parallelism

    无默认值

    写入的并行度,如果不设置,则默认使用上游数据并行度。

    sink.dynamic-partition.limit

    100

    动态分区写入时,单个Checkpoint可同时导入的最大分区数量。

    sink.group-partition.enable

    FALSE

    动态分区写入时,是否按照分区进行分组。

  • insertupsert写入参数

    upsert写入参数

    参数

    是否必配

    默认值

    说明

    upsert.writer.max-retries

    3

    upsert写入时,writer flush重试次数。

    upsert.writer.buffer-size

    64m

    upsert写入时,单个writer的buffer size。

    upsert.writer.bucket.buffer-size

    1m

    upsert写入时,单个bucket的buffer size。

    upsert.write.bucket.num

    写入表的bucket数量,必须与写入表write.bucket.num值一致。

    upsert.writer.slot-num

    1

    upsert写入时,单个session使用Tunnel slot数量。

    upsert.commit.max-retries

    3

    upsert写入时,commit session 重试次数。

    upsert.commit.thread-num

    8

    upsert写入时,commit session 的并行度。

    upsert.major-compact.min-commits

    100

    upsert写入时,发起major compact的最小commit次数。

    insert写入参数

    参数

    是否必配

    默认值

    说明

    insert.commit.thread-num

    32

    insert写入时,commit session 的并行度。

    insert.arrow-writer.enable

    TRUE

    insert写入时,是否使用arrow格式。

    insert.arrow-writer.batch-size

    512

    insert写入时,arrow batch最大行数。

    insert.arrow-writer.flush-interval

    100000

    insert写入时,writer flush间隔,单位ms。

内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家

阿里云企业补贴进行中: 马上申请

腾讯云限时活动1折起,即将结束: 马上收藏

同尘科技为腾讯云授权服务中心。

购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠

转转请注明出处:http://www.yunxiaoer.com/158578.html

(0)
上一篇 2023年12月10日
下一篇 2023年12月10日
详情页2

相关推荐

  • 阿里云ECS云服务器自定义镜像概述-云淘科技

    自定义镜像是您基于实例或快照创建的镜像,或者从本地导入的镜像,包含已部署的应用、数据等信息。使用自定义镜像可以快速创建更多相同配置的实例,无需每次创建实例时重复配置操作。 自定义镜像使用流程 当您成功创建或成功导入自定义镜像后,镜像的状态为可用。此时,您可以使用该镜像创建实例,可以将其共享给其他阿里云账号使用,或复制该镜像到其他地域使用,或导出该镜像到OSS…

    阿里云服务器 2023年12月9日
  • 阿里云云原生大数据计算服务 MaxCompute近实时增量导入-云淘科技

    数据流入Transactional Table 2.0主要存在近实时增量写入和批量写入两种场景,本文为您介绍高并发近实时增量写入场景的架构设计。 实际业务数据处理场景中,涉及的数据源丰富多样,可能存在数据库、日志系统或者其他消息队列等系统,为了方便用户将数据写入MaxCompute的Transactional Table 2.0, MaxCompute深度定…

    2023年12月10日
  • 阿里云RDS数据库ReleaseInstancePublicConnection – 释放实例的外网连接地址-云淘科技

    该接口用于释放实例的外网连接地址。 接口说明 适用引擎 RDS MySQL RDS PostgreSQL RDS SQL Server RDS MariaDB 相关功能文档 注意 使用该接口前,请仔细阅读功能文档,确保完全了解使用接口的前提条件及使用后造成的影响后,再进行操作。 RDS MySQL释放外网连接地址 RDS PostgreSQL释放外网连接地址…

    阿里云数据库 2023年12月9日
  • 信息流广告,信息流部分建议宽度830px,只针对默认列表样式,顺序随机
  • DataWorks中odps运行数据集成任务,运行日志中stage为0%?-云小二-阿里云

    DataWorks中odps运行数据集成任务,运行日志中stage为0%? 以下为热心网友提供的参考意见 在DataWorks中运行ODPS数据集成任务时,如果运行日志中的stage一直为0%,可能的原因是数据集成资源组的资源不足。当资源不足时,任务可能会长时间处于等待状态,导致stage的进度无法推进。在这种情况下,您可以考虑增加资源组的资源配额,以便任务…

    阿里云 2023年12月24日
  • 阿里云对象存储OSS从HDFS迁移数据到OSS-HDFS-云淘科技

    本文介绍如何使用阿里云Jindo DistCp从HDFS迁移数据到OSS-HDFS。 前提条件 JDK 1.8及以上版本。 如果您使用的是自建ECS集群,需要具备Hadoop2.7+或Hadoop3.x环境以及进行MapReduce作业的能力。 如果您使用的是阿里云E-MapReduce,需使用EMR-5.6.0及后续版本或EMR-3.40.0及后续版本。 …

    阿里云对象存储 2023年12月10日

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信
本站为广大会员提供阿里云、腾讯云、华为云、百度云等一线大厂的购买,续费优惠,保证底价,买贵退差。