Skip to content

PipelineEventMonitor

Bases: Construct

CDK construct for monitoring AWS CodePipeline events and updating DynamoDB records.

This construct creates an event-driven monitoring system that listens to CodePipeline state changes via EventBridge and processes them through a Lambda function. It's designed to track pipeline execution status and update corresponding records in a DynamoDB table, providing real-time visibility into deployment pipeline activities.

The construct handles: - EventBridge rule configuration for CodePipeline events - Lambda function creation with DynamoDB access permissions - Automatic IAM policy management for table and index operations - Integration with MARE foundation infrastructure patterns - Support for multiple pipeline monitoring through environment mapping

Key Features: - Real-time CodePipeline event processing - DynamoDB integration for status tracking - Support for Global Secondary Index operations - Configurable pipeline filtering based on environment mapping - Automatic IAM permission management - 30-second Lambda timeout for reliable event processing

Source code in mare_aws_common_lib/constructs/pipeline_event_monitor.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
class PipelineEventMonitor(Construct):
    """
    CDK construct for monitoring AWS CodePipeline events and updating DynamoDB records.

    This construct creates an event-driven monitoring system that listens to CodePipeline
    state changes via EventBridge and processes them through a Lambda function. It's designed
    to track pipeline execution status and update corresponding records in a DynamoDB table,
    providing real-time visibility into deployment pipeline activities.

    The construct handles:
    - EventBridge rule configuration for CodePipeline events
    - Lambda function creation with DynamoDB access permissions
    - Automatic IAM policy management for table and index operations
    - Integration with MARE foundation infrastructure patterns
    - Support for multiple pipeline monitoring through environment mapping

    Key Features:
    - Real-time CodePipeline event processing
    - DynamoDB integration for status tracking
    - Support for Global Secondary Index operations
    - Configurable pipeline filtering based on environment mapping
    - Automatic IAM permission management
    - 30-second Lambda timeout for reliable event processing
    """
    def __init__(self, scope: Construct, id: str, *,
        app_helper: ApplicationHelper, table_base_name: str, table_indexes: List[str], lambda_function_name: str,
        lambda_handler: str, lambda_code_path: str, env_vars: Dict[str, str], 
        env_pipeline_map: Dict[str, str], is_devops: bool = True) -> None:
        """
        Initialize the CodePipeline event monitoring construct.

        Creates a complete pipeline monitoring system that captures CodePipeline events
        through EventBridge and processes them via Lambda to update DynamoDB records.
        The system automatically configures IAM permissions and event filtering.

        Args:
            scope (Construct): CDK construct scope (typically a Stack)
            id (str): Unique identifier for this construct within the scope
            app_helper (ApplicationHelper): Helper containing application configuration
                and utilities for resource naming and parameter access
            table_base_name (str): Base name of the DynamoDB table where pipeline
                status will be stored. Used with ResourceNaming to generate full table name
            table_indexes (List[str]): List of Global Secondary Index names that the
                Lambda function needs to query or update. Used for IAM permission setup
            lambda_function_name (str): Name for the Lambda function that will process
                CodePipeline events. Should be descriptive of the monitoring purpose
            lambda_handler (str): Python handler function in the format "module.function"
                (e.g., "app.handler", "pipeline_monitor.process_event")
            lambda_code_path (str): Local path to the Lambda function code directory
                or ZIP file containing the event processing logic
            env_vars (Dict[str, str]): Environment variables for the Lambda function.
                Should include configuration needed for pipeline event processing such as
                table names, notification settings, or external service endpoints
            env_pipeline_map (Dict[str, str]): Mapping of environment names to pipeline names
                that should be monitored. Only events from these pipelines will trigger
                the Lambda function. Format: {"dev": "my-app-dev-pipeline", "prod": "my-app-prod-pipeline"}
            is_devops (bool, optional): Whether to look up SSM parameters in the devops
                environment. Defaults to True for cross-account foundation resources.

        Example:
            ```python
            monitor = PipelineEventMonitor(
                scope=stack,
                id="deployment-monitor",
                app_helper=helper,
                table_base_name="deployment-status",
                table_indexes=["pipeline-status-index", "environment-index"],
                lambda_function_name="pipeline-status-monitor",
                lambda_handler="monitor.process_pipeline_event",
                lambda_code_path="./lambda/pipeline-monitor",
                env_vars={
                    "TABLE_NAME": "deployment-status-table",
                    "SLACK_WEBHOOK": "https://hooks.slack.com/...",
                    "LOG_LEVEL": "INFO"
                },
                env_pipeline_map={
                    "development": "myapp-dev-pipeline",
                    "staging": "myapp-staging-pipeline", 
                    "production": "myapp-prod-pipeline"
                }
            )
            ```

        Infrastructure Created:
            - Lambda function (Python 3.12 runtime, 30-second timeout)
            - EventBridge rule filtering CodePipeline events for specified pipelines
            - IAM policies for Lambda function:
                 * dynamodb:Query and dynamodb:UpdateItem on table and all specified indexes
                 * Basic Lambda execution permissions
            - Event target linking EventBridge rule to Lambda function

        EventBridge Rule Details:
            The rule monitors two types of CodePipeline events:

            - "CodePipeline Action Execution State Change": Individual action status
            - "CodePipeline Pipeline Execution State Change": Overall pipeline status

            Events are filtered to only include pipelines specified in env_pipeline_map,
            ensuring the Lambda function only processes relevant pipeline events.

        Notes:
            - The DynamoDB table must already exist with SSM parameter:
                 * dynamodb_table_{table_name}: Table ARN
            - Lambda function uses Python 3.12 runtime with 30-second timeout
            - All specified table indexes must exist on the DynamoDB table
            - The Lambda function will receive EventBridge events in standard format
            - IAM permissions are automatically configured for all table operations

        Lambda Event Format:
            The Lambda function will receive CodePipeline events in this structure:
            ```json
            {
                "source": "aws.codepipeline",
                "detail-type": "CodePipeline Pipeline Execution State Change",
                "detail": {
                    "pipeline": "pipeline-name",
                    "execution-id": "execution-uuid",
                    "state": "SUCCEEDED|FAILED|STARTED|...",
                    "version": 1
                }
            }
            ```

        Raises:
            ValueError: If required SSM parameters are not found
            TypeError: If table_indexes contains invalid index names
        """

        super().__init__(scope, id)

        table_name, _ = ResourceNaming.get_name_for_resource(app_helper, AWSResourceType.DYNAMODB, 
                                                             resource_name=table_base_name, 
                                                             max_length=AWSResourceNameLength.DYN_TABLE.value)

        # Get SSM parameter values (table ARN)
        resources_arn = []
        resources_arn.append(app_helper.get_parameter_value(self, f"dynamodb_table_{table_name.replace("-","_")}", is_devops))
        for item in table_indexes:
            resources_arn.append(
                ArnUriHelper.build_dynamodb_table_index_arn(
                    app_helper.get_from_common("region"), 
                    app_helper.get_from_env("account_id"), 
                    table_name, item
                )
            )

        monitor_lambda = _lambda.Function(
            self,
            f"{id.replace("-","_")}_lambda",
            runtime=_lambda.Runtime.PYTHON_3_12,
            handler=lambda_handler,
            code=_lambda.Code.from_asset(lambda_code_path),
            function_name=lambda_function_name,
            timeout=Duration.seconds(30),
            environment=env_vars
        )

        # Add policies to query and update DynamoDB
        monitor_lambda.add_to_role_policy(
            iam.PolicyStatement(
                actions=[
                    "dynamodb:UpdateItem",
                    "dynamodb:Query"
                ],
                resources=resources_arn,
            )
        )

        # Event types to monitor
        detail_types = [
            "CodePipeline Action Execution State Change",
            "CodePipeline Pipeline Execution State Change"
        ]

        # Create EventBridge rule
        rule = events.Rule(
            self,
            f"{id.replace("-","_")}_event_rule",
            rule_name=f"{lambda_function_name}-event-rule",
            event_pattern=events.EventPattern(
                source=["aws.codepipeline"],
                detail_type=detail_types,
                detail={
                    "pipeline": list(env_pipeline_map.values()),
                }
            ),
        )

        rule.add_target(targets.LambdaFunction(monitor_lambda))

Functions

__init__(scope, id, *, app_helper, table_base_name, table_indexes, lambda_function_name, lambda_handler, lambda_code_path, env_vars, env_pipeline_map, is_devops=True)

Initialize the CodePipeline event monitoring construct.

Creates a complete pipeline monitoring system that captures CodePipeline events through EventBridge and processes them via Lambda to update DynamoDB records. The system automatically configures IAM permissions and event filtering.

Parameters:

Name Type Description Default
scope Construct

CDK construct scope (typically a Stack)

required
id str

Unique identifier for this construct within the scope

required
app_helper ApplicationHelper

Helper containing application configuration and utilities for resource naming and parameter access

required
table_base_name str

Base name of the DynamoDB table where pipeline status will be stored. Used with ResourceNaming to generate full table name

required
table_indexes List[str]

List of Global Secondary Index names that the Lambda function needs to query or update. Used for IAM permission setup

required
lambda_function_name str

Name for the Lambda function that will process CodePipeline events. Should be descriptive of the monitoring purpose

required
lambda_handler str

Python handler function in the format "module.function" (e.g., "app.handler", "pipeline_monitor.process_event")

required
lambda_code_path str

Local path to the Lambda function code directory or ZIP file containing the event processing logic

required
env_vars Dict[str, str]

Environment variables for the Lambda function. Should include configuration needed for pipeline event processing such as table names, notification settings, or external service endpoints

required
env_pipeline_map Dict[str, str]

Mapping of environment names to pipeline names that should be monitored. Only events from these pipelines will trigger the Lambda function. Format: {"dev": "my-app-dev-pipeline", "prod": "my-app-prod-pipeline"}

required
is_devops bool

Whether to look up SSM parameters in the devops environment. Defaults to True for cross-account foundation resources.

True
Example
monitor = PipelineEventMonitor(
    scope=stack,
    id="deployment-monitor",
    app_helper=helper,
    table_base_name="deployment-status",
    table_indexes=["pipeline-status-index", "environment-index"],
    lambda_function_name="pipeline-status-monitor",
    lambda_handler="monitor.process_pipeline_event",
    lambda_code_path="./lambda/pipeline-monitor",
    env_vars={
        "TABLE_NAME": "deployment-status-table",
        "SLACK_WEBHOOK": "https://hooks.slack.com/...",
        "LOG_LEVEL": "INFO"
    },
    env_pipeline_map={
        "development": "myapp-dev-pipeline",
        "staging": "myapp-staging-pipeline", 
        "production": "myapp-prod-pipeline"
    }
)
Infrastructure Created
  • Lambda function (Python 3.12 runtime, 30-second timeout)
  • EventBridge rule filtering CodePipeline events for specified pipelines
  • IAM policies for Lambda function:
    • dynamodb:Query and dynamodb:UpdateItem on table and all specified indexes
    • Basic Lambda execution permissions
  • Event target linking EventBridge rule to Lambda function
EventBridge Rule Details

The rule monitors two types of CodePipeline events:

  • "CodePipeline Action Execution State Change": Individual action status
  • "CodePipeline Pipeline Execution State Change": Overall pipeline status

Events are filtered to only include pipelines specified in env_pipeline_map, ensuring the Lambda function only processes relevant pipeline events.

Notes
  • The DynamoDB table must already exist with SSM parameter:
    • dynamodb_table_{table_name}: Table ARN
  • Lambda function uses Python 3.12 runtime with 30-second timeout
  • All specified table indexes must exist on the DynamoDB table
  • The Lambda function will receive EventBridge events in standard format
  • IAM permissions are automatically configured for all table operations
Lambda Event Format

The Lambda function will receive CodePipeline events in this structure:

{
    "source": "aws.codepipeline",
    "detail-type": "CodePipeline Pipeline Execution State Change",
    "detail": {
        "pipeline": "pipeline-name",
        "execution-id": "execution-uuid",
        "state": "SUCCEEDED|FAILED|STARTED|...",
        "version": 1
    }
}

Raises:

Type Description
ValueError

If required SSM parameters are not found

TypeError

If table_indexes contains invalid index names

Source code in mare_aws_common_lib/constructs/pipeline_event_monitor.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def __init__(self, scope: Construct, id: str, *,
    app_helper: ApplicationHelper, table_base_name: str, table_indexes: List[str], lambda_function_name: str,
    lambda_handler: str, lambda_code_path: str, env_vars: Dict[str, str], 
    env_pipeline_map: Dict[str, str], is_devops: bool = True) -> None:
    """
    Initialize the CodePipeline event monitoring construct.

    Creates a complete pipeline monitoring system that captures CodePipeline events
    through EventBridge and processes them via Lambda to update DynamoDB records.
    The system automatically configures IAM permissions and event filtering.

    Args:
        scope (Construct): CDK construct scope (typically a Stack)
        id (str): Unique identifier for this construct within the scope
        app_helper (ApplicationHelper): Helper containing application configuration
            and utilities for resource naming and parameter access
        table_base_name (str): Base name of the DynamoDB table where pipeline
            status will be stored. Used with ResourceNaming to generate full table name
        table_indexes (List[str]): List of Global Secondary Index names that the
            Lambda function needs to query or update. Used for IAM permission setup
        lambda_function_name (str): Name for the Lambda function that will process
            CodePipeline events. Should be descriptive of the monitoring purpose
        lambda_handler (str): Python handler function in the format "module.function"
            (e.g., "app.handler", "pipeline_monitor.process_event")
        lambda_code_path (str): Local path to the Lambda function code directory
            or ZIP file containing the event processing logic
        env_vars (Dict[str, str]): Environment variables for the Lambda function.
            Should include configuration needed for pipeline event processing such as
            table names, notification settings, or external service endpoints
        env_pipeline_map (Dict[str, str]): Mapping of environment names to pipeline names
            that should be monitored. Only events from these pipelines will trigger
            the Lambda function. Format: {"dev": "my-app-dev-pipeline", "prod": "my-app-prod-pipeline"}
        is_devops (bool, optional): Whether to look up SSM parameters in the devops
            environment. Defaults to True for cross-account foundation resources.

    Example:
        ```python
        monitor = PipelineEventMonitor(
            scope=stack,
            id="deployment-monitor",
            app_helper=helper,
            table_base_name="deployment-status",
            table_indexes=["pipeline-status-index", "environment-index"],
            lambda_function_name="pipeline-status-monitor",
            lambda_handler="monitor.process_pipeline_event",
            lambda_code_path="./lambda/pipeline-monitor",
            env_vars={
                "TABLE_NAME": "deployment-status-table",
                "SLACK_WEBHOOK": "https://hooks.slack.com/...",
                "LOG_LEVEL": "INFO"
            },
            env_pipeline_map={
                "development": "myapp-dev-pipeline",
                "staging": "myapp-staging-pipeline", 
                "production": "myapp-prod-pipeline"
            }
        )
        ```

    Infrastructure Created:
        - Lambda function (Python 3.12 runtime, 30-second timeout)
        - EventBridge rule filtering CodePipeline events for specified pipelines
        - IAM policies for Lambda function:
             * dynamodb:Query and dynamodb:UpdateItem on table and all specified indexes
             * Basic Lambda execution permissions
        - Event target linking EventBridge rule to Lambda function

    EventBridge Rule Details:
        The rule monitors two types of CodePipeline events:

        - "CodePipeline Action Execution State Change": Individual action status
        - "CodePipeline Pipeline Execution State Change": Overall pipeline status

        Events are filtered to only include pipelines specified in env_pipeline_map,
        ensuring the Lambda function only processes relevant pipeline events.

    Notes:
        - The DynamoDB table must already exist with SSM parameter:
             * dynamodb_table_{table_name}: Table ARN
        - Lambda function uses Python 3.12 runtime with 30-second timeout
        - All specified table indexes must exist on the DynamoDB table
        - The Lambda function will receive EventBridge events in standard format
        - IAM permissions are automatically configured for all table operations

    Lambda Event Format:
        The Lambda function will receive CodePipeline events in this structure:
        ```json
        {
            "source": "aws.codepipeline",
            "detail-type": "CodePipeline Pipeline Execution State Change",
            "detail": {
                "pipeline": "pipeline-name",
                "execution-id": "execution-uuid",
                "state": "SUCCEEDED|FAILED|STARTED|...",
                "version": 1
            }
        }
        ```

    Raises:
        ValueError: If required SSM parameters are not found
        TypeError: If table_indexes contains invalid index names
    """

    super().__init__(scope, id)

    table_name, _ = ResourceNaming.get_name_for_resource(app_helper, AWSResourceType.DYNAMODB, 
                                                         resource_name=table_base_name, 
                                                         max_length=AWSResourceNameLength.DYN_TABLE.value)

    # Get SSM parameter values (table ARN)
    resources_arn = []
    resources_arn.append(app_helper.get_parameter_value(self, f"dynamodb_table_{table_name.replace("-","_")}", is_devops))
    for item in table_indexes:
        resources_arn.append(
            ArnUriHelper.build_dynamodb_table_index_arn(
                app_helper.get_from_common("region"), 
                app_helper.get_from_env("account_id"), 
                table_name, item
            )
        )

    monitor_lambda = _lambda.Function(
        self,
        f"{id.replace("-","_")}_lambda",
        runtime=_lambda.Runtime.PYTHON_3_12,
        handler=lambda_handler,
        code=_lambda.Code.from_asset(lambda_code_path),
        function_name=lambda_function_name,
        timeout=Duration.seconds(30),
        environment=env_vars
    )

    # Add policies to query and update DynamoDB
    monitor_lambda.add_to_role_policy(
        iam.PolicyStatement(
            actions=[
                "dynamodb:UpdateItem",
                "dynamodb:Query"
            ],
            resources=resources_arn,
        )
    )

    # Event types to monitor
    detail_types = [
        "CodePipeline Action Execution State Change",
        "CodePipeline Pipeline Execution State Change"
    ]

    # Create EventBridge rule
    rule = events.Rule(
        self,
        f"{id.replace("-","_")}_event_rule",
        rule_name=f"{lambda_function_name}-event-rule",
        event_pattern=events.EventPattern(
            source=["aws.codepipeline"],
            detail_type=detail_types,
            detail={
                "pipeline": list(env_pipeline_map.values()),
            }
        ),
    )

    rule.add_target(targets.LambdaFunction(monitor_lambda))