1. Home
  2. Blog
  3. Releases
Releases

Snowplow 96 Zeugma released with NSQ support

We are pleased to announce the release of Snowplow 96 Zeugma. This release brings initial support for using NSQ with the Snowplow real-time pipeline, as an alternative to Amazon Kinesis or Apache Kafka.

Read on for more information on R96 Zeugma, named after an ancient city of Commagene located in southeastern Anatolia:

  1. Supporting NSQ
  2. Setting up NSQ
  3. Scala Stream Collector and NSQ
  4. Stream Enrich and NSQ
  5. Other NSQ releases
  6. Upgrading
  7. Roadmap
  8. Help

Zeugma

1. Supporting NSQ

NSQ is a realtime distributed messaging platform – think of it as a highly-scalable pub/sub system. We are planning on migrating Snowplow Mini to use NSQ under the hood, and so this new functionality is a stepping stone towards this goal.

At the moment, Snowplow Mini uses named Unix pipes “under the hood” for communicating between the various Snowplow components. This is an opaque and fairly brittle process – leading to unexpected behaviours such as backpressure issues and race conditions when launching. Switching Snowplow Mini to use NSQ is a good compromise: much simpler to setup than Kafka or Kinesis, but much more predictable than named Unix pipes.

Additionally, being highly scalable and relatively low-cost may make NSQ an important alternative to Kafka or Kinesis for some large-scale Snowplow roll-outs, particularly around the IoT space.

Adding NSQ support in Snowplow translates to:

  • Adding an NSQ sink to the Scala Stream Collector
  • Adding an NSQ source and an NSQ sink to Stream Enrich

We will detail both those steps below, but first let’s setup NSQ.

2. Setting up NSQ

The easiest way to spin up NSQ is through the NSQ quick start. For our purposes, we only need nsqlookupd and nsqd.

nsqlookupd is a component dedicated to managing who produces and consumes what. nsqd, on the other hand, is in charge of receiving, queueing and delivering messages.

After starting both nsqlookupd and nsqd, you can send the following POST requests in order to create the NSQ topics that we will use later on.

curl -X POST http://127.0.0.1:4161/topic/create?topic=GoodRawEvents curl -X POST http://127.0.0.1:4161/topic/create?topic=BadRawEvents curl -X POST http://127.0.0.1:4161/topic/create?topic=GoodEnrichedEvents curl -X POST http://127.0.0.1:4161/topic/create?topic=BadEnrichedEvents

Assuming all these commands run without error, you are ready to continue with the next steps.

3. Scala Stream Collector and NSQ

This release brings support for a new sink target for our Scala Stream Collector, in the form of a NSQ topic. This feature maps one-to-one in functionality with the current Kinesis and Kafka offerings.

If you have followed the setting up NSQ section you would need to update your Scala Stream Collector configuration to the following:

collector { ... streams { # Events which have successfully been collected will be stored in the good stream/topic  good = "GoodRawEvents" # Events that are too big will be stored in the bad stream/topic  bad = "BadRawEvents" ... sink { enabled = nsq # Host name for nsqd  host = "127.0.0.1" # TCP port for nsqd  port = 4150 } ... }

Launching the collector in this configuration will then start sinking raw events to your configured NSQ topic, allowing them to be picked up and consumed by other applications, including Stream Enrich.

4. Stream Enrich and NSQ

Stream Enrich has also been updated to support a NSQ topic as a source, and another one as a sink. Again, this feature maps one-to-one in functionality with the current Kinesis and Kafka offerings. If you are familiar with our Kinesis or Kafka support, you know the drill!

Following on from the Stream Collector section above, you can then configure your Stream Enrich application like so:

enrich { source = "nsq" sink = "nsq" ... streams { in { # Stream/topic where the raw events to be enriched are located  raw = "GoodRawEvents" } out { # Stream/topic where the events that were successfully enriched will end up  enriched = "GoodEnrichedEvents" # Stream/topic where the event that failed enrichment will be stored  bad = "BadEnrichedEvents" ... } nsq { # Channel name for the nsq source  # If more than one applications are reading from the same NSQ topic simultaneously,  # all of them must have the same channel name for getting all the data from the same topic  rawChannel = "StreamEnrichChannel" # Host name for nsqd  host = "127.0.0.1" # TCP port for nsqd  port = 4150 # Host name for nsqlookupd  lookupHost = "127.0.0.1 # HTTP port for nsqlookupd  lookupPort = 4161 } ... } ... }

Events from the Stream Collector’s raw topic will then start to be picked up and enriched before being written to the out.enriched topic.

5. Other NSQ releases

As we previously mentioned, the primary purpose of the NSQ support is Snowplow Mini’s migration. In support of that, we have already added NSQ support to the Elasticsearch Loader and S3 Loader
.

You can find more detailed information about these versions in the ElasticSearch Loader 0.10.0 and the S3 Loader 0.6.0 blog posts.

6. Upgrading

The real-time applications for R96 Zeugma are available at the following locations:

http://dl.bintray.com/snowplow/snowplow-generic/snowplow_scala_stream_collector_0.11.0.zip http://dl.bintray.com/snowplow/snowplow-generic/snowplow_stream_enrich_0.12.0.zip

To use NSQ, you will need to make the changes to the configurations of the Stream Collector and Stream Enrich as specified in the above sections to use NSQ.

If you are already using Kafka or Kinesis: there are no breaking changes in the R96 confguration for Stream Enrich, but you will need to update your Scala Stream Collector’s configuration. This is because only one sink configuration is needed from now on.

For example, if you’re using Kinesis only the Kinesis configuration will be needed:

collector { ... # sink = kinesis # REMOVED  streams { ... sink { # ADDED  enabled = kinesis # or kafka or nsq  region = eu-west-1 threadPoolSize = 10 aws { accessKey = iam secretKey = iam } backoffPolicy { minBackoff = maxBackoff = } # Or Kafka  #brokers = ""  ## Number of retries to perform before giving up on sending a record  #retries = 0  # Or NSQ  ## Host name for NSQ tools  #host = ""  ## TCP port for nsqd, 4150 by default  #port =  } } }

Finally, an upcoming release of the Snowplow Docker images will include images for both the Scala Stream Collector and Stream Enrich with NSQ support.

There are no material non-NSQ-related changes in R96.

7. Roadmap

Upcoming Snowplow releases will include:

And of course, please stay tuned for the Snowplow Mini 0.4.0 release with NSQ support!

8. Getting help

For more details on this release, please check out the release notes on Github.

If you have any questions or run into any problems, please visit our Discourse forum.

More about
the author

Avatar
Snowplow Team
View author

Unlock the value of your behavioral data with customer data infrastructure for AI, advanced analytics, and personalized experiences

Image of the Snowplow app UI