本文介绍日志服务Aliyun Log Java Producer类库的使用方法。
前提条件
-
已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权。
-
已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见配置环境变量。
重要
-
阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。
-
强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
-
-
已安装Java SDK。具体操作,请参见安装Java SDK。
背景信息
Aliyun Log Java Producer是为运行在大数据、高并发场景下的Java应用量身打造的高性能类库。相对于原始的API或SDK,使用该类库写日志数据能为您带来诸多优势,包括高性能、计算与I/O逻辑分离、资源可控制等。
特点
-
线程安全:Producer接口暴露的所有方法都是线程安全的。
-
异步发送:调用Producer的发送接口通常能够立即返回响应。Producer内部会缓存并合并待发送数据,然后批量发送以提高吞吐量。
-
自动重试:Producer会根据配置的最大重试次数和重试退避时间进行重试。
-
行为追溯:通过Callback或Future能获取当前数据是否发送成功的信息,也可以获得该数据每次被尝试发送的信息,有利于问题追溯和行为决策。
-
上下文还原:同一个Producer实例产生的日志在同一上下文中,在服务端可以查看某条日志前后相关的日志。
-
优雅关闭:保证close方法退出时,Producer缓存的所有数据都能被处理,同时您也能得到相应的通知。
优势
-
高性能
在海量数据、资源有限的前提下,写入端要达到目标吞吐量需要实现复杂的控制逻辑,包括多线程、缓存策略、批量发送等,另外还要充分考虑失败重试的场景。Producer实现了上述功能,在为您带来性能优势的同时简化了程序开发步骤。
-
异步非阻塞
在可用内存充足的前提下,Producer会对发往日志库的数据进行缓存,因此调用send方法时能够立即返回响应且不会阻塞,可达到计算与I/O逻辑分离的目的。之后,您可以通过返回的Future对象或传入的Callback获得数据发送的结果。
-
资源可控制
可以通过参数控制Producer用于缓存待发送数据的内存大小,同时还可以配置用于执行数据发送任务的线程数量。这样可避免Producer无限制地消耗资源,且可以让您根据实际情况平衡资源消耗和写入吞吐量。
安装Aliyun Log Java Producer
在Maven工程中使用日志服务Aliyun Log Java Producer,只需在pom.xml中加入相应依赖。Maven项目管理工具会自动下载相关JAR包。例如,在中加入如下内容:
com.aliyun.openservices
aliyun-log-producer
0.3.10
jar-with-dependency版本,可以解决Producer依赖的版本冲突问题。在中加入如下内容:
com.aliyun.openservices
aliyun-log
0.6.35
jar-with-dependencies
Java代码示例
安装完成后,您就可以使用Producer类库编写Java代码。
说明
-
aliyun-log-producer底层调用PutLogs接口上传日志,每次可以写入的原始日志大小存在限制。更多信息,请参见数据读写和PutLogs。
-
日志服务的基础资源,包括创建Project个数、Logstore个数、Shard个数、LogtaiConfig个数、机器组个数、单个LogItem大小、LogItem(Key)长度和LogItem(Value)长度等均存在限制。更多信息,请参见基础资源。
-
代码首次运行后,请在日志服务控制台开启日志库索引,等待一分钟后,进行查询。
-
在控制台进行日志查询时,当单个字段值长度超过最大长度时,超出部分被截断,不参与分析。更多信息,请参考创建索引。
-
Callback
本示例中,创建一个SampleProducerWithCallback.java文件,将生成的日志数据上传至日志服务。
-
示例代码
import com.aliyun.openservices.aliyun.log.producer.Callback; import com.aliyun.openservices.aliyun.log.producer.LogProducer; import com.aliyun.openservices.aliyun.log.producer.Producer; import com.aliyun.openservices.aliyun.log.producer.ProducerConfig; import com.aliyun.openservices.aliyun.log.producer.ProjectConfig; import com.aliyun.openservices.aliyun.log.producer.Result; import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException; import com.aliyun.openservices.log.common.LogItem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; public class SampleProducerWithCallback { private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class); private static final ExecutorService threadPool = Executors.newFixedThreadPool(10); public static void main(String[] args) throws InterruptedException { final String project = "example-project"; final String logstore = "example-logstore"; String endpoint = "example-endpoint"; // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。 String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); ProducerConfig producerConfig = new ProducerConfig(); final Producer producer = new LogProducer(producerConfig); producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret)); int nTask = 100; // The number of logs that have finished (either successfully send, or failed). final AtomicLong completed = new AtomicLong(0); final CountDownLatch latch = new CountDownLatch(nTask); for (int i = 0; i < nTask; ++i) { threadPool.submit( new Runnable() { @Override public void run() { //The maximum size of a LogItem (key) is 128 bytes. The maximum size of a LogItem (value) is 1 MB. LogItem logItem = new LogItem(); logItem.PushBack("key1", "foo"); logItem.PushBack("key2", "bar"); try { producer.send( project, logstore, "your-topic", "your-source", logItem, new SampleCallback(project, logstore, logItem, completed)); } catch (InterruptedException e) { LOGGER.warn("The current thread has been interrupted during send logs."); } catch (Exception e) { LOGGER.error("Failed to send log, logItem={}, e=", logItem, e); } finally { latch.countDown(); } } }); } // 只有进程退出的时候,才需要考虑如下的逻辑。 latch.await(); threadPool.shutdown(); try { producer.close(); } catch (InterruptedException e) { LOGGER.warn("The current thread has been interrupted from close."); } catch (ProducerException e) { LOGGER.info("Failed to close producer, e=", e); } LOGGER.info("All log complete, completed={}", completed.get()); } private static final class SampleCallback implements Callback { private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class); private final String project; private final String logStore; private final LogItem logItem; private final AtomicLong completed; SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) { this.project = project; this.logStore = logStore; this.logItem = logItem; this.completed = completed; } @Override public void onCompletion(Result result) { try { if (result.isSuccessful()) { LOGGER.info("Send log successfully."); } else { LOGGER.error( "Failed to send log, project={}, logStore={}, logItem={}, result={}", project, logStore, logItem.ToJsonString(), result); } } finally { completed.getAndIncrement(); } } } }
-
预期结果
{"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"} {"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"} {"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"} {"__source__":"your-source","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658715477","__time__":"1658715473","__topic__":"your-topic","key1":"foo","key2":"bar"} ......
更多信息,请参见Aliyun Log Java Producer。
-
-
Future
本示例中,创建一个SampleProducerWithFuture.java文件,将生成的日志数据上传至日志服务。
-
代码示例
import com.aliyun.openservices.aliyun.log.producer.LogProducer; import com.aliyun.openservices.aliyun.log.producer.Producer; import com.aliyun.openservices.aliyun.log.producer.ProducerConfig; import com.aliyun.openservices.aliyun.log.producer.ProjectConfig; import com.aliyun.openservices.aliyun.log.producer.Result; import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException; import com.aliyun.openservices.aliyun.log.producer.errors.ResultFailedException; import com.aliyun.openservices.log.common.LogItem; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class SampleProducerWithFuture { private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithFuture.class); private static final ExecutorService threadPool = Executors .newFixedThreadPool(Math.max(Runtime.getRuntime().availableProcessors(), 1)); public static void main(String[] args) throws InterruptedException { final String project = "example-project"; final String logstore = "example-logstore"; String endpoint = "example-endpoint"; // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。 String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); ProducerConfig producerConfig = new ProducerConfig(); final Producer producer = new LogProducer(producerConfig); producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret)); int n = 100; // The number of logs that have finished (either successfully send, or failed). final AtomicLong completed = new AtomicLong(0); for (int i = 0; i < n; ++i) { List logItems = new ArrayList(); for (int j = 0; j < 10; ++j) { //The maximum size of a LogItem (key) is 128 bytes. The maximum size of a LogItem (value) is 1 MB. LogItem logItem = new LogItem(); logItem.PushBack("key1", "foo" + j); logItem.PushBack("key2", "bar" + j); logItems.add(logItem); } try { ListenableFuture f = producer.send(project, logstore, logItems); Futures.addCallback( f, new SampleFutureCallback(project, logstore, logItems, completed), threadPool); } catch (InterruptedException e) { LOGGER.warn("The current thread has been interrupted during send logs."); } catch (Exception e) { LOGGER.error("Failed to send logs, e=", e); } } try { producer.close(); } catch (InterruptedException e) { LOGGER.warn("The current thread has been interrupted from close."); } catch (ProducerException e) { LOGGER.info("Failed to close producer, e=", e); } threadPool.shutdown(); while (!threadPool.isTerminated()) { threadPool.awaitTermination(100, TimeUnit.MILLISECONDS); } LOGGER.info("All log complete, completed={}", completed.get()); } private static final class SampleFutureCallback implements FutureCallback { private static final Logger LOGGER = LoggerFactory.getLogger(SampleFutureCallback.class); private final String project; private final String logStore; private final List logItems; private final AtomicLong completed; SampleFutureCallback( String project, String logStore, List logItems, AtomicLong completed) { this.project = project; this.logStore = logStore; this.logItems = logItems; this.completed = completed; } @Override public void onSuccess(@Nullable Result result) { LOGGER.info("Send logs successfully."); completed.getAndIncrement(); } @Override public void onFailure(Throwable t) { if (t instanceof ResultFailedException) { Result result = ((ResultFailedException) t).getResult(); LOGGER.error( "Failed to send logs, project={}, logStore={}, result={}", project, logStore, result); } else { LOGGER.error("Failed to send log, e=", t); } completed.getAndIncrement(); } } }
-
预期结果
{"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo0","key2":"bar0"} {"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo1","key2":"bar1"} {"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo2","key2":"bar2"} {"__source__":"172.16.4.254","__tag__:__client_ip__":"47.100.XX.XX","__tag__:__receive_time__":"1658716454","__time__":"1658716450","__topic__":"","key1":"foo3","key2":"bar3"} ......
-
更多信息,请参见Aliyun Log Java Producer。
此外,日志服务提供基于Aliyun Log Java Producer的样例应用程序,便于您快速上手。更多信息,请参见Aliyun Log Producer Sample Application。
常见问题
写入数据次数是否存在限制?
-
日志服务读写数据的次数和大小均存在限制。更多信息,请参见数据读写。
-
日志服务的基础资源,包括创建Project个数、Logstore个数、Shard个数、LogtaiConfig个数、机器组个数、单个LogItem大小、LogItem(Key)长度和LogItem(Value)长度等均存在限制。更多信息,请参见基础资源。
为什么数据没有写入日志服务?
如果您发现数据没有写入日志服务,可通过如下步骤诊断问题。
-
检查您项目中引入的
aliyun-log-producer
、aliyun-log
、protobuf-java
Jar包的版本是否和文档中安装部分列出的Jar包版本一致,如果不一致请进行升级。 -
Producer接口的send方法异步发送数据,无法及时获取返回的值。请通过Callback接口或返回的Future对象获取数据发送失败的原因。
-
如果您发现并没有回调Callback接口的onCompletion方法,请检查在您的程序退出之前是否有调用
producer.close()
方法。因为数据发送是由后台线程异步完成的,为了防止缓存在内存里的少量数据丢失,请务必在程序退出之前调用producer.close()
方法。 -
Producer接口会把运行过程中的关键行为通过日志框架slf4j进行输出,您可以在程序中配置好相应的日志实现框架并打开DEBUG级别的日志。重点检查是否输出ERROR级别的日志。
-
如果通过上述步骤仍然没有解决,请提工单。
相关文档
- 在调用API接口过程中,若服务端返回结果中包含错误信息,则表示调用API接口失败。您可以参考API错误码对照表查找对应的解决方法。更多信息,请参见API错误处理对照表。
- 阿里云OpenAPI开发者门户提供调试、SDK、示例和配套文档。通过OpenAPI,您无需手动封装请求和签名操作,就可以快速对日志服务API进行调试。更多信息,请参见OpenAPI开发者门户。
- 为满足越来越多的自动化日志服务配置需求,日志服务提供命令行工具CLI(Command Line Interface)。更多信息,请参见日志服务命令行工具CLI。
- 更多示例代码,请参见Aliyun Log Java SDK on GitHub。
内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家
阿里云企业补贴进行中: 马上申请
腾讯云限时活动1折起,即将结束: 马上收藏
同尘科技为腾讯云授权服务中心。
购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠
转转请注明出处:https://www.yunxiaoer.com/160368.html