本文为您介绍如何使用Flume同步EMR Kafka集群的数据至阿里云OSS-HDFS服务。
前提条件
- 已开通并授权访问OSS-HDFS服务。具体操作,请参见开通并授权访问OSS-HDFS服务。
- 已创建DataLake集群,并选择了Flume服务。具体操作,请参见创建集群。
- 已创建DataFlow集群,并选择了Kafka服务。具体操作,请参见创建集群。
操作步骤
- 配置Flume。
- 进入Flume的配置页面。
- 登录EMR on ECS控制台。
- 在顶部菜单栏处,根据实际情况选择地域和资源组。
- 在集群管理页面,单击目标集群操作列的集群服务。
- 在集群服务页面,单击FLUME服务区域的配置。
- 设置JVM最大可用内存(Xmx)。Flume向OSS-HDFS写入数据时需要占用较大的JVM内存,建议增加Flume Agent的Xmx。具体步骤如下:
- 单击flume-env.sh页签。
本文采用了全局配置方式。如果您希望按照节点配置,可以在FLUME服务配置页面的下拉列表中选择独立节点配置。
- 修改JAVA_OPTS的参数值。
例如,JVM最大可用内存设置为1 GB,则参数值修改为-Xmx1g。
- 单击保存。
- 单击flume-env.sh页签。
- 修改flume-conf.properties配置。
- 单击flume-conf.properties页签。
本文采用了全局配置方式。如果您希望按照节点配置,可以在FLUME服务配置页面的下拉列表中选择独立节点配置。
- 在flume-conf.properties右侧,输入以下配置项。
说明 以下示例中的default-agent的值需与FLUME服务配置页面的agent_name参数值保持一致。default-agent.sources = source1 default-agent.sinks = k1 default-agent.channels = c1 default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource default-agent.sources.source1.channels = c1 default-agent.sources.source1.kafka.bootstrap.servers = default-agent.sources.source1.kafka.topics = flume-test default-agent.sources.source1.kafka.consumer.group.id = flume-test-group default-agent.sinks.k1.type = hdfs default-agent.sinks.k1.hdfs.path = oss://..oss-dls.aliyuncs.com/ default-agent.sinks.k1.hdfs.fileType=DataStream # Use a channel which buffers events in memory default-agent.channels.c1.type = memory default-agent.channels.c1.capacity = 100 default-agent.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel default-agent.sources.source1.channels = c1 default-agent.sinks.k1.channel = c1
参数 描述 default-agent.sources.source1.kafka.bootstrap.servers Kafka集群Broker的Host和端口号。 default-agent.sinks.k1.hdfs.path OSS-HDFS的路径。填写格式为 oss://..oss-dls.aliyuncs.com/
。示例值为oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result。各参数说明如下:
- :填写已开启OSS-HDFS服务的Bucket名称。
- :填写Bucket所在的地域ID。
- :填写OSS-HDFS服务的目录名称。
default-agent.channels.c1.capacity 通道中存储的最大事件数。请根据实际环境修改该参数值。 default-agent.channels.c1.transactionCapacity 每个事务通道将从源接收或提供给接收器的最大事件数。请根据实际环境修改该参数值。 - 单击保存。
- 单击flume-conf.properties页签。
- 进入Flume的配置页面。
- 测试数据同步情况。
- 通过SSH方式连接DataFlow集群,详情请参见登录集群。
- 创建名称为flume-test的Topic。
kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
- 生成测试数据。
kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092
例如,输入
abc
并回车。在oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result路径下会以当前时间的时间戳(毫秒)为后缀生成格式为FlumeData.xxxx的文件。
内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家
阿里云企业补贴进行中: 马上申请
腾讯云限时活动1折起,即将结束: 马上收藏
同尘科技为腾讯云授权服务中心。
购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠
转转请注明出处:https://www.yunxiaoer.com/158929.html