Near Realtime AWS CloudWatch Log Stream for Java

Most of the small to medium scale applications in the cloud still using file base logs. And most of these logs get destroyed when the instance get terminated.
I came across a situation where I needed to log the applications access and error logs to AWS CloudWatch service in near real time rate.
To begin with AWS documentation didn't offer me with simple examples (https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html), and it was my first time using AWS SDK for Java. Took me awhile to read through the documentation and design the solution.
Sample Java Code https://pastebin.com/JSiRi2C2
package au.com.itelasoft.iot;import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.logs.AWSLogsClient;
import com.amazonaws.services.logs.AWSLogsClientBuilder;
import com.amazonaws.services.logs.model.DescribeLogStreamsRequest;
import com.amazonaws.services.logs.model.DescribeLogStreamsResult;
import com.amazonaws.services.logs.model.InputLogEvent;
import com.amazonaws.services.logs.model.PutLogEventsRequest;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;public class AwsCloudWatch implements CloudLogger {
private static volatile List<InputLogEvent> logEvents = new ArrayList();
private static AwsCloudWatch instance;
private static volatile Boolean inflight = false;
private Thread awsWriteThread;
private final BasicAWSCredentials awsCreds;
private final AWSLogsClient cwClient;
private final String logStreamName = “access-log”;
private final String logStreamGroup = “/test”;public static AwsCloudWatch getInstance() {
synchronized (AwsCloudWatch.class) {
if (!(AwsCloudWatch.instance instanceof AwsCloudWatch)) {
AwsCloudWatch.instance = new AwsCloudWatch();
}
}
return AwsCloudWatch.instance;
}
private AwsCloudWatch() {
this.awsCreds = new BasicAWSCredentials(“xxxxxx”, “xxxxxxxx”);
this.cwClient = (AWSLogsClient) AWSLogsClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(this.awsCreds))
.withRegion(“ap-southeast-1”).build();
this.initThread();
}
private void initThread(){
this.awsWriteThread = new Thread(new Runnable() {
@Override
public synchronized void run() {
try {
while(true){
writeToCloud();
}
} catch (Exception e) {
System.err.println(“Error logging “+e.toString());
}
}});
this.awsWriteThread.setName(“AWS Log wirter deamon”);
this.awsWriteThread.start();
}@Override
public void write(String message) {
InputLogEvent logEvent = new InputLogEvent().withMessage(message).withTimestamp(System.currentTimeMillis());
AwsCloudWatch.logEvents.add(logEvent);
if(this.awsWriteThread.isAlive()){
synchronized(this.awsWriteThread){
this.awsWriteThread.notify();
}
}else{
this.awsWriteThread.start();
}
}
private synchronized void writeToCloud(){
if(!AwsCloudWatch.logEvents.isEmpty() && AwsCloudWatch.inflight == false){
AwsCloudWatch.inflight = true;
PutLogEventsRequest putReq = new PutLogEventsRequest(this.logStreamGroup, this.logStreamName, logEvents)
.withSequenceToken(this.getUploadSequenceToken());
AwsCloudWatch.logEvents.removeAll(putReq.getLogEvents());
this.cwClient.putLogEvents(putReq);
AwsCloudWatch.inflight = false;
}else{
try {
synchronized(this.awsWriteThread){
this.awsWriteThread.wait();
}
} catch (InterruptedException ex) {
Logger.getLogger(AwsCloudWatch.class.getName()).log(Level.SEVERE, null, ex);
}
}
}private String getUploadSequenceToken() {
DescribeLogStreamsRequest req = new DescribeLogStreamsRequest().withLogGroupName(logStreamGroup).withLog StreamNamePrefix(logStreamName);
DescribeLogStreamsResult res = this.cwClient.describeLogStreams(req);
return res.getLogStreams().get(0).getUploadSequenceToken();
}
}
I would like to highlight the thinking behind my code decisions.
AwsCloudWatch class is using the Singleton design pattern that will ensure only one object exist in the instance.
private static volatile List<InputLogEvent> logEvents = new ArrayList();
private static AwsCloudWatch instance;
private static volatile Boolean inflight = false;
private Thread awsWriteThread;
logEvents ArrayList var will be act as a small queue with in the class and inflight boolean will hold the state of inflight API call status. Im using a separate thread to send the log data to the AWS asynchronously, so API communication process will minimize the blocking of main application execution.
private AwsCloudWatch() {
this.awsCreds = new BasicAWSCredentials(“xxxxxx”, “xxxxxxxxx”);
this.cwClient = (AWSLogsClient) AWSLogsClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(this.awsCreds))
.withRegion(“ap-southeast-1”).build();
this.initThread();
}
private void initThread(){
this.awsWriteThread = new Thread(new Runnable() {
@Override
public synchronized void run() {
try {
while(true){
writeToCloud();
}
} catch (Exception e) {
System.err.println(“Error logging “+e.toString());
}
}});
this.awsWriteThread.setName(“AWS Log writer thread”);
this.awsWriteThread.start();
}
Private constructor will Authenticate with AWS (for demonstration purpose I will be using the BasicAWSCredentials, its an anti pattern to keep the keys in the code itself refer there for other options https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/Install-CloudWatch-Agent.html ). Within the constructor, thread that write to AWS will get initiate. It will execute writeToCould method infinitely. This will lead to a high CPU usage. To overcome this we need to put the thread to waiting state when theres noting to log to AWS log stream.
public void write(String message) {
InputLogEvent logEvent = new InputLogEvent()
.withMessage(message)
.withTimestamp(System.currentTimeMillis()); AwsCloudWatch.logEvents.add(logEvent);
if(this.awsWriteThread.isAlive()){
synchronized(this.awsWriteThread){
this.awsWriteThread.notify();
}
}else{
this.awsWriteThread.start();
}
}
When the application write to log, it will fist write to logEvents Array and then start or notify the thread accordingly. This is to get near real time logging.
private synchronized void writeToCloud(){
if(!AwsCloudWatch.logEvents.isEmpty() && AwsCloudWatch.inflight == false){
AwsCloudWatch.inflight = true;
PutLogEventsRequest putReq = new PutLogEventsRequest(this.logStreamGroup, this.logStreamName, logEvents)
.withSequenceToken(this.getUploadSequenceToken());
AwsCloudWatch.logEvents.removeAll(putReq.getLogEvents());
this.cwClient.putLogEvents(putReq);
AwsCloudWatch.inflight = false;
} else{
if(AwsCloudWatch.inflight == false){
try {
synchronized(this.awsWriteThread){
this.awsWriteThread.wait();
}
} catch (InterruptedException ex) {
Logger.getLogger(AwsCloudWatch.class.getName()).log(Level.SEVERE, null, ex);
}}
}
}
writeToCloud will get invoke by the thread and will check if the logEvent Array(Queue) is empty and check if there are any inflight requests. If both are false, new PutLogEvent will be invoked and after it log event sent as the payload will get deleted from the logEvent Array. If the log is empty and no inflight requests, it will put the thread to the waiting state.
According to the API Documentation for PutLogEvents (https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html). For the success response we are getting the nextSequenceToken but we are taking the token each time using the getUploadSequenceToken() method, this is to allow different instances of the application to access the same log steam.
Feel free to add improvements on your own, this is just a small guide to get you started on AWS Log Streams.
Refer here for the full java class https://pastebin.com/JSiRi2C2