Express to AWS Lambda: Part 5

Workers and Background Tasks

UPDATE: Serverless 1.0 was released earlier this year. There are a lot of improvements in v1.x. I suggest you go and read the docs for the most up to date information.

In Part 1, Part 2, Part 3 and Part 4 of this series, we have learnt how to use API Gateway and AWS Lambda to build a web application. Web applications are often supported by other worker systems. These workers perform tasks that cannot be completed during a request. These workers might run tasks on a schedule or accept tasks produced by an external source. To learn more about how we can perform these tasks in a serverless environment, we’re going to convert an Express app and a worker to the Serverless Framework.

Example Express App and Worker

I’ve simplified the Express app that we have been using in the earlier parts in this series. Our new Express app has a single endpoint that accepts jobs via HTTP. These jobs are then passed to our worker instances using RabbitMQ.

I have updated the** index.js **file of our Express project as below. I am using jackrabbit to publish messages to a queue.

var express = require('express');
var bodyParser = require('body-parser');
**var jackrabbit = require('jackrabbit');**

var TASK_QUEUE_KEY = 'task_queue';
var RABBIT_URL = process.env.RABBIT_URL || 'amqp://localhost'

**var rabbit = jackrabbit(RABBIT_URL);
var exchange = rabbit.default();**

var app = express();

...

app.post('/jobs', function(req, res) {
  **exchange.publish(req.body.name, { key: TASK_QUEUE_KEY });**
  res.status(201).send();
});

I have added a new file for our worker, worker.js. The worker connects to our RabbitMQ server and listens for messages. When a message is received we print the message key and acknowledge the message.

var jackrabbit = require('jackrabbit');

var TASK_QUEUE_KEY = 'task_queue';
var RABBIT_URL = process.env.RABBIT_URL || 'amqp://localhost'

var rabbit = jackrabbit(RABBIT_URL);
var exchange = rabbit.default();
**var taskQueue = exchange.queue({ name: TASK_QUEUE_KEY, durable: true });**

**taskQueue.consume(function(key, ack) {
 console.log('Got job: ', key);
 ack();
});**

Events in Serverless

One of the most powerful features of AWS Lambda is the ability to invoke Lambda functions when events are generated by other AWS Services. This allows developers to build complex event driven processes in a very simple way. A good example is the video transcoding service A Cloud Guru built using the S3, the Elastic Transcoder Service and Lambda. A full list of Lambda event sources can be found in the AWS docs.

To convert our Express app to Lambda, we need accept jobs via API Gateway and send a message to a worker to perform the job. Instead of using RabbitMQ, we will be using the AWS Simple Notification Service (SNS) to push jobs to our worker. SNS is a scalable pub-sub service that integrates with AWS Lambda.

The first step is to create a function with endpoint that publishes messages to a SNS topic. I covered API Gateway endpoint configuration in Part 2 and Part 3 of this series, so we’ll skip over that part. Our /jobs *endpoint will accept a JSON object with one property, *name. We then use the AWS SDK for Node to publish a message to our SNS topic.

var AWS = require(‘aws-sdk’);
**var sns = new AWS.SNS();**

module.exports.handler = function(event, context, cb) {
  var message = {
    name: event.body.name
  };

  **var params = {
    Message: JSON.stringify(message),
    TopicArn: process.env.WORKER_SNS_TOPIC_ARN
  };**

  **sns.publish(params, function(err, data) {**
    if (err) {
      console.log(err, err.stack); // an error occurred
      return context.fail(‘Unexpected Error’)
    }

    console.log(data); // successful response
    return context.succeed({
      job: event.body.name
    });
  });
};

Once we’re publishing messages to SNS successfully, we need to create a new function that will receive SNS events. We use the same Serverless command as we have used before, but select Create Event instead of Create Endpoint.

Once we hace done this, we should see the following files added to our project.

worker
  |__event.json
  |__handler.js
  |__s-function.json

The next step is to update s-function.json to respond to SNS events. For a list of different Serverless event source configurations, please read the docs.

{
 "name": "worker",
 "runtime": "nodejs4.3",
 "handler": "handler.handler",
 ...
 "endpoints": [],
 **"events": [{
   "name": "workerEvent",
   "type": "sns",
   "config": {
     "topicName": "${workerSnsTopicName}"
   }
 }],**
 ...
}

Once we have configured the event source, we can update handler.js to process the events. Our Lambda function has the usual event, context *and *callback *parameters. The *event object contains an array or Records that represent the SNS messages pushed to our Lambda function. We need to update our handler to loop over these *Records *and process the jobs.

var util = require('util');

module.exports.handler = function(event, context, cb) {
  console.log(
    'Recieved lambda event: ',
    util.inspect(event, { depth: 5 })
  );

  **event.Records.forEach(function(evt) {**
    **if(evt.EventSource !== 'aws:sns') {
      console.warn('Recieved non sns event: ', evt);
      return;
    }**

**    var message = JSON.parse(evt.Sns.Message);
    console.log('Got job: ', message.name);
  });**

  return cb(null, {
    message: 'success'
  });
};

You may have noticed that the worker SNS topic name is not hard coded into the jobs handler or worker s-function.json configuration file. Serverless includes a CloudFormation template in a file called s-resources-cf.json. When the CloudFormation template is deployed, the outputs are set as Serverless variables. This allows you to configure environment variables and functions without hard coding the resource names. If you want to learn more you can read the Serverless docs or check the Github repo.

SNS is Push event source for Lambda. Push event sources invoke Lambda functions directly. Other event sources such as AWS Kinesis Streams are **Pull **event sources. Lambda must poll these event sources and invoke Lambda functions when the events occur. If you want to dive deeper into AWS Lambda events, understanding this concept is important. You can read the docs for more information.

Testing our Workers

To test our Express app we need to run both index.js **and **worker.js.

Express app during testingExpress app during testing

Node worker during testingNode worker during testing

We cannot simulate SNS locally so we need to test our Serverless project in AWS. To verify that the Lambda function has run correctly, we can check the CloudWatch logs for our Lambda function.

CloudWatch Logs from Jobs LambdaCloudWatch Logs from Jobs Lambda

CloudWatch logs from Worker LambdaCloudWatch logs from Worker Lambda

Console during testingConsole during testing

What’s next?

This is is the last post in this series and I hope it has helped you get started with the Serverless Framework. If you want to explore this example and the previous examples, please check out the repo on Github.

I plan to keep writing about the Serverless Framework and AWS Lambda. If you want to see more articles like this, follow me on Medium or Twitter.