If you have any doubts in the below, contact us by dropping a mail to the Kung Fu Panda.
We will get back to you very soon.
used for doing realtime processing of continuous stream of data.
Hadoop is a large scale processing engine, but Storm is a realtime data processing engine.
Hadoop is used for processing large amount of data, but storm is used to process data as it flows through the system.
Alternatives to Storm
We can use custom written queues and workers to do realtime processing. Processes will put the data to be processed in queues, and workers will process the data.
But it has many drawbacks
No Fault tolerance: if the worker(s) stop, then the data will not be processed, and workers will need to be started manually.
Difficult to scale : when the workers increase, we need to configure how to partition the data, messages, queues, ie we need to make sure no workers are overburdened, all messages are equally distributed
Difficult to manage: lots of code needs to be written in sending and receiving messages, configuring queues.
Advantages of using Storm
Highly scalable: an initial storm setup can processe around 100,000 messages per sec per cluster.
Guarantees no data loss: Storm guarantees that all data will be processed.
Robust: easy to work with and does not go down often, storm uses zookeeper to keep its cluster up and running.
Fault tolerant: if some component goes down, storm will restart it or pass the data to other components
Language Agnostic: storm code to process the data can be implemented in any language, but mostly Java, python are used.
Can be used for a large no of use cases.
Storm Use Cases
Storm processes a feed of data coming into the system.
Parsing twitter feed to find out the most popular club/sportsperson/trending topic
parsing news feed to calculate the top trending stories.
Nodes in a storm cluster
There are two types of nodes in storm cluster, master node and worker nodes. They are coordinated using ZooKeeper. The state of the master and worker nodes is always kept in zookeeper, and if any of the nodes is restarted, it can take its previous state and continue working.
runs a daemon thread called 'nimbus'.
assigns tasks to worker nodes, resubmits any failed tasks, tracks any failed nodes
similar to hadoop jobtracker.
each worker runs a daemon thread named 'Supervisor'
supervisor gets tasks from the 'nimbus' and starts/stops worker processes when necessary.
A Task or Job in Storm is known as 'Topology'
Topology is used for demonstrating the flow of the data in the application.
Topology is a graph of computation, and contains nodes
Each node contains the processing logic for the data on that node.
As the tuple passes through the application, it emits new tuples, creating a graph of computation, directed acyclic graph (DAG)
Components of a Storm topology
Stream => represents flowing data, just a concept.
Spout => is the source of data or stream, eg, a spout may read data from twitter feed, and spit it out as a stream.
Bolt => consumes one or more streams, processes its data, and emit new streams.
Stream is a infinite and continous stream of data which is fed into the storm.
Example of stream includes feeds like twitter feed, google news feed etc.
Spout is the source of stream in a storm topology.
Normally spouts read data from other sources, like twitter feeds, google feeds, and emit Tuples(list of values)
Eg. A spout may read data from twitter and emit the Football club name along with the count of 1 in the tweet.
Data passes from one spout to one or more bolts.
If you set the spout to be reliable, it will re-emit the data, which could not be processed by further bolts(because of failure or timeout)
Spouts can receive ack() and fail() events for each tuple from the bolts, which shows whether the processing of the tuple was successful or not.
Bolts process the data from the spout.
Like in the above, bolt may contain a map of football club, along with the no of its tweets for a given day.
A Bolt processes the data and emit tuples(list of values) again which can be processed by the next bolt.
By default, only 1 worker per bolt is started, but it can be increased by specifying when starting a bolt(if the data to be processed is more, more workers are needed for that bolt.)
if multiple workers are started for a bolt, stream grouping define which tuples will reach which bolt.
Eg if a bolt B has workers w1, w2, w3, and a tuple with name "team" and value "Madrid" needs to be processed by any worker of Bolt B, to which worker should it go.
In some cases, you would want it to go to any random bolt(shuffle grouping), but may want to go to some specific bolt in some cases(field grouping).
Eg, if you are maintaining a count of how many tweets contained "Madrid", "Barcelona" etc, then you would want all "Madrid" tuples to go to only one bolt, so that the count can be calculated.
In the above case, the stream will be field grouped by "team".
Types of Field Grouping
tuples are randomly distributed so that the load on the workers is equally distributed.
tuples with a particular value of a field will always go to a particular worker.
the input to a bolt goes to a worker depending on the field with which the input was grouped.
Is useful if we have to run some aggregation/grouping by that field, like the sum, count etc.
Other Groupings(less common)
Partial Key Grouping
// Start to create a topology
TopologyBuilder builder = new TopologyBuilder();
//create a spout with name "sentences" by class RandomSentenceSpout,
// and start 5 workers for it.
builder.setSpout("sentences", new RandomSentenceSpout(), 5);
// set a bolt with name "split" and using SplitSentences class and gets data from "sentences" spout
// using "shuffle grouping", and start 8 workers for it.
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("sentences");
// sets a bolt with name "count" and class "WordCount", which gets data from "split" bolt,
// and grouping is done on the basis of "word" field, and start 12 workers for it.
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
What is a Tuple in Storm
Tuple is the main data type in storm
It is just a named list of values in storm.
Eg a tuple can contain two fields, "tweet" and "location"
a tuple value can be "Hello"(tweet) and "India"(location)
Lifecycle of a tuple
Storm requests the next tuple from the spout by calling its nextTuple() function.
Spout emits a tuple to its output stream using SpoutOutputCollector.
Spout also creates a message id, and links it to the tuple, so that the tuple can be identified later.
After that a message passes through other bolts and creates other child messages.
The originating message reaches its destination successfully or can fail by throwing the exception or can time out.
If the message fails, the message is restarted by the originating spout.
If the message succeeds, then it is removed from the queue.
A storm job keeps on running and processing data till you kill it.
guarantees that each message coming from a spout will be fully processed.
'acker' tasks in storm topology track the individual tuples and their child tuples/graph.
when the tuple is created in a spout or in bolt, it is given 64 bit id.
when this tuple leads to other tuples,
If there are large no of tuples emitted by the bolts, then it could fill the 'acker' task's memory, then storm takes a different strategy, and uses
a fixed size per spout tuple, so that for each bolt tuple, the memory occupied is even less.
Storm Reliability, developer TODOs.
Storm keeps all the messages in memory till the time they are acknowledged() or failed(), so its important to call ack() or fail() on every message to avoid memory issues.
Also, when you create a new child message in an intermediate bolt, you have to 'anchor' it to the base message, so that if the base message fails, then its child message can be be failed.
Removing message reliability
If we don't care about whether the message was successfully processed or not, we can remove the message reliability in the following three ways
By Setting Config.TOPOLOGY_ACKERS to 0. In this case, the ack() will be called immediately when the message is emitted, and the message will not be tracked.
By removing the message id in the SpoutCollector.emit() method, so, the message will not be tracked.
create all new tuples as unanchored tuples, so that there is no parent-child relationship and parent and child tuples are unaffected of each other.