मैं एक टोपोलॉजी जो निम्न करता है लिखने के लिए कोशिश कर रहा हूँ:एक सरल एकत्रीकरण तूफान टोपोलॉजी में समूहन
- एक टोंटी है कि एक चहचहाना फ़ीड को सब्सक्राइब
- एक एकत्रीकरण बोल्ट (एक कीवर्ड के आधार पर) है कि एक संग्रह में कई ट्वीट्स (एन कहते हैं) को एकत्रित करता है और उन्हें प्रिंटर बोल्ट
- भेजता है एक साधारण बोल्ट जो संग्रह को कंसोल में एक बार प्रिंट करता है।
असल में मैं संग्रह पर कुछ और प्रसंस्करण करना चाहता हूं।
मैंने इसे स्थानीय रूप से परीक्षण किया और ऐसा लगता है कि यह काम कर रहा है। हालांकि, मुझे यकीन नहीं है कि मैंने समूह को बोल्ट पर सही तरीके से सेट किया है और यदि वास्तविक तूफान क्लस्टर पर तैनात किए जाने पर यह सही तरीके से काम करेगा। मैं इस बात की सराहना करता हूं कि कोई इस टोपोलॉजी की समीक्षा करने में मदद कर सकता है और किसी भी त्रुटि, परिवर्तन या सुधार का सुझाव दे सकता है।
धन्यवाद।
यह मेरी टोपोलॉजी की तरह दिखता है।
builder.setSpout("spout", new TwitterFilterSpout("pittsburgh"));
builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
.shuffleGrouping("spout");
builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");
एकत्रीकरण बोल्ट
public class SampleAggregatorBolt implements IRichBolt {
protected OutputCollector collector;
protected Tuple currentTuple;
protected Logger log;
/**
* Holds the messages in the bolt till you are ready to send them out
*/
protected List<Status> statusCache;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
log = Logger.getLogger(getClass().getName());
statusCache = new ArrayList<Status>();
}
@Override
public void execute(Tuple tuple) {
currentTuple = tuple;
Status currentStatus = null;
try {
currentStatus = (Status) tuple.getValue(0);
} catch (ClassCastException e) {
}
if (currentStatus != null) {
//add it to the status cache
statusCache.add(currentStatus);
collector.ack(tuple);
//check the size of the status cache and pass it to the next stage if you have enough messages to emit
if (statusCache.size() > 10) {
collector.emit(new Values(statusCache));
}
}
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tweets"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
protected void setupNonSerializableAttributes() {
}
}
प्रिंटर बोल्ट
public class PrinterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.size() + " " + tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}
}
मैंने उपरोक्त एग्रीगेटर बोल्ट के लिए कोड प्रदान किया है (निष्पादन विधि देखें)। अभी के लिए यह इंतजार कर रहा है जब तक कि उसने एन (10 उपरोक्त उदाहरण में) संदेश जमा नहीं किए हैं और जैसे ही इसमें 10 संदेश हैं, उन्हें विभाजित कर दिया जाता है। बीटीडब्ल्यू मुझे बस एक बग मिला जो मैं ठीक कर दूंगा। एक बार जब मैं मूल्यों को छोड़ देता हूं तो मुझे कैश को साफ़ करने की आवश्यकता होती है। तो अगर मुझे एक से अधिक एग्रीगेटर का उपयोग करने की आवश्यकता है तो क्या परिवर्तन आवश्यक होना चाहिए। –