当前MaxCompute为您提供了新版的Flink Connector插件,新版插件支持将Flink数据写入至MaxCompute的普通表和Transaction Table2.0类型表,提高了Flink数据写入MaxCompute的便捷性。本文为您介绍新版Flink Connector写入MaxCompute的能力支持情况与主要操作流程。
背景信息
-
支持的写入模式:
使用新版Flink Connector将数据写入MaxCompute时,支持通过upsert或insert两种写入方式。其中使用upsert时支持以下两种数据写入流程:
-
按照Primary Key进行分组
-
按照分区字段进行分组
说明
若分区数量过多,您可以按照分区字段进行分组,但使用该流程可能导致数据倾斜。
您可在配置Flink数据写入MaxCompute时,通过设置Flink Connector参数指定使用哪种写入方式,全量Connector参数介绍请参见下文的附录:新版Flink Connector全量参数。
-
-
Flink Upsert写入任务的Checkpoint间隔建议设置3分钟以上,设置太小的话,写入效率得不到保障,并且可能引入大量小文件。
-
MaxCompute与实时计算Flink版的字段类型对照关系如下。
Flink 数据类型
MaxCompute 数据类型
CHAR(p)
CHAR(p)
VARCHAR(p)
VARCHAR(p)
STRING
STRING
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INT
INT
BIGINT
LONG
FLOAT
FLOAT
DOUBLE
DOUBLE
DECIMAL(p, s)
DECIMAL(p, s)
DATE
DATE
TIMESTAMP(9) WITHOUT TIMEZONE
TIMESTAMP
TIMESTAMP(3) WITHOUT TIMEZONE
DATETIME
BYTES
BINARY
ARRAY
LIST
MAP
MAP
ROW
STRUCT
Flink数据写入MaxCompute流程:自建开源Flink
-
准备工作:创建MaxCompute表。
您需先创建好MaxCompute表,用于后续Flink数据写入。以下以创建两张表(Transaction Table2.0非分区表和分区表)作为示例,为您演示Flink数据写入MaxCompute的主要流程,其中表属性设置请参考Transaction Table2.0表参数。
说明
目前Transaction Table2.0处于邀测阶段,默认不支持直接使用,如需您需要使用此功能,请单击申请开通,在新功能试用申请页面申请开通使用Transaction Table2.0功能。关于Transaction Table2.0详情介绍,请参见Transaction Table2.0概述。
--创建Transaction Table2.0非分区表 CREATE TABLE mf_flink_tt ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ; --创建Transaction Table2.0分区表 CREATE TABLE mf_flink_tt_part ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) partitioned by (dd string, hh string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;
-
搭建开源Flink集群。当前仅支持1.13版本的开源Flink。
您可单击Flink 1.13版本包下载链接,将包下载至本地环境,下载完成后进行解压。
-
下载Flink Connector并添加至Flink集群包中。
-
单击Flink Connector下载链接,将Flink Connector Jar包下载至本地环境。
-
将Flink Connector Jar包添加至解压后的Flink安装包的lib目录中。
mv flink-connector-odps-1.13.0-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13.0-shaded.jar
-
-
启动Flink实例服务。
cd $FLINK_HOME/bin ./start-cluster.sh
-
启动Flink客户端。
cd $FLINK_HOME/bin ./sql-client.sh
-
创建Flink表,并配置Flink Connector参数。
当前支持直接使用Flink SQL创建Flink表并配置参数,也支持使用Flink的DataStream API进行相关操作。两种操作的核心示例如下。
使用Flink SQL
-
进入Flink SQL的编辑界面,执行以下命令完成建表与参数配置。
-- 在 Flink SQL中注册一张对应的非分区表 CREATE TABLE mf_flink ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyLZKT****', 'odps.access.key'='gJwKaF3hK9MDAQgbO0zs*******', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' ); -- 在 Flink SQL 中注册一张对应的分区表 CREATE TABLE mf_flink_part ( id BIGINT, name STRING, age INT, status BOOLEAN, dd STRING, hh STRING, PRIMARY KEY(id) NOT ENFORCED ) PARTITIONED BY (`dd`,`hh`) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt_part', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyLZKT****', 'odps.access.key'='gJwKaF3hK9MDAQgbO0zs*******', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' );
-
向Flink表中写入数据,并在MaxCompute表中查询,验证Flink数据写入MaxCompute的结果。
--在flink Sql客户端中往非分区表里插入数据 Insert into mf_flink values (1,'Danny',27, false); --在Maxcompute中查询返回 select * from mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 27 | false | +------------+------+------+--------+ --在flink Sql客户端中往非分区表里插入数据 Insert into mf_flink values (1,'Danny',28, false); --在Maxcompute中查询返回 select * from mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 28 | false | +------------+------+------+--------+ --在flink Sql客户端中往分区表里插入数据 Insert into mf_flink_part values (1,'Danny',27, false, '01','01'); --在Maxcompute中查询返回 select * from mf_flink_tt_part where dd=01 and hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 27 | false | 01 | 01 | +------------+------+------+--------+----+----+ --在flink Sql客户端中分区表里插入数据 Insert into mf_flink_part values (1,'Danny',30, false, '01','01'); --在Maxcompute中查询返回 select * from mf_flink_tt_part where dd=01 and hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 30 | false | 01 | 01 | +------------+------+------+--------+----+----+
使用DataStream API
-
使用DataStream接口时,需先添加以下依赖。
com.aliyun.odps flink-connector-odps 1.13.0 system ${project.basedir}/lib/flink-connector-odps-1.13.0-shaded.jar
-
建表与参数配置的示例代码如下。
package com.aliyun.odps.flink.examples; import org.apache.flink.configuration.Configuration; import org.apache.flink.odps.table.OdpsOptions; import org.apache.flink.odps.util.OdpsConf; import org.apache.flink.odps.util.OdpsPipeline; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; public class Examples { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(120 * 1000); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env); Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table"); DataStream input = streamTableEnvironment.toAppendStream(source, RowData.class); Configuration config = new Configuration(); config.set(OdpsOptions.SINK_OPERATION, "upsert"); config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8); config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100); OdpsConf odpsConfig = new OdpsConf("accessid", "accesskey", "endpoint", "project", "tunnel endpoint"); OdpsPipeline.Builder builder = OdpsPipeline.builder(); builder.projectName("sql2_isolation_2a") .tableName("user_ledger_portfolio") .partition("") .configuration(config) .odpsConf(odpsConfig) .sink(input, false); env.execute(); } }
-
Flink数据写入MaxCompute流程:阿里云全托管Flink
-
准备工作:创建MaxCompute表。
您需先创建好MaxCompute表,用于后续Flink数据写入。以下以创建一张Transaction Table2.0表为例。
set odps.sql.type.system.odps2=true; drop table mf_flink_upsert; create table mf_flink_upsert ( c1 int not null, c2 string, gt timestamp, primary key (c1) ) partitioned by (ds string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;
-
登录实时计算控制台,添加Flink Connector,需要上传开源版Flink Connector。其中关键参数如下,详细操作流程请参见管理自定义连接器。
'connector' = 'maxcompute',
-
通过Flink SQL作业创建Flink表,并构造Flink实时数据,完成作业开发后进行作业部署。
重要
目前仅支持vvr-4.0.16-flink-1.13版本,部署作业时引擎版本需选择为vvr-4.0.16-flink-1.13。
在Flink的作业开发页面,创建并编辑Flink SQL作业,以下示例为新建一张Flink数据源表、一张Flink临时结果表,并自动构建实时数据生成逻辑写入源表,通过计算逻辑将源表数据写入临时结果表。SQL作业开发详细操作请参见SQL作业开发。
--创建flink数据源表, CREATE TEMPORARY TABLE fake_src_table ( c1 int, c2 VARCHAR, gt as CURRENT_TIMESTAMP ) with ( 'connector' = 'faker', 'fields.c2.expression' = '#{superhero.name}', 'rows-per-second' = '100', 'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}' ); --flink创建临时结果表 CREATE TEMPORARY TABLE test_c_d_g ( c1 int, c2 VARCHAR, gt TIMESTAMP, ds varchar, PRIMARY KEY(c1) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_upsert', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyL****', 'odps.access.key'='gJwKaF3hK9MDAQgb**********', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' ); --flink 计算逻辑 insert into test_c_d_g select c1 as c1, c2 as c2, gt as gt, date_format(gt, 'yyyyMMddHH') as ds from fake_src_table;
-
在MaxCompute中查询数据,验证Flink数据写入MaxCompute的结果。
select * from mf_flink_upsert where ds=2023061517; --返回,由于Flink中的数据为随机生成,实际MaxCompute查询结果与本示例不一定完全一致 +------+----+------+----+ | c1 | c2 | gt | ds | +------+----+------+----+ | 0 | Skaar | 2023-06-16 01:59:41.116 | 2023061517 | | 21 | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 | | 104 | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 | | 126 | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
附录:新版Flink Connector全量参数
-
通用参数
参数
是否必配
默认值
说明
connector
是
无默认值
connector类型,需要设置为
MaxCompute
。odps.project.name
是
无默认值
MaxCompute的Project名称。
odps.access.id
是
无默认值
您的阿里云账号AccessKey ID。您可以在访问凭证页面查看对应信息。
odps.access.key
是
无默认值
您的阿里云账号AccessKey Secret。您可以在访问凭证页面查看对应信息。
odps.end.point
是
无默认值
MaxCompute的Endpoint信息。各地域的MaxCompute Endpoint请参见Endpoint。
table.name
是
无默认值
MaxCompute表名称,格式为
[project.][schema.]table
。sink.operation
是
insert
写入类型,取值为
insert
或upsert
。只有Transaction Table2.0支持upsert写入。sink.parallelism
否
无默认值
写入的并行度,如果不设置,则默认使用上游数据并行度。
sink.dynamic-partition.limit
否
100
动态分区写入时,单个Checkpoint可同时导入的最大分区数量。
sink.group-partition.enable
否
FALSE
动态分区写入时,是否按照分区进行分组。
-
insert
或upsert
写入参数upsert写入参数
参数
是否必配
默认值
说明
upsert.writer.max-retries
否
3
upsert写入时,writer flush重试次数。
upsert.writer.buffer-size
否
64m
upsert写入时,单个writer的buffer size。
upsert.writer.bucket.buffer-size
否
1m
upsert写入时,单个bucket的buffer size。
upsert.write.bucket.num
是
无
写入表的bucket数量,必须与写入表
write.bucket.num
值一致。upsert.writer.slot-num
否
1
upsert写入时,单个session使用Tunnel slot数量。
upsert.commit.max-retries
否
3
upsert写入时,commit session 重试次数。
upsert.commit.thread-num
否
8
upsert写入时,commit session 的并行度。
upsert.major-compact.min-commits
否
100
upsert写入时,发起major compact的最小commit次数。
insert写入参数
参数
是否必配
默认值
说明
insert.commit.thread-num
否
32
insert写入时,commit session 的并行度。
insert.arrow-writer.enable
否
TRUE
insert写入时,是否使用arrow格式。
insert.arrow-writer.batch-size
否
512
insert写入时,arrow batch最大行数。
insert.arrow-writer.flush-interval
否
100000
insert写入时,writer flush间隔,单位ms。
内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家
阿里云企业补贴进行中: 马上申请
腾讯云限时活动1折起,即将结束: 马上收藏
同尘科技为腾讯云授权服务中心。
购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠
转转请注明出处:http://www.yunxiaoer.com/158578.html