详情页标题前

阿里云日志服务SLS通过消费组消费数据-云淘科技

详情页1

通过消费组(ConsumerGroup)消费数据有显著优点,您无需关注日志服务的实现细节及消费者之间的负载均衡、故障转移(Failover)等,只需要专注于业务逻辑。

前提条件

  • 已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权。

  • 已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见配置环境变量。

    重要

    • 阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。

    • 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。

  • 已安装SDK开发环境。具体操作,请参见SDK参考。

基本概念

概念

说明

消费组

日志服务支持通过消费组消费数据。一个消费组由多个消费者构成,同一个消费组下面的消费者共同消费一个Logstore中的数据,各个消费者不会重复消费数据。

重要

每个Logstore中,最多创建30个消费组。

消费者

消费组的构成单元,实际承担消费任务。

重要

同一个消费组中的消费者名称必须不同。

Logstore

数据采集、存储和查询单元。更多信息,请参见日志库(Logstore)。

Shard

用于控制Logstore的读写能力,数据必定保存在某一个Shard中。更多信息,请参见分区(Shard)。

Checkpoint

消费位点,是程序消费到的最新位置。程序重启后,可以通过Checkpoint恢复消费进度。

说明

通过消费组消费,可以保存Checkpoint。在程序故障恢复时,能够从断点处继续消费,从而保证数据不会被重复消费。

分配原则

一个Logstore中包含多个Shard,通过消费组消费数据就是将Shard分配给一个消费组下面的消费者,分配方式遵循以下原则。

  • 一个Shard只会分配到一个消费者。

  • 一个消费者可以被分配多个Shard。

新的消费者加入消费组后,这个消费组下面的Shard从属关系会调整,以实现消费的负载均衡,但是仍遵循上述分配原则。

步骤一:消费数据

您可以通过Java、C++、Python及Go SDK实现消费组消费数据。此处,以Java SDK为例。

  1. 添加Maven依赖。

    在Java项目的根目录下,打开pom.xml文件,添加以下代码:

    
      com.google.protobuf
      protobuf-java
      2.5.0
    
    
      com.aliyun.openservices
      loghub-client-lib
      0.6.33
    
  2. 创建Main.java文件。

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // 日志服务的服务接入点,请您根据实际情况填写。
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // 日志服务项目名称,请您根据实际情况填写。请从已创建项目中获取项目名称。
        private static String Project = "ali-cn-hangzhou-sls-admin";
        // 日志库名称,请您根据实际情况填写。请从已创建日志库中获取日志库名称。
        private static String Logstore = "sls_operation_log";
        // 消费组名称,请您根据实际情况填写。您无需提前创建,该程序运行时会自动创建该消费组。
        private static String ConsumerGroup = "consumerGroupX";
        // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。。
        private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1是消费者名称,同一个消费组下面的消费者名称必须不同。不同消费者在多台机器上启动多个进程,均衡消费一个Logstore时,消费者名称可以使用机器IP地址来区分。
            // maxFetchLogGroupSize用于设置每次从服务端获取的LogGroup最大数目,使用默认值即可。您可以使用config.setMaxFetchLogGroupSize(100);调整,取值范围为(0,1000]。
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
            ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // Thread运行之后,ClientWorker会自动运行,ClientWorker扩展了Runnable接口。
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // 调用Worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。
            worker.shutdown();
            // ClientWorker运行过程中会生成多个异步的任务。Shutdown完成后,请等待还在执行的任务安全退出。建议设置sleep为30秒。
            Thread.sleep(30 * 1000);
        }
    }
  3. 创建SampleLogHubProcessor.java文件。

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    import java.util.List;
    
    public class SampleLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // 记录上次持久化Checkpoint的时间。
        private long mLastCheckTime = 0;
    
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // 消费数据的主逻辑,消费时的所有异常都需要处理,不能直接抛出。
        public String process(List logGroups,
                              ILogHubCheckPointTracker checkPointTracker) {
            // 打印已获取的数据。
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup flg = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
                    FastLogTag logtag = flg.getLogTags(tagIdx);
                    System.out.println(String.format("	%s	:	%s", logtag.getKey(), logtag.getValue()));
                }
                for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
                    FastLog log = flg.getLogs(lIdx);
                    System.out.println("--------
    Log: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int cIdx = 0; cIdx  30 * 1000) {
                try {
                    //参数为true表示立即将Checkpoint更新到服务端;false表示将Checkpoint缓存在本地。默认间隔60秒会将Checkpoint更新到服务端。
                    checkPointTracker.saveCheckPoint(true);
                } catch (LogHubCheckPointException e) {
                    e.printStackTrace();
                }
                mLastCheckTime = curTime;
            }
            return null;
        }
    
        // 当Worker退出时,会调用该函数,您可以在此处执行清理工作。
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // 将Checkpoint立即保存到服务端。
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }
    
    class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // 生成一个消费实例。
            return new SampleLogHubProcessor();
        }
    }

    更多信息,请参见Java、C++、Python及Go。

  4. 运行Main.java。

    以模拟消费Nginx日志为例,打印日志如下:

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.10.196
    upstream_response_time    :    0.02
    --------
    Log: 158, time: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629815
    --------
    Log: 0, time: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629877
    --------
    ......

步骤二:查看消费组状态

控制台方式

  1. 登录日志服务控制台。

  2. 在Project列表区域,单击目标Project。

  3. 日志存储 > 日志库页签中,单击目标Logstore左侧的阿里云日志服务SLS通过消费组消费数据-云淘科技图标,然后单击数据消费左侧的阿里云日志服务SLS通过消费组消费数据-云淘科技图标。

  4. 在消费组列表中,单击目标消费组。

  5. Consumer Group状态页面,查看每个Shard消费数据的进度。

SDK方式

此处以Java SDK为例。运行ConsumerGroupTest.java,查看每个Shard消费数据的进度。

import java.util.List;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
    static String endpoint = "";
    static String project = "";
    static String logstore = "";
    static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    public static void main(String[] args) throws LogException {
        Client client = new Client(endpoint, accesskeyId, accesskey);
        // 获取Logstore下的所有消费组。如果消费组不存在,则长度为0。
        List consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
        for(ConsumerGroup c: consumerGroups){
            // 打印消费组的属性,包括名称、心跳超时时间、是否按序消费。
            System.out.println("名称: " + c.getConsumerGroupName());
            System.out.println("心跳超时时间: " + c.getTimeout());
            System.out.println("按序消费: " + c.isInOrder());
            for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
                System.out.println("shard: " + cp.getShard());
                // 该时间精确到微秒,类型为长整型。
                System.out.println("最后一次更新消费进度的时间: " + cp.getUpdateTime());
                System.out.println("消费者名称: " + cp.getConsumer());
                String consumerPrg = "";
                if(cp.getCheckPoint().isEmpty())
                    consumerPrg = "尚未开始消费";
                else{
                    // Unix时间戳,单位是秒,输出时请注意格式化。
                    try{
                        int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
                        consumerPrg = "" + prg;
                    }
                    catch(LogException e){
                        if(e.GetErrorCode() == "InvalidCursor")
                            consumerPrg = "非法,前一次消费时刻已经超出了Logstore中数据的生命周期";
                        else{
                            // internal server error
                            throw e;
                        }
                    }
                }
                System.out.println("消费进度: " + consumerPrg);
                String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
                int endPrg = 0;
                try{
                    endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
                }
                catch(LogException e){
                    // do nothing
                }
                //Unix时间戳,单位:秒。输出时,请注意格式化。
                System.out.println("最后一条数据到达时刻: " + endPrg);
            }
        }
    }
}

返回以下结果:

名称: etl-6cac01c571d5a4b933649c04a7ba215b
心跳超时时间: 60
按序消费: false
shard: 0
最后一次更新消费进度的时间: 1639555453575211
消费者名称: etl-356464787983a3d17086a9797e3d5f0e6959b066-256521
消费进度: 1639555453
最后一条数据到达时刻: 1639555453
shard: 1
最后一次更新消费进度的时间: 1639555392071328
消费者名称: etl-356464787983a3d17086a9797e3d5f0e6959b066-256521
消费进度: 1639555391
最后一条数据到达时刻: 1639555391
名称: etl-2bd3fdfdd63595d56b1ac24393bf5991
心跳超时时间: 60
按序消费: false
shard: 0
最后一次更新消费进度的时间: 1639555453256773
消费者名称: etl-532719dd2f198b3878ee3c6dfc80aeffb39ee48b-061390
消费进度: 1639555453
最后一条数据到达时刻: 1639555453
shard: 1
最后一次更新消费进度的时间: 1639555392066234
消费者名称: etl-532719dd2f198b3878ee3c6dfc80aeffb39ee48b-061390
消费进度: 1639555391
最后一条数据到达时刻: 1639555391
名称: consumerGroupX
心跳超时时间: 60
按序消费: false
shard: 0
最后一次更新消费进度的时间: 1639555434142879
消费者名称: consumer_1
消费进度: 1635615029
最后一条数据到达时刻: 1639555453
shard: 1
最后一次更新消费进度的时间: 1639555437976929
消费者名称: consumer_1
消费进度: 1635616802
最后一条数据到达时刻: 1639555391

RAM用户授权

使用RAM用户操作时,需授予RAM用户操作消费组的相关权限。具体操作,请参见步骤二:为RAM用户授权。

授权的Action如下表所示。

动作(Action)

说明

授权策略中的资源描述方式(Resource)

log:GetCursorOrData(GetCursor,PullLogs)

根据时间获取游标(cursor)。

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}

log:CreateConsumerGroup

在指定的Logstore上创建一个消费组。

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

log:ListConsumerGroup

查询指定Logstore的所有消费组。

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*

log:ConsumerGroupUpdateCheckPoint

更新指定消费组的某个Shard的Checkpoint。

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

log:ConsumerGroupHeartBeat

为指定消费者发送心跳到服务端。

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

log:UpdateConsumerGroup

修改指定消费组属性。

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

log:GetConsumerGroupCheckPoint

获取指定消费组消费的某个或者所有Shard的Checkpoint。

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

例如,消费组的相关资源信息如下所示,您要通过RAM用户操作该消费组,则需为RAM用户授予以下权限。

  • Project所属的阿里云账号:174649****602745。

  • Project所在地域ID:cn-hangzhou。

  • Project名称:project-test。

  • Logstore名称:logstore-test。

  • 消费组名称:consumergroup-test。

{
  "Version": "1",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "log:GetCursorOrData"
      ],
      "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
    },
    {
      "Effect": "Allow",
      "Action": [
        "log:CreateConsumerGroup",
        "log:ListConsumerGroup"
      ],
      "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "log:ConsumerGroupUpdateCheckPoint",
        "log:ConsumerGroupHeartBeat",
        "log:UpdateConsumerGroup",
        "log:GetConsumerGroupCheckPoint"
      ],
      "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
    }
  ]
}

相关操作

  • 异常诊断

    建议您为消费者程序配置Log4j,将消费组内部遇到的异常信息打印出来,便于定位。log4j.properties典型配置:

    log4j.rootLogger = info,stdout
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.Target = System.out
    log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

    配置Log4j后,执行消费者程序可以看到类似如下异常信息:

    [WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
    com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
  • 通过消费组消费从某个时间开始的数据

    // consumerStartTimeInSeconds表示消费这个时间点之后的数据。
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          int consumerStartTimeInSeconds);
    
    // position是个枚举变量,LogHubConfig.ConsumePosition.BEGIN_CURSOR表示从最老的数据开始消费,LogHubConfig.ConsumePosition.END_CURSOR表示从最新的数据开始消费。
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          ConsumePosition position);

    说明

    • 按照消费需求,请您使用不同的构造方法。

    • 当服务端已保存Checkpoint,则开始消费位置以服务端保存的Checkpoint为准。

    • 日志服务消费数据时,默认优先使用Checkpoint作为消费点。当您指定从固定时间点开始消费数据时,必须保证consumerStartTimeInSeconds时间点落到TTL周期内,否则会造成消费不生效。

  • 重置Checkpoint

    public static void updateCheckpoint() throws Exception {
            Client client = new Client(host, accessId, accessKey);
            long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
            ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
            for (Shard shard : response.GetShards()) {
                int shardId = shard.GetShardId();
                String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
                client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
            }
        }

相关链接

  • API

    操作

    API接口

    创建消费组

    CreateConsumerGroup

    查询消费组

    ListConsumerGroup

    删除消费组

    DeleteConsumerGroup

    更新消费组

    UpdateConsumerGroup

    发送消费者心跳

    ConsumerGroupHeartBeat

    查询消费组的Checkpoint

    GetCheckPoint

    更新消费组的Checkpoint

    ConsumerGroupUpdateCheckPoint

  • SDK

    语言

    文档链接

    Java

    • 使用Java SDK管理消费组

    • 通过消费组消费数据

    Python

    • 使用Python SDK管理消费组

    • 通过消费组消费数据

  • CLI

    操作

    命令行接口

    创建消费组

    create_consumer_group

    查询消费组

    list_consumer_group

    更新消费组

    update_consumer_group

    删除消费组

    delete_consumer_group

    查询消费组的Checkpoint

    get_check_point

    更新消费组的Checkpoint

    update_check_point

内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家

阿里云企业补贴进行中: 马上申请

腾讯云限时活动1折起,即将结束: 马上收藏

同尘科技为腾讯云授权服务中心。

购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠

转转请注明出处:https://www.yunxiaoer.com/160367.html

(0)
上一篇 2023年12月10日
下一篇 2023年12月10日
详情页2

相关推荐

  • 腾讯云对象存储数据导出至 CKafka

    简介 数据导出至 CKafka 是腾讯云对象存储(Cloud Object Storage,COS)基于 云函数(Serverless Cloud Function,SCF) 为用户提供的数据出湖方案,可以帮助用户将 CSV、JSON 等文件格式的数据导出至同地域的腾讯云 CKafka 服务,用于海量消息、日志数据的聚合分析。 注意事项 数据导出至 CKaf…

    2023年12月9日
  • 腾讯云对象存储查询文档预览开通状态

    简介 本文档提供关于查询文档预览开通状态的 API 概览和 SDK 示例代码。 API 操作描述 查询文档预览开通状态 用于查询已经开通文档预览功能的 Bucket 查询文档预览开通状态 功能说明 用于查询已经开通文档预览功能的 Bucket。 方法原型 public Guzzle\Service\Resource\Model describeDocProc…

    腾讯云 2023年12月9日
  • 信息流广告,信息流部分建议宽度830px,只针对默认列表样式,顺序随机
  • 腾讯云对象存储删除对象

    简介 本文档提供关于对象的删除操作相关的 API 概览以及 SDK 示例代码。 API 操作名 操作描述 DELETE Object 删除单个对象 在存储桶中删除指定对象 删除单个对象 功能说明 删除指定的对象(DELETE Object)。 示例代码 // 存储桶名称,由 bucketname-appid 组成,appid 必须填入,可以在 COS 控制台…

    腾讯云 2023年12月9日
  • 腾讯云内容分发网络CDN高级回源配置

    功能介绍 腾讯云 CDN 支持更细粒度的回源配置,根据不同规则回源到不同的源站地址。例如:分路径回源(指定文件类型、文件夹、全路径文件(如:/test/1.jpg)、首页回源),根据 Client IP 所在区域回源等。 注意事项 1. 仅加速类型为 CDN 网页小文件、CDN 下载大文件、CDN 音视频点播的域名支持高级回源配置。2. 回源协议、回源 HO…

    2023年12月9日
  • 腾讯云云点播关于腾讯云视立方·播放器 SDK 视频播放能力升级及新增授权校验的公告

    腾讯云视立方·播放器 SDK 移动端(Android & iOS & Flutter)即将发布10.1版本,新版本 SDK 采用“腾讯视频”同款播放内核打造,视频播放能力获得全面优化升级,详见 视频播放能力升级特性。同时从该版本开始将增加对视频播放功能模块的授权校验,具体如下:如果您的 App 已经拥有直播 License (原直播推流 Li…

    腾讯云 2023年12月9日

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信
本站为广大会员提供阿里云、腾讯云、华为云、百度云等一线大厂的购买,续费优惠,保证底价,买贵退差。