Skip to content

DynamoDBStreamTrigger

Bases: Construct

CDK construct for creating DynamoDB stream-triggered Lambda functions with EventBridge Pipes.

This construct creates a complete event-driven architecture that processes DynamoDB stream events through EventBridge Pipes and triggers Lambda functions. It supports different downstream trigger types including CodePipeline executions and SNS topic publishing.

The construct handles:

  • DynamoDB table reference from existing resources via SSM parameters
  • Lambda function creation with appropriate IAM permissions
  • EventBridge Pipes configuration for stream processing
  • Stream filtering based on provided criteria
  • Automatic IAM role and policy management
  • Support for table index operations when specified

Key Features:

  • Event-driven processing of DynamoDB changes
  • Configurable stream filtering with JSON patterns
  • Multiple trigger types (CodePipeline, SNS)
  • Automatic IAM permission management
  • Support for DynamoDB Global Secondary Index operations
  • Integration with MARE foundation infrastructure patterns
Source code in mare_aws_common_lib/constructs/dynamodb_stream_trigger.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class DynamoDBStreamTrigger(Construct):
    """
    CDK construct for creating DynamoDB stream-triggered Lambda functions with EventBridge Pipes.

    This construct creates a complete event-driven architecture that processes DynamoDB stream
    events through EventBridge Pipes and triggers Lambda functions. It supports different
    downstream trigger types including CodePipeline executions and SNS topic publishing.

    The construct handles:

    - DynamoDB table reference from existing resources via SSM parameters
    - Lambda function creation with appropriate IAM permissions
    - EventBridge Pipes configuration for stream processing
    - Stream filtering based on provided criteria
    - Automatic IAM role and policy management
    - Support for table index operations when specified

    Key Features:

    - Event-driven processing of DynamoDB changes
    - Configurable stream filtering with JSON patterns
    - Multiple trigger types (CodePipeline, SNS)
    - Automatic IAM permission management
    - Support for DynamoDB Global Secondary Index operations
    - Integration with MARE foundation infrastructure patterns
    """
    def __init__(self, scope: Construct, id: str, *, 
                 app_helper: ApplicationHelper, trigger_type: StreamTriggerType,
                 table_base_name: str, resource_base_name: str,
                 lambda_handler: str, lambda_code_path: str, env_vars: Dict[str, str], 
                 filter_pattern: dict, table_index_name: str = None, is_devops: bool = True) -> None:
        """
        Initialize the DynamoDB stream trigger construct.

        Creates a complete stream processing pipeline that connects a DynamoDB table's stream
        to a Lambda function via EventBridge Pipes, with automatic IAM configuration and
        support for various downstream trigger types.

        Args:
            scope (Construct): CDK construct scope (typically a Stack)
            id (str): Unique identifier for this construct within the scope
            app_helper (ApplicationHelper): Helper containing application configuration
                and utilities for resource naming and parameter access
            trigger_type (StreamTriggerType): Type of downstream service to trigger
                (PIPELINE for CodePipeline, SNS_TOPIC for SNS publishing)
            table_base_name (str): Base name of the DynamoDB table. Used with ResourceNaming
                to generate the full table name and locate SSM parameters
            resource_base_name (str): Base name for the Lambda function and EventBridge Pipe.
                Should be descriptive of the processing purpose
            lambda_handler (str): Python handler function in the format "module.function"
                (e.g., "app.handler", "main.process_stream_event")
            lambda_code_path (str): Local path to the Lambda function code directory
                or ZIP file containing the function implementation
            env_vars (Dict[str, str]): Environment variables for the Lambda function.
                Should include ARNs for downstream services based on trigger_type:
                - For PIPELINE: CodePipeline ARNs as values
                - For SNS_TOPIC: SNS topic ARNs as values
            filter_pattern (dict): JSON filter pattern for EventBridge Pipes to determine
                which DynamoDB stream events should trigger the Lambda function.
                Uses EventBridge Pipes filter syntax
            table_index_name (str, optional): Name of a Global Secondary Index if the
                Lambda needs to perform queries/updates on the index. Defaults to None.
            is_devops (bool, optional): Whether to look up SSM parameters in the devops
                environment. Defaults to True for cross-account foundation resources.

        Example:
            ```python
            trigger = DynamoDBStreamTrigger(
                scope=stack,
                id="deployment-trigger",
                app_helper=helper,
                trigger_type=StreamTriggerType.PIPELINE,
                table_base_name="deployment-requests",
                resource_base_name="deployment-processor",
                lambda_handler="handlers.process_deployment",
                lambda_code_path="./lambda/deployment",
                env_vars={
                    "PIPELINE_ARN": "arn:aws:codepipeline:us-east-1:123456789012:my-pipeline"
                },
                filter_pattern={
                    "eventName": ["INSERT"],
                    "dynamodb": {
                        "NewImage": {
                            "status": {"S": ["PENDING"]}
                        }
                    }
                },
                table_index_name="status-index"
            )
            ```

        Infrastructure Created:
            - Lambda function with specified runtime and code
            - EventBridge Pipe connecting DynamoDB stream to Lambda
            - IAM role for the EventBridge Pipe with stream read permissions
            - IAM policies for Lambda based on trigger_type:
                 * PIPELINE: codepipeline:StartPipelineExecution, codepipeline:ListPipelineExecutions
                 * SNS_TOPIC: sns:Publish
            - Additional DynamoDB permissions if table_index_name is provided:
                 * dynamodb:Query, dynamodb:UpdateItem on table and index

        Notes:
            - The DynamoDB table must already exist and have SSM parameters stored:
                 * dynamodb_table_{table_name}: Table ARN
                 * dynamodb_table_stream_{table_name}: Stream ARN
            - Lambda function uses Python 3.12 runtime
            - EventBridge Pipe starts reading from LATEST position in the stream
            - Input template transforms DynamoDB stream records for Lambda processing
            - All IAM permissions are automatically configured based on trigger type

        Raises:
            ValueError: If required SSM parameters are not found
            TypeError: If env_vars contains ARNs that don't match the specified trigger_type
        """

        super().__init__(scope, id)

        table_name, _ = ResourceNaming.get_name_for_resource(app_helper, AWSResourceType.DYNAMODB, 
                                                             resource_name=table_base_name, 
                                                             max_length=AWSResourceNameLength.DYN_TABLE.value)

        # Get SSM parameter values (table ARN and stream ARN)
        table_arn = app_helper.get_parameter_value(self, f"dynamodb_table_{table_name.replace("-","_")}", is_devops)
        stream_arn = app_helper.get_parameter_value(self, f"dynamodb_table_stream_{table_name.replace("-","_")}", is_devops)

        index_arn: str = None
        if table_index_name is not None:
            index_arn = ArnUriHelper.build_dynamodb_table_index_arn(
                app_helper.get_from_common("region"), 
                app_helper.get_from_env("account_id"), 
                table_name, table_index_name
            )

        table = dynamodb.Table.from_table_attributes(
            self, f"{id.replace("-","_")}_table",
            table_arn=table_arn,
            table_stream_arn=stream_arn
        )

        trigger_lambda = lambda_.Function(
            self,
            f"{id.replace("-","_")}_lambda",
            runtime=lambda_.Runtime.PYTHON_3_12,
            handler=lambda_handler,
            code=lambda_.Code.from_asset(lambda_code_path),
            function_name=resource_base_name,
            environment=env_vars
        )

        if trigger_type == StreamTriggerType.PIPELINE:
             # Add policies so that the lambda can start the execution of a pipeline
            pipeline_arns = [value for value in env_vars.values() if isinstance(value, str) and value.startswith("arn:aws:codepipeline:")]
            trigger_lambda.add_to_role_policy(
                iam.PolicyStatement(
                    actions=[
                        "codepipeline:ListPipelineExecutions",
                        "codepipeline:StartPipelineExecution"
                    ],
                    resources=pipeline_arns
                )
            )

        if trigger_type == StreamTriggerType.SNS_TOPIC:
            topic_arns = [value for value in env_vars.values() if isinstance(value, str) and value.startswith("arn:aws:sns:")]
            trigger_lambda.add_to_role_policy(
                iam.PolicyStatement(
                    actions=["sns:Publish"],
                    resources=topic_arns
                )
            )

        table.grant_stream_read(trigger_lambda)

        if index_arn is not None:
            # Add policies so that the lambda can query and update records in dynamodb
            trigger_lambda.add_to_role_policy(
                iam.PolicyStatement(
                    actions=[
                        "dynamodb:UpdateItem",
                        "dynamodb:Query"
                    ],
                    resources=[table_arn, index_arn]
                )
            )

        trigger_lambda.add_permission(
            "AllowPipeInvoke",
            principal=iam.ServicePrincipal("pipes.amazonaws.com"),
            action="lambda:InvokeFunction"
        )

        pipe_role = iam.Role(
            self, f"{id.replace("-","_")}_pipe_role",
            assumed_by=iam.ServicePrincipal("pipes.amazonaws.com")
        )
        table.grant_stream_read(pipe_role)
        trigger_lambda.grant_invoke(pipe_role)

        pipes.CfnPipe(
            self, f"{id.replace("-","_")}_pipe",
            name=resource_base_name,
            role_arn=pipe_role.role_arn,
            source=table.table_stream_arn,
            source_parameters=pipes.CfnPipe.PipeSourceParametersProperty(
                dynamo_db_stream_parameters=pipes.CfnPipe.PipeSourceDynamoDBStreamParametersProperty(
                    starting_position="LATEST"
                ),
                filter_criteria=pipes.CfnPipe.FilterCriteriaProperty(
                    filters=[pipes.CfnPipe.FilterProperty(pattern=json.dumps(filter_pattern))]
                )
            ),
            target=trigger_lambda.function_arn,
            target_parameters=pipes.CfnPipe.PipeTargetParametersProperty(
                input_template="{\"Records\": [{\"dynamodb\": {\"NewImage\": <$.dynamodb.NewImage>}}]}"
            )
        )

Functions

__init__(scope, id, *, app_helper, trigger_type, table_base_name, resource_base_name, lambda_handler, lambda_code_path, env_vars, filter_pattern, table_index_name=None, is_devops=True)

Initialize the DynamoDB stream trigger construct.

Creates a complete stream processing pipeline that connects a DynamoDB table's stream to a Lambda function via EventBridge Pipes, with automatic IAM configuration and support for various downstream trigger types.

Parameters:

Name Type Description Default
scope Construct

CDK construct scope (typically a Stack)

required
id str

Unique identifier for this construct within the scope

required
app_helper ApplicationHelper

Helper containing application configuration and utilities for resource naming and parameter access

required
trigger_type StreamTriggerType

Type of downstream service to trigger (PIPELINE for CodePipeline, SNS_TOPIC for SNS publishing)

required
table_base_name str

Base name of the DynamoDB table. Used with ResourceNaming to generate the full table name and locate SSM parameters

required
resource_base_name str

Base name for the Lambda function and EventBridge Pipe. Should be descriptive of the processing purpose

required
lambda_handler str

Python handler function in the format "module.function" (e.g., "app.handler", "main.process_stream_event")

required
lambda_code_path str

Local path to the Lambda function code directory or ZIP file containing the function implementation

required
env_vars Dict[str, str]

Environment variables for the Lambda function. Should include ARNs for downstream services based on trigger_type: - For PIPELINE: CodePipeline ARNs as values - For SNS_TOPIC: SNS topic ARNs as values

required
filter_pattern dict

JSON filter pattern for EventBridge Pipes to determine which DynamoDB stream events should trigger the Lambda function. Uses EventBridge Pipes filter syntax

required
table_index_name str

Name of a Global Secondary Index if the Lambda needs to perform queries/updates on the index. Defaults to None.

None
is_devops bool

Whether to look up SSM parameters in the devops environment. Defaults to True for cross-account foundation resources.

True
Example
trigger = DynamoDBStreamTrigger(
    scope=stack,
    id="deployment-trigger",
    app_helper=helper,
    trigger_type=StreamTriggerType.PIPELINE,
    table_base_name="deployment-requests",
    resource_base_name="deployment-processor",
    lambda_handler="handlers.process_deployment",
    lambda_code_path="./lambda/deployment",
    env_vars={
        "PIPELINE_ARN": "arn:aws:codepipeline:us-east-1:123456789012:my-pipeline"
    },
    filter_pattern={
        "eventName": ["INSERT"],
        "dynamodb": {
            "NewImage": {
                "status": {"S": ["PENDING"]}
            }
        }
    },
    table_index_name="status-index"
)
Infrastructure Created
  • Lambda function with specified runtime and code
  • EventBridge Pipe connecting DynamoDB stream to Lambda
  • IAM role for the EventBridge Pipe with stream read permissions
  • IAM policies for Lambda based on trigger_type:
    • PIPELINE: codepipeline:StartPipelineExecution, codepipeline:ListPipelineExecutions
    • SNS_TOPIC: sns:Publish
  • Additional DynamoDB permissions if table_index_name is provided:
    • dynamodb:Query, dynamodb:UpdateItem on table and index
Notes
  • The DynamoDB table must already exist and have SSM parameters stored:
    • dynamodb_table_{table_name}: Table ARN
    • dynamodb_table_stream_{table_name}: Stream ARN
  • Lambda function uses Python 3.12 runtime
  • EventBridge Pipe starts reading from LATEST position in the stream
  • Input template transforms DynamoDB stream records for Lambda processing
  • All IAM permissions are automatically configured based on trigger type

Raises:

Type Description
ValueError

If required SSM parameters are not found

TypeError

If env_vars contains ARNs that don't match the specified trigger_type

Source code in mare_aws_common_lib/constructs/dynamodb_stream_trigger.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def __init__(self, scope: Construct, id: str, *, 
             app_helper: ApplicationHelper, trigger_type: StreamTriggerType,
             table_base_name: str, resource_base_name: str,
             lambda_handler: str, lambda_code_path: str, env_vars: Dict[str, str], 
             filter_pattern: dict, table_index_name: str = None, is_devops: bool = True) -> None:
    """
    Initialize the DynamoDB stream trigger construct.

    Creates a complete stream processing pipeline that connects a DynamoDB table's stream
    to a Lambda function via EventBridge Pipes, with automatic IAM configuration and
    support for various downstream trigger types.

    Args:
        scope (Construct): CDK construct scope (typically a Stack)
        id (str): Unique identifier for this construct within the scope
        app_helper (ApplicationHelper): Helper containing application configuration
            and utilities for resource naming and parameter access
        trigger_type (StreamTriggerType): Type of downstream service to trigger
            (PIPELINE for CodePipeline, SNS_TOPIC for SNS publishing)
        table_base_name (str): Base name of the DynamoDB table. Used with ResourceNaming
            to generate the full table name and locate SSM parameters
        resource_base_name (str): Base name for the Lambda function and EventBridge Pipe.
            Should be descriptive of the processing purpose
        lambda_handler (str): Python handler function in the format "module.function"
            (e.g., "app.handler", "main.process_stream_event")
        lambda_code_path (str): Local path to the Lambda function code directory
            or ZIP file containing the function implementation
        env_vars (Dict[str, str]): Environment variables for the Lambda function.
            Should include ARNs for downstream services based on trigger_type:
            - For PIPELINE: CodePipeline ARNs as values
            - For SNS_TOPIC: SNS topic ARNs as values
        filter_pattern (dict): JSON filter pattern for EventBridge Pipes to determine
            which DynamoDB stream events should trigger the Lambda function.
            Uses EventBridge Pipes filter syntax
        table_index_name (str, optional): Name of a Global Secondary Index if the
            Lambda needs to perform queries/updates on the index. Defaults to None.
        is_devops (bool, optional): Whether to look up SSM parameters in the devops
            environment. Defaults to True for cross-account foundation resources.

    Example:
        ```python
        trigger = DynamoDBStreamTrigger(
            scope=stack,
            id="deployment-trigger",
            app_helper=helper,
            trigger_type=StreamTriggerType.PIPELINE,
            table_base_name="deployment-requests",
            resource_base_name="deployment-processor",
            lambda_handler="handlers.process_deployment",
            lambda_code_path="./lambda/deployment",
            env_vars={
                "PIPELINE_ARN": "arn:aws:codepipeline:us-east-1:123456789012:my-pipeline"
            },
            filter_pattern={
                "eventName": ["INSERT"],
                "dynamodb": {
                    "NewImage": {
                        "status": {"S": ["PENDING"]}
                    }
                }
            },
            table_index_name="status-index"
        )
        ```

    Infrastructure Created:
        - Lambda function with specified runtime and code
        - EventBridge Pipe connecting DynamoDB stream to Lambda
        - IAM role for the EventBridge Pipe with stream read permissions
        - IAM policies for Lambda based on trigger_type:
             * PIPELINE: codepipeline:StartPipelineExecution, codepipeline:ListPipelineExecutions
             * SNS_TOPIC: sns:Publish
        - Additional DynamoDB permissions if table_index_name is provided:
             * dynamodb:Query, dynamodb:UpdateItem on table and index

    Notes:
        - The DynamoDB table must already exist and have SSM parameters stored:
             * dynamodb_table_{table_name}: Table ARN
             * dynamodb_table_stream_{table_name}: Stream ARN
        - Lambda function uses Python 3.12 runtime
        - EventBridge Pipe starts reading from LATEST position in the stream
        - Input template transforms DynamoDB stream records for Lambda processing
        - All IAM permissions are automatically configured based on trigger type

    Raises:
        ValueError: If required SSM parameters are not found
        TypeError: If env_vars contains ARNs that don't match the specified trigger_type
    """

    super().__init__(scope, id)

    table_name, _ = ResourceNaming.get_name_for_resource(app_helper, AWSResourceType.DYNAMODB, 
                                                         resource_name=table_base_name, 
                                                         max_length=AWSResourceNameLength.DYN_TABLE.value)

    # Get SSM parameter values (table ARN and stream ARN)
    table_arn = app_helper.get_parameter_value(self, f"dynamodb_table_{table_name.replace("-","_")}", is_devops)
    stream_arn = app_helper.get_parameter_value(self, f"dynamodb_table_stream_{table_name.replace("-","_")}", is_devops)

    index_arn: str = None
    if table_index_name is not None:
        index_arn = ArnUriHelper.build_dynamodb_table_index_arn(
            app_helper.get_from_common("region"), 
            app_helper.get_from_env("account_id"), 
            table_name, table_index_name
        )

    table = dynamodb.Table.from_table_attributes(
        self, f"{id.replace("-","_")}_table",
        table_arn=table_arn,
        table_stream_arn=stream_arn
    )

    trigger_lambda = lambda_.Function(
        self,
        f"{id.replace("-","_")}_lambda",
        runtime=lambda_.Runtime.PYTHON_3_12,
        handler=lambda_handler,
        code=lambda_.Code.from_asset(lambda_code_path),
        function_name=resource_base_name,
        environment=env_vars
    )

    if trigger_type == StreamTriggerType.PIPELINE:
         # Add policies so that the lambda can start the execution of a pipeline
        pipeline_arns = [value for value in env_vars.values() if isinstance(value, str) and value.startswith("arn:aws:codepipeline:")]
        trigger_lambda.add_to_role_policy(
            iam.PolicyStatement(
                actions=[
                    "codepipeline:ListPipelineExecutions",
                    "codepipeline:StartPipelineExecution"
                ],
                resources=pipeline_arns
            )
        )

    if trigger_type == StreamTriggerType.SNS_TOPIC:
        topic_arns = [value for value in env_vars.values() if isinstance(value, str) and value.startswith("arn:aws:sns:")]
        trigger_lambda.add_to_role_policy(
            iam.PolicyStatement(
                actions=["sns:Publish"],
                resources=topic_arns
            )
        )

    table.grant_stream_read(trigger_lambda)

    if index_arn is not None:
        # Add policies so that the lambda can query and update records in dynamodb
        trigger_lambda.add_to_role_policy(
            iam.PolicyStatement(
                actions=[
                    "dynamodb:UpdateItem",
                    "dynamodb:Query"
                ],
                resources=[table_arn, index_arn]
            )
        )

    trigger_lambda.add_permission(
        "AllowPipeInvoke",
        principal=iam.ServicePrincipal("pipes.amazonaws.com"),
        action="lambda:InvokeFunction"
    )

    pipe_role = iam.Role(
        self, f"{id.replace("-","_")}_pipe_role",
        assumed_by=iam.ServicePrincipal("pipes.amazonaws.com")
    )
    table.grant_stream_read(pipe_role)
    trigger_lambda.grant_invoke(pipe_role)

    pipes.CfnPipe(
        self, f"{id.replace("-","_")}_pipe",
        name=resource_base_name,
        role_arn=pipe_role.role_arn,
        source=table.table_stream_arn,
        source_parameters=pipes.CfnPipe.PipeSourceParametersProperty(
            dynamo_db_stream_parameters=pipes.CfnPipe.PipeSourceDynamoDBStreamParametersProperty(
                starting_position="LATEST"
            ),
            filter_criteria=pipes.CfnPipe.FilterCriteriaProperty(
                filters=[pipes.CfnPipe.FilterProperty(pattern=json.dumps(filter_pattern))]
            )
        ),
        target=trigger_lambda.function_arn,
        target_parameters=pipes.CfnPipe.PipeTargetParametersProperty(
            input_template="{\"Records\": [{\"dynamodb\": {\"NewImage\": <$.dynamodb.NewImage>}}]}"
        )
    )

StreamTriggerType

Bases: Enum

Enumeration of supported trigger types for DynamoDB stream processing.

Defines the available downstream services that can be triggered when DynamoDB stream events are processed by the Lambda function.

Source code in mare_aws_common_lib/constructs/dynamodb_stream_trigger.py
14
15
16
17
18
19
20
21
22
class StreamTriggerType(Enum):
    """
    Enumeration of supported trigger types for DynamoDB stream processing.

    Defines the available downstream services that can be triggered when
    DynamoDB stream events are processed by the Lambda function.
    """
    PIPELINE = "pipeline"
    SNS_TOPIC = "sns_topic"

PIPELINE = 'pipeline' class-attribute instance-attribute

SNS_TOPIC = 'sns_topic' class-attribute instance-attribute