Snowplow 96 Zeugma released with NSQ support

We are pleased to announce the release of Snowplow 96 Zeugma. This release brings initial support for using NSQ with the Snowplow real-time pipeline, as an alternative to Amazon Kinesis or Apache Kafka.
Read on for more information on R96 Zeugma, named after an ancient city of Commagene located in southeastern Anatolia:
- Supporting NSQ
- Setting up NSQ
- Scala Stream Collector and NSQ
- Stream Enrich and NSQ
- Other NSQ releases
- Upgrading
- Roadmap
- Help
1. Supporting NSQ
NSQ is a realtime distributed messaging platform – think of it as a highly-scalable pub/sub system. We are planning on migrating Snowplow Mini to use NSQ under the hood, and so this new functionality is a stepping stone towards this goal.
At the moment, Snowplow Mini uses named Unix pipes “under the hood” for communicating between the various Snowplow components. This is an opaque and fairly brittle process – leading to unexpected behaviours such as backpressure issues and race conditions when launching. Switching Snowplow Mini to use NSQ is a good compromise: much simpler to setup than Kafka or Kinesis, but much more predictable than named Unix pipes.
Additionally, being highly scalable and relatively low-cost may make NSQ an important alternative to Kafka or Kinesis for some large-scale Snowplow roll-outs, particularly around the IoT space.
Adding NSQ support in Snowplow translates to:
- Adding an NSQ sink to the Scala Stream Collector
- Adding an NSQ source and an NSQ sink to Stream Enrich
We will detail both those steps below, but first let’s setup NSQ.
2. Setting up NSQ
The easiest way to spin up NSQ is through the NSQ quick start. For our purposes, we only need nsqlookupd
and nsqd
.
nsqlookupd
is a component dedicated to managing who produces and consumes what. nsqd
, on the other hand, is in charge of receiving, queueing and delivering messages.
After starting both nsqlookupd
and nsqd
, you can send the following POST
requests in order to create the NSQ topics that we will use later on.
curl -X POST http://127.0.0.1:4161/topic/create?topic=GoodRawEvents curl -X POST http://127.0.0.1:4161/topic/create?topic=BadRawEvents curl -X POST http://127.0.0.1:4161/topic/create?topic=GoodEnrichedEvents curl -X POST http://127.0.0.1:4161/topic/create?topic=BadEnrichedEvents
Assuming all these commands run without error, you are ready to continue with the next steps.
3. Scala Stream Collector and NSQ
This release brings support for a new sink target for our Scala Stream Collector, in the form of a NSQ topic. This feature maps one-to-one in functionality with the current Kinesis and Kafka offerings.
If you have followed the setting up NSQ section you would need to update your Scala Stream Collector configuration to the following:
collector { ... streams { # Events which have successfully been collected will be stored in the good stream/topic good = "GoodRawEvents" # Events that are too big will be stored in the bad stream/topic bad = "BadRawEvents" ... sink { enabled = nsq # Host name for nsqd host = "127.0.0.1" # TCP port for nsqd port = 4150 } ... }
Launching the collector in this configuration will then start sinking raw events to your configured NSQ topic, allowing them to be picked up and consumed by other applications, including Stream Enrich.
4. Stream Enrich and NSQ
Stream Enrich has also been updated to support a NSQ topic as a source, and another one as a sink. Again, this feature maps one-to-one in functionality with the current Kinesis and Kafka offerings. If you are familiar with our Kinesis or Kafka support, you know the drill!
Following on from the Stream Collector section above, you can then configure your Stream Enrich application like so:
enrich { source = "nsq" sink = "nsq" ... streams { in { # Stream/topic where the raw events to be enriched are located raw = "GoodRawEvents" } out { # Stream/topic where the events that were successfully enriched will end up enriched = "GoodEnrichedEvents" # Stream/topic where the event that failed enrichment will be stored bad = "BadEnrichedEvents" ... } nsq { # Channel name for the nsq source # If more than one applications are reading from the same NSQ topic simultaneously, # all of them must have the same channel name for getting all the data from the same topic rawChannel = "StreamEnrichChannel" # Host name for nsqd host = "127.0.0.1" # TCP port for nsqd port = 4150 # Host name for nsqlookupd lookupHost = "127.0.0.1 # HTTP port for nsqlookupd lookupPort = 4161 } ... } ... }
Events from the Stream Collector’s raw topic will then start to be picked up and enriched before being written to the out.enriched
topic.
5. Other NSQ releases
As we previously mentioned, the primary purpose of the NSQ support is Snowplow Mini’s migration. In support of that, we have already added NSQ support to the Elasticsearch Loader and S3 Loader
.
You can find more detailed information about these versions in the ElasticSearch Loader 0.10.0 and the S3 Loader 0.6.0 blog posts.
6. Upgrading
The real-time applications for R96 Zeugma are available at the following locations:
http://dl.bintray.com/snowplow/snowplow-generic/snowplow_scala_stream_collector_0.11.0.zip http://dl.bintray.com/snowplow/snowplow-generic/snowplow_stream_enrich_0.12.0.zip
To use NSQ, you will need to make the changes to the configurations of the Stream Collector and Stream Enrich as specified in the above sections to use NSQ.
If you are already using Kafka or Kinesis: there are no breaking changes in the R96 confguration for Stream Enrich, but you will need to update your Scala Stream Collector’s configuration. This is because only one sink configuration is needed from now on.
For example, if you’re using Kinesis only the Kinesis configuration will be needed:
collector { ... # sink = kinesis # REMOVED streams { ... sink { # ADDED enabled = kinesis # or kafka or nsq region = eu-west-1 threadPoolSize = 10 aws { accessKey = iam secretKey = iam } backoffPolicy { minBackoff = maxBackoff = } # Or Kafka #brokers = "" ## Number of retries to perform before giving up on sending a record #retries = 0 # Or NSQ ## Host name for NSQ tools #host = "" ## TCP port for nsqd, 4150 by default #port = } } }
Finally, an upcoming release of the Snowplow Docker images will include images for both the Scala Stream Collector and Stream Enrich with NSQ support.
There are no material non-NSQ-related changes in R96.
7. Roadmap
Upcoming Snowplow releases will include:
- R97 [BAT] 4 webhooks, which will add support for 4 new webhooks (Mailgun, Olark, Unbounce, StatusGator)
- R9x [STR] Priority fixes, removing the potential for data loss in the stream processing pipeline
And of course, please stay tuned for the Snowplow Mini 0.4.0 release with NSQ support!
8. Getting help
For more details on this release, please check out the release notes on Github.
If you have any questions or run into any problems, please visit our Discourse forum.