Blog

Accessing Snowplow data in real time in AWS

By
Trent T
&
October 30, 2020
Share this post

Taking action on event data in real time is a popular feature of Snowplow. We have customers using this to power use cases including:

  • Retail: Fueling dynamic pricing, product and content recommendations to improve sales and revenue.
  • Customer support: Providing support staff with informaiton on where a user is stuck and what help articles they have recently viewed at the time they call the support line.
  • Machine Learning: Feeding algorithms with data in real time for decision making
  • Security: Enabling fraud detection and anomaly detection in financial transactions

First a quick Snowplow recap. Snowplow provides a fully managed data pipeline in your own cloud account. It collects user's behavioural data from your products into a unified stream, and passes it into your data warehouse.

Once you have snowplow set up, it only takes a couple steps to start reading data from the real time stream. This guide will show you how to quickly achieve this in AWS using a Python Lambda function. It's important to note that this can also be achieved on GCP and with our SDKs in other languages.

Tutorial

This is a really simple tutorial of reading from the real time stream. What we're going to do is set up a lambda function to trigger when data is written to the good enriched Kinesis events stream, transform the data into JSON with the Snowplow Python Analytics SDK and log the output to CloudWatch. Data that is written to the event stream has passed through the validation and enrichment steps in the diagram below.

1. Prerequisites

If you haven't got a snowplow pipeline set up in AWS using Kinesis streams this next part is not going to make much sense. If you're unsure check out here to get started on open source or here to contact us about our managed solution

2. Set up IAM in AWS

For this tutorial your lambda function needs a role with permission to do the following:

"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
"kinesis:ListStreams",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"

3. Create the lambda function

Create a new Lambda function by clicking Create Function and give it the following properties:

  • Name: lambda_function_payload
  • Runtime: Python 3
  • Permissions: Use existing role, and select the role you made in the previous step

4. Create the Python script

Copy this Python script to a file locally. In my case, it's called lambda_function_payload.py and is inside a subdirectory called lambda_function_payload. This subdirectory is important for the next step.

import snowplow_analytics_sdk.event_transformer
import snowplow_analytics_sdk.snowplow_event_transformation_exception
import json
import base64

# Decode batch of lambda records from Base64 to TSV
def decode_records(update):
data = []
if "Records" in update:
for record in update["Records"]:
if "kinesis" in record and "data" in record['kinesis']:
# decode base64 data in each record and append to the list
decoded_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
data.append(decoded_data)
return data

# Entry point for the lambda function
# Output from the print statments can be found in cloudwatch
def lambda_handler(event, context):
# Uncomment this line to show the raw batch of events
# print("Received event: " + json.dumps(event, indent=2))

# Decode received batch of records from Base64 to Snowplow TSV
records = decode_records(event)

for record in records:
try:
# Transform record from Snowplow TSV to JSON
print(snowplow_analytics_sdk.event_transformer.transform(record))
print("Succesfully retrieved and transformed event")
except snowplow_analytics_sdk.snowplow_event_transformation_exception.SnowplowEventTransformationException as e:
for error_message in e.error_messages:
print(error_message)
continue

return

5. Package and upload the Python script

Package a zip file for lambda with the following script. Copy it to a new file and place it in the directory above the python script. Note that this is being run on a subdirectry with the Python app called lambda_function_payload.py. The Python fuction above is written in Python 3 the script below will install the Snowplow Python Analytics SDK in the directory so that it is included in the zip file. You could also consider creating a virtual environment using pipenv or virtualenv rather than installing the Analytics SDK globally.

#!/bin/bash
cd lambda_function_payload
pip3 install snowplow_analytics_sdk -t . --upgrade
chmod -R 755 .
zip -r ../lambda_function_payload.zip .
cd ..

Upload the zip file to lambda in the AWS Console. Since the dependency is a couple of mb, we won't be able to see and edit the code in AWS.

Set up the basic settings to point to the handler in the Python script. The handler is the entry point into the code where execution will begin. The handler is formated asname_of_python_file.name_of_handler_function. The .py is not included.

6. Add a trigger to connect it to the enriched kinesis stream

Set up your trigger based on the screenshot below. The default values will be fine but note that your Kinesis stream will be named differently. For your production workloads, you may wish to use different batch sizes and utilise retries but this will vary depending on your use case.

7. Test and confirm events are being processed

Send some events into your Snowplow pipeline and give them a couple seconds to process. Click on monitoring on the lambda fuction page and scroll down to CloudWatch logs. You should see some entries appearing.

Looking at the logs, you should see something like this. What you see here is the entire event's data in JSON format. You have now succesfully read from the real time stream and are ready to process your events!

Okay, what next?

Now that you have control over data coming out of the real time stream. Here are some articles with ideas on what you can do with it:

If you want to learn more about what you can achieve with Snowplow, get in touch today!

Subscribe to our newsletter

Get the latest blog posts to your inbox every week.

Get Started

Unlock the value of your behavioral data with customer data infrastructure for AI, advanced analytics, and personalized experiences