详情页标题前

阿里云大数据开发治理平台 DataWorks在DataWorks上运行PySpark作业的最佳实践-云淘科技

详情页1

PySpark可直接调用Python的API运行Spark作业,PySpark作业需在特定Python环境中运行。EMR默认支持使用Python,若EMR支持的Python版本无法运行PySpark作业,则您可参考本实践配置可用的Python环境并在DataWorks上运行PySpark作业。

前提条件

执行本实践所使用的DataWorks及E-MapReduce(简称EMR)需部署在相同地域。产品各自需执行的前提条件如下:

  • DataWorks侧

    在DataWorks运行PySpark作业时,需创建EMR Spark节点,并使用spark-submit命令提交作业。

  • EMR侧需准备如下EMR环境:
    • 准备EMR实例。本实践示例使用EMR on ECS实例。
    • 可选:本实践需使用一个Python包进行示例验证,您可在本地或ECS进行自主打包;也可直接下载本实践的示例包(Python3.7)。使用自主打包时,本地或ECS需安装Docker运行环境及Python运行环境。说明 本实践仅以Python3.7演示相关操作,实际使用中可选择所需Python版本。EMR支持的Python版本可能和您使用的Python版本存在差异,建议使用本实践的Python3.7版本。

操作步骤

  1. 可选:准备运行Python程序需要的虚拟环境。您可选择直接下载本实践的示例包python3.7使用;或通过如下步骤自主打包Python环境。
    1. 制作Docker镜像。

      您可选择直接下载本实践的示例Dockerfile文件至本地或ECS;或在安装了Docker环境的宿主机上新建一个Dockerfile文件。Dockerfile文件的内容如下。

      FROM centos:centos7.9.2009
      RUN set -ex \
      # 预安装所需组件。
      && yum install -y wget tar libffi-devel zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make initscripts zip\
      && wget https://www.python.org/ftp/python/3.7.0/Python-3.7.0.tgz \
      && tar -zxvf Python-3.7.0.tgz \
      && cd Python-3.7.0 \
      && ./configure prefix=/usr/local/python3 \
      && make \
      && make install \
      && make clean \
      && rm -rf /Python-3.7.0* \
      && yum install -y epel-release \
      && yum install -y python-pip
      # 设置默认为python3。
      RUN set -ex \
      # 备份旧版本python。
      && mv /usr/bin/python /usr/bin/python27 \
      && mv /usr/bin/pip /usr/bin/pip-python27 \
      # 配置默认为python3。
      && ln -s /usr/local/python3/bin/python3.7 /usr/bin/python \
      && ln -s /usr/local/python3/bin/pip3 /usr/bin/pip
      # 修复因修改python版本导致yum失效问题。
      RUN set -ex \
      && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/bin/yum \
      && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/libexec/urlgrabber-ext-down \
      && yum install -y deltarpm
      # 更新pip版本。
      RUN pip install --upgrade pip
    2. 构建镜像并运行容器。在Dockerfile文件所在路径下,执行如下命令。
      sudo docker build -t python-centos:3.7 .
      sudo docker run -itd --name python3.7 python-centos:3.7
    3. 进入安装容器所需的Python依赖库并打包Python环境。
      sudo docker exec -it  python3.7 bash
      
      pip install [所需依赖库]
      # vi requirements.txt
      # pip install -r requirements.txt
      # numpy
      # pandas
      
      cd /usr/local/
      zip -r python3.7.zip python3/
    4. 拷贝容器中的Python环境到宿主机。
      # 在宿主机运行命令将虚拟环境拷贝到宿主机。
      sudo docker cp python3.7:/usr/local/python3.7.zip .                         
  2. 上传虚拟环境。您可根据需要,选择上传Python虚拟环境至OSS或HDFS。说明 本实践以上传至HDFS示例。如果您选择上传至OSS,操作详情请参见上传文件。上传Python环境至HDFS命令如下。
    # 上传至HDFS中。
    hdfs dfs -copyFromLocal python3.7.zip /tmp/pyspark                          
  3. 测试并上传Python代码。
    1. 您可在本地或ECS中创建一个py文件,按照下述方法测试Python代码是否正确。本实践示例使用pyspark_test.py文件测试。
      # -*- coding: utf-8 -*-
      
      import os
      from pyspark.sql import SparkSession
      
      def noop(x):
          import socket
          import sys
          host = socket.gethostname() + ' '.join(sys.path) + ' '.join(os.environ)
          print('host: ' + host)
          print('PYTHONPATH: ' + os.environ['PYTHONPATH'])
          print('PWD: ' + os.environ['PWD'])
          print(os.listdir('.'))
          return host
      
      
      if __name__ == '__main__':
      
          spark = SparkSession \
              .builder \
              .appName("test_pyspark") \
              .enableHiveSupport() \
              .getOrCreate()
      
          sc = spark.sparkContext
          # 验证系统当前环境变量。
          rdd = sc.parallelize(range(10), 2)
          hosts = rdd.map(noop).distinct().collect()
          print(hosts)
      
          # 验证UDF。
          # https://docs.databricks.com/spark/latest/spark-sql/udf-python.html#
          # spark.udf.register("udf_squared", udf_squared)
          # spark.udf.register("udf_numpy", udf_numpy)
      
          tableName = "store"
          df = spark.sql("""select count(*) from %s """ % tableName)
          print("rdf count, %s
      " % df.count())
          df.show()
    2. 上传Python代码至HDFS中。参考如下命令,在EMR实例中上传Python代码至HDFS。说明 本实践以上传至HDFS示例。如果您选择上传至OSS,操作详情请参见上传文件。
      hdfs dfs -copyFromLocal pyspark_test.py /tmp/pyspark
  4. 在DataWorks中通过spark-submit命令提交作业。在创建的EMR Spark节点中,使用如下命令提交作业。说明 如果您选择上传Python代码至OSS,则需替换为实际使用的OSS路径。
    spark-submit --master yarn \
    --deploy-mode cluster \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./PYTHONENV/python3/bin/python3.7 \
    --conf spark.executorEnv.PYTHONPATH=. \
    --conf spark.yarn.appMasterEnv.PYTHONPATH=. \
    --conf spark.yarn.appMasterEnv.JOBOWNER=LiuYuQuan \
    --archives hdfs://hdfs-cluster/tmp/pyspark/python3.7.zip#PYTHONENV \
    ## --py-files hdfs://hdfs-cluster/tmp/pyspark/mc_pyspark-0.1.0-py3-none-any.zip \
    --driver-memory 4g \
    --driver-cores 1 \
    --executor-memory 4g \
    --executor-cores 1 \
    --num-executors 3 \
    --name TestPySpark \
    hdfs://hdfs-cluster/tmp/pyspark/pyspark_test.py

内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家

阿里云企业补贴进行中: 马上申请

腾讯云限时活动1折起,即将结束: 马上收藏

同尘科技为腾讯云授权服务中心。

购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠

转转请注明出处:https://www.yunxiaoer.com/171963.html

(0)
上一篇 2023年12月10日 下午3:04
下一篇 2023年12月10日
详情页2

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信
本站为广大会员提供阿里云、腾讯云、华为云、百度云等一线大厂的购买,续费优惠,保证底价,买贵退差。