您可以使用Kafka Producer SDK、Beats系列软件、Collectd、Fluentd、Logstash、Telegraf、Vector等采集工具采集日志,并通过Kafka协议上传到日志服务。本文介绍通过采集工具采集到日志后,利用Kafka协议上传日志到日志服务的操作步骤。
相关限制
-
支持的Kafka协议版本为:0.8.0到2.1.1V2。
-
为保证日志传输安全性,必须使用SASL_SSL连接协议。
数据解析说明
使用Kafka协议上传的日志,其日志内容存储在content字段中。如果日志为JSON类型,您可以为content字段设置JSON类型的索引。更多信息,请参见JSON类型。
现在,通过Kafka生产者(produce)或Beats系列软件上传日志时,您可以在采集配置中设置topic或headers,实现JSON格式的日志自动展示。即日志服务会自动展开content字段,您无需为content字段配置JSON类型的索引。具体说明,请参见配置方式。
配置方式
使用Kafka协议上传日志时,您需要配置以下参数。
说明
各个工具的配置参数名称略有不同,请根据实际参数配置。
参数 |
说明 |
连接类型 |
为保证日志传输安全性,连接协议必须为SASL_SSL。 |
hosts |
初始连接的集群地址,格式为
|
topic |
配置为日志服务Logstore名称。 使用Kafka生产者(produce)或Beats系列软件上传日志且指定输出格式为JSON时,您可以将topic的值设置为 |
headers |
使用Kafka生产者(produce)或Beats系列软件上传日志且指定输出格式为JSON时,您可以将headers配置为如下内容后,实现JSON日志自动展开。
更多信息,请参见示例一:通过Beats系列软件上传日志。 |
username |
配置为日志服务Project名称。 |
password |
配置为阿里云AK,格式为${access-key-id}#${access-key-secret}。请根据实际情况,将${access-key-id}替换为您的AccessKey ID,将${access-key-secret}替换为您的AccessKey Secret。建议使用RAM用户的AK。更多信息,请参见授权。 |
证书文件 |
日志服务的域名均具备可信任证书,您只需使用服务器自带的根证书即可,例如:/etc/ssl/certs/ca-bundle.crt。 |
说明
如果您要通过Kafka消费组实时消费日志服务的数据,请提交工单咨询阿里云技术支持工程师。
示例一:通过Beats系列软件上传日志
Beats系列软件(MetricBeat、PacketBeat、Winlogbeat、Auditbeat、Filebeat、Heartbeat等)采集到日志后,支持通过Kafka协议将日志上传到日志服务。更多信息,请参见Beats-Kafka-Output。
-
示例1
-
配置示例
output.kafka: # initial brokers for reading cluster metadata hosts: ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] username: "yourusername" password: "yourpassword" ssl.certificate_authorities: # message topic selection + partitioning topic: 'test-logstore-1' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
-
日志样例
Beats系列软件默认输出的日志为JSON类型,您可以为content字段创建JSON类型的索引。更多信息,请参见JSON类型。
-
-
示例2
-
配置示例
output.kafka: enabled: true hosts: ["cn-hangzhou-intranet.log.aliyuncs.com:10011"] username: "test-project-1" password: "access-key-id#access-key-secret" ssl.certificate_authorities: topic: 'test-logstore-1' headers: - key: "data-parse-format" value: "json" partition.hash: reachable_only: false
-
日志样例
通过headers配置,实现JSON日志自动展开。
-
示例二:通过Collectd上传日志
Collectd是一个守护(daemon)进程,用于定期采集系统和应用程序的性能指标,并支持通过Kafka协议上传到日志服务。更多信息,请参见Write Kafka Plugin。
将Collectd采集到日志上传到日志服务时,还需安装Kafka插件以及相关依赖。例如:在linux Centos中,可以使用yum安装Kafka插件,命令为sudo yum install collectd-write_kafka
,安装RPM请参见Collectd-write_kafka。
-
配置示例
示例中将日志输出格式(Format)设置为JSON,除此之外,还支持Command、Graphite类型。更多信息,请参见Collectd配置文档。
Property "metadata.broker.list" "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" Property "security.protocol" "sasl_ssl" Property "sasl.mechanism" "PLAIN" Property "sasl.username" "yourusername" Property "sasl.password" "yourpassword" Property "broker.address.family" "v4" Format JSON Key "content"
-
日志样例
使用JSON模式输出日志后,您可以给content字段创建JSON类型的索引。更多信息,请参见JSON类型。
示例三:使用Telegraf上传日志
Telegraf是由Go语言编写的代理程序,内存占用小,用于收集、处理、汇总数据指标。Telegraf具有丰富的插件及具备集成功能,可从其运行的系统中获取各种指标、从第三方API中获取指标以及通过statsd和Kafka消费者服务监听指标。
将Telegraf采集到的日志通过kafka协议上传到日志服务前,您需要先修改配置文件。
-
配置示例
示例中将日志输出格式(Format)设置为JSON,除此之外还支持Graphite、Carbon2等类型。更多信息,请参见Telegraf输出格式。
说明
Telegraf必须配置一个合法的tls_ca路径,使用服务器自带的根证书的路径即可。Linux环境中,根证书CA路径一般为/etc/ssl/certs/ca-bundle.crt。
[[outputs.kafka]] ## URLs of kafka brokers brokers = ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] ## Kafka topic for producer messages topic = "test-logstore-1" routing_key = "content" ## CompressionCodec represents the various compression codecs recognized by ## Kafka in messages. ## 0 : No compression ## 1 : Gzip compression ## 2 : Snappy compression ## 3 : LZ4 compression compression_codec = 1 ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Optional SASL Config sasl_username = "yourusername" sasl_password = "yourpassword" ## Data format to output. ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "json"
-
日志样例
使用JSON模式输出日志后,您可以给content字段创建JSON类型的索引。更多信息,请参见JSON类型。
示例四:使用Fluentd上传日志
Fluentd是一个开源的日志收集器,是云端原生计算基金会(CNCF)的成员项目之一,遵循Apache 2 License协议。
Fluentd支持众多输入、处理、输出插件,支持通过Kafka插件将日志上传到日志服务,您只需安装并配置Kafka插件即可。更多信息,请参见fluent-plugin-kafka。
-
配置示例
示例中将日志输出格式(Format)设置为JSON,除此之外还支持数十种Format类型。更多信息,请参见Fluentd Formatter。
@type kafka # Brokers: you can choose either brokers or zookeeper. brokers test-project-1.cn-hangzhou.log.aliyuncs.com:10012 default_topic test-logstore-1 default_message_key content output_data_type json output_include_tag true output_include_time true sasl_over_ssl true username yourusername //用户名,请根据真实值进行替换。 password "yourpassword" //用户密码,请根据真实值进行替换。 ssl_ca_certs_from_system true # ruby-kafka producer options max_send_retries 10000 required_acks 1 compression_codec gzip
-
日志样例
使用JSON模式输出日志后,您可以给content字段创建JSON类型的索引。更多信息,请参见JSON类型。
示例五:使用Logstash上传日志
Logstash是一个具备实时处理能力、开源的日志采集引擎,可以动态采集不同来源的日志。
Logstash内置Kafka输出插件,您可以配置Logstash实现日志通过kafka协议上传到日志服务。由于日志服务使用SASL_SSL连接协议,因此还需要配置SSL证书和jaas文件。
-
配置示例
-
创建jaas文件,并保存到任意路径(例如/etc/kafka/kafka_client_jaas.conf)。
将如下内容添加到jaas文件中。
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="yourusername" password="yourpassword"; };
-
配置SSL信任证书,保存到任意路径(例如:/etc/kafka/client-root.truststore.jks)。
下载根证书,保存到任意路径(例如:/etc/kafka/root.pem),然后通过keytool命令生成.jks格式的文件(首次生成时,需要配置密码)。
keytool -keystore client-root.truststore.jks -alias root -import -file /etc/kafka/root.pem
-
配置Logstash。
示例中将日志输出格式(Format)设置为JSON,除此之外还支持数十种Format类型。更多信息,请参见Logstash Codec。
说明
本示例为连通性测试的配置,您的生产环境中建议删除stdout的输出配置。
input { stdin { } } output { stdout { codec => rubydebug } kafka { topic_id => "test-logstore-1" bootstrap_servers => "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" security_protocol => "SASL_SSL" ssl_truststore_location => "/etc/client-root.truststore.jks" ssl_truststore_password => "123456" jaas_path => "/etc/kafka_client_jaas.conf" sasl_mechanism => "PLAIN" codec => "json" client_id => "kafka-logstash" } }
-
-
日志样例
使用JSON模式输出日志后,您可以给content字段创建JSON类型的索引。更多信息,请参见JSON类型。
示例六:通过Kafka生产者(produce)上传日志
通过Kafka生产者(produce)上传日志到日志服务时,您可以在采集配置中设置topic或headers,实现JSON日志自动展开。
-
配置示例
public static void produce(){ //配置信息。 Properties props2 = new Properties(); String project = "etl-test-tlj"; String topic = "test3.json"; props2.put("bootstrap.servers", "kafka.log.aliyuncs.com:9093"); props2.put("security.protocol", "sasl_ssl"); props2.put("sasl.mechanism", "PLAIN"); props2.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+ project +"\" password=\"access-key-id#access-key-secret\";"); //设置数据key和value的序列化处理类。 props2.put("key.serializer", StringSerializer.class); props2.put("value.serializer", StringSerializer.class); //创建生产者实例。 KafkaProducer producer = new KafkaProducer(props2); //发送记录。 for(int i=0;i<1;i++){ ProducerRecord record = new ProducerRecord(topic, "{\"logName\": \"error4\"}"); record.headers().add(new RecordHeader("data-parse-format","json".getBytes())); producer.send(record); } producer.close(); }
-
日志样例
示例七:通过Fluent-bit上传日志
Fluent-bit是一个轻量级、高可扩展的日志与指标的处理器、转发器,支持众多输入、处理和输出插件,支持通过Kafka插件将日志上传到日志服务。更多信息,请参见Kafka output plugin。
-
配置示例
相关的配置信息,请参见配置方式。
[Output] Name kafka Match * Brokers etl-shanghai.cn-shanghai.log.aliyuncs.com:10012 Topics etl-out Format json rdkafka.sasl.username yourusername rdkafka.sasl.password yourpassword rdkafka.security.protocol SASL_SSL rdkafka.sasl.mechanism PLAIN
-
日志样例
示例八 :Vector配置Kafka协议上传
Vector是一款轻量级、高性能的日志处理软件,它支持Kafka协议的方式上报日志。下面是Vector通过Kafka兼容模式写入SLS的配置方法。
-
配置示例
[sinks.aliyun_sls] type = "kafka" inputs = ["file_logs"] # 定义source,这里是监控一个日志文件 bootstrap_servers = "etl-dev.cn-huhehaote.log.aliyuncs.com:10012" compression = "gzip" healthcheck = true topic = "dst-kafka.json" # dst-kafka是logstore名字,加.json后缀是表示Kafka兼容写入的时候尝试json展开 encoding.codec = "json" sasl.enabled = true sasl.mechanism = "PLAIN" sasl.username = "etl-dev" # etl-dev是sls project名称 sasl.password = "{{RAM用户的AKSK,格式{AK#SK}}}" tls.enabled = true
-
日志样例
错误信息
使用Kafka协议上传日志失败时,会按照Kafka的错误信息返回对应的错误信息,如下表所示,Kafka协议错误信息详情,请参见error list。
错误信息 |
说明 |
推荐解决方式 |
NetworkException |
出现网络错误时返回该错误信息。 |
一般等待1秒后重试即可。 |
TopicAuthorizationException |
鉴权失败时返回该错误信息。 |
一般是您提供的AccessKey错误或没有写入对应Project、Logstore的权限。请填写正确的且具备写入权限的AccessKey。 |
UnknownTopicOrPartitionException |
出现该错误可能有两种原因:
|
请确保已创建对应的Project和Logstore。如果已创建还是提示该错误,请检查Project所在地域是否和填入的Endpoint一致。 |
KafkaStorageException |
服务端出现异常时返回该错误信息。 |
一般等待1秒后重试即可。 |
内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家
阿里云企业补贴进行中: 马上申请
腾讯云限时活动1折起,即将结束: 马上收藏
同尘科技为腾讯云授权服务中心。
购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠
转转请注明出处:https://www.yunxiaoer.com/161266.html