Blog

Real-Time Reporting using AWS Lambda and DynamoDB in Snowplow

By
Snowplow Team
&
September 25, 2024
Share this post

Real-time reporting is crucial for applications that require immediate insights, such as monitoring active players in a game level. In this post, we walk through implementing a real-time reporting system using AWS Lambda, DynamoDB, and Snowplow’s enriched event stream.

Q: Why use AWS Lambda and DynamoDB for real-time reporting in Snowplow?

AWS Lambda and DynamoDB provide a serverless, low-latency infrastructure for processing Snowplow events as they arrive in real-time. Key benefits include:

  • Scalability: Automatically adjusts to fluctuating event volumes.

  • Cost Efficiency: Pay only for compute and storage consumed.

  • Speed: Achieve millisecond-level latency for processing events.

Q: How can I set up the Snowplow event stream in AWS Lambda?

  1. Configure the Lambda Function:

    • Set up a new Lambda function in the AWS Console.
    • Attach a Kinesis stream as the event source to process Snowplow events.

  2. Implement the PlayerState Table in DynamoDB:

    • The PlayerState table will store each player’s current level and timestamp:

import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('PlayerState')
def update_player_level(player_id, level_id, timestamp):
    table.update_item(
        Key={'playerId': player_id},
        UpdateExpression="SET levelId = :level, lastUpdated = :timestamp",
        ExpressionAttributeValues={":level": level_id, ":timestamp": timestamp}
    )

Q: How do I update the LevelState Table in DynamoDB?

  • When a player enters a new level, increment the player count for that level:

def increment_level(level_id):
    table.update_item(
        Key={'levelId': level_id},
        UpdateExpression="ADD playerCount :inc",
        ExpressionAttributeValues={":inc": 1}
    )
  • When a player exits a level, decrement the player count:

def decrement_level(level_id):
    table.update_item(
        Key={'levelId': level_id},
        UpdateExpression="ADD playerCount :dec",
        ExpressionAttributeValues={":dec": -1}
    )

Q: How can I handle player inactivity?

To handle players who become inactive without explicitly exiting a level, implement a pruning Lambda function to periodically clean up old player records:

def prune_inactive_players(timeout_mins):
    now = int(time.time() * 1000)
    cutoff = now - (timeout_mins * 60 * 1000)
    scan_kwargs = {
        'FilterExpression': Attr('lastUpdated').lt(cutoff)
    }
    response = table.scan(**scan_kwargs)
    for item in response['Items']:
        decrement_level(item['levelId'])
        table.delete_item(Key={'playerId': item['playerId']})

Final Thoughts

By integrating AWS Lambda, DynamoDB, and Snowplow’s enriched event stream, we can build a robust, real-time reporting system that tracks player activity with minimal latency. Stay tuned for Part 2, where we cover more advanced use cases and optimizations for real-time reporting using this architecture.

Subscribe to our newsletter

Get the latest content to your inbox monthly.

Get Started

Whether you’re modernizing your customer data infrastructure or building AI-powered applications, Snowplow helps eliminate engineering complexity so you can focus on delivering smarter customer experiences.