Skip to content

Batch Processing

Warning

This page refers to an unreleased utility that has yet to be published on the npm registry. Any version of the package built from source, as well as all future versions tagged with the -alpha suffix should be treated as experimental. Follow the Beta release milestone for updates on the progress of this utility.

The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.

Key features

  • Reports batch item failures to reduce number of retries for a record upon errors
  • Simple interface to process each batch record
  • Build your own batch processor by extending primitives

Background

When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages.

If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: a) your Lambda function returns a successful response, b) record reaches maximum retry attempts, or c) when records expire.

With this utility, batch records are processed individually – only messages that failed to be processed return to the queue or stream for a further retry. This works when two mechanisms are in place:

  1. ReportBatchItemFailures is set in your SQS, Kinesis, or DynamoDB event source properties
  2. A specific response is returned so Lambda knows which records should not be deleted during partial responses
Warning: This utility lowers the chance of processing records more than once; it does not guarantee it

We recommend implementing processing logic in an idempotent manner wherever possible.

You can find more details on how Lambda works with either SQS, Kinesis, or DynamoDB in the AWS Documentation.

Getting started

Regardless whether you're using SQS, Kinesis Data Streams or DynamoDB Streams, you must configure your Lambda function event source to use ReportBatchItemFailures.

You do not need any additional IAM permissions to use this utility, except for what each event source requires.

Required resources

The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries were attempted.

template.yaml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: partial batch response sample

Globals:
  Function:
    Timeout: 5
    MemorySize: 256
    Runtime: nodejs18.x
    Tracing: Active
    Environment:
      Variables:
        LOG_LEVEL: INFO
        POWERTOOLS_SERVICE_NAME: hello

Resources:
  HelloWorldFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: index.handler
      CodeUri: hello_world
      Policies:
        - SQSPollerPolicy:
            QueueName: !GetAtt SampleQueue.QueueName
      Events:
        Batch:
          Type: SQS
          Properties:
            Queue: !GetAtt SampleQueue.Arn
            FunctionResponseTypes:
              - ReportBatchItemFailures

  SampleDLQ:
    Type: AWS::SQS::Queue

  SampleQueue:
    Type: AWS::SQS::Queue
    Properties:
      VisibilityTimeout: 30 # Fn timeout * 6
      SqsManagedSseEnabled: true
      RedrivePolicy:
        maxReceiveCount: 2
        deadLetterTargetArn: !GetAtt SampleDLQ.Arn
template.yaml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: partial batch response sample

Globals:
  Function:
    Timeout: 5
    MemorySize: 256
    Runtime: nodejs18.x
    Tracing: Active
    Environment:
      Variables:
        LOG_LEVEL: INFO
        POWERTOOLS_SERVICE_NAME: hello

Resources:
  HelloWorldFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: index.handler
      CodeUri: hello_world
      Policies:
        # Lambda Destinations require additional permissions
        # to send failure records to DLQ from Kinesis/DynamoDB
        - Version: '2012-10-17'
          Statement:
            Effect: 'Allow'
            Action:
              - sqs:GetQueueAttributes
              - sqs:GetQueueUrl
              - sqs:SendMessage
            Resource: !GetAtt SampleDLQ.Arn
      Events:
        KinesisStream:
          Type: Kinesis
          Properties:
            Stream: !GetAtt SampleStream.Arn
            BatchSize: 100
            StartingPosition: LATEST
            MaximumRetryAttempts: 2
            DestinationConfig:
              OnFailure:
                Destination: !GetAtt SampleDLQ.Arn
            FunctionResponseTypes:
              - ReportBatchItemFailures

  SampleDLQ:
    Type: AWS::SQS::Queue

  SampleStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
      StreamEncryption:
        EncryptionType: KMS
        KeyId: alias/aws/kinesis
template.yaml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: partial batch response sample

Globals:
  Function:
    Timeout: 5
    MemorySize: 256
    Runtime: nodejs18.x
    Tracing: Active
    Environment:
      Variables:
        LOG_LEVEL: INFO
        POWERTOOLS_SERVICE_NAME: hello

Resources:
  HelloWorldFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: index.handler
      CodeUri: hello_world
      Policies:
        # Lambda Destinations require additional permissions
        # to send failure records from Kinesis/DynamoDB
        - Version: '2012-10-17'
          Statement:
            Effect: 'Allow'
            Action:
              - sqs:GetQueueAttributes
              - sqs:GetQueueUrl
              - sqs:SendMessage
            Resource: !GetAtt SampleDLQ.Arn
      Events:
        DynamoDBStream:
          Type: DynamoDB
          Properties:
            Stream: !GetAtt SampleTable.StreamArn
            StartingPosition: LATEST
            MaximumRetryAttempts: 2
            DestinationConfig:
              OnFailure:
                Destination: !GetAtt SampleDLQ.Arn
            FunctionResponseTypes:
              - ReportBatchItemFailures

  SampleDLQ:
    Type: AWS::SQS::Queue

  SampleTable:
    Type: AWS::DynamoDB::Table
    Properties:
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: pk
          AttributeType: S
        - AttributeName: sk
          AttributeType: S
      KeySchema:
        - AttributeName: pk
          KeyType: HASH
        - AttributeName: sk
          KeyType: RANGE
      SSESpecification:
        SSEEnabled: true
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES

Processing messages from SQS

Processing batches from SQS works in three stages:

  1. Instantiate BatchProcessor and choose EventType.SQS for the event type
  2. Define your function to handle each batch record, and use the SQSRecord type annotation for autocompletion
  3. Use processPartialResponse to kick off processing
Info

This code example optionally uses Logger for completion.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import {
  BatchProcessor,
  EventType,
  processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
  SQSEvent,
  SQSRecord,
  Context,
  SQSBatchResponse,
} from 'aws-lambda';

const processor = new BatchProcessor(EventType.SQS);
const logger = new Logger();

const recordHandler = (record: SQSRecord): void => {
  const payload = record.body;
  if (payload) {
    const item = JSON.parse(payload);
    logger.info('Processed item', { item });
  }
};

export const handler = async (
  event: SQSEvent,
  context: Context
): Promise<SQSBatchResponse> => {
  return processPartialResponse(event, recordHandler, processor, {
    context,
  });
};

The second record failed to be processed, therefore the processor added its message ID in the response.

1
2
3
4
5
6
7
{
  "batchItemFailures": [
    {
      "itemIdentifier": "244fc6b4-87a3-44ab-83d2-361172410c3a"
    }
  ]
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
{
  "Records": [
    {
      "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
      "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
      "body": "{\"Message\": \"success\"}",
      "attributes": {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1545082649183",
        "SenderId": "AIDAIENQZJOLO23YVJ4VO",
        "ApproximateFirstReceiveTimestamp": "1545082649185"
      },
      "messageAttributes": {},
      "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
      "eventSource": "aws:sqs",
      "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
      "awsRegion": "us-east-1"
    },
    {
      "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a",
      "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
      "body": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
      "attributes": {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1545082649183",
        "SenderId": "AIDAIENQZJOLO23YVJ4VO",
        "ApproximateFirstReceiveTimestamp": "1545082649185"
      },
      "messageAttributes": {},
      "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
      "eventSource": "aws:sqs",
      "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
      "awsRegion": "us-east-1"
    }
  ]
}

FIFO queues

When using SQS FIFO queues, we will stop processing messages after the first failure, and return all failed and unprocessed messages in batchItemFailures. This helps preserve the ordering of messages in your queue.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import {
  SqsFifoPartialProcessor,
  processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
  SQSEvent,
  SQSRecord,
  Context,
  SQSBatchResponse,
} from 'aws-lambda';

const processor = new SqsFifoPartialProcessor();
const logger = new Logger();

const recordHandler = (record: SQSRecord): void => {
  const payload = record.body;
  if (payload) {
    const item = JSON.parse(payload);
    logger.info('Processed item', { item });
  }
};

export const handler = async (
  event: SQSEvent,
  context: Context
): Promise<SQSBatchResponse> => {
  return processPartialResponse(event, recordHandler, processor, {
    context,
  });
};

Processing messages from Kinesis

Processing batches from Kinesis works in three stages:

  1. Instantiate BatchProcessor and choose EventType.KinesisDataStreams for the event type
  2. Define your function to handle each batch record, and use the KinesisStreamRecord type annotation for autocompletion
  3. Use processPartialResponse to kick off processing
Info

This code example optionally uses Logger for completion.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import {
  BatchProcessor,
  EventType,
  processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
  KinesisStreamEvent,
  KinesisStreamRecord,
  Context,
  KinesisStreamBatchResponse,
} from 'aws-lambda';

const processor = new BatchProcessor(EventType.KinesisDataStreams);
const logger = new Logger();

const recordHandler = (record: KinesisStreamRecord): void => {
  logger.info('Processing record', { record: record.kinesis.data });
  const payload = JSON.parse(record.kinesis.data);
  logger.info('Processed item', { item: payload });
};

export const handler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<KinesisStreamBatchResponse> => {
  return processPartialResponse(event, recordHandler, processor, {
    context,
  });
};

The second record failed to be processed, therefore the processor added its sequence number in the response.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
{
  "Records": [
    {
      "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "1",
        "sequenceNumber": "4107859083838847772757075850904226111829882106684065",
        "data": "eyJNZXNzYWdlIjogInN1Y2Nlc3MifQ==",
        "approximateArrivalTimestamp": 1545084650.987
      },
      "eventSource": "aws:kinesis",
      "eventVersion": "1.0",
      "eventID": "shardId-000000000006:4107859083838847772757075850904226111829882106684065",
      "eventName": "aws:kinesis:record",
      "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
      "awsRegion": "us-east-2",
      "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
    },
    {
      "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "1",
        "sequenceNumber": "6006958808509702859251049540584488075644979031228738",
        "data": "c3VjY2Vzcw==",
        "approximateArrivalTimestamp": 1545084650.987
      },
      "eventSource": "aws:kinesis",
      "eventVersion": "1.0",
      "eventID": "shardId-000000000006:6006958808509702859251049540584488075644979031228738",
      "eventName": "aws:kinesis:record",
      "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
      "awsRegion": "us-east-2",
      "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
    }
  ]
}
1
2
3
4
5
6
7
{
  "batchItemFailures": [
    {
      "itemIdentifier": "6006958808509702859251049540584488075644979031228738"
    }
  ]
}

Processing messages from DynamoDB

Processing batches from DynamoDB Streams works in three stages:

  1. Instantiate BatchProcessor and choose EventType.DynamoDBStreams for the event type
  2. Define your function to handle each batch record, and use the DynamoDBRecord type annotation for autocompletion
  3. Use processPartialResponse to kick off processing
Info

This code example optionally uses Logger for completion.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import {
  BatchProcessor,
  EventType,
  processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
  DynamoDBStreamEvent,
  DynamoDBRecord,
  Context,
  DynamoDBBatchResponse,
} from 'aws-lambda';

const processor = new BatchProcessor(EventType.DynamoDBStreams);
const logger = new Logger();

const recordHandler = (record: DynamoDBRecord): void => {
  if (record.dynamodb && record.dynamodb.NewImage) {
    logger.info('Processing record', { record: record.dynamodb.NewImage });
    const message = record.dynamodb.NewImage.Message.S;
    if (message) {
      const payload = JSON.parse(message);
      logger.info('Processed item', { item: payload });
    }
  }
};

export const handler = async (
  event: DynamoDBStreamEvent,
  context: Context
): Promise<DynamoDBBatchResponse> => {
  return processPartialResponse(event, recordHandler, processor, {
    context,
  });
};

The second record failed to be processed, therefore the processor added its sequence number in the response.

1
2
3
4
5
6
7
{
  "batchItemFailures": [
    {
      "itemIdentifier": "8640712661"
    }
  ]
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "failure"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "3275880929",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "eventsource_arn",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "SomethingElse": {
            "S": "success"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "8640712661",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "eventsource_arn",
      "eventSource": "aws:dynamodb"
    }
  ]
}

Partial failure mechanics

All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:

  • All records successfully processed. We will return an empty list of item failures {'batchItemFailures': []}
  • Partial success with some exceptions. We will return a list of all item IDs/sequence numbers that failed processing
  • All records failed to be processed. We will raise BatchProcessingError exception with a list of all exceptions raised when processing

Processing messages asynchronously

You can use AsyncBatchProcessor class and asyncProcessPartialResponse function to process messages concurrently.

When is this useful?

Your use case might be able to process multiple records at the same time without conflicting with one another.

For example, imagine you need to process multiple loyalty points and incrementally save in a database. While you await the database to confirm your records are saved, you could start processing another request concurrently.

The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order).

High-concurrency with AsyncBatchProcessor
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import {
  AsyncBatchProcessor,
  EventType,
  asyncProcessPartialResponse,
} from '@aws-lambda-powertools/batch';
import axios from 'axios'; // axios is an external dependency
import type {
  SQSEvent,
  SQSRecord,
  Context,
  SQSBatchResponse,
} from 'aws-lambda';

const processor = new AsyncBatchProcessor(EventType.SQS);

const recordHandler = async (record: SQSRecord): Promise<number> => {
  const res = await axios.post('https://httpbin.org/anything', {
    message: record.body,
  });

  return res.status;
};

export const handler = async (
  event: SQSEvent,
  context: Context
): Promise<SQSBatchResponse> => {
  return await asyncProcessPartialResponse(event, recordHandler, processor, {
    context,
  });
};

Advanced

Accessing processed messages

Use the BatchProcessor directly in your function to access a list of all returned values from your recordHandler function.

  • When successful. We will include a tuple with success, the result of recordHandler, and the batch record
  • When failed. We will include a tuple with fail, exception as a string, and the batch record
Accessing processed messages
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import { BatchProcessor, EventType } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
  SQSEvent,
  SQSRecord,
  Context,
  SQSBatchResponse,
} from 'aws-lambda';

const processor = new BatchProcessor(EventType.SQS);
const logger = new Logger();

const recordHandler = (record: SQSRecord): void => {
  const payload = record.body;
  if (payload) {
    const item = JSON.parse(payload);
    logger.info('Processed item', { item });
  }
};

export const handler = async (
  event: SQSEvent,
  context: Context
): Promise<SQSBatchResponse> => {
  const batch = event.Records;

  processor.register(batch, recordHandler, { context });
  const processedMessages = processor.process();

  for (const message of processedMessages) {
    const status: 'success' | 'fail' = message[0];
    const record = message[2];

    logger.info('Processed record', { status, record });
  }

  return processor.response();
};

Accessing Lambda Context

Within your recordHandler function, you might need access to the Lambda context to determine how much time you have left before your function times out.

We can automatically inject the Lambda context into your recordHandler as optional second argument if you register it when using BatchProcessor or the processPartialResponse function.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import {
  BatchProcessor,
  EventType,
  processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
  SQSEvent,
  SQSRecord,
  Context,
  SQSBatchResponse,
} from 'aws-lambda';

const processor = new BatchProcessor(EventType.SQS);
const logger = new Logger();

const recordHandler = (record: SQSRecord, lambdaContext?: Context): void => {
  const payload = record.body;
  if (payload) {
    const item = JSON.parse(payload);
    logger.info('Processed item', { item });
  }
  if (lambdaContext) {
    logger.info('Remaining time', {
      time: lambdaContext.getRemainingTimeInMillis(),
    });
  }
};

export const handler = async (
  event: SQSEvent,
  context: Context
): Promise<SQSBatchResponse> => {
  return processPartialResponse(event, recordHandler, processor, {
    context,
  });
};

Extending BatchProcessor

You might want to bring custom logic to the existing BatchProcessor to slightly override how we handle successes and failures.

For these scenarios, you can subclass BatchProcessor and quickly override successHandler and failureHandler methods:

  • successHandler() – Keeps track of successful batch records
  • failureHandler() – Keeps track of failed batch records
Example

Let's suppose you'd like to add a metric named BatchRecordFailures for each batch record that failed processing

Extending failure handling mechanism in BatchProcessor
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import { Metrics, MetricUnits } from '@aws-lambda-powertools/metrics';
import {
  BatchProcessor,
  EventType,
  FailureResponse,
  EventSourceType,
  processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
  SQSEvent,
  SQSRecord,
  Context,
  SQSBatchResponse,
} from 'aws-lambda';

class MyProcessor extends BatchProcessor {
  #metrics: Metrics;

  public constructor(eventType: keyof typeof EventType) {
    super(eventType);
    this.#metrics = new Metrics({ namespace: 'test' });
  }

  public failureHandler(
    record: EventSourceType,
    error: Error
  ): FailureResponse {
    this.#metrics.addMetric('BatchRecordFailures', MetricUnits.Count, 1);

    return super.failureHandler(record, error);
  }
}

const processor = new MyProcessor(EventType.SQS);
const logger = new Logger();

const recordHandler = (record: SQSRecord): void => {
  const payload = record.body;
  if (payload) {
    const item = JSON.parse(payload);
    logger.info('Processed item', { item });
  }
};

export const handler = async (
  event: SQSEvent,
  context: Context
): Promise<SQSBatchResponse> => {
  return processPartialResponse(event, recordHandler, processor, {
    context,
  });
};

Create your own partial processor

You can create your own partial batch processor from scratch by inheriting the BasePartialProcessor class, and implementing the prepare(), clean(), processRecord() and asyncProcessRecord() abstract methods.

  • processRecord() – handles all processing logic for each individual message of a batch, including calling the recordHandler (this.handler)
  • prepare() – called once as part of the processor initialization
  • clean() – teardown logic called once after processRecord completes
  • asyncProcessRecord() – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic

You can then use this class as a context manager, or pass it to processPartialResponse to process the records in your Lambda handler function.

Creating a custom batch processor
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import { randomInt } from 'node:crypto';
import {
  DynamoDBClient,
  BatchWriteItemCommand,
} from '@aws-sdk/client-dynamodb';
import { marshall } from '@aws-sdk/util-dynamodb';
import {
  BasePartialProcessor,
  processPartialResponse,
} from '@aws-lambda-powertools/batch';
import type {
  SuccessResponse,
  FailureResponse,
  EventSourceType,
} from '@aws-lambda-powertools/batch';
import type { SQSEvent, Context, SQSBatchResponse } from 'aws-lambda';

const tableName = process.env.TABLE_NAME || 'table-not-found';

class MyPartialProcessor extends BasePartialProcessor {
  #tableName: string;
  #client?: DynamoDBClient;

  public constructor(tableName: string) {
    super();
    this.#tableName = tableName;
  }

  public async asyncProcessRecord(
    _record: EventSourceType
  ): Promise<SuccessResponse | FailureResponse> {
    throw new Error('Not implemented');
  }

  /**
   * It's called once, **after** processing the batch.
   *
   * Here we are writing all the processed messages to DynamoDB.
   */
  public clean(): void {
    // We know that the client is defined because clean() is called after prepare()
    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
    this.#client!.send(
      new BatchWriteItemCommand({
        RequestItems: {
          [this.#tableName]: this.successMessages.map((message) => ({
            PutRequest: {
              Item: marshall(message),
            },
          })),
        },
      })
    );
  }

  /**
   * It's called once, **before** processing the batch.
   *
   * It initializes a new client and cleans up any existing data.
   */
  public prepare(): void {
    this.#client = new DynamoDBClient({});
    this.successMessages = [];
  }

  /**
   * It handles how your record is processed.
   *
   * Here we are keeping the status of each run, `this.handler` is
   * the function that is passed when calling `processor.register()`.
   */
  public processRecord(
    record: EventSourceType
  ): SuccessResponse | FailureResponse {
    try {
      const result = this.handler(record);

      return this.successHandler(record, result);
    } catch (error) {
      return this.failureHandler(record, error as Error);
    }
  }
}

const processor = new MyPartialProcessor(tableName);

const recordHandler = (): number => {
  return Math.floor(randomInt(1, 10));
};

export const handler = async (
  event: SQSEvent,
  context: Context
): Promise<SQSBatchResponse> => {
  return processPartialResponse(event, recordHandler, processor, {
    context,
  });
};

Testing your code

As there is no external calls, you can unit test your code with BatchProcessor quite easily.

Example:

Given a SQS batch where the first batch record succeeds and the second fails processing, we should have a single item reported in the function response.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import { ContextExamples as dummyContext } from '@aws-lambda-powertools/commons';
import { handler, processor } from './gettingStartedSQS';
import sqsEvent from './samples/sampleSQSEvent.json';

describe('Function tests', () => {
  beforeEach(() => {
    jest.clearAllMocks();
  });

  test('should return one failed message', async () => {
    // Prepare
    const context = dummyContext.helloworldContext;
    const processorResult = processor; // access processor for additional assertions
    const successfulRecord = sqsEvent.Records[0];
    const failedRecord = sqsEvent.Records[1];
    const expectedResponse = {
      batchItemFailures: [
        {
          itemIdentifier: failedRecord.messageId,
        },
      ],
    };

    // Act
    const response = await handler(sqsEvent, context);

    // Assess
    expect(response).toEqual(expectedResponse);
    expect(processorResult.failureMessages).toHaveLength(1);
    expect(processorResult.successMessages[0]).toEqual(successfulRecord);
  });
});
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import {
  BatchProcessor,
  EventType,
  processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
  SQSEvent,
  SQSRecord,
  Context,
  SQSBatchResponse,
} from 'aws-lambda';

const processor = new BatchProcessor(EventType.SQS);
const logger = new Logger();

const recordHandler = (record: SQSRecord): void => {
  const payload = record.body;
  if (payload) {
    const item = JSON.parse(payload);
    logger.info('Processed item', { item });
  }
};

export const handler = async (
  event: SQSEvent,
  context: Context
): Promise<SQSBatchResponse> => {
  return processPartialResponse(event, recordHandler, processor, {
    context,
  });
};
export { processor };
events/sqs_event.json
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
{
  "Records": [
    {
      "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
      "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
      "body": "{\"Message\": \"success\"}",
      "attributes": {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1545082649183",
        "SenderId": "AIDAIENQZJOLO23YVJ4VO",
        "ApproximateFirstReceiveTimestamp": "1545082649185"
      },
      "messageAttributes": {},
      "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
      "eventSource": "aws:sqs",
      "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
      "awsRegion": "us-east-1"
    },
    {
      "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a",
      "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
      "body": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
      "attributes": {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1545082649183",
        "SenderId": "AIDAIENQZJOLO23YVJ4VO",
        "ApproximateFirstReceiveTimestamp": "1545082649185"
      },
      "messageAttributes": {},
      "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
      "eventSource": "aws:sqs",
      "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
      "awsRegion": "us-east-1"
    }
  ]
}

Last update: 2023-07-25