API Reference
    Preparing search index...
    • Wrap a handler function to automatically deserialize and validate Kafka records from an MSK event.

      The returned function will:

      • Deserialize the key and value of each record using the provided schema config.
      • Validate the deserialized key and value using Zod schemas if provided.
      • Replace the records property in the event with an array of deserialized and validated records.
      • Call the original handler with the modified event and original context/arguments.

      Type Parameters

      • K

        Optional type of the deserialized key - defaults to unknown.

      • V

        Optional type of the deserialized value - defaults to unknown.

      Parameters

      • handler: AsyncHandler<Handler<ConsumerRecords<K, V>>>

        The original handler function to wrap. It should accept the deserialized event as its first argument.

      • Optionalconfig: SchemaConfig

        The schema configuration for deserializing and validating record keys and values.

      Returns (event: MSKEvent, context: Context) => Promise<unknown>

      import { kafkaConsumer } from '@aws-lambda-powertools/kafka';
      import { z } from 'zod';

      const keySchema = z.string();
      const valueSchema = z.object({
      id: z.number(),
      });

      export const handler = kafkaConsumer<z.infer<keySchema>, z.infer<valueSchema>>(async (event, context) => {
      // event.records is now an array of deserialized and validated records
      for (const record of event.records) {
      console.log(record.key, record.value);
      }
      }, {
      key: { type: 'json', parserSchema: keySchema },
      value: { type: 'json', parserSchema: valueSchema },
      });