Hi All,
Bellow are some snapshots of nathanmarz video, at the time of launching storm and other content available on internet..
Please have look and let me know in case of any doubt.
Use Cases:
Streams
The core
abstraction in Storm is the "stream". A stream is an unbounded
sequence of tuples.
All the tuples must have the same schema.
Storm provides the primitives for
transforming a stream into a new stream in a distributed and reliable way. For
example, you may transform a stream of tweets into a stream of trending topics.
The basic primitives Storm provides for doing stream
transformations are "spouts" and "bolts". Spouts and bolts
have interfaces that you implement to run your application-specific logic.
A spout is a source of streams. For example, a spout may read
tuples off of a Kestrel queue and emit them as a stream. Or a spout may connect to
the Twitter API and emit a stream of tweets.
A bolt consumes any number of input
streams, does some processing, and possibly emits new streams.
A bolt consumes any number of input
streams, does some processing, and possibly emits new streams.
Networks of spouts and bolts are packaged into a
"topology" which is the top-level abstraction that you submit to
Storm clusters for execution. A topology is a graph of stream transformations
where each node is a spout or bolt. Edges in the graph indicate which bolts are
subscribing to which streams. When a spout or bolt emits a tuple to a stream,
it sends the tuple to every bolt that subscribed to that stream.
Each node in a Storm topology
executes in parallel. In your topology, you can specify how much parallelism
you want for each node, and then Storm will spawn that number of threads across
the cluster to do the execution.
A topology
runs forever, or until you kill it. Storm will automatically reassign any
failed tasks. Additionally, Storm guarantees that there will be no data loss, even
if machines go down and messages are dropped
Stream groupings
A stream grouping tells a topology
how to send tuples between two components. Remember, spouts and bolts execute
in parallel as many tasks across the cluster. If you look at how a topology is
executing at the task level, it looks something like this:
When a task for Bolt A emits a tuple
to Bolt B, which task should it send the tuple to?
A "stream grouping" answers this question by
telling Storm how to send tuples between sets of tasks.
TopologyBuilder
builder = new TopologyBuilder();
builder.setSpout("sentences",
new RandomSentenceSpout(), 5);
builder.setBolt("split",
new SplitSentence(), 8)
.shuffleGrouping("sentences");
builder.setBolt("count",
new WordCount(), 12)
.fieldsGrouping("split", new
Fields("word"));
SplitSentence emits a tuple for each word in each sentence it receives,
and WordCount keeps a
map in memory from word to count. Each time WordCount receives a word, it updates its state and emits the new
word count.
There's a few different kinds of
stream groupings.
The simplest kind of grouping is
called a "shuffle grouping" which sends the tuple to a random task. A
shuffle grouping is used in the WordCountTopology to send tuples from RandomSentenceSpout to the SplitSentence bolt. It has the effect of evenly distributing the work of
processing the tuples across all of SplitSentence bolt's tasks.
A more
interesting kind of grouping is the "fields grouping". A fields
grouping is used between the SplitSentence bolt and the WordCount
bolt. It is critical for the functioning of the WordCount bolt that the same word always go
to the same task. A fields grouping lets you group a stream by a subset of its
fields. This causes equal values for that subset of fields to go to the same
task. Since WordCount subscribes to SplitSentence's output stream using a fields grouping on the
"word" field, the same word always goes to the same task and the bolt
produces the correct output.
Data model
Storm uses tuples as its data model.
A tuple is a named list of values, and a field in a tuple can be an object of
any type. Out of the box, Storm supports all the primitive types, strings, and
byte arrays as tuple field values. To use an object of another type, you just
need to implement a serializer
for the type.
Every node in a topology must
declare the output fields for the tuples it emits. For example, this bolt
declares that it emits 2-tuples with the fields "double" and
"triple":
public
class DoubleAndTripleBolt extends BaseRichBolt {
private OutputCollectorBase _collector;
@Override
public void prepare(Map conf,
TopologyContext context, OutputCollectorBase collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new
Values(val*2, val*3));
_collector.ack(input);
}
@Override
public void
declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new
Fields("double", "triple"));
}
}
The declareOutputFields function declares the output fields ["double", "triple"] for the component.
A simple topology
Let's take a look at a simple
topology to explore the concepts more and see how the code shapes up. Let's
look at the ExclamationTopology definition from storm-starter:
TopologyBuilder
builder = new TopologyBuilder();
builder.setSpout("words",
new TestWordSpout(), 10);
builder.setBolt("exclaim1",
new ExclamationBolt(), 3)
.shuffleGrouping("words");
builder.setBolt("exclaim2",
new ExclamationBolt(), 2)
.shuffleGrouping("exclaim1");
This topology contains a spout and
two bolts. The spout emits words, and each bolt appends the string
"!!!" to its input. The nodes are arranged in a line: the spout emits
to the first bolt which then emits to the second bolt. If the spout emits the
tuples ["bob"] and ["john"], then the second bolt will emit
the words ["bob!!!!!!"] and ["john!!!!!!"].
This code defines the nodes using
the setSpout and setBolt methods. These methods take as input a user-specified id,
an object containing the processing logic, and the amount of parallelism you
want for the node. In this example, the spout is given id "words" and
the bolts are given ids "exclaim1" and "exclaim2".
The object containing the processing
logic implements the IRichSpout
interface for spouts and the IRichBolt
interface for bolts.
The last parameter, how much
parallelism you want for the node, is optional. It indicates how many threads
should execute that component across the cluster. If you omit it, Storm will
only allocate one thread for that node.
setBolt returns an InputDeclarer
object that is used to define the inputs to the Bolt. Here, component
"exclaim1" declares that it wants to read all the tuples emitted by
component "words" using a shuffle grouping, and component
"exclaim2" declares that it wants to read all the tuples emitted by
component "exclaim1" using a shuffle grouping. "shuffle
grouping" means that tuples should be randomly distributed from the input
tasks to the bolt's tasks. There are many ways to group data between
components. If you wanted component "exclaim2" to read all the tuples
emitted by both component "words" and component "exclaim1",
you would write component "exclaim2"'s definition like this:
builder.setBolt("exclaim2",
new ExclamationBolt(), 5)
.shuffleGrouping("words")
.shuffleGrouping("exclaim1");
As you can see, input declarations
can be chained to specify multiple sources for the Bolt.
Running ExclamationTopology in local mode
Let's see how to run the ExclamationTopology in local mode and see that it's working.
Storm has
two modes of operation: local mode and distributed mode. In local mode, Storm
executes completely in process by simulating worker nodes with threads. Local
mode is useful for testing and development of topologies. When you run the
topologies in storm-starter, they'll run in local mode and you'll be able to
see what messages each component is emitting.
To create an in-process cluster,
simply use the LocalCluster class. For example:
import
backtype.storm.LocalCluster;
LocalCluster
cluster = new LocalCluster();
You can then submit topologies using
the submitTopology
method on the LocalCluster object. Just like the corresponding method on StormSubmitter,
submitTopology
takes a name, a topology configuration, and the topology object. You can then
kill a topology using the killTopology method which takes the topology name as an argument.
To shutdown a local cluster, simple
call:
cluster.shutdown();
Sample code:
Here's the code that runs ExclamationTopology in local mode:
Config
conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster
cluster = new LocalCluster();
cluster.submitTopology("test",
conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();