kafka has an interface of the messaging system but the storage of a log system.
Kafka Use Cases
Metrics and Logging
Entities in Kafka
an entity which produces the data and pushes to kafka.
kafka servers which stores the data of multple topics.
maintains the metadata related to topics.
Consumes the data from the brokers.
Represents a type of data, like 'order-created' topic, 'order-fulfilled' topic, 'print-invoice' topic.
Each topic is assigned some partitions.
Each message in a partition is assigned an offset. Kind of represents a unique number for that message.
offset is like the line number in a topic, only except that it is unique within a topic across brokers.
is the message being stored in the kafka broker.
producers can also direct messages to a topic or a particular partition in the topic(done by using a partitioner).
How producers produce data
producer produces the message to a topic.
the message is broken down/hashed into a something which is reduced to a partition(like hashcode(message)%partitions).
each partition in the topic in the broker will have the offset to which to write the new message.
each partition is implemented as multiple files of equal size(in terms of length/num of records or size).
each message may or may not require an acknowledgement after it is written to master and 0-n slaves of that partition.
after the message is written to master and appropriate no of slaves, the acknowledgement is sent.
messages to be written to a partition are processed and written one at a time.
messages to each partition are ordered like they are written in a file.
Producer produces the message, giving topic and message, and optionally giving key and partition.
It goes to serializer which serializes the data, then to partitioner which finds out which partition the message should be sent.
Then the message is added to the current batch which needs to be sent to a particular topic and partition.
A separate thread will start sending the batches of records to appropriate kafka brokers.
If everything succeeds, broker sends the metadata back to the producer. Metadata includes topic, partition and offset within the partition.
If there is a failure, broker will see if it should be retried a few times, if yes, it will retry and will give exception if no retries left.
Kafka Producer Requirements
Creating a producer needs the following.
kafka broker servers
acks: how many partition replicas must have received the record before the conclusion that message was delivered successfully.
acks=0 means doesn't care, =1 means the leader should get the message etc.
buffer.memory: how much memory should be used by the messages being kept and awaiting sent.
compression.type: default no compression. can use snappy(uses less CPU) or gzip(uses more CPU, but compresses more)
retries: no of retries being done by the producer for transient errors(like no leader for partition)
batch.size : max memory size in bytes which will be kept before the messages will be sent. works side by side with linger.ms
linger.ms : max time to wait for additional messages before sending them. setting more linger.ms increases latency and throughput.
client.id : an identifier for the client who is sending messages.
max.in.flight.requests.per.connection: how many requests can be sent to the broker without receiving the response.
timeout.ms: max time to wait for success from replicas and leader.
request.timeout.ms: max time to waiting for success response.
metadata.timeout.ms : max waiting time for getting metadata.
Messages can be sent to the broker in the following ways
FireNForget => don't worry about the result.
Synchronous-Send => send, and get back the future.
Asynchronous-Send => send, and get the response in the callback function whether the message was processed successfully.
can be specified at global level or at topic level.
retention period(7 days) or retention size(10 GB)
once a log segment(log file) is closed, only then does its time start to expire.
log segment vs log retention
log segment is how large in size or how long in time should the log file be.
log retention is after how long in time after the log segment closes, should the log be expired.
Normally starts on 9092 port.
Each kafka server is called kafka broker.
Brokers combine to become a cluster.
In a cluster, one broker is automatically selected to be the controller.
The controller is responsible for adminstrative operations, like assigning partitions to brokers and looking out for failed brokers.
Each partition is held by the broker which is called the leader of the partition.
A single partition can be assigned to multiple brokers in which case, one becomes the leader and rest become the followers.
All consumers and producers must connect to the leader of the partition only.
The consumer subscribes to one or more topics and reads the messages in the order in which they were produced.
A consumer can subscribe to a topic and become part of a consumer group or assign itself partitions but not both.
consumers combine together to form the consumer group.
if the number of consumers N in a consumer group > no of partitions P, then some consumers will remain idle.
we should create topics with large num of partitions. because once the load increases, all we have to do is increase the num of consumers.
you create one consumer group for an application that needs to get all the messages from one or more topics.
when a partition is moved from one consumer to another(may be because the first consumer shuts down, it is called rebalance).
when a rebalance happens, the whole consumer groups stops processing messages for a moment.
when a rebalance happens, the consumer loses it state and may have to refresh any cache or state it had saved in itself.
consumers in a consumer group maintain their existence and ownership of partitions by sending heartbeats to a kafka broker known as the 'coordinator' for this consumer group.
the 'coordinator' kafka broker will be different for different groups.
'coordinator' triggers a rebalance once it does not receive the heartbeats from a consumer in a consumer group.
how are partitions assigned to consumers in a group.
whenever a consumer wants to join a group, it sends a request to the group 'coordinator' kafka broker.
the first consumer to join becomes the group 'leader' consumer.
the 'leader' receives all the list of consumers in the group from the group 'coordinator'.
'leader' finalizes the partitions to be assigned to the consumers in a consumer group.
after finalizing the list, 'leader' sends this list to the group 'coordinator' which sends it to all the consumers.
The leader has the complete list of consumers and their assignments while each consumers only knows its assigned partitions.
whenever a rebalance happens, the cycle is repeated.
Creating a Kafka Consumer
bootstrap.servers: kafka brokers
group.id : not mandatory, the group which this consumer belongs to.
How consumer works
simply by polling the server at regular intervals.
consumers normally are long running infinite loops which keep on polling the server for more data.
poll returns a list of records, each of which contains topic, partition, offset in the partition, key, message.
almost always, you should have one consumer per thread.
implemented by having consumer as a runnable, and using executorService to create a fixedThreadPool of size N = num of consumers.
Consumer additional configuration params
fetch.min.bytes : min bytes which the consumer will want otherwise it will wait for the time (specified by fetch.max.wait.ms).
its value should be kept higher when your consumer is using more CPU than expected, and you want to reduce it.
or if the kafka broker is under high load because there are a number of consumers linked to it.
fetch.max.wait.ms :max time which the consumer will wait to get enough data by fetch.min.bytes before returning with available data.
session.timeout.ms : max time within which it can be considered alive without sending heartbeats to the 'coordinator' kafka broker.
enable.auto.commit : whether the offsets will be autocommited
partition.assignment.strategy: Range(default) or Round Robin
client.id: identifier to the client.
Consumer commits and offsets
for a consumer, the movement of current offset in the partition is known as commit.
commits are made by publishing a message to _consumer_offsets topic with the commited offsets for the partition.
whenever a rebalance happens, the fresh consumers read the partition offsets from this topic and start processing from next offset.
Since commits happen after an interval, it is possible that the consumer processes the message, rebalance happens before the commit, and
after the rebalance, the message will be processed again.
enable.auto.commit=true (default is 5 secs)
auto.commit.interval.ms=10000 (will make the commit interval as 10 secs)
Commit current offset
consumer.commitSync(); => returns the last offset which we got from the broker. Also it retries till the time it gets an exception about which it cannot do anything.
if you want to commit in between while iterating over the list of records, you can call
consumer.commitAsync(Map of TopicPartition Vs OffsetAndMetadata, null);
consumer.asyncCommit(); => it does not retry in case of failure because a newer commit might have happened.
A good idea is to normally do commitAsync() always, but before the consumer shutdown, in the finally, do the commitSync();
passed as the argument while subscribing to topics.
used for cleaning up, committing offsets, closing database connections etc.
each topic can have multiple replicas.
all produce/consume is done from the leader so that consistency is maintained.
just stay upto date. can be elected as a leader when the leader fails.
followers make fetch calls to leaders. similar call to what a consumer does.
the call gets all the data one by one from an offset.
only those followers who are completely in sync can become the leaders once the leader fails.
A prefered leader of a topic partition is the leader of that partition when the topic was first created.
this helps in overall balance of the overall partitions so that the load is evenly distributed across the cluster.
like when after broker1 fails, it will be the leader of 0 partitions, and maybe some broker broker2 took over its partitions. Now broker2 has ownership of many more partitions and is overloaded.
auto.leader.rebalance.enable=true is normally set.
with it, eventually after broker1 comes up and gets in sync with broker, it will be made the leader after a failover
Kafka has a binary protocol (over TCP) that specifies the format of the requests and how brokers respond to them— both when the request is processed successfully or when the broker encounters errors while processing the request.
How kafka works
on the port on which kafka runs, it creates two threads
creates connection, reads data and hands over the data to processor thread for processing.
gets the data, and pushes to request queue for processing.
processor threads pick the response data from the response queue.
pick the request from the request queue, process it and place it on the response queue.
If a request is made to a different broker than who is the leader of the partition, it returns with the error "Not the leader for this partition".
Kafka clients are responsible for directing the data to appropriate brokers.
Types of requests from kafka clients to brokers
clients use metadata requests to get the metadata from brokers.
classes makes the metadata request and passes the list of topics it is interested in.
the server responds with the partitions of the topics, the brokers for each partition and the leader for each partition.
the clients cache the metadat information.
the metadata info in the client is refreshed periodically or when the clients receive "Not a leader" error.
how does a broker handle produce requests.
the broker checks the following
can this user write on this topic.
is the 'acks' param valid, (valid ones are '0', '1', 'all')
if acks='all', are there enough in-sync replicas that can handle. if not an error will be returned.
Then the messages are written to local disc or filesystem cache(in case of linux)
How does a broker handle get requests
get requests from consumers or follower brokers are of the form "send me 5 messages starting at offset 101 of partition 2 of Topic X and 5 messages starting at offset 51 of partition 5 of Topic X"
kafka reads data from the file and uses a 'zero copy' method to send the message directly to the consumers. 'zero copy' method means kafka sends message from the file(or linux filesystem cache) directly to the network without any intermediate buffers. This improves the performance.
Consumer Offsets are stored in a separate topic '_consumer_offsets' topic instead of being stored in zookeeper.
Currently Zookeeper keeps the list of topics, brokers keep a track of topics in zookeeper.
In the future, topics will be stored in another topics, so that zookeeper may not be required.
normally, for upgrading, brokers should be upgraded first before upgrading the clients.
new brokers can handle old clients, but new clients cannot handle old brokers.
Lets say we create a topic with 10 partitions and replication factor 2, and the kafka cluster has 5 brokers.
10*2 = 20, so we need to have a topic of 20 allocation of topic partitions across the brokers.
Assuming all the brokers are on separate racks
First all the partition leaders are allocated to brokers in round robin fashion.