Skip to content

AppSync Events

Event Handler for AWS AppSync real-time events.

stateDiagram-v2
    direction LR
    EventSource: AppSync Events
    EventHandlerResolvers: Publish & Subscribe events
    LambdaInit: Lambda invocation
    EventHandler: Event Handler
    EventHandlerResolver: Route event based on namespace/channel
    YourLogic: Run your registered handler function
    EventHandlerResolverBuilder: Adapts response to AppSync contract
    LambdaResponse: Lambda response

    state EventSource {
        EventHandlerResolvers
    }

    EventHandlerResolvers --> LambdaInit

    LambdaInit --> EventHandler
    EventHandler --> EventHandlerResolver

    state EventHandler {
        [*] --> EventHandlerResolver: app.resolve(event, context)
        EventHandlerResolver --> YourLogic
        YourLogic --> EventHandlerResolverBuilder
    }

    EventHandler --> LambdaResponse

Key Features

  • Easily handle publish and subscribe events with dedicated handler methods
  • Automatic routing based on namespace and channel patterns
  • Support for wildcard patterns to create catch-all handlers
  • Process events in parallel corontrol aggregation for batch processing
  • Graceful error handling for individual events

Terminology

AWS AppSync Events. A service that enables you to quickly build secure, scalable real-time WebSocket APIs without managing infrastructure or writing API code.

It handles connection management, message broadcasting, authentication, and monitoring, reducing time to market and operational costs.

Getting started

Tip: New to AppSync Real-time API?

Visit AWS AppSync Real-time documentation to understand how to set up subscriptions and pub/sub messaging.

Required resources

You must have an existing AppSync Events API with real-time capabilities enabled and IAM permissions to invoke your Lambda function. That said, there are no additional permissions required to use Event Handler as routing requires no dependency (standard library).

AppSync request and response format

AppSync Events uses a specific event format for Lambda requests and responses. In most scenarios, Powertools for AWS simplifies this interaction by automatically formatting resolver returns to match the expected AppSync response structure.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
{
   "identity":"None",
   "result":"None",
   "request":{
      "headers": {
      "x-forwarded-for": "1.1.1.1, 2.2.2.2",
      "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36",
   },
      "domainName":"None"
   },
   "info":{
      "channel":{
         "path":"/default/channel",
         "segments":[
            "default",
            "channel"
         ]
      },
      "channelNamespace":{
         "name":"default"
      },
      "operation":"PUBLISH"
   },
   "error":"None",
   "prev":"None",
   "stash":{

   },
   "outErrors":[

   ],
   "events":[
      {
         "payload":{
            "data":"data_1"
         },
         "id":"1"
      },
      {
         "payload":{
            "data":"data_2"
         },
         "id":"2"
      }
   ]
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
{
   "events":[
      {
         "payload":{
            "data":"data_1"
         },
         "id":"1"
      },
      {
         "payload":{
            "data":"data_2"
         },
         "id":"2"
      }
   ]
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
   "events":[
      {
         "error": "Error message",
         "id":"1"
      },
      {
         "payload":{
            "data":"data_2"
         },
         "id":"2"
      }
   ]
}
1
2
3
{
  "error": "Exception - An exception occurred"
}

Events response with error

When processing events with Lambda, you can return errors to AppSync in three ways:

  • Item specific error: Return an error key within each individual item's response. AppSync Events expects this format for item-specific errors.
  • Fail entire request: Return a JSON object with a top-level error key. This signals a general failure, and AppSync treats the entire request as unsuccessful.
  • Unauthorized exception: Raise the UnauthorizedException exception to reject a subscribe or publish request with HTTP 403.

Resolver decorator

Important

The event handler automatically parses the incoming event data and invokes the appropriate handler based on the namespace/channel pattern you register.

You can define your handlers for different event types using the app.on_publish(), app.async_on_publish(), and app.on_subscribe() methods.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from aws_lambda_powertools.event_handler import AppSyncEventsResolver

if TYPE_CHECKING:
    from aws_lambda_powertools.utilities.typing import LambdaContext

app = AppSyncEventsResolver()


@app.on_publish("/default/channel")
def handle_channel1_publish(payload: dict[str, Any]):
    # Process the payload for this specific channel
    return {
        "processed": True,
        "original_payload": payload,
    }


def lambda_handler(event: dict, context: LambdaContext):
    return app.resolve(event, context)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from __future__ import annotations

from typing import TYPE_CHECKING

from aws_lambda_powertools import Metrics
from aws_lambda_powertools.event_handler import AppSyncEventsResolver
from aws_lambda_powertools.event_handler.events_appsync.exceptions import UnauthorizedException
from aws_lambda_powertools.metrics import MetricUnit

if TYPE_CHECKING:
    from aws_lambda_powertools.utilities.typing import LambdaContext

app = AppSyncEventsResolver()
metrics = Metrics(namespace="AppSyncEvents", service="GettingStartedWithSubscribeEvents")


@app.on_subscribe("/*")
def handle_all_subscriptions():
    path = app.current_event.info.channel_path

    # Perform access control checks
    if not is_authorized(path):
        raise UnauthorizedException("You are not authorized to subscribe to this channel")

    metrics.add_dimension(name="channel", value=path)
    metrics.add_metric(name="subscription", unit=MetricUnit.Count, value=1)

    return True


def is_authorized(path: str):
    # Your authorization logic here
    return path != "not_allowed_path_here"


@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event: dict, context: LambdaContext):
    return app.resolve(event, context)

Advanced

Wildcard patterns and handler precedence

You can use wildcard patterns to create catch-all handlers for multiple channels or namespaces. This is particularly useful for centralizing logic that applies to multiple channels.

When an event matches with multiple handlers, the most specific pattern takes precedence.

Supported wildcard patterns

Only the following patterns are supported:

  • /namespace/* - Matches all channels in the specified namespace
  • /* - Matches all channels in all namespaces

Patterns like /namespace/channel* or /namespace/*/subpath are not supported.

More specific routes will always take precedence over less specific ones. For example, /default/channel1 will take precedence over /default/*, which will take precedence over /*.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from aws_lambda_powertools.event_handler import AppSyncEventsResolver

if TYPE_CHECKING:
    from aws_lambda_powertools.utilities.typing import LambdaContext

app = AppSyncEventsResolver()


@app.on_publish("/default/channel1")
def handle_specific_channel(payload: dict[str, Any]):
    # This handler will be called for events on /default/channel1
    return {"source": "specific_handler", "data": payload}


@app.on_publish("/default/*")
def handle_default_namespace(payload: dict[str, Any]):
    # This handler will be called for all channels in the default namespace
    # EXCEPT for /default/channel1 which has a more specific handler
    return {"source": "namespace_handler", "data": payload}


@app.on_publish("/*")
def handle_all_channels(payload: dict[str, Any]):
    # This handler will be called for all channels in all namespaces
    # EXCEPT for those that have more specific handlers
    return {"source": "catch_all_handler", "data": payload}


def lambda_handler(event: dict, context: LambdaContext):
    return app.resolve(event, context)

If the event doesn't match any registered handler, the Event Handler will log a warning and skip processing the event.

Aggregated processing

Aggregate Processing

When aggregate=True, your handler receives a list of all events, requiring you to manage the response format. Ensure your response includes results for each event in the expected AppSync Request and Response Format.

In some scenarios, you might want to process all events for a channel as a batch rather than individually. This is useful when you need to:

  • Optimize database operations by making a single batch query
  • Ensure all events are processed together or not at all
  • Apply custom error handling logic for the entire batch

You can enable this with the aggregate parameter:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from __future__ import annotations

from typing import TYPE_CHECKING, Any

import boto3
from boto3.dynamodb.types import TypeSerializer

from aws_lambda_powertools.event_handler import AppSyncEventsResolver

if TYPE_CHECKING:
    from aws_lambda_powertools.utilities.typing import LambdaContext

dynamodb = boto3.client("dynamodb")
serializer = TypeSerializer()
app = AppSyncEventsResolver()


def marshall(item: dict[str, Any]) -> dict[str, Any]:
    return {k: serializer.serialize(v) for k, v in item.items()}


@app.on_publish("/default/foo/*", aggregate=True)
async def handle_default_namespace_batch(payload: list[dict[str, Any]]):
    write_operations: list = []

    write_operations.extend({"PutRequest": {"Item": marshall(item)}} for item in payload)

    # Executar operação de lote no DynamoDB
    if write_operations:
        dynamodb.batch_write_item(
            RequestItems={
                "your-table-name": write_operations,
            },
        )

    return payload


def lambda_handler(event: dict, context: LambdaContext):
    return app.resolve(event, context)

Handling errors

You can filter or reject events by raising exceptions in your resolvers or by formatting the payload according to the expected response structure. This instructs AppSync not to propagate that specific message, so subscribers will not receive it.

Handling errors with individual items

When processing items individually with aggregate=False, you can raise an exception to fail a specific message. When this happens, the Event Handler will catch it and include the exception name and message in the response.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from aws_lambda_powertools.event_handler import AppSyncEventsResolver

if TYPE_CHECKING:
    from aws_lambda_powertools.utilities.typing import LambdaContext

app = AppSyncEventsResolver()


class ValidationError(Exception):
    pass


@app.on_publish("/default/channel")
def handle_channel1_publish(payload: dict[str, Any]):
    if not is_valid_payload(payload):
        raise ValidationError("Invalid payload format")

    return {"processed": payload["data"]}


def is_valid_payload(payload: dict[str, Any]):
    return "data" in payload


def lambda_handler(event: dict, context: LambdaContext):
    return app.resolve(event, context)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
    "events":[
       {
          "error": "Error message",
          "id":"1"
       },
       {
          "payload":{
             "data":"data_2"
          },
          "id":"2"
       }
    ]
  }

Handling errors with batch of items

When processing batch of items with aggregate=True, you must format the payload according the expected response.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from aws_lambda_powertools.event_handler import AppSyncEventsResolver

if TYPE_CHECKING:
    from aws_lambda_powertools.utilities.typing import LambdaContext

app = AppSyncEventsResolver()


@app.on_publish("/default/*", aggregate=True)
def handle_default_namespace_batch(payload: list[dict[str, Any]]):
    results: list = []

    # Process all events in the batch together
    for event in payload:
        try:
            # Process each event
            results.append({"id": event.get("id"), "payload": {"processed": True, "originalEvent": event}})
        except Exception as e:
            # Handle errors for individual events
            results.append(
                {
                    "error": str(e),
                    "id": event.get("id"),
                },
            )

    return results


def lambda_handler(event: dict, context: LambdaContext):
    return app.resolve(event, context)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
    "events":[
       {
          "error": "Error message",
          "id":"1"
       },
       {
          "payload":{
             "data":"data_2"
          },
          "id":"2"
       }
    ]
  }

If instead you want to fail the entire batch, you can throw an exception. This will cause the Event Handler to return an error response to AppSync and fail the entire batch.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from aws_lambda_powertools.event_handler import AppSyncEventsResolver

if TYPE_CHECKING:
    from aws_lambda_powertools.utilities.typing import LambdaContext

app = AppSyncEventsResolver()


@app.on_publish("/default/*", aggregate=True)
def handle_default_namespace_batch(payload: list[dict[str, Any]]):
    results: list = []

    # Process all events in the batch together
    for event in payload:
        try:
            # Process each event
            results.append({"id": event.get("id"), "payload": {"processed": True, "originalEvent": event}})
        except Exception as e:
            # Handle errors for individual events
            results.append(
                {
                    "error": str(e),
                    "id": event.get("id"),
                },
            )

    return results


def lambda_handler(event: dict, context: LambdaContext):
    return app.resolve(event, context)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
    "events":[
       {
          "error": "Error message",
          "id":"1"
       },
       {
          "payload":{
             "data":"data_2"
          },
          "id":"2"
       }
    ]
  }

Authorization control

Raising UnauthorizedException will cause the Lambda invocation to fail.

You can also do content based authorization for channel by raising the UnauthorizedException exception. This can cause two situations:

  • When working with publish events Powertools for AWS stop processing messages and subscribers will not receive any message.
  • When working with subscribe events the subscription won't be established.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from aws_lambda_powertools.event_handler import AppSyncEventsResolver

if TYPE_CHECKING:
    from aws_lambda_powertools.utilities.typing import LambdaContext

app = AppSyncEventsResolver()


class ValidationError(Exception):
    pass


@app.on_publish("/default/channel")
def handle_channel1_publish(payload: dict[str, Any]):
    if not is_valid_payload(payload):
        raise ValidationError("Invalid payload format")

    return {"processed": payload["data"]}


def is_valid_payload(payload: dict[str, Any]):
    return "data" in payload


def lambda_handler(event: dict, context: LambdaContext):
    return app.resolve(event, context)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
    "events":[
       {
          "error": "Error message",
          "id":"1"
       },
       {
          "payload":{
             "data":"data_2"
          },
          "id":"2"
       }
    ]
  }

Processing events with async resolvers

Use the @app.async_on_publish() decorator to process events asynchronously.

We use asyncio module to support async functions, and we ensure reliable execution by managing the event loop.

Events order and AppSync Events

AppSync does not rely on event order. As long as each event includes the original id, AppSync processes them correctly regardless of the order in which they are received.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING, Any

from aws_lambda_powertools.event_handler import AppSyncEventsResolver  # type: ignore[attr-defined]

if TYPE_CHECKING:
    from aws_lambda_powertools.utilities.typing import LambdaContext

app = AppSyncEventsResolver()


@app.async_on_publish("/default/channel1")
async def handle_channel1_publish(payload: dict[str, Any]):
    result = await async_process_data(payload)
    return result


async def async_process_data(payload: dict[str, Any]):
    await asyncio.sleep(0.1)
    return {"processed": payload, "async": True}


def lambda_handler(event: dict, context: LambdaContext):
    return app.resolve(event, context)

Accessing Lambda context and event

You can access to the original Lambda event or context for additional information. These are accessible via the app instance:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from aws_lambda_powertools.event_handler import AppSyncEventsResolver
from aws_lambda_powertools.utilities.data_classes import AppSyncResolverEventsEvent

if TYPE_CHECKING:
    from aws_lambda_powertools.utilities.typing import LambdaContext

app = AppSyncEventsResolver()


class ValidationError(Exception):
    pass


@app.on_publish("/default/channel1")
def handle_channel1_publish(payload: dict[str, Any]):
    # Access the full event and context
    lambda_event: AppSyncResolverEventsEvent = app.current_event

    # Access request headers
    header_user_agent = lambda_event.request_headers["user-agent"]

    return {
        "originalMessage": payload,
        "userAgent": header_user_agent,
    }


def lambda_handler(event: dict, context: LambdaContext):
    return app.resolve(event, context)

Event Handler workflow

Working with single items

sequenceDiagram
    participant Client
    participant AppSync
    participant Lambda
    participant EventHandler
    note over Client,EventHandler: Individual Event Processing (aggregate=False)
    Client->>+AppSync: Send multiple events to channel
    AppSync->>+Lambda: Invoke Lambda with batch of events
    Lambda->>+EventHandler: Process events with aggregate=False
    loop For each event in batch
        EventHandler->>EventHandler: Process individual event
    end
    EventHandler-->>-Lambda: Return array of processed events
    Lambda-->>-AppSync: Return event-by-event responses
    AppSync-->>-Client: Report individual event statuses

Working with aggregated items

sequenceDiagram
    participant Client
    participant AppSync
    participant Lambda
    participant EventHandler
    note over Client,EventHandler: Aggregate Processing Workflow
    Client->>+AppSync: Send multiple events to channel
    AppSync->>+Lambda: Invoke Lambda with batch of events
    Lambda->>+EventHandler: Process events with aggregate=True
    EventHandler->>EventHandler: Batch of events
    EventHandler->>EventHandler: Process entire batch at once
    EventHandler->>EventHandler: Format response for each event
    EventHandler-->>-Lambda: Return aggregated results
    Lambda-->>-AppSync: Return success responses
    AppSync-->>-Client: Confirm all events processed

Authorization fails for publish

sequenceDiagram
    participant Client
    participant AppSync
    participant Lambda
    participant EventHandler
    note over Client,EventHandler: Publish Event Authorization Flow
    Client->>AppSync: Publish message to channel
    AppSync->>Lambda: Invoke Lambda with publish event
    Lambda->>EventHandler: Process publish event
    alt Authorization Failed
        EventHandler->>EventHandler: Authorization check fails
        EventHandler->>Lambda: Raise UnauthorizedException
        Lambda->>AppSync: Return error response
        AppSync--xClient: Message not delivered
        AppSync--xAppSync: No distribution to subscribers
    else Authorization Passed
        EventHandler->>Lambda: Return successful response
        Lambda->>AppSync: Return processed event
        AppSync->>Client: Acknowledge message
        AppSync->>AppSync: Distribute to subscribers
    end

Authorization fails for subscribe

sequenceDiagram
    participant Client
    participant AppSync
    participant Lambda
    participant EventHandler
    note over Client,EventHandler: Subscribe Event Authorization Flow
    Client->>AppSync: Request subscription to channel
    AppSync->>Lambda: Invoke Lambda with subscribe event
    Lambda->>EventHandler: Process subscribe event
    alt Authorization Failed
        EventHandler->>EventHandler: Authorization check fails
        EventHandler->>Lambda: Raise UnauthorizedException
        Lambda->>AppSync: Return error response
        AppSync--xClient: Subscription denied (HTTP 403)
    else Authorization Passed
        EventHandler->>Lambda: Return successful response
        Lambda->>AppSync: Return authorization success
        AppSync->>Client: Subscription established
    end

Testing your code

You can test your event handlers by passing a mocked or actual AppSync Events Lambda event.

Testing publish events

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import json
from pathlib import Path

from aws_lambda_powertools.event_handler import AppSyncEventsResolver


class LambdaContext:
    def __init__(self):
        self.function_name = "test-func"
        self.memory_limit_in_mb = 128
        self.invoked_function_arn = "arn:aws:lambda:eu-west-1:809313241234:function:test-func"
        self.aws_request_id = "52fdfc07-2182-154f-163f-5f0f9a621d72"

    def get_remaining_time_in_millis(self) -> int:
        return 1000


def test_publish_event_with_synchronous_resolver():
    """Test handling a publish event with a synchronous resolver."""
    # GIVEN a sample publish event
    with Path.open("getting_started_with_testing_publish_event.json", "r") as f:
        event = json.load(f)

    lambda_context = LambdaContext()

    # GIVEN an AppSyncEventsResolver with a synchronous resolver
    app = AppSyncEventsResolver()

    @app.on_publish(path="/default/*")
    def test_handler(payload):
        return {"processed": True, "data": payload["data"]}

    # WHEN we resolve the event
    result = app.resolve(event, lambda_context)

    # THEN we should get the correct response
    expected_result = {
        "events": [
            {"id": "123", "payload": {"processed": True, "data": "test data"}},
        ],
    }
    assert result == expected_result
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
{
    "identity":"None",
    "result":"None",
    "request":{
       "headers": {
        "x-forwarded-for": "1.1.1.1, 2.2.2.2",
        "cloudfront-viewer-country": "US",
        "cloudfront-is-tablet-viewer": "false",
        "via": "2.0 xxxxxxxxxxxxxxxx.cloudfront.net (CloudFront)",
        "cloudfront-forwarded-proto": "https",
        "origin": "https://us-west-1.console.aws.amazon.com",
        "content-length": "217",
        "accept-language": "en-US,en;q=0.9",
        "host": "xxxxxxxxxxxxxxxx.appsync-api.us-west-1.amazonaws.com",
        "x-forwarded-proto": "https",
        "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36",
        "accept": "*/*",
        "cloudfront-is-mobile-viewer": "false",
        "cloudfront-is-smarttv-viewer": "false",
        "accept-encoding": "gzip, deflate, br",
        "referer": "https://us-west-1.console.aws.amazon.com/appsync/home?region=us-west-1",
        "content-type": "application/json",
        "sec-fetch-mode": "cors",
        "x-amz-cf-id": "3aykhqlUwQeANU-HGY7E_guV5EkNeMMtwyOgiA==",
        "x-amzn-trace-id": "Root=1-5f512f51-fac632066c5e848ae714",
        "authorization": "eyJraWQiOiJScWFCSlJqYVJlM0hrSnBTUFpIcVRXazNOW...",
        "sec-fetch-dest": "empty",
        "x-amz-user-agent": "AWS-Console-AppSync/",
        "cloudfront-is-desktop-viewer": "true",
        "sec-fetch-site": "cross-site",
        "x-forwarded-port": "443"
      },
       "domainName":"None"
    },
    "info":{
       "channel":{
          "path":"/default/channel",
          "segments":[
             "default",
             "channel"
          ]
       },
       "channelNamespace":{
          "name":"default"
       },
       "operation":"PUBLISH"
    },
    "error":"None",
    "prev":"None",
    "stash":{

    },
    "outErrors":[

    ],
    "events":[
       {
          "payload":{
            "data": "test data"
          },
          "id":"123"
       }
    ]
  }

Testing subscribe events

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import json
from pathlib import Path

from aws_lambda_powertools.event_handler import AppSyncEventsResolver


class LambdaContext:
    def __init__(self):
        self.function_name = "test-func"
        self.memory_limit_in_mb = 128
        self.invoked_function_arn = "arn:aws:lambda:eu-west-1:809313241234:function:test-func"
        self.aws_request_id = "52fdfc07-2182-154f-163f-5f0f9a621d72"

    def get_remaining_time_in_millis(self) -> int:
        return 1000


def test_subscribe_event_with_valid_return():
    """Test error handling during publish event processing."""
    # GIVEN a sample publish event
    with Path.open("getting_started_with_testing_publish_event.json", "r") as f:
        event = json.load(f)

    lambda_context = LambdaContext()

    # GIVEN an AppSyncEventsResolver with a resolver that returns ok
    app = AppSyncEventsResolver()

    @app.on_subscribe(path="/default/*")
    def test_handler():
        pass

    # WHEN we resolve the event
    result = app.resolve(event, lambda_context)

    # THEN we should return None because subscribe always must return None
    assert result is None
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
{
    "identity":"None",
    "result":"None",
    "request":{
       "headers": {
        "x-forwarded-for": "1.1.1.1, 2.2.2.2",
        "cloudfront-viewer-country": "US",
        "cloudfront-is-tablet-viewer": "false",
        "via": "2.0 xxxxxxxxxxxxxxxx.cloudfront.net (CloudFront)",
        "cloudfront-forwarded-proto": "https",
        "origin": "https://us-west-1.console.aws.amazon.com",
        "content-length": "217",
        "accept-language": "en-US,en;q=0.9",
        "host": "xxxxxxxxxxxxxxxx.appsync-api.us-west-1.amazonaws.com",
        "x-forwarded-proto": "https",
        "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36",
        "accept": "*/*",
        "cloudfront-is-mobile-viewer": "false",
        "cloudfront-is-smarttv-viewer": "false",
        "accept-encoding": "gzip, deflate, br",
        "referer": "https://us-west-1.console.aws.amazon.com/appsync/home?region=us-west-1",
        "content-type": "application/json",
        "sec-fetch-mode": "cors",
        "x-amz-cf-id": "3aykhqlUwQeANU-HGY7E_guV5EkNeMMtwyOgiA==",
        "x-amzn-trace-id": "Root=1-5f512f51-fac632066c5e848ae714",
        "authorization": "eyJraWQiOiJScWFCSlJqYVJlM0hrSnBTUFpIcVRXazNOW...",
        "sec-fetch-dest": "empty",
        "x-amz-user-agent": "AWS-Console-AppSync/",
        "cloudfront-is-desktop-viewer": "true",
        "sec-fetch-site": "cross-site",
        "x-forwarded-port": "443"
      },
       "domainName":"None"
    },
    "info":{
       "channel":{
          "path":"/default/channel",
          "segments":[
             "default",
             "channel"
          ]
       },
       "channelNamespace":{
          "name":"default"
       },
       "operation":"SUBSCRIBE"
    },
    "error":"None",
    "prev":"None",
    "stash":{

    },
    "outErrors":[

    ],
    "events":[]
  }