Snowplow, the global leader in customer data infrastructure (CDI) for AI, enables every organization to own and unlock the value of its customer behavioral data to fuel AI-driven marketing, digital products and services, customer experiences, and fraud mitigation.
Snowplow, the global leader in customer data infrastructure (CDI) for AI, enables every organization to own and unlock the value of its customer behavioral data to fuel AI-driven marketing, digital products and services, customer experiences, and fraud mitigation.
Data modeling i.e. applying business rules to aggregate up event-level data into a format suitable for ingesting into a business intelligence / reporting / OLAP tool
Real-time aggregation of data for real-time dashboards
Running machine-learning algorithms on event-level data
We’re just at the beginning of our journey getting familiar with Apache Spark. I’ve been using Spark for the first time over the past few weeks. In this post I’ll share back with the community what I’ve learnt, and will cover:
Assuming you have git, Vagrant and VirtualBox installed, you can get started by simply clone the Snowplow repo, switching to the feature/spark-data-modeling branch then vagrant up and vagrant ssh onto the box:
This tutorial also assumes you have some Snowplow enriched events files stored locally in /path/to/data. The enriched events are stored in the TSV format documented here.
First, we open up the Scala console or REPL:
This gives us access to all of the libraries loaded as part of the spark-data-modeling project which we will need in a later step. First we define a SparkContext. Paste this into your Scala console:
We define inDir as the path of the directory with all our data files – Spark supports wildcards. We can now load the data:
If we had wanted to load data directly from S3, we would only have to change the directory path value:
In this case, you must have the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables set to your AWS account credentials; the AWS account needs both “read” and “write” permissions.
Transforming the data for further analysis
Let’s look at what the data looks like at the moment:
Each data element is a long string of TSVs with some of the values being in JSON. We can tidy this up with the EventTransformer object:
Note that the EventTransformer was originally written to convert Snowplow enriched events into a format suitable for ingesting directly into ElasticSearch, as part of our real-time flow. The same transformation makes the data easy to work with in Spark.
The data now looks like:
A JSON string is returned. We will now load this into a Spark DataFrame so that we can easily manipulate data across dimensions.
Loading the JSON formatted data into a Spark DataFrame
We can load the JSON formatted data into a DataFrame two different ways. Just continuing from the work we did above in the console:
Alternatively, if we had saved the data to files we could load the data directly into a DataFrame this way:
Let’s look at the data now by returning the schema:
Spark SQL inferred the schema from the JSON files. It should fit this schema.
2. Simple aggregations on Snowplow data
To illustrate simple aggregations with Spark, we will:
Count the number of events per day
Count the number of users per day
Count the number of sessions per day
Count the number of events per day
We will set up a crude function, toDate, to get the date out of the collector_tstamp column containing the date and time in the ISO 8601 format. We then make a UDF from toDate to use it on a Column in the DataFrame. udf() takes function objects so we define toDate as an anonymous function:
Note: there is a bug concerning registering UDFs in certain contexts (like the SBT console we are using) for which there are workarounds for the current Spark release version, though this has since been fixed. Alternatively, these aggregations can also be done using RDDs without UDFs – see how to go from a DataFrame to an RDD here.
We group by the new column and count each event per group:
The show() method on DataFrames is useful to quickly see the results of any operations on the DataFrame. It however only displays the top 20 rows.
Count the number of users per day
First we have to get the distinct users per day (or unique users per day), to get one row per user per day. Then we repeat as above to group by day and count the users in each group:
Count the number of sessions per day
It’s the same principle as counting the number of users per day. A session is defined as the unique combinations of domain_userid and domain_sessionidx.
Note: there also exists the countDistinct function which we can use to aggregate over a group, like this:
However its behaviour is inconsistent with the select(...).distinct.groupBy(...).count approach we took prior, as null values are not taken into account by countDistinct.
When we analyse event-level data we are often interested in understanding the sequence in which events occur. Funnel analysis is one of the simplest examples where we’re sequencing events.
We define a funnel as being made up of three events. In this example, it will be three page view events, where each event is identified via a unique page URL. We want to aggregate all the events corresponding to one session into a single field that summarises the journey in the funnel for that session.
First we define the urls for our funnel:
Next we want to group our events by session and collect the page_urlpath of page_view events. Unfortunately, aggregations in Spark DataFrames only work with some basic pre-defined functions: count which we used above, and a few standard functions. UDAFs are not yet supported in Spark SQL, so we will map the DataFrame to a RDD using the map method:
Note the triple === equality symbol to test equality. The events are ordered by date and time because the order in which the urls were visited is important in constructing the funnel. We use pattern matching to take care of the null domain_sessionidx values, since they are of type Long.
Next we group by session:
It returns a PairRDD where the key is the session (a tuple with the domain_userid and domain_sessionidx) and the value is an Iterable of rows corresponding to that session. For example:
Here we’re looking at five elements in the eventsBySession RDD. In the first four sessions, only one url was visited, whereas six urls were visited in the fifth session displayed here.
Each row still contains the domain_userid and domain_sessionidx fields which were used to group, so we need to remove these obsolete fields and keep or “project” the page_urlpath. Each page_urlpath can be mapped to the corresponding funnel letter we defined in the urls Map and we can join all the funnel letters together in a single String for our summarised funnel journey field.
We do all this in a function so that if our funnel urls change, we can recalculate the funnel field for each session from the eventsBySession RDD by passing the new urls Map as the urlToLetter argument.
We apply the reduceToFunnelLetter function and get:
So for example, in the first session shown, there was no visit to any of the urls we are interested in. In the third, there was a visit to the url corresponding to ‘B’ and in the seventh, there was a visit to the ‘B’ url and then to the ‘A’ url in that single session.
We can convert this PairRDD into a DataFrame if needed:
Here Spark inferred the schema using reflection where the case class defines the schema of the table. With the data in a DataFrame, we can now use very terse declarative code to analyse the data further, for example here we look at the longest funnel journeys:
Next steps
There are a number of ways we can build on the computations outlined above. For our funnel analysis, for example, we might want to define funnels where the steps in each funnel are not simply page views and identified by page URL paths – we want the flexibility to build funnels out of any event type, and use any combination of fields in our Snowplow data to identify steps in that funnel. Our code above would need to be more flexible and accept a nested eventToLetter mapping of this sort of form:
As the next steps in my internship, I will be focusing on marketing attribution data in particular. I’m going to compute identify, filter and transform that data in Spark, before loading it into DynamoDB and visualising it using D3.js. This stack should give me a lot of flexibility to explore different approaches to visualizing marketing attribution data.
In parallel, another intern at Snowplow is figuring out how to run Spark Streaming with Kinesis so that we can perform these types of real-time computation and visualization in real-time. Stay tuned for a blog post on that in due course!