Optimising the Snowplow pipeline
Since its inception, the Snowplow pipeline was designed to handle massive amounts of data. Up to hundreds of gigabytes per day, which was quite a lot back in 2012. And if one wanted to handle that much data 10 years ago, the Apache Hadoop ecosystem was always the answer.
The Apache Hadoop ecosystem (what we now often refer to as “big data frameworks”) and batch architecture were at the heart of all Snowplow components for a long time. Snowplow Hadoop ETL, Snowplow Hive Storage, EmrEtlRunner, Snowplow Spark Enrich and many others followed this principle: a big batch of data produced by one component is handled by the next component downstream, one big batch at a time. And in order to do that we start a compute cluster, submit a set of steps and wait for the result.
It was a great approach, and we were able to handle the desired hundreds of gigabytes. Until we were not.
When a cluster was not able to process the data, or when some transient cluster or S3 outage was happening, then we faced all the despair of this architecture:
- It’s difficult to get to the bottom of the reason a cluster is failing. EMR and Hadoop produce tens of megabytes of logs and not everyone dares to traverse all of them to find a cryptic IO exception;
- If a cluster idles for some time, then in order to catch up the next one has to be bigger. If the next one is not big enough, then the cluster will fail again and one will need an even bigger one, until you run out of resources;
- Despite EMR having auto-scaling capabilities for several years, we were never able to make it work reliably enough for our data pipeline workloads. Any sudden traffic spike would necessitate human intervention;
- In order to avoid above problems the most widely strategy is overprovision.
Furthermore, the batch architecture did not favor users who value low latency rather than high throughput. Some Snowplow users process merely hundreds of megabytes per day, rather than hundreds of gigabytes, but it is critical that their highly valuable data enters the warehouse as quickly as possible. In such cases, it is expensive and unwelcome to wait for a multi-node EMR cluster to warm up for 15 minutes before processing any event.
The above architecture still has some influence in our AWS version of pipeline, but things drastically changed in 2017 when we started to design the GCP version, where we made a conscious decision to design everything streaming-first.
After a successful GCP launch it was an eye-opener that a streaming architecture could achieve everything we had done previously with the batch architecture, but it would be more reliable, less expensive and less obscure for pipeline operators. As the GCP version went out of beta, we were still using Apache Beam as the core component for GCP enrich and loader components. But it was not so long after this, that we started to question whether we need big data frameworks at all.
Big data frameworks are great tools. They solve very complex tasks. They allow users to aggregate and join big volumes of data, write jobs in SQL-like DSLs, make it possible to plug-in machine learning and probabilistic data structures. Snowplow pipelines, on the other hand, don’t actually need any of those complex solutions.
Every step of a Snowplow pipeline on a very high level is some combination of following three actions:
- Transformation, or in terms of programming languages a map function, from A to B. For example, Collector transforms an HTTP request into a so-called collector payload; enrich transforms the collector payload into enriched TSV, and so on. A map function cannot drop elements, which means the pipeline is lossless.
- Partition, or a function from A to either B or C. This is mostly the case for bad rows, i.e. rows that are not valid against some predefined schema, but perhaps can be fixed later. Still, neither B or C is dropped, it just means that one of them has to be written out of the main workflow, into so called “bad stream”.
- IO. The most complex and dangerous part, but we still need to read and write data somewhere.
From the above overview, we can make several conclusions:
- None of the most admired features from the land of big data frameworks are needed here. There’s no JOIN, GROUP BY, HyperLogLog or similar, we just transform and write data.
- Applications that just transform and write data are super easy to scale horizontally, especially if components don’t need exactly-once semantics and data source (such as Kinesis or PubSub) has reliable checkpointing/acknowledging mechanism. In 2021 systems like Kubernetes or ECS can automatically scale stateless apps to more or fewer workers, based on a variety of real time metrics such as CPU usage or event latency.
- Whenever we need IO – we want to have full control over it. We want to know exactly what component will be retrying failed writes, how many times, and in what circumstances. With big data frameworks we often stumbled upon very simple tasks that required a deep knowledge of that framework’s internals in order for the simple task to be implemented.
- Last, but not least. Our transformation and split logic is usually IO-free: it’s all about validating data and implementing our protocols. It means that all this logic can and should be reused across different platforms and the IO layer should be no more than a thin wrapper around pure business logic.
All of the above made us think we can strip away a lot of cost and complexity by reducing the role of big data frameworks in the Snowplow pipeline and relying on more lightweight solutions. However, what we didn’t want to give up on was a common interface that the frameworks provided.
Functional Streams for Snowplow
One great feature provided by frameworks like Apache Beam is a unified API for very different kinds of sources, sinks and transformation steps. A developer writes a job that reads data from GCS and writes to PubSub, then decides to swap PubSub with BigQuery and very few lines of code have to be changed in order to do that. Low-level details like backpressure and checkpointing are handled by the framework.
There are many libraries out there that provide similar APIs and one has shown particularly impressive results, while providing an incredibly user-friendly API – Function Streams for Scala (or FS2 for short). FS2 is a well-known library in the Scala community, along with its companion library Cats Effect, they provide a set of high- and low-level entities to work with streaming data, where streaming can broadly be defined as any data that can be processed without loading it into memory at once.
Being a generic stream-processing library it also offers many primitives to work with parallel and async computations in a very safe and predictable way. What is more important is that it has a blooming ecosystem of connectors for all technologies we use or planning to use: Kafka, Kinesis S3 and GCS, JDBC, PubSub and many more, and adding new ones is a breeze.
Another big benefit of FS2-based apps is that they’re plain JVM assets that can run anywhere from a low-end laptop to a huge Kubernetes cluster and don’t have any requirements for the environment, such as specific network topology, leader node or pre-installed resource manager. This makes them much easier to test and debug and doesn’t require knowledge of very complicated technologies such as Hadoop.
Our first attempt with FS2 was Snowplow Postgres Loader, allowing us to load enriched data into PostgreSQL straight from an enriched stream. One interesting consequence of using FS2 there was that after the initial implementation that supported only Google PubSub we realized that adding Amazon Kinesis is just a matter of a few lines of code. As a result the Postgres Loader is the first Snowplow multi-cloud app that one can launch even on a dev machine.
Our next major milestone was Enrich Pubsub, a FS2-based Enrich application (and previously named Enrich FS2 until the 2.0.0 release), which is intended to replace Beam-Enrich on GCP pipelines. This is where we had a chance to do the first benchmarking against Beam Enrich of the same version and things started to look really interesting. According to our estimation, the same n1-standard-1 single worker placed in Iowa, would cost on average $3.38 per day running a Dataflow job and just $1.27 per day running Enrich PubSub. These changes should be especially noticeable for small pipelines that can run on tiny nodes like e2-small – Dataflow simply cannot operate on those, hence significantly lifting the lower cap.
Another small benefit of a simpler architecture is that we have much better control over everything happening on the worker and hence can perform tasks that previously were over complicated because of framework overhead, obscuring access to the node state. A good example is enrich assets reload: A long-running Enrich PubSub application will periodically re-download the latest versions of the reference databases it needs for processing events. On Beam Enrich it was close to impossible to orchestrate this periodic maintenance, but in Enrich PubSub it is straightforward and predictable.
Another big success step in reducing the role of big data frameworks is RDB Loader 1.x.x. It’s a small app that doesn’t have any throughput requirements unlike Enrich or Shredder, but in 1.0.0 we decided to move the Loader step out of the EMR cluster, significantly reducing the amount of work the cluster does. As a result we see huge reduction in data latency for big managed pipelines – some clients who were loading ~40GB of data every 10-15 minutes now can load data every 5 minutes and the bottleneck is typically their Redshift clusters.
At this point our engineering team is convinced that the best design choice for our pipeline applications is to have horizontally scalable standalone applications with no dependency on big data frameworks. And we find that FS2 is an excellent library for implementing this design choice on a JVM. We’re actively working on a version of BigQuery loader and Enrich Kinesis that use very consistent API and configuration and we expect to see the same benefits there. Another big project we’re planning to work on is the streaming RDB Shredder, which reads data directly from Kinesis Stream and therefore reduces latency of events landing in the data warehouse.
However, it also doesn’t mean that we’re getting rid of big data frameworks entirely! During all those years we developed a lot of expertise in using Amazon EMR, Apache Spark and Apache Beam. They’re extremely powerful tools. But with great power comes great complexity. And we want to use this power only when we’re completely sure there’s no easier way. One great example where we think these frameworks are entirely justified and we encourage to use them is data analysis. If your team feels more comfortable with Apache Spark than with SQL-based tools like Redshift and you want to aggregate or join big volumes of data then Spark might be an ideal solution for you.