详情页标题前

阿里云对象存储OSS通过SeaTunnel集成平台将数据写入OSS-HDFS服务-云淘科技

详情页1

SeaTunnel是一个开源、易用的超高性能分布式数据集成平台,支持海量数据的实时同步。本文介绍如何通过SeaTunnel集成平台将数据写入OSS-HDFS服务。

背景信息

SeaTunnel可稳定高效地同步百亿级数据,专注于数据集成和数据同步,主要解决数据集成领域的以下问题。

  • 数据源多样

    常用的数据源有数百种,版本不兼容。随着新技术的出现,可能出现更多的数据源。用户很难找到能够全面快速支持这些数据源的工具。

  • 复杂同步场景

    数据同步需要支持离线-全量同步、离线-增量同步、CDC、实时同步、全库同步等多种同步场景。

  • 资源需求高

    现有的数据集成和数据同步工具往往需要大量的计算资源或JDBC连接资源来完成海量小文件的实时同步,这在一定程度上加重了企业的负担。

  • 缺乏数据监控

    数据集成和同步过程经常会丢失或重复数据。同步过程缺乏监控,无法直观了解任务过程中数据的真实情况。

  • 技术栈复杂

    企业使用的技术组件各不相同,您需要针对不同的组件开发相应的同步程序来完成数据集成。

  • 管理和维护困难

    受限于不同的底层技术组件(Flink或者Spark),通常单独开发和管理离线同步和实时同步,增加了管理和维护的难度。

更多信息,请参见SeaTunnel。

前提条件

已开通OSS-HDFS服务。具体步骤,请参见开通并授权访问OSS-HDFS服务。

使用限制

仅允许通过专有网络VPC的方式访问OSS-HDFS服务。创建专有网络VPC时,需确保创建的VPC与待开启OSS-HDFS服务的Bucket位于相同的地域。

步骤一:部署SeaTunnel

本地部署

重要

执行以下步骤前,您需要确保已安装Java 8或者Java 11并设置JAVA_HOME。

  1. 在终端通过以下命令下载并解压SeaTunnel。

    以下以下载并解压2.3.0版本的SeaTunnel为例。

    export version="2.3.0" && wget "https://www.apache.org/dyn/closer.lua/incubator/seatunnel/2.3.0/apache-seatunnel-incubating-2.3.0-bin.tar.gz" && tar -xzvf "apache-seatunnel-incubating-2.3.0-bin.tar.gz"

    关于SeaTunnel的版本信息,请参见Apache SeaTunnel。

  2. 安装connector。

    自2.2.0-beta版本开始,二进制包默认不提供connector依赖。以2.3.0版本为例,您在首次使用SeaTunnel时,需要执行以下命令安装connector。

    sh bin/install_plugin.sh 2.3.0

    说明

    ${SEATUNNEL_HOME}/config/plugin.properties配置文件默认下载全部connector插件,您可以结合业务需求适当增减connector。

    connector插件列表如下:

       --connectors-v2--
       connector-amazondynamodb
       connector-assert
       connector-cassandra
       connector-cdc-mysql
       connector-cdc-sqlserver
       connector-clickhouse
       connector-datahub
       connector-dingtalk
       connector-doris
       connector-elasticsearch
       connector-email
       connector-file-ftp
       connector-file-hadoop
       connector-file-local
       connector-file-oss
       connector-file-oss-jindo
       connector-file-s3
       connector-file-sftp
       connector-google-sheets
       connector-hive
       connector-http-base
       connector-http-feishu
       connector-http-gitlab
       connector-http-github
       connector-http-jira
       connector-http-klaviyo
       connector-http-lemlist
       connector-http-myhours
       connector-http-notion
       connector-http-onesignal
       connector-http-wechat
       connector-hudi
       connector-iceberg
       connector-influxdb
       connector-iotdb
       connector-jdbc
       connector-kafka
       connector-kudu
       connector-maxcompute
       connector-mongodb
       connector-neo4j
       connector-openmldb
       connector-pulsar
       connector-rabbitmq
       connector-redis
       connector-s3-redshift
       connector-sentry
       connector-slack
       connector-socket
       connector-starrocks
       connector-tablestore
       connector-selectdb-cloud
       connector-hbase
       --end--

Kubernetes(Beta)部署

重要

  • 通过Kubernetes(Beta)部署SeaTunnel目前处于试运行阶段。以下以Flink引擎为例,不推荐在生产环境中使用。

  • 确保已在本地安装Docker,Kubernetes以及Helm。

  1. 启动集群。

    以Kubernetes 1.23.3版本为例,您可以使用以下命令启动集群。

    minikube start --kubernetes-version=v1.23.3
  2. 使用SeaTunnel运行镜像。

    ENV SEATUNNEL_VERSION="2.3.0-beta"
    ENV SEATUNNEL_HOME = "/opt/seatunnel"
    
    RUN mkdir -p $SEATUNNEL_HOME
    
    RUN wget https://archive.apache.org/dist/incubator/seatunnel/${SEATUNNEL_VERSION}/apache-seatunnel-incubating-${SEATUNNEL_VERSION}-bin.tar.gz
    RUN tar -xzvf apache-seatunnel-incubating-${SEATUNNEL_VERSION}-bin.tar.gz
    
    RUN cp -r apache-seatunnel-incubating-${SEATUNNEL_VERSION}/* $SEATUNNEL_HOME/
    RUN rm -rf apache-seatunnel-incubating-${SEATUNNEL_VERSION}*
    RUN rm -rf $SEATUNNEL_HOME/connectors/seatunnel
  3. 构建镜像。

    docker build -t seatunnel:2.3.0-beta-flink-1.13 -f Dockerfile .
  4. 将图像加载至minikube。

    minikube image load seatunnel:2.3.0-beta-flink-1.13
  5. 在Kubernetes集群上安装证书管理器。

    在Kubernetes集群上安装证书管理器以启用Webhook组件,每个Kubernetes集群只需要安装一次证书管理器。

    kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.7.1/cert-manager.yaml
  6. 使用Helm图表部署最新的Flink Kubernetes Operator版本。

    1. 下载Flink Kubernetes Operator。

      helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-0.1.0/
    2. 部署Flink Kubernetes Operator。

      helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
  7. 验证kubectl是否已成功安装。

    kubectl get pods

    返回以下结果说明已成功安装kubectl。

    NAME                                                   READY   STATUS    RESTARTS      AGE
    flink-kubernetes-operator-5f466b8549-mgchb             1/1     Running   3 (23h ago)   1

步骤二:设置配置文件

通过添加配置文件,确定SeaTunnel启动后数据输入、处理和输出的方式和逻辑。配置示例如下:

env {                                                                                                                                                                          
  # You can set SeaTunnel environment configuration here                                                                                                                       
  execution.parallelism = 10                                                                                                                                                   
    job.mode = "BATCH"                                                                                                                                                           
    checkpoint.interval = 10000                                                                                                                                                  
    #execution.checkpoint.interval = 10000                                                                                                                                       
    #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"                                                                                                          
    }                                                                                                                                                                              


source {                                                                                                                                                                       
  LocalFile {                                                                                                                                                                    
  			path = "/data/seatunnel-2.3.0/testfile/source"                                                                                                                               
  			type = "csv"                                                                                                                                                                 
			delimiter = "#"                                                                                                                                                                
    		schema {                                                                                                                                                                       
    			fields {                                                                                                                                                                   
      				name = string                                                                                                                                                          
      				age = int                                                                                                                                                              
      				gender = string                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                
						} 
           			}
     		}                                                                                             
}   

      
transform {
    
}
  # In this case we don't need this function. If you would like to get more information about how to configure Seatunnel and see full list of sink plugins,                                                              
  # please go to https://seatunnel.apache.org/docs/category/sink-v2

      
sink {                                                                                                                                                                         
	OssJindoFile {                                                                                                                                                               
      path="/seatunnel/oss03"                                                                                                                                                                                                                                                                                        
      bucket = "oss://examplebucket.cn-hangzhou.oss-dls.aliyuncs.com"                                                                                                       
      access_key = "LTAI5t7h6SgiLSganP2m****"
      access_secret = "KZo149BD9GLPNiDIEmdQ7d****"                                                                                                                                                                                                                                                          
      endpoint = "cn-hangzhou.oss-dls.aliyuncs.com"                                                                                                                              
      }

  # If you would like to get more information about how to configure Seatunnel and see full list of sink plugins,                                                              
  # please go to https://seatunnel.apache.org/docs/category/sink-v2                                                                                                            
} 

以上配置示例包含四个模块,详细说明如下。

模块

是否必选

说明

env

用于配置引擎的环境变量。

关于env的更多信息,请参见env。

source

用于定义SeaTunnel需要获取的数据源,并将获取的数据用于下一个模块transform,支持同时定义多个数据源。

关于支持的数据源列表,请参见source。

transform

用于定于数据处理模块。当定义了数据源后,可能还需要对数据做进一步的处理。如果您不需要做数据处理,可以直接忽略transform模块,数据将直接从source写入sink。

关于transform的更多信息,请参见transform。

sink

用于定义SeaTunnel将数据写入的目标端,本教程以写入OSS-HDFS服务为例。

  • 关于支持的数据写入目标端列表,请参见sink。

  • 关于sink的配置示例,请参见OSS-HDFS配置说明。

步骤三:运行SeaTunnel

通过以下命令运行SeaTunnel。

cd "apache-seatunnel-incubating-${version}" ./bin/seatunnel.sh --config ./config/seatunnel.streaming.conf.template -e local

数据同步结束后,您可以通过SeaTunnel控制台查看输出结果,示例如下。

***********************************************                                                                                                                                
           Job Statistic Information                                                                                                                                           
***********************************************                                                                                                                                
Start Time                : 2023-02-22 17:12:19                                                                                                                                
End Time                  : 2023-02-22 17:12:37                                                                                                                                
Total Time(s)             :                  18                                                                                                                                
Total Read Count          :            10000000                                                                                                                                
Total Write Count         :            10000000                                                                                                                                
Total Failed Count        :                   0                                                                                                                                
***********************************************

OSS-HDFS配置说明

使用OssJindoFile将数据输出到OSS-HDFS文件系统。

配置参数

名称

类型

是否必选

说明

默认值

path

string

文件写入路径。

bucket

string

指定OSS-HDFS服务中的Bucket。

access_key

string

用于访问OSS-HDFS服务的AccessKey ID。

access_secret

string

用于访问OSS-HDFS服务的AccessKey Secret。

endpoint

string

用于访问OSS-HDFS服务的Endpoint。

file_format

string

文件类型。支持parquet,orc, json,csv和text。

“csv”

field_delimiter

string

文件数据分隔符,默认与hive一致,仅在写入类型为text文件时生效。

‘\001’

row_delimiter

string

文件行分隔符。仅在写入类型为text文件时生效。


partition_by

array

分区字段。如果上游数据中具有分区字段,则该配置支持数据根据分区字段目录写入,行为与hive一致。

is_partition_field_write_in_file

boolean

是否将分区字段写入文件中。

false

sink_columns

array

写入文件的字段 。

从Transform中获取的所有列Source

common-options

object

公共参数。更多信息,请参见common-options。

配置示例

以下是text格式的配置示例。如果您需要配置为其他格式的配置文件,仅需相应替换以下示例中file_format的值,例如file_format = "csv"

OssJindoFile {
    path="/seatunnel/sink"
    bucket = "oss://examplebucket.cn-hangzhou.oss-dls.aliyuncs.com"
    access_key = "LTAI5t7h6SgiLSganP2m****"
    access_secret = "KZo149BD9GLPNiDIEmdQ7d****"
    endpoint = "cn-hangzhou.oss-dls.aliyuncs.com"
    file_format = "text"
  }

相关文档

  • OSS-HDFS服务概述

  • OSS-HDFS服务使用前须知

  • 开通并授权访问OSS-HDFS服务

内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家

阿里云企业补贴进行中: 马上申请

腾讯云限时活动1折起,即将结束: 马上收藏

同尘科技为腾讯云授权服务中心。

购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠

转转请注明出处:https://www.yunxiaoer.com/157685.html

(0)
上一篇 2023年12月10日
下一篇 2023年12月10日
详情页2

相关推荐

  • 阿里云对象存储OSSOSS-HDFS服务使用前须知-云淘科技

    开通OSS-HDFS服务之前,您需要了解OSS-HDFS服务与OSS其他多个功能的关系,避免影响OSS-HDFS服务的正常使用或者引发数据丢失的风险。 警告 当您为某个Bucket开通OSS-HDFS服务后,通过该服务写入的数据将保留在OSS-HDFS的数据存储目录.dlsdata/下。为避免影响OSS-HDFS服务的正常使用或者引发数据丢失的风险,禁止以非…

    阿里云对象存储 2023年12月10日
  • 阿里云对象存储OSS在EMR Hive或Spark中访问OSS-HDFS-云淘科技

    EMR-3.42及后续版本或EMR-5.8.0及后续版本的集群,支持OSS-HDFS(JindoFS服务)作为数据存储,提供缓存加速服务和Ranger鉴权功能,使得在Hive或Spark等大数据ETL场景将获得更好的性能和HDFS平迁能力。本文为您介绍E-MapReduce(简称EMR)Hive或Spark如何操作OSS-HDFS。 前提条件 已创建EMR-…

    阿里云对象存储 2023年12月10日
  • 阿里云对象存储OSSOSS-HDFS服务的冷热分层存储-云淘科技

    并不是所有OSS-HDFS中存储的数据都需要频繁访问,但基于数据合规或者存档等原因,部分数据仍然需要继续保存。针对以上问题,OSS-HDFS服务支持数据的冷热分层存储,对于经常需要访问的数据以标准类型进行存储,对于较少访问的数据以低频、归档以及冷归档类型进行存储,从而降低总存储成本。 前提条件 已在OSS-HDFS服务中写入数据。 华东1(杭州)、华东2(上…

    阿里云对象存储 2023年12月10日
  • 阿里云对象存储OSS数据迁移概述-云淘科技

    本文介绍如何数据迁移至OSS或OSS-HDFS。 将数据迁移至OSS 您可以基于实际业务需求将本地、第三方存储设备或者OSS源存储空间(Bucket)内的数据迁移至OSS目标Bucket,具体如下表所示: 迁移方式 说明 相关文档 在线迁移 使用在线迁移服务,您可以将第三方数据轻松迁移至阿里云对象存储OSS,也可以在对象存储OSS之间进行跨账号、跨地域、以及…

    阿里云对象存储 2023年12月10日
  • 信息流广告,信息流部分建议宽度830px,只针对默认列表样式,顺序随机
  • 阿里云对象存储OSS导出OSS-HDFS服务审计日志-云淘科技

    OSS-HDFS服务端记录了客户端请求的查询、修改、删除文件元数据的操作审计日志。 您可以通过审计日志,了解OSS-HDFS服务操作审计、访问统计以及异常请求等情况。 前提条件 使用4.6.0及以上版本JindoSDK。下载地址,请参见JindoData下载。 配置OSS-HDFS服务下Bucket的访问密钥。 具体操作,请参见通过Jindo CLI命令访问…

    阿里云对象存储 2023年12月10日

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信
本站为广大会员提供阿里云、腾讯云、华为云、百度云等一线大厂的购买,续费优惠,保证底价,买贵退差。