Snowplow 88 Angkor Wat released


We are pleased to announce the release of Snowplow 88 Angkor Wat. This release introduces event de-duplication across different pipeline runs, powered by DynamoDB, along with an important refactoring of the batch pipeline configuration.
Read on for more information on R88 Angkor Wat, named after the largest religious monument in the world:
1. New storage targets configuration
Historically storage targets for the Snowplow batch pipeline have been configured from a shared set of properties in the EmrEtlRunner and StorageLoader config.yml
YAML file.
Using the same YAML properties to configure very different databases, such as Redshift and Elasticsearch, has been difficult and error-prone, especially for new Snowplow users. Continuing to overload these YAML properties as we add additional databases such as Google BigQuery, Snowflake and Azure SQL Data Warehouse is unsustainable.
As of this release, storage targets for the Snowplow batch pipeline are configured through database-specific self-describing JSONs – the same way that our enrichments are configured. This should reduce the scope for errors – not least because Snowplow will validate that these configuration JSONs are correct and complete.
You can find all the supported target configurations in Iglu Central, and sample configs in the Snowplow repo at 4-storage/config
.
This change means that the older config.yml
format is no longer valid; both EmrEtlRunner and StorageLoader now need to accept --targets
option specifying directory with storage configuration JSONs, plus the --resolver
option to validate these JSONs.
2. Cross-batch natural deduplication
2.1 The deduplication story so far
Event duplicates can prove a challenge in any event pipeline – we have described the problem in this blog post and on our Discourse forum.
As a first step to solving the problem, in R76 Changeable Hawk Eagle we implemented in-batch natural deduplication, which removed duplicates originating due to at-least-once delivery semantics in Snowplow pipeline. Next, in R86 Petra, we introduced synthetic in-batch deduplication which one again drastically reduced amount of duplicates in our users’ clusters and completely removed them from each particular load, but left them across separate loads.
2.2 Cross-batch deduplication using DynamoDB
Today we’re going further and introducing new cross-batch deduplication that works with natural dupes across many loads, eliminating the duplicates problem for many users.
To solve this problem across pipeline runs we’re using Amazon DynamoDB storage, which allows us to keep track of which events we have processed across multiple runs; essentially we maintain an “event manifest” in DynamoDB with just some important information about each event:
- Event id – used to identify event
- Event fingerprint – used in conjunction with event id to identify natural duplicates
- ETL timestamp – used to check if previous Hadoop Shred was aborted and event is being reprocessed
- Time to live – timestamp allowing DynamoDB automatically clean-up stale objects (set to
etl_tstamp
plus 180 days)
The mechanics of the manifest set-and-check are relatively simple – you can find more technical information on the dedicated wiki page.
It’s important to note that, unlike previous in-batch deduplication logic, this new functionality needs to be explicitly enabled.
2.3 How to enable the new deduplication process
To start deduplicating events across batches you need to provide EmrEtlRunner a duplicate storage configuration via the new --targets
option.
Here is an example configuration:
{ "schema": "iglu:com.snowplowanalytics.snowplow.storage/amazon_dynamodb_config/jsonschema/1-0-0", "data": { "name": "AWS DynamoDB duplicates storage", "accessKeyId": "...", "secretAccessKey": "...", "awsRegion": "eu-west-1", "dynamodbTable": "snowplow-event-manifest", "purpose": "DUPLICATE_TRACKING" } }
If you don’t add duplicate storage configuration, then your Hadoop Shred job will continue work as before. Cross-batch deduplication is completely optional – you may decide that the cost-benefit calculus for enabling cross-batch duplication is not right for you.
2.4 Cost impact of running this process
This process introduces two additional costs to running your Snowplow batch pipeline:
- Increased EMR jobflow times, which has associated financial costs in terms of Normalized I
nstance Hours - Additional AWS costs associated with the DynamoDB table used for the event manifest
The EMR jobflow time increases because Hadoop Shred job processes many events in parallel, while DynamoDB relies on a mechanism called provisioned throughput to cap reads and writes. Provisioned throughput throttles the jobflow when DynamoDB writes reach the specified capacity units. Throttling means that, no matter how powerful your EMR cluster, the job will proceed only as fast as the throttled writes can make it through to the DynamoDB table.
The default write capacity we use for storing the event manifest is 100 units, which roughly costs 50 USD per month. If this slows down your job considerably, then you can increase the DynamoDB write capacity to e.g. 500 units, but this increases DynamoDB costs roughly to 250 USD.
There’s no golden rule for calculating write capacity and cluster configuration – you should experiment with different options to find the best cost-performance profile for your event volumes. The DynamoDB monitoring UI in AWS is helpful here because it shows you the level of throttling on your DynamoDB writes.
2.5 Solving the “cold start” problem
The new cross-batch deduplication is powerful, but how do you handle the “cold start” problem where the event manifest table in DynamoDB starts off empty?
To help, we have developed a new Event Manifest Populator Spark job, which lets you pre-load the DynamoDB table from your enriched event archive.
Event Manifest Populator can be started on EMR with a PyInvoke script provided by us. To run it, you’ll need to download script itself and install boto2
:
$ wget https://raw.githubusercontent.com/snowplow/snowplow/master/5-data-modeling/event-manifest-populator/run.py $ pip install boto
Last step is to run the actual job:
$ python run.py $ENRICHED_ARCHIVE_S3_PATH $STORAGE_CONFIG_PATH $IGLU_RESOLVER_PATH
Here, the run_emr
task sent to PyInvoke takes three positional arguments:
$ENRICHED_ARCHIVE_S3_PATH
is the path to the enriched events archive in S3, as found ataws.s3.buckets.enriched.archive
inconfig.yml
$STORAGE_CONFIG_PATH
is the duplicate storage configuration JSON$IGLU_RESOLVER_PATH
is your Iglu resolver JSON configuration
You can also add one extra argument: --since
, which specifies timespan of events you want to load to duplicate storage. Date is specified with YYYY-MM-dd
format.
Note, that Event Manifest Populator works only with events produced with Snowplow versions equal or higher than R73 Cuban Macaw as event’s TSV format was changed.
You can find more about usage of Event Manifest Populator and its interface at its dedicated wiki page.
2.6 What’s coming next for deduplication
Firstly, in an upcoming release we will release a Python script to generate SQL allowing you to clean out all historic event duplicates, which were loaded into your Redshift cluster by earlier, pre-deduplication versions of Hadoop Shred.
Looking further ahead, we are interested in extracting our DynamoDB-powered deduplication logic into a standalone library, so that this can be used with the loaders for other storage targets, such as S3/Parquet/Avro or Snowflake. our real-time pipeline, where at present moment all natural duplicates are being simply erased.
3. Upgrading
3.1 Upgrading EmrEtlRunner and StorageLoader
The latest version of the EmrEtlRunner and StorageLoader are available from our Bintray here.
3.2 Creating new targets configuration
Storage targets configuration JSONs can be generated from your existing config.yml
, using the 3-enrich/emr-etl-runner/config/convert_targets.rb
script. These files should be stored in a folder, for example called targets
, alongside your existing enrichments
folder.
When complete, your folder layout will look something like this:
snowplow_config ├── config.yml ├── enrichments │ ├── campaign_attribution.json │ ├── ... │ ├── user_agent_utils_config.json ├── iglu_resolver.json ├── targets │ ├── duplicate_dynamodb.json │ ├── enriched_redshift.json
3.3 Updating config.yml
- Remove whole
storage.targets
section (leavingstorage.download.folder
) from yourconfig.yml
file - Update the
hadoop_shred
job version in your configuration YAML like so:
versions: hadoop_enrich: 1.8.0 # UNCHANGED hadoop_shred: 0.11.0 # WAS 0.10.0 hadoop_elasticsearch: 0.1.0 # UNCHANGED
For a complete example, see our sample config.yml
template and sample storage target templates.
3.4 Update EmrEtlRunner and StorageLoader scripts
- Append the option
--targets $TARGETS_DIR
to bothsnowplow-emr-etl-runner
andsnowplow-storage-loader
applications - Append the option
--resolver $IGLU_RESOLVER
tosnowplow-storage-loader
application. This is
required to validate the storage target configurations
3.5 Enabling cross-batch deduplication
Please be aware that enabling this will have a potentially high cost and performance impact on your Snowplow batch pipeline.
If you want to start to deuplicate events across batches you need to add a new [dynamodb_config target][duplicate_storage_config] to your newly created targets
directory.
Optionally, before first run of Shred job with cross-batch deduplication, you may want to run Event Manifest Populator to back-fill the DynamoDB table.
When Hadoop Shred runs, if the table doesn’t exist then it will be automatically created with provisioned throughput by default set to 100 write capacity units and 100 read capacity units and the required schema to store and deduplicate events.
For relatively low (1m events per run) cases, the default settings will likely “just work”. However, we do strongly recommend monitoring the EMR job, and its AWS billing impact, closely and tweaking DynamoDB provisioned throughput and your EMR cluster specification accordingly.
4. Roadmap
Upcoming Snowplow releases include:
- R89 Plain of Jars, which will port our Hadoop Enrich and Hadoop Shred jobs from Scalding to Apache Spark
- R9x [HAD] 4 webhooks, which will add support for 4 new webhooks (Mailgun, Olark, Unbounce, StatusGator)
- R9x [HAD] GCP support pt. 1, the first phase of our support for running Snowplow real-time on Google Cloud Platform
- R9x [HAD] EmrEtlRunner robustness, continuing our work making EmrEtlRunner more reliable and modular
- R9x [HAD] StorageLoader reboot, which will port our StorageLoader app to Scala
5. Getting help
For more details on this release, as always please check out the release notes on GitHub.
If you have any questions or run into any problems, please raise an issue or get in touch with us through the usual channels.