?> The Ins and Outs of building the Snowplow Rust Tracker | Snowplow
Snowplow ‘Outperforms’ GA4 in GigaOm Comparative Evaluation.
Read More
  1. Home
  2. Blog
  3. Engineering
Engineering

The Ins and Outs of building the Snowplow Rust Tracker

Jump to

The Rust tracker was born out of affection for the language from a few of us here at Snowplow, and our bi-annual (semi-annual?) hackathon we hold seemed like the perfect excuse! The goal was simple – to build a very minimal MVP that could send an event to a Snowplow collector, and learn a bit about Rust in the process.

Throughout this post, we’ll go over the design decisions we made while building the tracker, along with a dive into the main components.

As we already have trackers in Java and C++ (along with a host of others, but they’re a bit less architecturally relevant), it seemed only natural to take influence from the design philosophies followed as they’ve been built up over time. One important design philosophy we try and stick to is programming to interfaces. Each component of the tracker should be able to be swapped out for a custom implementation to provide as much flexibility as possible, whilst still providing solid implementations of each component by default.

Before we dive into the tracker specifics, a few thoughts on some of the Rust-specific things encountered throughout the build, and some terms to know if you aren’t familiar with Rust.

Terminology

  • struct – A struct is a collection of fields, which can be of different types. They can be thought of as a class in other languages, but one that doesn’t contain any methods. Methods are defined in impl (“implementation”) blocks.
  • trait – A trait is a collection of methods that a type must implement. They can be thought of as an interface in other languages.
  • crate – A crate is a package of Rust code, similar to a library in other languages.

Async Rust

Whilst the Rust async ecosystem has been improving leaps and bounds over the past few years, there are still a few sharp edges that we got caught on, with three things in particular that we had to consider when building the tracker.

Runtimes

There is no default async runtime that comes with the rust stdlib. Though this isn’t uncommon, it meant we had to choose one of the async runtimes out there provided by a crate. In practice, this meant deciding between Tokio or async-std (or possibly smol). Being the most used runtime by a fair amount, and bringing all the benefits of that with it (more real-world usage, higher development velocity etc.) we decided that Tokio was likely a sensible choice.

Locking In

Along with which specific runtime to choose, another consideration that comes with that is a runtime can result in locking users into having to use that specific runtime, as async programs aren’t guaranteed to be portable across different runtimes. Even though we have chosen Tokio and used some of their features, it may well be possible to refactor to just use the stdlib futures for better compatibility, as the features used from Tokio aren’t overly complex. However, this isn’t an essential feature for early release just yet. The way we use Tokio also means we don’t directly expose the user to the runtime.

A blog post on the Rust lang blog outlines the ideal state of async in future, but it’s not quite reached the runtime-independent goal (yet!).

Traits

A final pain point (also mentioned in the linked Rust lang blog post) is to do with traits. Currently, async trait methods aren’t supported in stable rust (though the MVP has recently been added into Nightly), meaning we have to rely on the async-trait crate to allow our HTTP requests to run asynchronously.

Crates

Most of the crates used were pretty standard for core functionality, Reqwest for an HTTP client, the fantastic serde for serialization, and others such as uuid and log. The only crate not fully required for functionality was the derive-builder crate. This crate allows us to automatically derive builders for structs. This allowed us to rapidly develop our structs without having to worry about the associated builder method to go along with it, which was especially useful for the Payload struct, as there are a fair amount of optional fields.

Once we get to a point where structs won’t see much change, these builders can be implemented manually to allow us to remove this dependency and reduce built time a bit, but it has been a very handy tool for early development.

Tracker Components

Now we can move onto the more nitty-gritty bits and dive into some of the components of the tracker, how they’re structured, and the decisions behind that.

The diagram below shows the structure of the 0.2.0 tracker, to hopefully give you a good idea of where each component lives for the following sections!

The Emitter

The emitter is the decision maker, it’s responsible for deciding when to send events, utilising the `EventStore` and `HttpClient` modules to do so.

An emitter is defined by the following trait:

This trait is intentionally minimal to provide maximum flexibility for custom implementation.

Batch Emitter

Structure

The emitter provided by the tracker is the BatchEmitter. The batch emitter is split into two distinct parts.

1. The main Sync program

2. The Tokio runtime running in its own thread

This all came down to the decision to make HTTP requests non-blocking (aka a red function). This meant we needed to start a Tokio runtime somewhere in the program. The unpleasant reality of doing this is the necessity of having to permeate (or rather, infect our program) with async methods from the entry point of the runtime to the eventual call to the HTTP request.

This is not a very desirable way to structure a mostly sync program and could prove pretty annoying for users building a sync program, but there is another solution out there – run a Tokio runtime in its own thread, restricting the async spread to a minimal surface.

Whilst this could have also been accomplished in a non-async way, by spawning a thread per job, each job would have spent 99% of its time waiting on network IO, so letting an async executor handle this for us seemed like a pretty desirable solution. With the async runtime not being exposed to the user, it seemed like we might be able to have our cake and eat it too.

The Java tracker uses the nicely provided ExecutorService to do this, and while the rust-executors crate exists, it didn’t seem to be quite the right fit for what we wanted to create.

Communicating between Sync and Async

Now the next step was how to communicate between our async runtime and the sync part of the emitter. Thankfully this had very much been considered by async library developers out there, so channels were a pretty natural method of communication.

A far more in-depth guide to channels can be found in the Tokio docs, but a brief overview is a channel compromises of two parts, a Sender and a Receiver. By passing the `Receiver` into the thread running the Tokio runtime, and keeping the Sender in the sync portion of the emitter, we can send messages from one thread to another. In our case, we use two channels, one for communication between sync/async, and another for the retry mechanism (more on that shortly). Specifically, we use two MPSC (Multi-Producer, Single-Consumer) channels, with the “message” being an instance of an EventBatch contained in the EmitterMessage enum, which looks like:

pub struct EventBatch {
    pub id: uuid::Uuid,
    // The `Payload` struct is a bit beefy and isn't too important to know, so I've omitted it, but you can find it here:
    // https://github.com/snowplow/snowplow-rust-tracker/blob/main/src/payload.rs#L41
    pub events: Vec<snowplow_tracker::payload::Payload>,
    pub delay: Option<std::time::Duration>,
    pub retry_attempts: u32,
}

pub enum EmitterMessage {
    Send(EventBatch),
    Close,
}

This just holds events in a batch for sending, along with a delay and retry attempts for the retry mechanism.

The diagram shows the path an EventBatch will take when the send condition is triggered.

Retry

So what happens if an event attempts to send, but fails? Well, we more than likely don’t just want to drop the event (though this is configurable if desired) so we should have another go! The retry mechanism is pretty simple and is implemented using the delay and retry_attempts fields of the EventBatch struct.

To configure the number of retries, a RetryPolicy can be passed to the emitter, which looks like this:

pub enum RetryPolicy {
    /// Retry sending events forever
    RetryForever,
    /// Retry sending events until the maximum number of retries is reached
    MaxRetries(u32),
    /// Do not retry sending events
    NoRetry,
}

When a send attempt happens, the retry_attempts field of the EventBatch is incremented. If the event fails to send, we check to see if there are any retry attempts left based on the retry policy. If there are, set the delay field to 1 if it is None, or double the delay if it is Some. It’s worth noting that even if you use a RetryForever policy, there is a hard maximum of 10 minutes between retries, to avoid events sitting around for an extended time.

This is where the second channel comes in, acting as a queue for events that have failed to send. By utilising tokio::select! we check both the retry channel and the main channel for messages, with the retry channel taking priority. tokio::select! will return whichever future resolves first, and by default just chooses a random branch to check first. We want to bias the retry channel towards retries, so we use biased; which will check branches in the order they are defined.

let message = match tokio::select! {
    biased;
    // readers note: `rx` is commonly used to denote a `Receiver`
    retry = retry_rx.recv() => retry,
    event = rx.recv() => event,
} {
    Some(message) => message,
    None => break,
};

Whilst not critical, as the emitter can generally keep up with large volumes of event throughput, retries should take priority to avoid building up a potentially large backlog of events that need to be retried. Though of course if the Snowplow collector is down for an extended period, this is unavoidable.

There are also several status codes we never retry on. These are in the DONT_RETRY_STATUS_CODES constant, and are:

const DONT_RETRY_STATUS_CODES: [u16; 5] = [400, 401, 403, 410, 422];

The status codes are: 400 Bad Request, 401 Unauthorised, 403 Forbidden, 410 Gone, and 422 Unprocessable Entity.

The diagram below shows the potential paths an event will take based on if it has a delay, and/or fails to send.

The Event Store

The Event Store is responsible for storing events until they are ready to be sent to the collector. It is also responsible for batching events into batches of a set size. The trait, appropriately named EventStore, looks like this:

/// An EventStore is responsible for storing events until they are sent to the collector.
///
/// Implement this trait to use your own EventStore implementation on an [Emitter](crate::Emitter).
pub trait EventStore {
    /// Add a [PayloadBuilder] to the EventStore
    fn add(&mut self, payload: PayloadBuilder) -> Result<(), Error>;
    /// The number of events currently in the EventStore
    fn len(&self) -> usize;
    /// The set size of the batches that will be sent to the collector
    fn batch_size(&self) -> usize;
    /// The maximum number of events that can be stored in the EventStore
    fn capacity(&self) -> usize;
    /// Removes and returns a batch of events from the event store
    /// The batch size is determined by the `batch_size` field
    fn full_batch(&mut self) -> Result<EventBatch, Error>;
    /// Removes and returns the provided number of events from the EventStore as an [EventBatch]
    fn batch_of(&mut self, size: usize) -> Result<EventBatch, Error>;
    // A method to be called after attempts to send are finished, either successfully or unsuccessfully
    fn cleanup_after_send_attempt(&mut self, batch_id: Uuid) -> Result<(), Error>;
}

Why most of these methods are required is pretty self-explanatory, with the possible exception of cleanup_after_send_attempt, but first, a quick explanation of the default event store.

InMemoryEventStore

The default values will likely be fine for most use cases, but if you have a high event volume, larger batches could be more suitable. Something to watch out for is that this risks creating oversized requests, which can result in 413 Payload Too Large responses from the collector. Now, back to that cleanup_after_send_attempt method. As InMemoryEventStore is the only event store to come with the tracker, let’s have a look at how it’s implemented there:

fn cleanup_after_send_attempt(&mut self, batch_id: Uuid) -> Result<(), Error> {
    Ok(drop(batch_id))
}

Well, that seems like it does a whole lot of nothing. But it does so for good reason! A potential problem with the in-memory event store is just that – it lives in memory, so an unexpected end to your program would mean all your events currently in the store will be lost. The cleanup_after_send_attempt method is intended to be used with more permanent event stores, such as a database (similar to the SQLite event store used in our C++ tracker), to remove events from the store after they’ve been sent. In the case of InMemoryEventStore, it’s a no-op, as the events are removed from the store when a batch is created.

HTTP Client

And finally, the last piece of the tracker is the HTTP Client, which is responsible for doing the actual sending of events to the collector. This is also where the async_trait comes into play. The trait, HttpClient, looks like this:

#[async_trait]
pub trait HttpClient {
    async fn post(&self, payload: SelfDescribingJson) -> Result<u16, Error>;
    fn clone(&self) -> Box<dyn HttpClient + Send + Sync>;
}

ReqwestClient

The default implementation is the ReqwestClient which simply uses reqwest::Client.post to send events and return a status code (or error if no successful connection is made):

const POST_PATH: &str = "com.snowplowanalytics.snowplow/tp2";

async fn post(&self, payload: SelfDescribingJson) -> Result<u16, Error> {
    let collector_url = format!("{}/{}", self.collector_url, POST_PATH);

    match self.client.post(&collector_url).json(&payload).send().await {
        Ok(resp) => Ok(resp.status().as_u16()),
        Err(e) => Err(Error::EmitterError(format!("POST request failed: {e}"))),
    }

The End!

And that’s it! Those are the key (see: interesting) parts of the tracker. Hopefully, this post has helped you understand how each bit of the tracker works. For a more high-level implementation guide, check out the Snowplow docs on the Rust tracker.

As a final note, the tracker is still in development, so if you have any suggestions or find any bugs, please let us know on GitHub. We’d love to hear from you!

As a final, final note, some more good blogs on Rust that I couldn’t quite find an appropriate place to jam into this blog post, but found to be interesting reads:

More about
the author

Avatar
Greg Leonard

Software Engineer at Snowplow

View author

Ready to start creating rich, first-party data?

Image of the Snowplow app UI