Intro to Node on AWS Lambda for S3 and Kinesis

AWS Lambda is an on-demand computation service that allows you to write code that responds to events, and can be executed at scale within the AWS ecosystem. It has some unique benefits that make working with it particularly desirable. It's cost-effective, scalable, and presents an alternative to spinning up heavy servers to do straightforward event-based work.

At Localytics, we process billions of data points in real-time. At the end of our processing pipeline we output our data to Kinesis streams and S3 buckets. This allows teams to process either live data via the stream or historical data via S3. The format of the data is identical. Lambda was an ideal fit for handling both data sources, as we could write the event handling logic as a single Lambda, and make our data-processing code source-agnostic.

Event sources

Lambda responds to events from a variety of sources. For our purposes we were focused on handling Kinesis stream events and S3 PUT events. See here if you'd like to learn more about the types of events that Lambda supports.

We were tasked with creating a new service that could process historical and live data. As we've made the format identical between S3 and Kinesis data sources, we were able to write a single lambda to handle both event sources. This reduced the surface area of our code that needed to be maintained and clarified the deploy process.

S3 PUT events

Our Lambda will receive an event when invoked from an S3 PUT notification. It looks like this:

{
  "Records":[
    {
      "eventVersion":"2.0",
      "eventSource":"aws:s3",
      "awsRegion":"us-east-1",
      "eventTime":"1970-01-01T00:00:00.000Z",
      "eventName":"ObjectCreated:Put",
      "userIdentity":{
        "principalId":"EXAMPLE"
      },
      "requestParameters":{
        "sourceIPAddress":"127.0.0.1"
      },
      "responseElements":{
        "x-amz-request-id":"EXAMPLE",
        "x-amz-id-2":"EXAMPLE"
      },
      "s3":{
        "s3SchemaVersion":"1.0",
        "configurationId":"testConfigRule",
        "bucket":{
          "name":"sourcebucket",
          "ownerIdentity":{
            "principalId":"EXAMPLE"
          },
          "arn":"arn:aws:s3:::mybucket"
        },
        "object":{
          "key":"HappyFace.jpg",
          "size":1024,
          "eTag":"d41d8cd98f00b204e9800998ecf8427e"
        }
      }
    }
  ]
}

It's important to note that we're only given metadata about the object (not the data itself). It's on us to get that object from S3. Also, we store our data gzipped, so we need to ungzip the data before we can do something with it.

Here's the functional code that handles this in our lambda (we'll show a complete example later on):

async.waterfall([
  function download(next) {
    s3.getObject({
      Bucket: record.s3.bucket.name,
      Key: record.s3.object.key
    }, function(err, data) {
      next(err, data);
    });
  },
  function gunzip(response, next) {
    var buffer = new Buffer(response.Body);
    zlib.gunzip(buffer, function(err, decoded) {
      next(err, decoded && decoded.toString());
    });
  },
  function doSomething(data, next) {
    // `data` is raw data, ready for use.
  
  }
], function(e, r) {
  if (e) throw e;
});

Kinesis events

Our Kinesis stream is always on and channeling data, so our lambda simply listens to the stream and acts upon it.

When Lambda responds to a Kinesis stream event, our event source looks like this:

{
  "Records":[
    {
      "kinesis":{
        "partitionKey":"partitionKey-3",
        "kinesisSchemaVersion":"1.0",
        "data":"TG9jYWx5dGljcyBFbmdpbmVlcmluZyBpcyBoaXJpbmchIGh0dHA6Ly9iaXQubHkvMURqN2N1bA==",
        "sequenceNumber":"EXAMPLE"
      },
      "eventSource":"aws:kinesis",
      "eventID":"shardId-000000000000:EXAMPLE",
      "invokeIdentityArn":"arn:aws:iam::EXAMPLE",
      "eventVersion":"1.0",
      "eventName":"aws:kinesis:record",
      "eventSourceARN":"arn:aws:kinesis:EXAMPLE",
      "awsRegion":"us-east-1"
    }
  ]
}

Records[0].kinesis.data is what we want. The beauty of this event source is that it contains base64 encoded data. Very simple to decode and use in our lambda:

var data = new Buffer(Records[0].kinesis.data, 'base64').toString('utf8');

Creating a dual-purpose lambda

Let's walk through creating and deploying a single lambda that can handle both S3 PUT notifications as well as Kinesis stream events. The full codebase for this example can be found on GitHub.

Permissions

First off, there are two specific permissions that you'll need:

  • User permission for iam:PassRole. This policy needs to be applied to the
    user who is creating the lambda:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt1429124462000",
            "Effect": "Allow",
            "Action": [
                "iam:PassRole"
            ],
            "Resource": [
                "arn:aws:iam::<account_id>:role/lambda_basic_execution"
            ]
        }
    ]
}
  • Lambda execution role. You need to create a new role that the Lambda will
    run as. We assume that role is named lambda_basic_execution for the purposes of this project. That role must have (at least) this policy applied:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "logs:*"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    }
  ]
}

Your lambda execution role will also need permissions for whatever services you want to use within your function. If you intend to be working with S3, for example, you need to specifically grant your execution role permissions for whatever you intend to do with S3.

Source code walkthrough for a dual-purpose lambda

Let's create a file named MyLambda.js, and require some things:

var async = require('async');
var AWS = require('aws-sdk');
var fs = require('fs');
var zlib = require('zlib');

We'll be using async as mentioned previously to pull objects from S3 and unzip them with zlib.gunzip. aws-sdk is required for working with S3.

Let's initialize the SDK:

var s3 = new AWS.S3();

Since our code is running as a role within the Lambda system, we don't need to provide credentials. The SDK will happily make any requests you ask of it, and the role's permissions will dictate what we can and cannot do.

Let's write some code that will handle Kinesis events:

exports.kinesisHandler = function(records, context) {
  var data = records
    .map(function(record) {
      return new Buffer(record.kinesis.data, 'base64').toString('utf8');
    })
    .join();
  doWork(data);
  context.done();
};

When we get a Kinesis stream event, we could have any number of records to process. Our code expects that, maps the base64-encoded value and joins them to provide a single base64-decoded string that we can work with.

Then we call doWork(data). In the real world you might be doing asynchronous work on the data (and you may be interested in reading Better Asynchronous JavaScript).

context.done() is how we let Lambda know that we're finished doing work.

That's all we need to do to handle Kinesis event streams, so let's move on to S3 PUT events.

exports.s3Handler = function(record, context) {
  async.waterfall([
    function download(next) {
      s3.getObject({
        Bucket: record.s3.bucket.name,
        Key: record.s3.object.key
      }, function(err, data) {
        next(err, data);
      });
    },
    function gunzip(response, next) {
      var buffer = new Buffer(response.Body);
      zlib.gunzip(buffer, function(err, decoded) {
        next(err, decoded && decoded.toString());
      });
    },
    function doSomething(data, next) {
      doWork(data);
      context.done();
    }
  ], function(err) {
    if (err) throw err;
  });
};

This should look familiar from earlier in this post. When we get a S3 PUT event, we know that we'll only ever have a single record to work with. So we pass that record to our s3Handler, download the object, unzip the object, and finally doSomething with the data.

Now that we have our two specific handlers for each event type we intend to support, we need to handle the direct event source from Lambda:

exports.handler = function(event, context) {
  var record = event.Records[0];
  if (record.kinesis) {
    exports.kinesisHandler(event.Records, context);
  } else if (record.s3) {
    exports.s3Handler(record, context);
  }
};

Our actual handler is very simple. If the event looks like an S3 event, let the s3Handler do the work. Otherwise, if it looks like a Kinesis event, let kinesisHandler do the work.

This is all of the code that's necessary to write your first lambda that supports both S3 and Kinesis.

Deployment

Now that we have our code that we want to deploy to Lambda, it's time to actually upload it.

A few basic first steps:

  • Install the AWS CLI via pip install awscli
  • Configure your AWS credentials at ~/.aws/credentials:
[default]
aws_access_key_id = ...
aws_access_key_secret = ...
  • Ensure you've given your user permissions for iam:PassRole.
  • Create the lambda_basic_execution role as directed above.

Once those are set, we need to package our module up:

  • npm init
  • npm install async aws-sdk --save
  • npm install
  • zip -r ./MyLambda.zip *

Now we can upload the module:

aws lambda create-function \
	--region us-east-1 \
	--function-name node-lambda-starter \
	--zip-file fileb://MyLambda.zip \
	--handler MyLambda.handler \
	--runtime nodejs \
	--role arn:aws:iam::<account_id>:role/lambda_basic_execution

If your upload was successful, you should receive a response like this:

{
    "FunctionName": "node-lambda-starter",
    "CodeSize": 1158014,
    "MemorySize": 128,
    "FunctionArn": "arn:aws:lambda:us-east-1:<account_id>:function:node-lambda-starter",
    "Handler": "MyLambda.handler",
    "Role": "arn:aws:iam::<account_id>:role/lambda_basic_execution",
    "Timeout": 3,
    "LastModified": "2015-04-23T20:58:17.586+0000",
    "Runtime": "nodejs",
    "Description": ""
}

You can see your uploaded lambda on your dashboard. From there you can also edit/invoke with sample data.

Add event sources

Now that your lambda has been created and uploaded, you can add event sources to it via the dashboard. As mentioned, both S3 PUT events and Kinesis streams will work properly with this lambda we've created.

Starter module

To make working with Lambda a bit easier, we wrote a starter Lambda module. We defined a handful of Make targets which can make managing a single lambda a bit easier:

  • make upload -- upload your function for the first time.
  • make update -- upload new function code.
  • make get -- retrieve details of your existing function on Lambda.
  • make invoke -- invoke your function with sample data provided within the repo
  • make delete -- remove this function from Lambda.

We hope you find it useful! Be sure to drop an issue on GitHub for any questions / bugs.

Conclusion

Lambda presents a new way of programming application logic around events instead of infrastructure. We think this has the potential to bring entirely new types of applications and workflows to market, and it fills a gap in AWS's cloud computing lineup that makes it easier and faster to do real-time work on data within the ecosystem.

Even aside from the affordability and durability of Lambda, being able to direct chunks of logic to process individual events from systems represents an opportunity for data-heavy organizations to drastically streamline their technical infrastructure.