详情页标题前

阿里云日志服务SLS导入Kafka数据-云淘科技

详情页1

本文介绍如何将Kafka数据导入到日志服务,实现数据的查询分析、加工等操作。

前提条件

  • 已有可用的Kafka集群。
  • 已创建Project和Logstore。具体操作,请参见创建Project和创建Logstore。

版本说明

目前,只支持Kafka 2.2.0及以上版本。

创建数据导入配置

  1. 登录日志服务控制台。
  2. 在接入数据区域的数据导入页签中,单击Kafka-数据导入。
  3. 选择目标Project和Logstore,单击下一步。
  4. 设置导入配置。
    1. 在数据源设置步骤中,配置如下参数。
      参数 说明
      配置名称 设置配置的名称。
      服务地址 Kafka bootstrap Servers地址。多个服务地址之间使用半角逗号(,)分隔。

      • 如果是阿里云云消息队列 Kafka 版,需输入接入点的IP地址或域名。
      • 如果是阿里云ECS上自建的Kafka集群,需输入ECS实例的IP地址。
      • 如果是其他的Kafka集群,需输入Kafka Broker的公网IP地址或域名。
      Topic列表 Kafka主题。多个主题之间使用半角逗号(,)分隔。
      消费组 如果您使用的是阿里云云消息队列 Kafka 版,且未开启自由使用Group功能,则需要选择对应的消费组。创建消费组的具体操作,请参见创建消费组。
      起始位置 开始导入数据的位置。

      • 最早:从现有的第一条Kafka数据开始导入。
      • 最晚:从最新生成的Kafka数据开始导入。
      数据格式 待导入数据的格式。

      • 极简模式:如果待导入的数据为单行格式,您可以选择极简模式。
      • JSON字符串:如果待导入的数据为JSON,您可以选择JSON字符串。导入任务会将数据解析为键值对格式,只解析到第一层。
      解析数组元素 打开解析数组元素开关后,对于JSON数组格式的数据,系统会按其数组元素拆分为多条数据后进行导入。
      编码格式 待导入数据的编码格式(即字符集),目前支持UTF-8和GBK。
      VPC实例ID 如果Kafka集群是VPC环境下的阿里云云消息队列 Kafka 版或阿里云ECS上自建的Kafka集群,您可以通过设置VPC实例ID,实现日志服务通过阿里云内网读取Kafka集群的数据。

      通过阿里云内网读取数据,具备更好的安全性和网络稳定性。

      重要 Kafka集群需允许被IP网段100.104.0.0/16访问。

      时间配置
      时间字段设置为Kafka数据中代表时间的列名,用于指定数据导入日志服务时的时间。
      提取时间正则如果您选择的数据格式为极简模式,您需要设置正则表达式提取Kafka数据中的时间。

      例如,数据内容为message with time 2022-08-08 14:20:20,则您可以设置提取时间正则为\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d

      时间字段格式 指定时间格式,用于解析时间字段的值。

      • 支持Java SimpleDateFormat语法的时间格式,例如yyyy-MM-dd HH:mm:ss。时间格式的语法详情请参见Class SimpleDateFormat。常见的时间格式请参见时间格式。
      • 支持epoch格式,可选值为epoch、epochMillis、epochMacro或epochNano。
      时间字段时区 选择时间字段对应的时区。

      当时间格式是epoch时,不需要设置时区。

      默认时间来源 当没有提供时间提取信息或者时间提取失败时,使用您所设置的时间来源,包括系统当前时间和kafka消息时间戳。
      高级配置
      日志上下文 打开日志上下文开关后,支持日志服务的上下文查询功能。您可以查看目标数据在原始Kafka partition中的前若干条(上文)或后若干条(下文)数据。
      通信协议 通过公网导入时,建议通过加密连接、用户认证的方式进行导入,即在此处定义连接Kafka集群的通信协议信息,配置示例如下所示。

      protocol字段的可选值为plaintext、ssl、sasl_plaintext或sasl_ssl。建议设置为sasl_ssl,此协议需要连接加密和用户认证。

      设置protocol为sasl_plaintext或sasl_ssl时,需设置sasl节点。其中,mechanism字段可以为PLAIN、SCRAM-SHA-256或SCRAM-SHA-512,表示用户名/密码身份验证机制。

      {
          "protocol":"sasl_plaintext",
           "sasl":{
              "mechanism":"PLAIN",
              "username":"xxx",
              "password":"yyy"
          }
      }
      私网域名解析 部署在阿里云ECS上的Kafka Broker之间采用内部域名通信时,您需要在此处指定每个Broker对应的ECS域名和IP地址。配置示例如下所示。

      {
          "hostname#1":"192.168.XX.XX",
          "hostname#2":"192.168.XX.XX",
          "hostname#3":"192.168.XX.XX"
      }
    2. 单击预览,预览导入结果。
    3. 确认无误后,单击下一步。
  5. 预览数据及创建索引,然后单击下一步。日志服务默认开启全文索引。您也可以根据采集到的日志,手动创建字段索引,或者单击自动生成索引,日志服务将自动生成字段索引。更多信息,请参见开启并配置索引。 重要 如果您要查询和分析日志,那么全文索引和字段索引必须至少启用一种。同时启用时,以字段索引为准。
  6. 单击查询日志,进入查询和分析页面,确认是否成功导入Kafka数据。等待1分钟左右,如果有目标Kafka数据导入,则说明导入成功。

查看导入配置

创建导入配置成功后,您可以在控制台中查看已创建的导入配置及生成的统计报表。

  1. 单击目标Project。
  2. 选择目标日志库下的数据接入 > 数据导入,单击配置名称。
  3. 在导入配置概览页面,查看导入配置的基本信息和统计报表。阿里云日志服务SLS导入Kafka数据-云淘科技

相关操作

在导入配置概览页面,您还可以进行如下操作。

  • 修改配置

    单击修改配置,修改导入配置的相关配置。具体配置,请参见创建数据导入配置。

  • 删除配置 单击删除配置,删除该导入配置。 警告 删除后不可恢复,请谨慎操作。
  • 停止任务

    单击停止,停止该导入任务。

常见问题

问题 可能原因 解决方法
预览时出现Kafka Broker连接错误(Broker transport failure)。
  • 设置了错误的Kafka服务地址。
  • 没有在白名单中添加导入服务对应的IP地址,导致导入服务无法访问Kafka集群。
  • 导入部署在阿里云上的Kafka集群数据时,没有设置VPC实例ID。
  • 确保设置了正确的Kafka服务地址。
  • 在白名单中添加IP地址,允许导入服务连接Kafka集群。更多信息,请参见IP地址白名单。
  • 采用阿里云内网导入部署在阿里云上的Kafka集群数据时,确保设置了VPC实例ID。
预览时出现超时错误(preview request timed out)。 待导入的Kafka Topic中没有数据。 如果待导入的Kafka Topic中没有数据,请在写入数据后,再重试预览。
数据存在乱码。 编码格式配置不符合预期。 根据Kafka真实的编码格式更新导入配置。

如果需要修复已有的乱码数据,请创建新的Logstore和导入配置。

日志服务中显示的数据时间和数据本身的时间不一致。 设置导入配置时,没有指定日志时间字段或者设置时间格式、时区有误。 设置指定的日志时间字段以及正确的时间格式和时区。更多信息,请参见创建数据导入配置。
导入数据后,无法查询和分析数据。
  • 数据不在查询范围内。
  • 未配置索引。
  • 索引未生效。
  • 检查待查询数据的时间是否在查询时间范围内。

    如果不在查询范围内,请调整查询范围并重新查询。

  • 检查是否已为Logstore设置索引。

    如果未设置,请先设置索引。具操操作,请参见开启并配置索引、重建索引 。

  • 如果已设置索引,且数据处理流量观测仪表盘中显示的成功导入数据量符合预期,则可能原因是索引未生效,请尝试重建索引。具体操作,请参见重建索引。
导入的数据条目数量少于预期。 存在大于3 MB的Kafka数据,您可以通过数据处理流量观测仪表盘确认。 缩小单条Kafka消息的大小。
数据导入时存在明显的延迟
  • Kafka集群带宽达到限制。
  • 通过公网导入数据时,网络不稳定。
  • Kafka Topic的Partition数量过少。
  • Logstore Shard数量过少。
  • 更多原因,请参见性能限制。
  • 检查Kafka集群流量是否达到带宽限制(特别是部署在阿里云上的Kafka集群),如果达到或接近带宽限制,则需要进行带宽扩容。
  • Kafka Topic的Partition数量较少时,请尝试增加Partition数量,并观察延迟情况。
  • Logstore Shard数量较少时,请尝试增加Shard的个数,并观察延迟情况。具体操作,请参见管理Shard。

错误处理机制

限制项 说明
网络连接错误 导入任务会定期重试,即网络连接恢复后,导入任务会自动从之前中断的Offset位置继续消费数据。
Kafka Topic不存在 当目标Kafka Topic不存在时,导入任务会跳过该Topic,且不影响其他正常Topic的数据导入。

当不存在的Topic被重建后,导入任务会正常消费该Topic中的数据(存在10分钟左右的延迟)。

SLS Logstore不存在 导入任务会定期重试,即重建Logstore后,导入任务会自动恢复导入,并从上一次成功处理消息的Offset位置继续拉取消息。如果之前没有成功处理过消息,则按照导入配置中的起始位置的设置来决定要使用的Offset。

内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家

阿里云企业补贴进行中: 马上申请

腾讯云限时活动1折起,即将结束: 马上收藏

同尘科技为腾讯云授权服务中心。

购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠

转转请注明出处:https://www.yunxiaoer.com/163228.html

(0)
上一篇 2023年12月10日
下一篇 2023年12月10日
详情页2

相关推荐

  • 阿里云大数据开发治理平台 DataWorks概述-云淘科技

    DataWorks为您提供了多个实验教程,帮助您从前期的环境准备、数据采集,到后期的数据开发、结果展示,端到端了解DataWorks使用的全流程,更加熟悉产品的核心功能。 DataWorks目前提供的产品教程如下: 简单开发应用教程 简单用户画像分析(MaxCompute版) 简单用户画像分析(EMR版) 对接使用CDH与CDP 内容没看懂? 不太想学习?想…

  • 阿里云对象存储OSSJava请求者付费模式-云淘科技

    请求者付费模式是指由请求者支付读取存储空间(Bucket)内数据时产生的流量费用和请求费用,而Bucket拥有者仅支付存储费用。当您希望共享数据,但又不希望产生流量费用和请求费用时,您可以开启此功能。 注意事项 本文以华东1(杭州)外网Endpoint为例。如果您希望通过与OSS同地域的其他阿里云产品访问OSS,请使用内网Endpoint。关于OSS支持的R…

    阿里云对象存储 2023年12月10日
  • 信息流广告,信息流部分建议宽度830px,只针对默认列表样式,顺序随机
  • 阿里云日志服务SLS事件操作函数-云淘科技

    本文介绍事件操作函数的语法规则,包括参数解释、函数示例等。 函数列表 类型 函数 说明 事件操作 e_drop 根据条件判断是否丢弃日志。 支持和其他函数组合使用。相关示例,请参见复制和分发数据。 e_keep 根据条件判断是否保留日志。e_keep函数和e_drop函数都会丢弃日志。e_keep函数在不满足条件时丢弃,而e_drop函数则是在满足条件时丢弃…

    2023年12月10日
  • 阿里云负载均衡CLB计费FAQ-云淘科技

    文本介绍传统型负载均衡CLB(Classic Load Balancer)计费相关的常见问题。 CLB实例如何计费? CLB是否对入流量计费? 健康检查产生的流量是否会被计费? ECS加入CLB的后端服务器资源池是否影响CLB计费? 攻击流量是否会被计费? CLB实例的所有后端ECS都停止,或者没有挂载ECS时,是否会收取公网网络费用? CLB实例停机后是否…

    2023年12月10日
  • 大数据计算MaxCompute整库可以是 只读库吗?-云小二-阿里云

    大数据计算MaxCompute整库可以是 只读库吗? 我们的数据库是 drds(rds) 得行不 监控数据源这的binlog 这个是啥弄的 比如datahub kafka ? 以下为热心网友提供的参考意见 监控binlog,只读。 ,此回答整理自钉群“MaxCompute开发者社区2群” 以下为热心网友提供的参考意见 大数据计算服务MaxCompute是一种…

    阿里云 2024年1月9日

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信
本站为广大会员提供阿里云、腾讯云、华为云、百度云等一线大厂的购买,续费优惠,保证底价,买贵退差。