Using AWS Lambda with Elasticsearch Service

posted in: aws, lambda, serverless | 0

Here I demostrate how to setup a serverless alternative for the ELK stack (Elasticsearch, Logstash and Kibana) using AWS Elasticsearch service (ES) and Lambda using python. The classic ELK stack is great but there is an overhead of managing and maintaining infrastructure. The solution to use AWS Lambda and ES instead makes the entire stack serverless and eliminates the overhead.

Elasticsearch Service

First, lets create an AWS ES domain. This can be done either from the AWS Console or AWS command line tools. The process is fairly intuitive, sign in to the AWS ES console, create a new ES domain and choose the options for instance type, instance count, storage options, access policy. The steps are well documented here.

Now creating a document in Elasticsearch is quite straight forward, a simple POST request can be used to index a document. But, to access ES domain or Kibana, we need to provide proper access policies. There are 2 ways to create a policy that grants/denies access to ES endpoints.

  • Resource based policies
  • Identity based policies

I will get to each of these in the next section, where we need to provide the lambda function access to ES domain.

AWS Lambda function

This Lambda function will replace Logstash. The lambda function will run in response to events generated by various triggers. Lambda functions can be triggered by Kinesis, Dynamo db streams, S3, SNS, Cloudwatch logs etc. These triggers are analogous to the logstash input plugins.

Now, before we get into creating the lambda function, we should make sure we provide the lambda function access to ES domain. But, Lambda doesn’t use a fixed set of IP addresses to create an access policy for ES domain for particular set of IPs. In fairness, Amazon does provide a list of IP addresses it uses but it is expected to change several times a week. And, ES domain doesn’t providing access via VPC yet so we cannot use a policy to provide Lambda access to ES domain using VPC. So, we can instead use a resource based policy for ES domain without IP address white listing by issuing a Signature Version 4 signed request to ES domain instead. I use elasticsearch and requests_aws4auth modules in this lambda function to make signed requests to the ES domain.

import boto3
from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth

Now, lets get the ES domain endpoint.

host = 'my-es-endpoint-goes-here.amazonaws.com'   #ES domain endpoint
 
s3 = boto3.client('s3')
sns = boto3.client('sns')
 
awsauth = AWS4Auth('KEY','SECRET-KEY','region','es')
es = Elasticsearch(hosts=[{'host': host, 'port': 443}], http_auth=awsauth, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection)

Here’s the function that will create a new document.

def post_to_es(doc):
    _index = "lambda-v1"    # Append date here to get a daily index
    _type = "mydocuments"       
    es.index(index=_index, doc_type=_type, body=doc)
    #print("success!")

And, to create the document in ES all you have to do is,

doc = {"foo": "2016-11-21T08:17:25Z" ,"bar": "test!"}   #sample document
post_to_es(doc)   
# If the post is successful, you will see a response like this,
#{u'_type': u'mydocument', u'created': True, u'_shards': {u'successful': 1, u'failed': 0, u'total': 2}, u'_version': 1, u'_index': u'lambda-v1', u'_id': u'AViJiKxABCDpp4IIN2XDC'}

Now, this is how it will be in the lambda handler, I will consider a S3 trigger, that retrieves the metadata of an object that has been updated. Any other trigger for lambda can be used, Amazon documentation provides the boilerplate snippets for all other triggers.

def lambda_handler(event, context):
    #print("Received event: " + json.dumps(event, indent=2))
 
    # Get the object from the event and show its content type
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
    try:
        response = s3.get_object(Bucket=bucket, Key=key)
        print("CONTENT TYPE: " + response['ContentType'])
 
        #### Here you do your thing ####
        doc = {"foo": "2016-11-21T08:17:25Z" ,"bar": "test!"}   #sample document
 
        #create document in ES
        post_to_es(doc)                                            
 
        return response['ContentType']
    except Exception as e:
        print(e)        
        raise e

We can print out debug messages and check cloud watch logs of the lambda function.

Deploy Lambda Function

I prefer deploying lambda code using AWS S3 buckets. I generally use a bash script to quickly create/update and deploy the lambda function.

#!/bin/bash
rm my-lambda-function.zip
rm -r venv
 
#create virtual environment
virtualenv -p /usr/bin/python2.7 venv
source venv/bin/activate
 
#python modules
pip install elasticsearch
pip install requests_aws4auth
 
zip -9 my-lambda-function.zip handler.py
cd venv/lib/python2.7/site-packages/
zip -r9 ../../../../my-lambda-function.zip *
cd ../../../../
 
#copy the zip file to s3
aws s3 cp my-lambda-function.zip s3://my-bucket/key1/my-lambda-function.zip
 
#aws lambda create-function \
      --region us-east-1 \
      --runtime python2.7 \
      --timeout 30 \
      --description my-lambda-function \
      --role arn:aws:iam::111111111111:role/lambda-s3-role \
      --handler handler.lambda_handler \
      --function-name my-lambda-function \
      --code S3Bucket=my-bucket,S3Key=key1/my-lambda-function.zip
 
#aws lambda update-function-code \
      --region us-east-1 \
      --function-name my-lambda-function \ 
      --s3-bucket my-bucket \
      --s3-key key1/my-lambda-function.zip

Kibana

We need to make sure to have access to Kibana, you will have to provide this in the access policy of the ES domain. For instance, this policy will provide access only to IP address A.B.C.D

   {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": "es:*",
      "Resource": "arn:aws:es::11111111111:domain/my-es/*",
      "Condition": {
        "IpAddress": {
          "aws:SourceIp": [
            "A.B.C.D"
          ]
        }
      }
    }

In Kibana, choose the index pattern and the data should be available in realtime.

Leave a Reply