详情页标题前

阿里云云原生大数据计算服务 MaxComputePipeline示例-云淘科技

详情页1

本文为您介绍MapReduce的Pipeline示例。

测试准备

  1. 准备好测试程序的JAR包,假设名字为mapreduce-examples.jar,本地存放路径为data\resources。
  2. 准备好Pipeline的测试表和资源。
    1. 创建测试表。
      create table wc_in (key string, value string);
      create table wc_out(key string, cnt bigint);
    2. 添加测试资源。
      add jar data\resources\mapreduce-examples.jar -f;
  3. 使用Tunnel导入数据。
    tunnel upload data wc_in;

    导入wc_in表的数据文件data的内容。

    hello,odps

测试步骤

在MaxCompute客户端中执行WordCountPipeline。

jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.WordCountPipeline wc_in wc_out;

预期结果

作业成功结束后,输出表wc_out中的内容如下。

+------------+------------+
| key        | cnt        |
+------------+------------+
| hello      | 1          |
| odps       | 1          |
+------------+------------+

代码示例

package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.Job;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.pipeline.Pipeline;
public class WordCountPipelineTest {
    public static class TokenizerMapper extends MapperBase {
        Record word;
        Record one;
        @Override
            public void setup(TaskContext context) throws IOException {
            word = context.createMapOutputKeyRecord();
            one = context.createMapOutputValueRecord();
            one.setBigint(0, 1L);
        }
        @Override
            public void map(long recordNum, Record record, TaskContext context)
            throws IOException {
            for (int i = 0; i < record.getColumnCount(); i++) {
                String[] words = record.get(i).toString().split("\s+");
                for (String w : words) {
                    word.setString(0, w);
                    context.write(word, one);
                }
            }
        }
    }
    public static class SumReducer extends ReducerBase {
        private Record value;
        @Override
            public void setup(TaskContext context) throws IOException {
            value = context.createOutputValueRecord();
        }
        @Override
            public void reduce(Record key, Iterator values, TaskContext context)
            throws IOException {
            long count = 0;
            while (values.hasNext()) {
                Record val = values.next();
                count += (Long) val.get(0);
            }
            value.set(0, count);
            context.write(key, value);
        }
    }
    public static class IdentityReducer extends ReducerBase {
        private Record result;
        @Override
            public void setup(TaskContext context) throws IOException {
            result = context.createOutputRecord();
        }
        @Override
            public void reduce(Record key, Iterator values, TaskContext context)
            throws IOException {
            while (values.hasNext()) {
                result.set(0, key.get(0));
                result.set(1, values.next().get(0));
                context.write(result);
            }
        }
    }
    public static void main(String[] args) throws OdpsException {
        if (args.length != 2) {
            System.err.println("Usage: WordCountPipeline  ");
            System.exit(2);
        }
        Job job = new Job();
        /**构造Pipeline的过程中,如果不指定Mapper的OutputKeySortColumns、PartitionColumns、OutputGroupingColumns,框架会默认使用其OutputKey作为此三者的默认配置。
         */
        Pipeline pipeline = Pipeline.builder()
            .addMapper(TokenizerMapper.class)
            .setOutputKeySchema(
            new Column[] { new Column("word", OdpsType.STRING) })
            .setOutputValueSchema(
            new Column[] { new Column("count", OdpsType.BIGINT) })
            .setOutputKeySortColumns(new String[] { "word" })
            .setPartitionColumns(new String[] { "word" })
            .setOutputGroupingColumns(new String[] { "word" })
            .addReducer(SumReducer.class)
            .setOutputKeySchema(
            new Column[] { new Column("word", OdpsType.STRING) })
            .setOutputValueSchema(
            new Column[] { new Column("count", OdpsType.BIGINT)})
            .addReducer(IdentityReducer.class).createPipeline();
        /**将pipeline的设置到jobconf中,如果需要设置combiner,是通过jobconf来设置。*/
        job.setPipeline(pipeline);
        /**设置输入输出表。*/
        job.addInput(TableInfo.builder().tableName(args[0]).build());
        job.addOutput(TableInfo.builder().tableName(args[1]).build());
        /**作业提交并等待结束。*/
        job.submit();
        job.waitForCompletion();
        System.exit(job.isSuccessful() == true ? 0 : 1);
    }
}

内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家

阿里云企业补贴进行中: 马上申请

腾讯云限时活动1折起,即将结束: 马上收藏

同尘科技为腾讯云授权服务中心。

购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠

转转请注明出处:https://www.yunxiaoer.com/159434.html

(0)
上一篇 2023年12月10日
下一篇 2023年12月10日
详情页2

相关推荐

  • 阿里云日志服务SLS通过Node.js SDK使用SQL独享版-云淘科技

    本文介绍通过Node.js SDK使用SQL独享版的代码示例。 前提条件 已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权。 已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见配置环境变量。 重要 阿里云账号的AccessKey拥有所有AP…

    阿里云日志服务SLS 2023年12月10日
  • 阿里云对象存储OSS使用Java SDK的SelectObject查询CSV和JSON文件-云淘科技

    本文介绍如何使用Java SDK的SelectObject查询CSV和JSON文件。 说明 本文示例由阿里云用户bin提供,仅供参考。 以下代码用于查询CSV文件和JSON文件: import com.aliyun.oss.ClientException; import com.aliyun.oss.OSS; import com.aliyun.oss.OS…

    阿里云对象存储 2023年12月10日
  • 信息流广告,信息流部分建议宽度830px,只针对默认列表样式,顺序随机
  • 阿里云云原生大数据计算服务 MaxCompute使用限制-云淘科技

    本文为您介绍MaxCompute Graph的使用限制。 单个Job引用的Resource数量不超过256个,Table、Archive按照一个单位计算。 单个Job引用的Resource总计字节数大小不超过512 MB。 单个Job的输入路数不能超过1024(输入表的个数不能超过64)。单个Job的输出路数不能超过256。 多路输出中指定的Label不能为…

  • 阿里云日志服务SLS数据压缩-云淘科技

    lz4是一种无损压缩算法,具有高速解码与压缩能力。日志服务部分API接口支持lz4压缩算法,使用lz4压缩算法可以减少网络传输流量,降低流量费用,提升接口访问速度。 压缩请求数据 日志服务如下API接口支持在HTTP请求体中传输lz4压缩格式的数据。 PutLogs(PutLogStoreLogs) PutWebtracking 其使用方法主要分为如下几个步…

    阿里云日志服务SLS 2023年12月10日
  • 阿里云人工智能平台PAIProphet-云淘科技

    对每一行的MTable数据,进行Prophet时间序列预测,给出下一时间段的预测结果。 使用限制 支持的计算引擎为MaxCompute。 算法原理 Prophet是Facebook开源的时间序列预测算法,适用于具有一定规律的数据。 可视化配置组件参数 输入桩 输入桩(从左到右) 数据类型 建议上游组件 是否必选 数据 无 读数据表 读CSV文件 特征工程 数…

    阿里云人工智能平台PAI 2023年12月10日

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信
本站为广大会员提供阿里云、腾讯云、华为云、百度云等一线大厂的购买,续费优惠,保证底价,买贵退差。