详情页标题前

腾讯云对象存储使用 Hadoop Filesystem API 代码访问 COS 元数据加速存储桶

详情页1

操作场景

如果对象存储(Cloud Object Storage,COS)存储桶开启了元数据加速,除了可以使用 Hadoop 命令行、大数据组件等方式操作外,还可以通过 Hadoop Filesystem API,使用 Java 代码来访问元数据加速桶。本文指导您如何通过 Java 代码访问元数据加速桶。

前提条件

确保已经开通元数据加速,并且进行了正确的环境部署和 HDFS 协议配置。具体部署和配置,详情请参见 使用 HDFS 协议访问已开启元数据加速的存储桶如果有 Hadoop 环境,可以验证下 Hadoop 命令行是否能正确访问。

操作步骤

1. 新建 maven 工程,并在 maven 的 pom.xml 中添加以下依赖项(请根据自己实际 Hadoop 版本及环境设置 hadoop-common 包、hadoop-cos 包和 cos_api-bundle 包的版本)。

              org.apache.hadoop         hadoop-common         2.8.5         provided                    com.qcloud.cos         hadoop-cos         xxx                   com.qcloud         cos_api-bundle         xxx     

2. 参考如下 hadoop 的代码进行修改。其中的配置项可参见 配置项说明 文档进行修改。以及重点关注其中数据持久化和可见性相关的说明。
以下只列出了部分常见的文件系统的操作,其他的接口可参见
Hadoop FileSystem 接口文档

package com.qcloud.cos.demo;
import org.apache.commons.io.IOUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileChecksum;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;
import java.io.IOException;import java.net.URI;import java.nio.ByteBuffer;
public class Demo { private static FileSystem initFS() throws IOException { Configuration conf = new Configuration(); // 配置项可参见 https://cloud.tencent.com/document/product/436/6884#.E4.B8.8B.E8.BD.BD.E4.B8.8E.E5.AE.89.E8.A3.85 // 以下配置是必填项
conf.set("fs.cosn.impl", "org.apache.hadoop.fs.CosFileSystem"); conf.set("fs.AbstractFileSystem.cosn.impl", "org.apache.hadoop.fs.CosN"); conf.set("fs.cosn.userinfo.secretId", "xxxxxx"); conf.set("fs.cosn.userinfo.secretKey", "xxxxxx"); conf.set("fs.cosn.bucket.region", "xxxxxx"); conf.set("fs.cosn.tmp.dir", "/data/chdfs_tmp_cache");
// 配置项可参考 https://cloud.tencent.com/document/product/436/71550 // 通过 POSIX 访问方式必填配置项(推荐方式) conf.set("fs.cosn.trsf.fs.AbstractFileSystem.ofs.impl", "com.qcloud.chdfs.fs.CHDFSDelegateFSAdapter"); conf.set("fs.cosn.trsf.fs.ofs.impl", "com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapter"); conf.set("fs.cosn.trsf.fs.ofs.tmp.cache.dir", "/data/chdfs_tmp_cache");
// appid 根据实际 appid 进行替换 conf.set("fs.cosn.trsf.fs.ofs.user.appid", "1250000000"); // region 根据实际地域进行替换 conf.set("fs.cosn.trsf.fs.ofs.bucket.region", "ap-beijing"); // 其他可选配置参考官网文档 https://cloud.tencent.com/document/product/436/6884#.E4.B8.8B.E8.BD.BD.E4.B8.8E.E5.AE.89.E8.A3.85 // 是否开启 CRC64 校验,默认不开启,此时无法使用 hadoop fs -checksum 命令获取文件的 CRC64 校验值 conf.set("fs.cosn.crc64.checksum.enabled", "true"); String cosHadoopFSUrl = "cosn://examplebucket-12500000000/"; return FileSystem.get(URI.create(cosHadoopFSUrl), conf); }
private static void mkdir(FileSystem fs, Path filePath) throws IOException { fs.mkdirs(filePath); }
private static void createFile(FileSystem fs, Path filePath) throws IOException { // 创建一个文件(如果存在则将其覆盖) // if the parent dir does not exist, fs will create it! FSDataOutputStream out = fs.create(filePath, true); try { // 写入一个文件 String content = "test write file"; out.write(content.getBytes()); } finally { // close 返回成功, 表示数据写入成功, 若抛出异常, 表示数据写入失败 out.close(); } }
private static void readFile(FileSystem fs, Path filePath) throws IOException { FSDataInputStream in = fs.open(filePath); try { byte[] buf = new byte[4096]; int readLen = -1; do { readLen = in.read(buf); } while (readLen >= 0); } finally { IOUtils.closeQuietly(in); } }

private static void queryFileOrDirStatus(FileSystem fs, Path path) throws IOException { FileStatus fileStatus = fs.getFileStatus(path); if (fileStatus.isDirectory()) { System.out.printf("path %s is dir\n", path); return; }
long fileLen = fileStatus.getLen(); long accessTime = fileStatus.getAccessTime(); long modifyTime = fileStatus.getModificationTime(); String owner = fileStatus.getOwner(); String group = fileStatus.getGroup();

System.out.printf("path %s is file, fileLen: %d, accessTime: %d, modifyTime: %d, owner: %s, group: %s\n", path, fileLen, accessTime, modifyTime, owner, group); }

// 默认的校验类型为 COMPOSITE-CRC32C private static void getFileCheckSum(FileSystem fs, Path path) throws IOException { FileChecksum checksum = fs.getFileChecksum(path); System.out.printf("path %s, checkSumType: %s, checkSumCrcVal: %d\n", path, checksum.getAlgorithmName(), ByteBuffer.wrap(checksum.getBytes()).getInt()); }

private static void copyFileFromLocal(FileSystem fs, Path chdfsPath, Path localPath) throws IOException { fs.copyFromLocalFile(localPath, chdfsPath); }

private static void copyFileToLocal(FileSystem fs, Path chdfsPath, Path localPath) throws IOException { fs.copyToLocalFile(chdfsPath, localPath); }

private static void renamePath(FileSystem fs, Path oldPath, Path newPath) throws IOException { fs.rename(oldPath, newPath); }

private static void listDirPath(FileSystem fs, Path dirPath) throws IOException { FileStatus[] dirMemberArray = fs.listStatus(dirPath);

for (FileStatus dirMember : dirMemberArray) { System.out.printf("dirMember path %s, fileLen: %d\n", dirMember.getPath(), dirMember.getLen()); } }

// 递归删除标志用于删除目录 // 如果递归为 false 并且 dir 不为空,则操作将失败 private static void deleteFileOrDir(FileSystem fs, Path path, boolean recursive) throws IOException { fs.delete(path, recursive); }

private static void closeFileSystem(FileSystem fs) throws IOException { fs.close(); }

public static void main(String[] args) throws IOException { // 初始化文件系统 FileSystem fs = initFS();

// 创建文件 Path chdfsFilePath = new Path("/folder/exampleobject.txt"); createFile(fs, chdfsFilePath);

// 读取文件 readFile(fs, chdfsFilePath);

// 查询文件或目录 queryFileOrDirStatus(fs, chdfsFilePath);

// 获取文件校验和 getFileCheckSum(fs, chdfsFilePath);

// 从本地复制文件 Path localFilePath = new Path("file:///home/hadoop/cosn_demo/data/exampleobject.txt"); copyFileFromLocal(fs, chdfsFilePath, localFilePath);

// 获取文件到本地 Path localDownFilePath = new Path("file:///home/hadoop/cosn_demo/data/exampleobject.txt"); copyFileToLocal(fs, chdfsFilePath, localDownFilePath);

// 重命名 Path newPath = new Path("/doc/example.txt"); renamePath(fs, chdfsFilePath, newPath);

// 删除文件 deleteFileOrDir(fs, newPath, false);

// 创建目录 Path dirPath = new Path("/folder"); mkdir(fs, dirPath);

// 在目录中创建文件 Path subFilePath = new Path("/folder/exampleobject.txt"); createFile(fs, subFilePath);

// 列出目录 listDirPath(fs, dirPath);

// 删除目录 deleteFileOrDir(fs, dirPath, true);

// 关闭文件系统 closeFileSystem(fs); }}

3. 编译和运行。说明运行前,请确保已正确设置 classpath。classpath 需包含 Hadoop common 包以及元数据加速桶依赖的 Jar 包的路径。对于 EMR 环境,如果您按照 使用 HDFS 协议访问已开启元数据加速的存储桶 逐步操作,那么 Hadoop common 包通常在 /usr/local/service/hadoop/share/hadoop/common/ 目录下,元数据加速桶依赖的 Jar 包通常在 /usr/local/service/hadoop/share/hadoop/common/lib/ 目录下。

对象存储官网1折活动,限时活动,即将结束,速速收藏
同尘科技腾讯云授权服务中心。
购买腾讯云产品享受折上折,更有现金返利。同意关联立享优惠

转转请注明出处:https://www.yunxiaoer.com/144847.html

(0)
上一篇 2023年12月9日 上午1:52
下一篇 2023年12月9日 上午1:52
详情页2

相关推荐

  • 腾讯云容器服务Ingress 使用 TkeServiceConfig 配置 CLB同尘科技

    TkeServiceConfig TkeServiceConfig 是腾讯云容器服务 TKE 提供的自定义资源 CRD,通过 TkeServiceConfig 能够帮助您更灵活的进行 Ingress 管理负载均衡的各种配置。 使用场景 Ingress YAML 的语义无法定义的负载均衡参数和功能,可以通过 TkeServiceConfig 来配置。 配置说明…

    腾讯云 2023年12月9日
  • 腾讯云对象存储删除存储桶

    简介 本文档提供关于删除存储桶的 API 概览以及 SDK 示例代码。 API 操作名 操作描述 DELETE Bucket 删除存储桶 删除指定账号下的空存储桶 SDK API 参考 SDK 所有接口的具体参数与方法说明,请参考 SDK API。 删除存储桶 功能说明 删除指定的存储桶(DELETE Bucket)。注意 删除存储桶前,请确保存储桶内的数据…

    腾讯云 2023年12月9日
  • 信息流广告,信息流部分建议宽度830px,只针对默认列表样式,顺序随机
  • 阿里云ECS云服务器通过API允许不同阿里云账号下的ECS实例内网通信-云淘科技

    若您需要实现同一地域下不同阿里云账号的ECS实例内网通信,可以参考本文描述授权安全组间互访。 前提条件 请确保您已经为ECS实例安装了阿里云CLI,在不同操作系统中安装CLI的方式请参见: 在Windows上安装阿里云CLI 在Linux上安装阿里云CLI 在macOS上安装阿里云CLI 背景信息 目前授权安全组内网通信有以下两种,请根据您的实际需求选择方式…

    阿里云服务器 2023年12月9日
  • 腾讯云云直播切换至新版控制台

    云直播新版控制台新增播放鉴权、直播流管理和自定义模板配置等功能,新版控制台与直播 API3.0 协同使用,提供更全面更便捷的配置和管理。云直播旧版控制台将持续维护,但不再上线新功能,您可切换新版控制台以使用持续升级的云直播功能和服务。切换新版控制台后,旧版录制和回调配置将从全局切换到域名维度,后续新增的推流域名,可以分别绑定不同的录制和回调模板。同时,截图和…

    2023年12月9日
  • 腾讯云负载均衡传统型负载均衡管理后端云服务器

    传统型负载均衡将请求路由至运行正常的后端云服务器实例,首次使用传统型负载均衡或根据业务需求,需要增加或删除后端服务器数量时,可按照本文指引进行操作。 前提条件 需已创建传统型负载均衡实例并配置监听器,详情请参见 传统型负载均衡快速入门。 操作步骤 添加传统型负载均衡后端服务器 说明如果传统型负载均衡实例与某个弹性伸缩组关联,则该组中的云服务器会自动添加至传统…

    2023年12月9日

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信
本站为广大会员提供阿里云、腾讯云、华为云、百度云等一线大厂的购买,续费优惠,保证底价,买贵退差。