Skip to content

Decorators

FUNCTION DESCRIPTION
async_batch_processor

Middleware to handle batch event processing

async_process_partial_response

Higher level function to handle batch event processing asynchronously.

batch_processor

Middleware to handle batch event processing

process_partial_response

Higher level function to handle batch event processing.

async_batch_processor

async_batch_processor(
    handler: Callable,
    event: dict,
    context: LambdaContext,
    record_handler: Callable[..., Awaitable[Any]],
    processor: AsyncBatchProcessor,
)

Middleware to handle batch event processing

Notes

Consider using async_process_partial_response function for an easier experience.

PARAMETER DESCRIPTION
handler

Lambda's handler

TYPE: Callable

event

Lambda's Event

TYPE: dict

context

Lambda's Context

TYPE: LambdaContext

record_handler

Callable to process each record from the batch

TYPE: Callable[..., Awaitable[Any]]

processor

Batch Processor to handle partial failure cases

TYPE: AsyncBatchProcessor

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
>>> from aws_lambda_powertools.utilities.batch import async_batch_processor, AsyncBatchProcessor
>>> from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
>>>
>>> processor = AsyncBatchProcessor(event_type=EventType.SQS)
>>>
>>> async def async_record_handler(record: SQSRecord):
>>>     payload: str = record.body
>>>     return payload
>>>
>>> @async_batch_processor(record_handler=async_record_handler, processor=processor)
>>> def lambda_handler(event, context):
>>>     return processor.response()
Limitations
  • Sync batch processors. Use batch_processor instead.
Source code in aws_lambda_powertools/utilities/batch/decorators.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@lambda_handler_decorator
@deprecated(
    "`async_batch_processor` decorator is deprecated; use `async_process_partial_response` function instead.",
    category=None,
)
def async_batch_processor(
    handler: Callable,
    event: dict,
    context: LambdaContext,
    record_handler: Callable[..., Awaitable[Any]],
    processor: AsyncBatchProcessor,
):
    """
    Middleware to handle batch event processing

    Notes
    -----
    Consider using async_process_partial_response function for an easier experience.

    Parameters
    ----------
    handler: Callable
        Lambda's handler
    event: dict
        Lambda's Event
    context: LambdaContext
        Lambda's Context
    record_handler: Callable[..., Awaitable[Any]]
        Callable to process each record from the batch
    processor: AsyncBatchProcessor
        Batch Processor to handle partial failure cases

    Example
    --------
        >>> from aws_lambda_powertools.utilities.batch import async_batch_processor, AsyncBatchProcessor
        >>> from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
        >>>
        >>> processor = AsyncBatchProcessor(event_type=EventType.SQS)
        >>>
        >>> async def async_record_handler(record: SQSRecord):
        >>>     payload: str = record.body
        >>>     return payload
        >>>
        >>> @async_batch_processor(record_handler=async_record_handler, processor=processor)
        >>> def lambda_handler(event, context):
        >>>     return processor.response()

    Limitations
    -----------
    * Sync batch processors. Use `batch_processor` instead.
    """

    warnings.warn(
        "The `async_batch_processor` decorator is deprecated in V3 "
        "and will be removed in the next major version. Use `async_process_partial_response` function instead.",
        category=PowertoolsDeprecationWarning,
        stacklevel=2,
    )

    records = event["Records"]

    with processor(records, record_handler, lambda_context=context):
        processor.async_process()

    return handler(event, context)

async_process_partial_response

async_process_partial_response(
    event: dict,
    record_handler: Callable,
    processor: AsyncBatchProcessor,
    context: LambdaContext | None = None,
) -> PartialItemFailureResponse

Higher level function to handle batch event processing asynchronously.

PARAMETER DESCRIPTION
event

Lambda's original event

TYPE: dict

record_handler

Callable to process each record from the batch

TYPE: Callable

processor

Batch Processor to handle partial failure cases

TYPE: AsyncBatchProcessor

context

Lambda's context, used to optionally inject in record handler

TYPE: LambdaContext | None DEFAULT: None

RETURNS DESCRIPTION
result

Lambda Partial Batch Response

TYPE: PartialItemFailureResponse

Example

Processes Lambda's SQS event

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from aws_lambda_powertools.utilities.batch import AsyncBatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord

processor = BatchProcessor(EventType.SQS)

async def record_handler(record: SQSRecord):
    return record.body

def handler(event, context):
    return async_process_partial_response(
        event=event, record_handler=record_handler, processor=processor, context=context
    )
Limitations
  • Sync batch processors. Use process_partial_response instead.
Source code in aws_lambda_powertools/utilities/batch/decorators.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def async_process_partial_response(
    event: dict,
    record_handler: Callable,
    processor: AsyncBatchProcessor,
    context: LambdaContext | None = None,
) -> PartialItemFailureResponse:
    """
    Higher level function to handle batch event processing asynchronously.

    Parameters
    ----------
    event: dict
        Lambda's original event
    record_handler: Callable
        Callable to process each record from the batch
    processor: AsyncBatchProcessor
        Batch Processor to handle partial failure cases
    context: LambdaContext
        Lambda's context, used to optionally inject in record handler

    Returns
    -------
    result: PartialItemFailureResponse
        Lambda Partial Batch Response

    Example
    --------
    **Processes Lambda's SQS event**

    ```python
    from aws_lambda_powertools.utilities.batch import AsyncBatchProcessor, EventType, process_partial_response
    from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord

    processor = BatchProcessor(EventType.SQS)

    async def record_handler(record: SQSRecord):
        return record.body

    def handler(event, context):
        return async_process_partial_response(
            event=event, record_handler=record_handler, processor=processor, context=context
        )
    ```

    Limitations
    -----------
    * Sync batch processors. Use `process_partial_response` instead.
    """
    try:
        records: list[dict] = event.get("Records", [])
        if not records or not isinstance(records, list):
            raise UnexpectedBatchTypeError(
                "Unexpected batch event type. Possible values are: SQS, KinesisDataStreams, DynamoDBStreams",
            )

    except AttributeError:
        event_types = ", ".join(list(EventType.__members__))
        docs = "https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/#processing-messages-from-sqs"  # noqa: E501 # long-line
        raise ValueError(
            f"Invalid event format. Please ensure batch event is a valid {processor.event_type.value} event. \n"
            f"See sample events in our documentation for either {event_types}: \n {docs}",
        )

    with processor(records, record_handler, context):
        processor.async_process()

    return processor.response()

batch_processor

batch_processor(
    handler: Callable,
    event: dict,
    context: LambdaContext,
    record_handler: Callable,
    processor: BatchProcessor,
)

Middleware to handle batch event processing

Notes

Consider using process_partial_response function for an easier experience.

PARAMETER DESCRIPTION
handler

Lambda's handler

TYPE: Callable

event

Lambda's Event

TYPE: dict

context

Lambda's Context

TYPE: LambdaContext

record_handler

Callable or corutine to process each record from the batch

TYPE: Callable

processor

Batch Processor to handle partial failure cases

TYPE: BatchProcessor

Example

Processes Lambda's event with a BatchProcessor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
>>> from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, EventType
>>> from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
>>>
>>> processor = BatchProcessor(EventType.SQS)
>>>
>>> def record_handler(record):
>>>     return record["body"]
>>>
>>> @batch_processor(record_handler=record_handler, processor=BatchProcessor())
>>> def handler(event, context):
>>>     return processor.response()
Limitations
  • Async batch processors. Use async_batch_processor instead.
Source code in aws_lambda_powertools/utilities/batch/decorators.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@lambda_handler_decorator
@deprecated(
    "`batch_processor` decorator is deprecated; use `process_partial_response` function instead.",
    category=None,
)
def batch_processor(
    handler: Callable,
    event: dict,
    context: LambdaContext,
    record_handler: Callable,
    processor: BatchProcessor,
):
    """
    Middleware to handle batch event processing

    Notes
    -----
    Consider using process_partial_response function for an easier experience.

    Parameters
    ----------
    handler: Callable
        Lambda's handler
    event: dict
        Lambda's Event
    context: LambdaContext
        Lambda's Context
    record_handler: Callable
        Callable or corutine to process each record from the batch
    processor: BatchProcessor
        Batch Processor to handle partial failure cases

    Example
    --------
    **Processes Lambda's event with a BatchProcessor**

        >>> from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, EventType
        >>> from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
        >>>
        >>> processor = BatchProcessor(EventType.SQS)
        >>>
        >>> def record_handler(record):
        >>>     return record["body"]
        >>>
        >>> @batch_processor(record_handler=record_handler, processor=BatchProcessor())
        >>> def handler(event, context):
        >>>     return processor.response()

    Limitations
    -----------
    * Async batch processors. Use `async_batch_processor` instead.
    """

    warnings.warn(
        "The `batch_processor` decorator is deprecated in V3 "
        "and will be removed in the next major version. Use `process_partial_response` function instead.",
        category=PowertoolsDeprecationWarning,
        stacklevel=2,
    )

    records = event["Records"]

    with processor(records, record_handler, lambda_context=context):
        processor.process()

    return handler(event, context)

process_partial_response

process_partial_response(
    event: dict,
    record_handler: Callable,
    processor: BasePartialBatchProcessor,
    context: LambdaContext | None = None,
) -> PartialItemFailureResponse

Higher level function to handle batch event processing.

PARAMETER DESCRIPTION
event

Lambda's original event

TYPE: dict

record_handler

Callable to process each record from the batch

TYPE: Callable

processor

Batch Processor to handle partial failure cases

TYPE: BasePartialBatchProcessor

context

Lambda's context, used to optionally inject in record handler

TYPE: LambdaContext | None DEFAULT: None

RETURNS DESCRIPTION
result

Lambda Partial Batch Response

TYPE: PartialItemFailureResponse

Example

Processes Lambda's SQS event

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord

processor = BatchProcessor(EventType.SQS)

def record_handler(record: SQSRecord):
    return record.body

def handler(event, context):
    return process_partial_response(
        event=event, record_handler=record_handler, processor=processor, context=context
    )
Limitations
  • Async batch processors. Use async_process_partial_response instead.
Source code in aws_lambda_powertools/utilities/batch/decorators.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def process_partial_response(
    event: dict,
    record_handler: Callable,
    processor: BasePartialBatchProcessor,
    context: LambdaContext | None = None,
) -> PartialItemFailureResponse:
    """
    Higher level function to handle batch event processing.

    Parameters
    ----------
    event: dict
        Lambda's original event
    record_handler: Callable
        Callable to process each record from the batch
    processor: BasePartialBatchProcessor
        Batch Processor to handle partial failure cases
    context: LambdaContext
        Lambda's context, used to optionally inject in record handler

    Returns
    -------
    result: PartialItemFailureResponse
        Lambda Partial Batch Response

    Example
    --------
    **Processes Lambda's SQS event**

    ```python
    from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
    from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord

    processor = BatchProcessor(EventType.SQS)

    def record_handler(record: SQSRecord):
        return record.body

    def handler(event, context):
        return process_partial_response(
            event=event, record_handler=record_handler, processor=processor, context=context
        )
    ```

    Limitations
    -----------
    * Async batch processors. Use `async_process_partial_response` instead.
    """
    try:
        records: list[dict] = event.get("Records", [])
        if not records or not isinstance(records, list):
            raise UnexpectedBatchTypeError(
                "Unexpected batch event type. Possible values are: SQS, KinesisDataStreams, DynamoDBStreams",
            )

    except AttributeError:
        event_types = ", ".join(list(EventType.__members__))
        docs = "https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/#processing-messages-from-sqs"  # noqa: E501 # long-line
        raise ValueError(
            f"Invalid event format. Please ensure batch event is a valid {processor.event_type.value} event. \n"
            f"See sample events in our documentation for either {event_types}: \n {docs}",
        )

    with processor(records, record_handler, context):
        processor.process()

    return processor.response()
CLASS DESCRIPTION
SqsFifoPartialProcessor

Process native partial responses from SQS FIFO queues.

SqsFifoPartialProcessor

SqsFifoPartialProcessor(
    model: BatchSqsTypeModel | None = None,
    skip_group_on_error: bool = False,
)

Bases: BatchProcessor

Process native partial responses from SQS FIFO queues.

Stops processing records when the first record fails. The remaining records are reported as failed items.

Example

Process batch triggered by a FIFO SQS

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = SqsFifoPartialProcessor()
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: SQSRecord):
    payload: str = record.body
    if payload:
        item: dict = json.loads(payload)
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
    return processor.response()
PARAMETER DESCRIPTION
model

An optional model for batch processing.

TYPE: BatchSqsTypeModel | None DEFAULT: None

skip_group_on_error

Determines whether to exclusively skip messages from the MessageGroupID that encountered processing failures Default is False.

TYPE: bool DEFAULT: False

Source code in aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def __init__(self, model: BatchSqsTypeModel | None = None, skip_group_on_error: bool = False):
    """
    Initialize the SqsFifoProcessor.

    Parameters
    ----------
    model: BatchSqsTypeModel | None
        An optional model for batch processing.
    skip_group_on_error: bool
        Determines whether to exclusively skip messages from the MessageGroupID that encountered processing failures
        Default is False.

    """
    self._skip_group_on_error: bool = skip_group_on_error
    self._current_group_id = None
    self._failed_group_ids: set[str] = set()
    super().__init__(EventType.SQS, model)