2013-06-04 10 views
10

मैं एक टोपोलॉजी जो निम्न करता है लिखने के लिए कोशिश कर रहा हूँ:एक सरल एकत्रीकरण तूफान टोपोलॉजी में समूहन

  1. एक टोंटी है कि एक चहचहाना फ़ीड को सब्सक्राइब
  2. एक एकत्रीकरण बोल्ट (एक कीवर्ड के आधार पर) है कि एक संग्रह में कई ट्वीट्स (एन कहते हैं) को एकत्रित करता है और उन्हें प्रिंटर बोल्ट
  3. भेजता है एक साधारण बोल्ट जो संग्रह को कंसोल में एक बार प्रिंट करता है।

असल में मैं संग्रह पर कुछ और प्रसंस्करण करना चाहता हूं।

मैंने इसे स्थानीय रूप से परीक्षण किया और ऐसा लगता है कि यह काम कर रहा है। हालांकि, मुझे यकीन नहीं है कि मैंने समूह को बोल्ट पर सही तरीके से सेट किया है और यदि वास्तविक तूफान क्लस्टर पर तैनात किए जाने पर यह सही तरीके से काम करेगा। मैं इस बात की सराहना करता हूं कि कोई इस टोपोलॉजी की समीक्षा करने में मदद कर सकता है और किसी भी त्रुटि, परिवर्तन या सुधार का सुझाव दे सकता है।

धन्यवाद।

यह मेरी टोपोलॉजी की तरह दिखता है।

builder.setSpout("spout", new TwitterFilterSpout("pittsburgh")); 
    builder.setBolt("sampleaggregate", new SampleAggregatorBolt()) 
       .shuffleGrouping("spout"); 
    builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate"); 

एकत्रीकरण बोल्ट

public class SampleAggregatorBolt implements IRichBolt { 

    protected OutputCollector collector; 
    protected Tuple currentTuple; 
    protected Logger log; 
    /** 
    * Holds the messages in the bolt till you are ready to send them out 
    */ 
    protected List<Status> statusCache; 

    @Override 
    public void prepare(Map stormConf, TopologyContext context, 
         OutputCollector collector) { 
     this.collector = collector; 

     log = Logger.getLogger(getClass().getName()); 
     statusCache = new ArrayList<Status>(); 
    } 

    @Override 
    public void execute(Tuple tuple) { 
     currentTuple = tuple; 

     Status currentStatus = null; 
     try { 
      currentStatus = (Status) tuple.getValue(0); 
     } catch (ClassCastException e) { 
     } 
     if (currentStatus != null) { 

      //add it to the status cache 
      statusCache.add(currentStatus); 
      collector.ack(tuple); 


      //check the size of the status cache and pass it to the next stage if you have enough messages to emit 
      if (statusCache.size() > 10) { 
       collector.emit(new Values(statusCache)); 
      } 

     } 
    } 

    @Override 
    public void cleanup() { 


    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("tweets")); 

    } 

    @Override 
    public Map<String, Object> getComponentConfiguration() { 
     return null; //To change body of implemented methods use File | Settings | File Templates. 
    } 


    protected void setupNonSerializableAttributes() { 

    } 

} 

प्रिंटर बोल्ट

public class PrinterBolt extends BaseBasicBolt { 

    @Override 
    public void execute(Tuple tuple, BasicOutputCollector collector) { 
     System.out.println(tuple.size() + " " + tuple); 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer ofd) { 
    } 

} 

उत्तर

4

मैं इसे अच्छा लग रहा है क्या देख सकते हैं से। हालांकि, शैतान विवरण में है। मुझे यकीन नहीं है कि आपका एग्रीगेटर बोल्ट क्या करता है लेकिन अगर यह इसके मूल्यों के बारे में कोई धारणा करता है तो आपको उपयुक्त फ़ील्ड ग्रुपिंग पर विचार करना चाहिए। यह उस अंतर के बड़े हिस्से को नहीं बना सकता है क्योंकि आप 1 के डिफ़ॉल्ट समांतरता संकेत का उपयोग कर रहे हैं, लेकिन क्या आप कई समग्र बोल्ट उदाहरणों के साथ स्केल करने का निर्णय लेना चाहते हैं, जो आप गैर-शफल समूह के लिए कॉल कर सकते हैं।

+0

मैंने उपरोक्त एग्रीगेटर बोल्ट के लिए कोड प्रदान किया है (निष्पादन विधि देखें)। अभी के लिए यह इंतजार कर रहा है जब तक कि उसने एन (10 उपरोक्त उदाहरण में) संदेश जमा नहीं किए हैं और जैसे ही इसमें 10 संदेश हैं, उन्हें विभाजित कर दिया जाता है। बीटीडब्ल्यू मुझे बस एक बग मिला जो मैं ठीक कर दूंगा। एक बार जब मैं मूल्यों को छोड़ देता हूं तो मुझे कैश को साफ़ करने की आवश्यकता होती है। तो अगर मुझे एक से अधिक एग्रीगेटर का उपयोग करने की आवश्यकता है तो क्या परिवर्तन आवश्यक होना चाहिए। –

0

जैसे ही आप एक से अधिक कीवर्ड की सदस्यता लेने का प्रयास कर रहे हैं, आप समस्याओं में भाग लेंगे। मेरा सुझाव है कि आपका स्पॉट भी उस मूल कीवर्ड को छोड़ देता है जिसका उपयोग फ़िल्टर करने के लिए किया गया था।

तो बजाय shuffleGrouping करने का मैं एक fieldsGrouping

builder.setBolt("sampleaggregate", new SampleAggregatorBolt()) 
      .shuffleGrouping("spout", new Fields("keyword")); 

इस तरह आप यकीन है कि केवल एक कीवर्ड के परिणाम एक ही बोल्ट पर हर बार अंत कर करना होगा। ऐसा है कि आप समेकित रूप से गणना कर सकते हैं। यदि आप फ़ील्ड को छोड़ देते हैं तो ग्रुपिंग स्टॉर्म आपके कुल बोल्ट की किसी भी मात्रा को तुरंत चालू कर सकता है और स्पॉट से किसी भी संदेश को कुल बोल्ट के किसी भी उदाहरण में भेज सकता है जो अंतिम मामले में गलत परिणाम होगा।

संबंधित मुद्दे