详情页标题前

阿里云日志服务SLSStorm消费-云淘科技

详情页1

日志服务LogHub提供了高效、可靠的日志通道功能,您可以通过Logtail、SDK等多种方式来实时采集日志数据。采集日志数据之后,可以通过Storm实时消费写入到日志服务中的数据。为了降低Storm消费的代价,日志服务提供了LogHub Storm Spout来实时读取日志数据。

基本结构和流程

图 1. 基本结构和流程阿里云日志服务SLSStorm消费-云淘科技

  • 上图中红色虚线框中是LogHub Storm Spout,每个Storm Topology会有一组Spout,同组内的Spout共同负责读取Logstore中全部数据。不同Topology中的Spout相互不干扰。

  • 每个Storm Topology需要选择唯一的消费组名称来相互标识,同一Topology内的Spout通过消费组消费日志数据来完成负载均衡和自动Failover,详情请参见通过消费组消费数据。

  • Spout从LogHub中实时读取数据之后,发送至Storm Topology中的Bolt节点,定期将消费完成位置作为Checkpoint保存到LogHub服务端。

说明

  • 为了防止滥用,每个Logstore最多支持10个消费组,对于不再使用的消费组,可以调用DeleteConsumerGroup接口删除。

  • 建议Spout的个数和Shard个数相同,否则可能会导致单个Spout处理数据量过多而影响性能。

  • 如果单个Shard的数据量太大,超过一个Spout处理极限,则可以使用Shard split接口分裂Shard,来降低每个Shard的数据量。

  • 在Loghub Spout中,强制依赖Storm的ACK机制,用于确认Spout将消息正确发送至Bolt,所以在Bolt中一定要调用ACK进行确认。

前提条件

  • 已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权。

  • 已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见配置环境变量。

    重要

    • 阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。

    • 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。

示例

  • Spout使用示例,用于构建Storm Topology。

         public static void main( String[] args )
        {   // 使用本地测试模式。  
            String mode = "Local";  
           // 每个Topology需要设定唯一的消费组名称。
            String consumer_group_name = "";   
            String project = "";     
            String logstore = "";   
           // 设置日志服务的服务接入点。
            String endpoint = "";  
           // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
            String access_id = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); 
            String access_key = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
            // 构建一个Loghub Storm Spout需要使用的配置。
            LogHubSpoutConfig config = new LogHubSpoutConfig(consumer_group_name,
                    endpoint, project, logstore, access_id,
                    access_key, LogHubCursorPosition.END_CURSOR);
            TopologyBuilder builder = new TopologyBuilder();
            // 构建Loghub Storm Spout。
            LogHubSpout spout = new LogHubSpout(config);
            // 在实际场景中,Spout的个数可以和Logstore Shard个数相同。
            builder.setSpout("spout", spout, 1);
            builder.setBolt("exclaim", new SampleBolt()).shuffleGrouping("spout");
            Config conf = new Config();
            conf.setDebug(false);
            conf.setMaxSpoutPending(1); 
            // 如果使用Kryo进行数据的序列化和反序列化,则需要显示设置LogGroupData的序列化方法LogGroupDataSerializSerializer。
            Config.registerSerialization(conf, LogGroupData.class, LogGroupDataSerializSerializer.class);
            if (mode.equals("Local")) {
                logger.info("Local mode...");
                LocalCluster cluster  = new LocalCluster();
                cluster.submitTopology("test-jstorm-spout", conf, builder.createTopology());
                try {
                    Thread.sleep(6000 * 1000);   
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }  
                cluster.killTopology("test-jstorm-spout");
                cluster.shutdown();  
            } else if (mode.equals("Remote")) {
                logger.info("Remote mode...");
                conf.setNumWorkers(2);
                try {
                    StormSubmitter.submitTopology("stt-jstorm-spout-4", conf, builder.createTopology());
                } catch (AlreadyAliveException e) {
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    e.printStackTrace();
                }
            } else {
                logger.error("invalid mode: " + mode);
            }
        }
  • 消费数据的bolt代码示例,打印每条日志的内容。

    public class SampleBolt extends BaseRichBolt {
        private static final long serialVersionUID = 475265688777440****;
        private static final Logger logger = Logger.getLogger(BaseBasicBolt.class);
        private OutputCollector mCollector;
        @Override
        public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
                OutputCollector collector) {
            mCollector = collector;
        }
        @Override
        public void execute(Tuple tuple) {
            String shardId = (String) tuple
                    .getValueByField(LogHubSpout.FIELD_SHARD_ID);
            @SuppressWarnings("unchecked")
            List logGroupDatas = (ArrayList) tuple.getValueByField(LogHubSpout.FIELD_LOGGROUPS);
            for (LogGroupData groupData : logGroupDatas) {
                // 每个logGroup由一条或多条日志组成。
                LogGroup logGroup = groupData.GetLogGroup();
                for (Log log : logGroup.getLogsList()) {
                    StringBuilder sb = new StringBuilder();
                    // 每条日志,有一个时间字段,以及多个Key,Value对。
                    int log_time = log.getTime();
                    sb.append("LogTime:").append(log_time);
                    for (Content content : log.getContentsList()) {
                        sb.append("	").append(content.getKey()).append(":")
                                .append(content.getValue());
                    }
                    logger.info(sb.toString());
                }
            }
            // 在LogHub Spout中,强制依赖Storm的ACK机制,用于确认Spout将消息正确发送至bolt,所以在bolt中一定要调用ACK。
            mCollector.ack(tuple);
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            //do nothing
        }
    }

添加Maven依赖

  • Storm 1.0版本及之前版本(如0.9.6版本),请使用:

    
      com.aliyun.openservices
      loghub-storm-spout
      0.6.6
    
  • storm 1.0版本及之后版本,请使用:

    
      com.aliyun.openservices
      loghub-storm-1.0-spout
      0.1.3
    

内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家

阿里云企业补贴进行中: 马上申请

腾讯云限时活动1折起,即将结束: 马上收藏

同尘科技为腾讯云授权服务中心。

购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠

转转请注明出处:https://www.yunxiaoer.com/159784.html

(0)
上一篇 2023年12月10日 上午1:53
下一篇 2023年12月10日 上午1:54
详情页2

相关推荐

  • 阿里云云原生大数据计算服务 MaxCompute转义字符-云淘科技

    在MaxCompute中转义字符用来表达字符串中的特殊字符或将其后跟的字符解释为其本身。本文为您介绍MaxCompute中转义字符的使用场景和使用示例。 转义字符使用场景 在编程领域几乎所有的字符串表示都会遇到转义字符的问题,而解决思路也都是类似的: 首先制定一个规则,规定一些有特殊含义的字符,例如’、 “。 然后对这些特殊含义的字符以及不可见字符做特殊处理…

  • 阿里云云原生大数据计算服务 MaxCompute在SQL语句中设置Flag示例-云淘科技

    本文为您介绍如何使用MaxCompute Java SDK设置SQL的Flag。 背景信息 使用DataWorks或MaxCompute客户端提交SQL时,通常需要设置SQL的Flag。例如,Session级别使用MaxCompute 2.0数据类型时,需要在涉及2.0数据类型的SQL前加设置Flag的语句set odps.sql.type.system.o…

  • 阿里云云原生大数据计算服务 MaxComputeNVL-云淘科技

    如果value值为NULL,返回default_value,否则返回value。两个参数的数据类型必须一致。 命令格式 nvl(T , T ) 参数说明 value:必填。输入参数。T指代输入数据类型,可以是MaxCompute支持的所有数据类型。 default_value:必填。替换后的值。必须与value的数据类型保持一致。 使用示例 例如表t_dat…

  • 阿里云RDS数据库空间管理-云淘科技

    自治服务的空间管理功能提供分层的监控与分析,从实例深入到数据库,再从数据库深入到表,帮助用户发现和定位数据库空间相关问题。 空间管理提供您查看实例基本信息、空间总览、空间数据图表信息、空间变化趋势等等,帮助您从各个维度了解实例的空间情况,便于您发现空间异常。 前提条件 实例不能是RDS SQL Server 2008 R2云盘版。 操作步骤 访问RDS实例列…

    阿里云数据库 2023年12月9日
  • 信息流广告,信息流部分建议宽度830px,只针对默认列表样式,顺序随机
  • 阿里云云原生大数据计算服务 MaxCompute创建表-云淘科技

    表是MaxCompute的数据存储单元。您需要基于表进行数据处理。本文为您介绍如何创建表。 前提条件 请确认您已满足如下条件: 已登录MaxCompute客户端。 更多登录MaxCompute客户端操作,请参见登录MaxCompute客户端。 已准备好保存了待导入数据的CSV或TXT文件。 本文提供的数据文件样例如下: 创建非分区表的数据文件:banking…

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信
本站为广大会员提供阿里云、腾讯云、华为云、百度云等一线大厂的购买,续费优惠,保证底价,买贵退差。