详情页标题前

阿里云日志服务SLSSpark Streaming消费-云淘科技

详情页1

日志服务采集到日志数据后,您可以通过运行Spark Streaming任务消费日志数据。

日志服务提供的Spark SDK实现了Receiver模式和Direct模式两种消费模式。Maven依赖如下:


  com.aliyun.emr
  emr-logservice_2.11
  1.7.2

Receiver模式

Receiver模式通过消费组消费日志数据并暂存在Spark Executor,Spark Streaming任务启动之后从Executor读取并处理数据。每条数据以JSON字符串的形式返回。消费组自动定时保存Checkpoint到服务端,无需手动更新Checkpoint。更多信息,请参见通过消费组消费日志数据。

  • 参数说明
    参数 类型 说明
    project String 日志服务Project名称。
    logstore String 日志服务Logstore名称。
    consumerGroup String 消费组名称。
    endpoint String 日志服务Project所在地域的服务入口。更多信息,服务入口。
    accessKeyId String 访问日志服务的AccessKey ID。
    accessKeySecret String 访问日志服务的AccessKey Secret。
  • 示例


    说明 默认配置下,Receiver模式在异常情况下可能导致数据丢失。为了避免此类情况发生,建议开启Write-Ahead Logs开关(Spark 1.2以上版本支持)。更多关于Write-Ahead
    Logs的细节请参见Spark。

    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.SparkConf
    
    object TestLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestLoghub    
              |           
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val loghubStream = LoghubUtils.createStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            endpoint,
            accessKeyId,
            accessKeySecret,
            StorageLevel.MEMORY_AND_DISK)
    
          loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd =>
            rdd.map(bytes => new String(bytes)).top(10).foreach(println)
          )
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

Direct模式

Direct模式不需要消费组,使用API在任务运行时直接从服务端请求数据。Direct模式具有如下优势:

  • 简化并行:Spark partition个数与Logstore Shard总数一致。只需分裂Shard即可提高任务的并行度。
  • 高效:不需要开启Write-Ahead Logs来保证数据不丢失。
  • Exactly-once语义:直接从服务端按需获取数据,任务成功之后再提交Checkpoint。

    由于Spark异常退出或者其他原因导致任务未正常结束,可能会导致部分数据被重复消费。

Direct模式需要依赖ZooKeeper环境,用于临时保存中间状态。同时,必须设置Checkpoint目录。中间状态数据保存在ZooKeeper内对应的Checkpoint目录内。如果任务重启之后希望重新消费,需要在ZooKeeper内删除该目录,并更改消费组名称。

  • 参数说明
    参数 类型 说明
    project String 日志服务Project名称。
    logstore String 日志服务Logstore名称。
    consumerGroup String 消费组名称,仅用于保存消费位置。
    endpoint String 日志服务Project所在地域的服务入口。更多信息,请参见服务入口。
    accessKeyId String 访问日志服务的Access Key ID。
    accessKeySecret String 访问日志服务的Access Key Secret。
    zkAddress String ZooKeeper的连接地址。
  • 限流配置

    Spark Streaming流式消费是将数据分成微小的批次进行处理,因此日志服务开始消费时,需预知每个批次的边界,即每个批次需要获取的数据条数。

    日志服务底层的存储模型以LogGroup为单位,正常情况下每个LogGroup对应一次写入请求,例如一次写入请求可能包含数千条日志,这些日志作为一个LogGroup进行存储和消费。而通过WebTracking方式写入日志时,每次写入请求仅包含一条日志,即一个LogGroup只有一条日志。为了能够应对不同写入场景的消费需求,SDK提供如下两个参数进行限流配置。

    参数 说明 默认值
    spark.loghub.batchGet.step 限制单次消费请求获取的最大LogGroup个数。 100
    spark.streaming.loghub.maxRatePerShard 限制单批次内每个Shard消费的日志条数。 10000

    通过spark.streaming.loghub.maxRatePerShard可指定每个批次每个Shard期望消费的最大日志条数。Spark SDK的实现原理是每次从服务端获取spark.loghub.batchGet.step中的LogGroup个数并累计其中的日志条数,直到达到或超过spark.streaming.loghub.maxRatePerShard。因此spark.streaming.loghub.maxRatePerShard并非一个精确控制单批次消费日志条数的参数,实际上每个批次消费的日志条数与spark.loghub.batchGet.step以及每个LogGroup中包含的日志条数相关。

  • 示例
    import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.streaming.aliyun.logservice.{CanCommitOffsets, LoghubUtils}
    
    object TestDirectLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestDirectLoghub    
              |            
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val zkAddress = if (args.length >= 8) args(7) else "localhost:2181"
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Direct Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val zkParas = Map("zookeeper.connect" -> zkAddress,
            "enable.auto.commit" -> "false")
          val loghubStream = LoghubUtils.createDirectStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            accessKeyId,
            accessKeySecret,
            endpoint,
            zkParas,
            LogHubCursorPosition.END_CURSOR)
    
          loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {
            println(s"count by key: ${rdd.map(s => {
              s.sorted
              (s.length, s)
            }).countByKey().size}")
            loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
          })
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
        ssc.start()
        ssc.awaitTermination()
      }
    }

更多信息,请参见GitHub。

内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家

阿里云企业补贴进行中: 马上申请

腾讯云限时活动1折起,即将结束: 马上收藏

同尘科技为腾讯云授权服务中心。

购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠

转转请注明出处:https://www.yunxiaoer.com/161968.html

(0)
上一篇 2023年12月10日
下一篇 2023年12月10日
详情页2

相关推荐

  • 阿里云云原生大数据计算服务 MaxCompute数仓性能优化-云淘科技

    针对数仓的性能优化,主要是针对表和数据分布的优化。表设计的最佳实践请参见表设计最佳实践。 Hash Clustering Hash Clustering表的优势在于可以实现Bucket Pruning优化、Aggregation优化以及存储优化。在创建表时,使用clustered by指定Hash Key后,MaxCompute将对指定列进行Hash运算,按…

  • 阿里云日志服务SLS分析概述-云淘科技

    日志服务提供分析功能,该功能结合了查询功能和SQL计算功能。本文介绍分析功能的基本语法、使用限制和SQL函数等信息。 重要 如果您要分析日志,则必须将日志采集到Standard Logstore中,且在配置索引时打开对应字段的开启统计开关。更多信息,请参见Logstore类型对比、创建索引。 日志服务默认存在保留字段。如果您要分析保留字段,请参见保留字段。 …

    阿里云日志服务SLS 2023年12月10日
  • 信息流广告,信息流部分建议宽度830px,只针对默认列表样式,顺序随机
  • 阿里云大数据开发治理平台 DataWorksMariaDB数据源-云淘科技

    MariaDB数据源为您提供读取和写入MariaDB的双向通道,本文为您介绍DataWorks的MariaDB数据同步能力支持情况。 支持的MariaDB版本 离线读写 支持MariaDB 5.5.x、MariaDB 10.0.x、MariaDB 10.1.x、MariaDB 10.2.x、MariaDB 10.3.x版本,且离线同步支持读取视图表。 支持的…

  • 阿里云日志服务SLS查看全局数据-云淘科技

    本文介绍如何在日志审计服务中查看从云产品接入的全局数据。 查看日志审计全局数据视图 登录日志服务控制台。 在日志应用区域的审计与安全页签下,单击日志审计服务。 单击审计配置 > 云产品接入 > 全局数据,查看日志审计全局数据视图。 查看云产品全局数据视图 登录日志服务控制台。 在日志应用区域的审计与安全页签下,单击日志审计服务。 单击审计报表 &…

    阿里云日志服务SLS 2023年12月10日
  • 阿里云大数据开发治理平台 DataWorksHybridDB for MySQL数据源-云淘科技

    HybridDB for MySQL数据源为您提供读取和写入HybridDB for MySQL的双向功能,本文为您介绍DataWorks的HybridDB for MySQL数据同步能力支持情况。 使用限制 离线同步支持读取视图表。 HybridDB for MySQL Reader插件支持读取表和视图。表字段可以依序指定全部列、部分列、调整列顺序、指定常…

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信
本站为广大会员提供阿里云、腾讯云、华为云、百度云等一线大厂的购买,续费优惠,保证底价,买贵退差。