Amazon Kinesis tutorial – a getting started guide


Of all the developments on the Snowplow roadmap, the one that we are most excited about is porting the Snowplow data pipeline to Amazon Kinesis to deliver real-time data processing. We will publish a separate post outlining why we are so excited about this. (Hint: it’s about a lot more than simply real-time analytics on Snowplow data.) This blog post is intended to provide a starting point for developers who are interested in diving into Kinesis.
In this tutorial, we will walk through the process of getting up and running with Amazon Kinesis using two very simple Kinesis apps:
- The kinesis-example-scala-producer: this will create a Kinesis stream and write records to it
- The kinesis-example-scala-consumer: this will consume the Kinesis stream created by the producer
The source code for both is available on the Snowplow repo.
Setting up the environment to run the apps
In general Kinesis apps should run on EC2. However, for this simple example, the apps can be run locally. They require Java 1.7 and SBT 0.13.0 to run. If you use Vagrant, you can run them in the dev-environment VM, by setting it up as follows:
First, clone the dev-environment repo (make sure to include the --recursive
flag):
$ git clone --recursive https://github.com/snowplow/dev-environment.git $ cd dev-environment
Now build the VM:
$ vagrant up
Once the build is complete, SSH in:
$ vagrant ssh
And now install both Java 1.7 and Scala/SBT by running the following two Ansible playbooks (from within the VM):
$ ansible-playbook /vagrant/ansible-playbooks/java-7.yml --inventory-file=/home/vagrant/ansible_hosts --connection=local $ ansible-playbook /vagrant/ansible-playbooks/scala-sbt.yml --inventory-file=/home/vagrant/ansible_hosts --connection=local
We’re now ready to install the example apps and start writing two and reading from Kinesis streams!
Creating a stream and writing records to it
We’re going to use the kinesis-example-scala-producer to create our stream and write records to it.
First clone the repo, then compile the app:
$ git clone git://github.com/snowplow/kinesis-example-scala-producer.git $ cd kinesis-example-scala-producer $ sbt compile
Now we need to create a config file (e.g. by copying the template config file to a new file in the project root):
$ cp src/main/resources/default.conf my.conf
Use your favorite text editor to edit the AWS credentials in the file with your own access key and secret access key. If you are creating a new user in IAM for the purpose of this tutorial, make sure that user has permissions to create and write to Kinesis streams, and create, write to and delete DynamoDB tables.
You’re now ready to run the app! Enter the following at the command line – this runs it from SBT, passing in the new config file as an argument:
$ sbt "run --config ./my.conf"
Once the app has started, it will create a new stream (if one does not already exist) with the name specified in the config file (this is kinesis_exmaple
as standard). You should be able to view the stream in the AWS management console:
If you click on the stream in the management console, you should see be able to see an increase in Put Requests after you started the app. Note that this may take a few minutes before it is visibile in the management console.
The console should look like this, as the app writes new records to the stream:
You are now writing records to your first Kinesis stream!
Consuming records from the stream
We’re going to use the kinesis-example-scala-consumer to read records from our stream.
First clone the repo, then compile the app:
$ cd .. $ git clone git://github.com/snowplow/kinesis-example-scala-consumer.git $ cd kinesis-example-scala-consumer $ sbt compile
As before, we create a config file (e.g. by copying the template config file to a new file in the project root):
$ cp src/main/resources/default.conf my.conf
And edit the my.conf
file in our favorite text editor to add our AWS credentials. The rest of the parameters should be fine, although if you have configured the name of the stream for the producer config, you will need to configure it in the consumer config so that it reads from the same stream that the producer writes to.
Now run the consumer:
$ sbt "run --config ./my.conf"
You should see something like this, as your consumer iterates through each record in the stream:
You’re now successfully reading records off the Kinesis stream!
Thanks
These two Kinesis apps were written in collaboration with our wintern Brandon Amos, who has been working exclusively on Kinesis development at Snowplow over his winternship. This is just the start – we hope to release Kines
is enabled modules for the core Snowplow stack that have also been developed with Brandon in the next couple of weeks. Stay tuned!