Monday, December 3, 2012

Strom(BackType/Twitter)- Part 2


Hi All,

Bellow are some snapshots of nathanmarz video, at the time of launching storm and other content available on internet..

Please have look and let me know in case of any doubt.

Use Cases:

 

Streams

The core abstraction in Storm is the "stream". A stream is an unbounded sequence of tuples.
 
All the tuples must have the same schema.
Storm provides the primitives for transforming a stream into a new stream in a distributed and reliable way. For example, you may transform a stream of tweets into a stream of trending topics.
The basic primitives Storm provides for doing stream transformations are "spouts" and "bolts". Spouts and bolts have interfaces that you implement to run your application-specific logic.
A spout is a source of streams. For example, a spout may read tuples off of a Kestrel queue and emit them as a stream. Or a spout may connect to the Twitter API and emit a stream of tweets.
A bolt consumes any number of input streams, does some processing, and possibly emits new streams.
 
A bolt consumes any number of input streams, does some processing, and possibly emits new streams.

 

Networks of spouts and bolts are packaged into a "topology" which is the top-level abstraction that you submit to Storm clusters for execution. A topology is a graph of stream transformations where each node is a spout or bolt. Edges in the graph indicate which bolts are subscribing to which streams. When a spout or bolt emits a tuple to a stream, it sends the tuple to every bolt that subscribed to that stream.
 
Each node in a Storm topology executes in parallel. In your topology, you can specify how much parallelism you want for each node, and then Storm will spawn that number of threads across the cluster to do the execution.
A topology runs forever, or until you kill it. Storm will automatically reassign any failed tasks. Additionally, Storm guarantees that there will be no data loss, even if machines go down and messages are dropped



Stream groupings

A stream grouping tells a topology how to send tuples between two components. Remember, spouts and bolts execute in parallel as many tasks across the cluster. If you look at how a topology is executing at the task level, it looks something like this:
.
When a task for Bolt A emits a tuple to Bolt B, which task should it send the tuple to?
A "stream grouping" answers this question by telling Storm how to send tuples between sets of tasks.




TopologyBuilder builder = new TopologyBuilder();
       
builder.setSpout("sentences", new RandomSentenceSpout(), 5);       
builder.setBolt("split", new SplitSentence(), 8)
        .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
        .fieldsGrouping("split", new Fields("word"));
SplitSentence emits a tuple for each word in each sentence it receives, and WordCount keeps a map in memory from word to count. Each time WordCount receives a word, it updates its state and emits the new word count.
There's a few different kinds of stream groupings.
The simplest kind of grouping is called a "shuffle grouping" which sends the tuple to a random task. A shuffle grouping is used in the WordCountTopology to send tuples from RandomSentenceSpout to the SplitSentence bolt. It has the effect of evenly distributing the work of processing the tuples across all of SplitSentence bolt's tasks.
A more interesting kind of grouping is the "fields grouping". A fields grouping is used between the SplitSentence bolt and the WordCount bolt. It is critical for the functioning of the WordCount bolt that the same word always go to the same task. A fields grouping lets you group a stream by a subset of its fields. This causes equal values for that subset of fields to go to the same task. Since WordCount subscribes to SplitSentence's output stream using a fields grouping on the "word" field, the same word always goes to the same task and the bolt produces the correct output.

Data model

Storm uses tuples as its data model. A tuple is a named list of values, and a field in a tuple can be an object of any type. Out of the box, Storm supports all the primitive types, strings, and byte arrays as tuple field values. To use an object of another type, you just need to implement a serializer for the type.
Every node in a topology must declare the output fields for the tuples it emits. For example, this bolt declares that it emits 2-tuples with the fields "double" and "triple":
public class DoubleAndTripleBolt extends BaseRichBolt {
    private OutputCollectorBase _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        int val = input.getInteger(0);       
        _collector.emit(input, new Values(val*2, val*3));
        _collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));
    }   
}
The declareOutputFields function declares the output fields ["double", "triple"] for the component.

A simple topology

Let's take a look at a simple topology to explore the concepts more and see how the code shapes up. Let's look at the ExclamationTopology definition from storm-starter:
TopologyBuilder builder = new TopologyBuilder();       
builder.setSpout("words", new TestWordSpout(), 10);       
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
        .shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
        .shuffleGrouping("exclaim1");
This topology contains a spout and two bolts. The spout emits words, and each bolt appends the string "!!!" to its input. The nodes are arranged in a line: the spout emits to the first bolt which then emits to the second bolt. If the spout emits the tuples ["bob"] and ["john"], then the second bolt will emit the words ["bob!!!!!!"] and ["john!!!!!!"].
This code defines the nodes using the setSpout and setBolt methods. These methods take as input a user-specified id, an object containing the processing logic, and the amount of parallelism you want for the node. In this example, the spout is given id "words" and the bolts are given ids "exclaim1" and "exclaim2".
The object containing the processing logic implements the IRichSpout interface for spouts and the IRichBolt interface for bolts.
The last parameter, how much parallelism you want for the node, is optional. It indicates how many threads should execute that component across the cluster. If you omit it, Storm will only allocate one thread for that node.
setBolt returns an InputDeclarer object that is used to define the inputs to the Bolt. Here, component "exclaim1" declares that it wants to read all the tuples emitted by component "words" using a shuffle grouping, and component "exclaim2" declares that it wants to read all the tuples emitted by component "exclaim1" using a shuffle grouping. "shuffle grouping" means that tuples should be randomly distributed from the input tasks to the bolt's tasks. There are many ways to group data between components. If you wanted component "exclaim2" to read all the tuples emitted by both component "words" and component "exclaim1", you would write component "exclaim2"'s definition like this:
builder.setBolt("exclaim2", new ExclamationBolt(), 5)
            .shuffleGrouping("words")
            .shuffleGrouping("exclaim1");
As you can see, input declarations can be chained to specify multiple sources for the Bolt.

Running ExclamationTopology in local mode

Let's see how to run the ExclamationTopology in local mode and see that it's working.
Storm has two modes of operation: local mode and distributed mode. In local mode, Storm executes completely in process by simulating worker nodes with threads. Local mode is useful for testing and development of topologies. When you run the topologies in storm-starter, they'll run in local mode and you'll be able to see what messages each component is emitting.
Running topologies in local mode is similar to running topologies on a cluster.
To create an in-process cluster, simply use the LocalCluster class. For example:
import backtype.storm.LocalCluster;

LocalCluster cluster = new LocalCluster();
You can then submit topologies using the submitTopology method on the LocalCluster object. Just like the corresponding method on StormSubmitter, submitTopology takes a name, a topology configuration, and the topology object. You can then kill a topology using the killTopology method which takes the topology name as an argument.
To shutdown a local cluster, simple call:
cluster.shutdown();

Sample code:


Here's the code that runs ExclamationTopology in local mode:
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();