Apache Kafka is a high-throughput distributed messaging system.
It is:
- Fast (A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients);
- Scalable (Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumers);
- Durable (Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact);
- Distributed by Design (Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees).[1]
This post intends to explore how to integrate with Kafka from Elixir, the post assumes you have some knowledge of Elixir if not checkout the Elixir getting started.
Setting up Kafka
We would set up Kafka from docker images (for production use consider http://kafka.apache.org/documentation.html#quickstart) using fig
We should have our kafka/zookeeper nodes up and running.
Create Kafka project
Edit the mix.exs as below:
Get kafka_ex
Update the application config (config/config.exs) to include the kafka node:
Make sure the above matches what you have in your kafka logs.
Producer
We would create a producer that runs in a infinite loop sleeping every 500ms and producing current time, create lib/producer.exs and add the below:
Open a new shell and run:
Metadata
To grab metadata for the topic we produced above, create lib/metadata.exs and add:
Open a new shell and run:
Consumer
To consume messages and print to console the messages published to the kafka
topic, create lib/consumer.exs and add:
Open a shell and run:
You would get an output similar to:
Note value: "{1427, 640108, 212625}",
and value: "{1427, 640109, 250613}"
is from inspect :os.timestamp
from the producer
Streaming
To stream messages from the kafka
topic and print the message to console, create lib/stream.exs and add:
Open a shell and run:
You would get an output similar to:
KafkaEx.stream
implements the Enumerable protocol, so you can use it with functions from the Enum and Stream modules, this allows us to do very fancy MapReduce operations on the messages as they arrive.
Offsets
To fetch offsets for the kafka
topic, create lib/offset.exs and add:
Open a shell and run:
You would get an output similar to:
The examples shown can be found here.