开源版本Flink不支持流式写入OSS-HDFS服务,也不支持以EXACTLY_ONCE语义写入存储介质。当您希望开源版本Flink以EXACTLY_ONCE语义流式写入OSS-HDFS服务,需要结合JindoSDK。
前提条件
- 已创建ECS实例。具体步骤,请参见选购ECS实例。
- 已自行部署开源版本Flink,且版本不低于1.10.1。
- 已开通并授权访问OSS-HDFS服务。具体操作,请参见开通并授权访问OSS-HDFS服务。
配置Jindo Flink connector
在所有节点Flink根目录下的lib文件夹下放置.jar文件jindo-flink-${version}-full.jar。
您可以解压缩jindosdk-${version}.tar.gz后在plugins/flink/目录下找到.jar文件。
在Flink作业中的用法
- 通用配置
为了支持EXACTLY_ONCE语义写入OSS-HDFS,您需要执行如下配置:
- 打开Flink的检查点(Checkpoint)。示例如下。
- 通过如下方式建立的StreamExecutionEnvironment。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- 执行如下命令,启动Checkpoint。
env.enableCheckpointing(, CheckpointingMode.EXACTLY_ONCE);
- 通过如下方式建立的StreamExecutionEnvironment。
- 使用可以重发的数据源,例如Kafka。
- 打开Flink的检查点(Checkpoint)。示例如下。
- 便捷使用
您无需额外引入依赖,只需携带oss://前缀的路径,并使用OSS-HDFS服务的Bucket及Endpoint,即可启用Flink。
- 添加Sink。
以将DataStream的对象OutputStream写入OSS-HDFS为例。
String outputPath = "oss:///" StreamingFileSink sink = StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder("UTF-8") ).build(); outputStream.addSink(sink);
重要 在OSS-HDFS服务的Bucket中带有
.
的字段为可选项。如果您希望省略该字段,请确保已在Flink或Hadoop组件中正确配置了OSS-HDFS服务的Endpoint。 - 使用
env.execute()
执行Flink作业。
- 添加Sink。
(可选)自定义配置
您在提交Flink作业时,可以自定义参数,以开启或控制特定功能。
例如,通过-yD
配置以yarn-cluster模式提交Flink作业时,示例如下:
/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...
您可以开启熵注入(Entropy Injection)功能。熵注入可以匹配写入路径的一段特定字符串,用一段随机的字符串进行替换,以削弱所谓片区效应,提高写入效率。
当写入场景为OSS-HDFS时,需要完成下列配置。
oss.entropy.key=
oss.entropy.length=
写入新文件时,路径中与相同的字符串会被替换为一个随机字符串,随机串的长度为
,且
必须大于零。
内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家
阿里云企业补贴进行中: 马上申请
腾讯云限时活动1折起,即将结束: 马上收藏
同尘科技为腾讯云授权服务中心。
购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠
转转请注明出处:https://www.yunxiaoer.com/157647.html