Apache Storm Flashcards
Which are logical processing components of Apache Storm?
1) Spout - From where the topology beings and pulls data from the source
2) Bolts - are the processing nodes where calculation/transformation etc happens. Bolts can send data to further bolts and can communicate with external DB also.
Which are technical architectural internal components of Apache Storm?
1) Nimbus node - Similar to hadoop JobTracker. It is master node.
- Uploads computation for execution
- Distributes code across the cluster
- Launches workers across the cluster
- Monitors computation and reallocates workers as needed.
2) Zookeeper nodes - Coordinates the storm cluster
3) Supervisor nodes - Communicates with Nimbus node through zookeeper, starts and stops workers according to signals from Nimbus
Five key abstractions of Apache Storm?
1) Tuples - an ordered list of elements
2) Streams - an unbounded sequence of tuples
3) Spouts - sources of streams in computation like Twitter API
4) Bolts - process input streams and produce output streams They can run functions filter, aggregate, or join data or talk to databases.
5) Topologies - the overall calculation, represented visually as DAG network of spouts and bolts.
Apache storm benefits
1) Fast - one million 100 byte messages per second per node
2) Scalable - with parallel computations that run across cluster of machines
3) Fault-tolerant - When workers die, Storm will automatically restart them. If worker dies, it will automatically be restarted.
4) Reliable - Storm guarantees that each tuple will be processed at-least once or exactly once. Messages are only replayed when there are failures.
5) Easy to operate
Which interface or base class do we have to implement or extend to create bolt?
IRichBolt or BaseRichBolt
It contains three useful methods + some others
1) declareOutputFields(OutputFieldsDeclarer declarer)
2) execute(Tuple input, BasicOutputCollector collector)
3) cleanup()
Program for Word count with Storm?
BrokerHosts zkConfig = new ZkHosts("localhost:2181"); SpoutConfig kafkaConf = new SpoutConfig(zkConfig, TOPIC, ZK_ROOT, CLIENT_ID); kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme()); kafkaConf.useStartOffsetTimeIfOffsetOutOfRange = true; kafkaConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1); topologyBuilder.setBolt("word-bolt", new WordBolt()).globalGrouping("kafka-spout"); topologyBuilder.setBolt("word-count", new WordCountBolt(), 1) .fieldsGrouping("word-bolt", new Fields("word")); topologyBuilder.setBolt("report-count", new ReportBolt()).globalGrouping("word-count");
Config config = new Config(); config. setDebug(false); config. put(Config.TOPOLOGY_DEBUG, false);
final LocalCluster localCluster = new LocalCluster(); StormTopology topology = topologyBuilder.createTopology(); localCluster.submitTopology("word-count-topology", config, topology);