本文介绍如何使用Confluent-Kafka-Python实现Kafka消费。
环境配置
安装confluent-kafka Python客户端。
pip install confluent-kafka
参数说明
参数 |
描述 |
示例 |
bootstrap.servers |
初始连接的集群地址,格式为
|
etl-dev.cn-hangzhou.log.aliyuncs.com:10012 其中,etl-dev为Project名称。 |
security.protocol |
为了保证数据传输的安全性,请使用sasl_ssl。 |
sasl_ssl |
sasl.mechanism |
必须使用PLAIN。 |
PLAIN |
sasl.username |
日志服务Project名称。 |
etl-dev |
sasl.password |
阿里云账号AccessKey。格式为 |
无 |
topic |
日志服务Logstore名称。 |
test |
group.id |
消费组ID,用于指定消费者组的标识符。 通过配置消费组ID,将消费组内的消费者分组,可以实现消费者组内的负载均衡,实现数据的处理和分发。 |
kafka-test |
enable.auto.commit |
是否自动提交消费点位,建议设置为true。 |
true |
auto.commit.interval.ms |
自动提交消费点位的间隔时间,建议设置为30000,单位为ms。 |
30000 |
max.poll.interval.ms |
消费组在消费者发起加入组请求后,等待所有消费者加入的时间间隔。建议设置为130000,保证所有消费者都能加入消费组。单位为ms。 在这个时间间隔内加入组的消费者为消费组的成员,进行分区分配,各个消费者按分配的分区开发消费数据,如果在这个时间内还有消费者没有加入消费组,则会触发消费组再平衡操作,再平衡期间不会消费数据,会导致消费延迟。 重要 使用confluent库时需要保证max.poll.interval.ms值大于session.timeout.ms,否则无法正常消费。 |
130000 |
auto.offset.reset |
消费起始点位,常用的值为latest和earliest。
|
latest |
heartbeat.interval.ms |
指定客户端和服务端之间心跳检测间隔时间。建议设置为5000,单位为ms。 设置的值越小,客户端和服务端之间的心跳检测越频繁,会导致越多的网络流量。 |
5000 |
session.timeout.ms |
心跳最大超时时间。建议设置为120000,单位为ms。 在该时间内如果消费者没有发送心跳请求,则视为该消费者发生异常,触发消费组再平衡操作。 |
120000 |
代码示例
import sys
import os
from confluent_kafka import Consumer, KafkaError, KafkaException
endpoint = "cn-huhehaote.log.aliyuncs.com"
"""
阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
此处以把AccessKey和AccessKeySecret保存在环境变量为例说明。您可以根据业务需要,保存到配置文件里。
强烈建议不要把AccessKey和AccessKeySecret保存到代码里,会存在密钥泄漏风险。
"""
accessKeyId = os.getenv("SLS_ACCESS_KEY_ID")
accessKeySecret = os.getenv("SLS_ACCESS_KEY_SECRET")
project = "etl-dev"
logstore = "test"
port = "10012"
groupId = "kafka-test"
kafkaEndpoint = "{}.{}:{}".format(project, endpoint, port)
groupId = "kafka-test2112"
c = Consumer({
"bootstrap.servers": kafkaEndpoint,
"sasl.mechanism": "PLAIN",
"security.protocol": "sasl_ssl",
"sasl.username": project,
"sasl.password": "%s#%s" % (accessKeyId, accessKeySecret),
"group.id": groupId,
"enable.auto.commit": "true",
"auto.commit.interval.ms": 30000,
"session.timeout.ms": 120000,
"auto.offset.reset": "earliest",
"max.poll.interval.ms": 130000,
"heartbeat.interval.ms": 5000,
})
c.subscribe([logstore])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家
阿里云企业补贴进行中: 马上申请
腾讯云限时活动1折起,即将结束: 马上收藏
同尘科技为腾讯云授权服务中心。
购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠
转转请注明出处:https://www.yunxiaoer.com/163055.html