Snowplow, the global leader in customer data infrastructure (CDI) for AI, enables every organization to own and unlock the value of its customer behavioral data to fuel AI-driven marketing, digital products and services, customer experiences, and fraud mitigation.
Snowplow, the global leader in customer data infrastructure (CDI) for AI, enables every organization to own and unlock the value of its customer behavioral data to fuel AI-driven marketing, digital products and services, customer experiences, and fraud mitigation.
An updated version of this blogpost was posted to Discourse.
We have been thinking about Apache Spark for some time now at Snowplow. This blogpost is the first in a series that will explore data modeling in Spark using Snowplow data. It’s similar to Justine’s write-up and covers the basics: loading events into a Spark DataFrame on a local machine and running simple SQL queries against the data.
Data modeling is a critical step in the Snowplow pipeline: it’s the stage at which business logic gets applied to the data. The event stream describes all that has happened up to a certain point in time. It therefore needs to be transformed before it becomes meaningful to an end user in the business. Because the logic gets applied at a later stage, it remains possible to revisit and iterate on earlier decisions.
Most Snowplow users do their data modeling in SQL using our open source tool SQL Runner or a BI tool such a Looker. We hope Spark will turn out to be a great addition to the data modeling toolkit.
Excited? Let’s get started!
White paper
Learn how to turn your raw data into easy-to-consume data sets
Make sure git, Vagrant and VirtualBox are installed. To get started with Spark, clone the Snowplow repo, switch to the feature/spark-data-modeling branch, vagrant up and vagrant ssh onto the box:
This last step opens the Scala console, which gives us access to all the libraries included in the spark-data-modeling project. We start with defining a SparkContext:
The SparkContext represents the connection to the Spark cluster.
Let’s now load some enriched events from S3 into Spark. I recommend creating a new S3 bucket with some Snowplow data that can serve as a sandbox. A single run should be enough to start with. The path to the enriched events should look something like this:
We can now load this data into Spark and create a Resilient Distributed Dataset (RDD):
Make sure to add in the actual path and AWS credentials. An alternative is to set the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables in the SparkContext and use a normal path instead.
Let’s take a look at the first line of this RDD:
This is what we would expect a TSV to look like.
Loading the data into a Spark DataFrame
We want to load our events into a Spark DataFrame, a distributed collection of data organized into named columns. This concept is similar to a data frame in R or a table in a relational database.
Let’s start with transforming the RDD into a more suitable format using the EventTransformer object:
The events are now in a format that is nicer to work with in Spark.
We can now load this into a Spark DataFrame. First, create a SQL Context:
The SQL Context allows us to create DataFrames and execute SQL queries.
We have now converted the RDD into a DataFrame. To show the top 5 rows and print the schema, run:
Running SQL queries on Spark DataFrames
Now that our events are in a DataFrame, we can run start to model the data. We will limit ourselves to simple SQL queries for now. In the next blogpost, we will start using the actual DataFrame API, which will enable us to build advanced data models.
To run SQL queries against the data, we first need to register a table:
We can now reference this table in subsequent SQL statements. For example:
To store the output in another DataFrame, we run:
Joins are also supported:
It’s of course possible to run more complex SQL queries, even though not all functions one would use in Redshift are supported. To take full advantage of Spark, however, we will need to drop one level down and start to use the DataFrame API itself. This is what we will in explore in the next post.
In a future post, we will also start running Spark on larger datasets in both Databricks and EMR.
In the meantime, let us know if you have any questions or feedback!