2013-07-13 4 views
7

मैं तूफान में ट्राइडेंट के लिए नया हूँ। मैं ट्रिडेंटस्टेट पर अपना सिर तोड़ रहा हूं। जहां तक ​​मेरी समझ ट्राइडेंट प्रत्येक बैच के लिए राज्य (यानी मेटाडेटा) को बनाए रखता है (चाहे बैच में सभी टुपल डेटाबेस में लेनदेन आईडी को बनाए रखकर पूरी तरह संसाधित हो जाएं) और मुझे पूरा यकीन नहीं है कि निम्नलिखित कथनतूफान में ट्राइडेंट स्टेट क्या है?

TridentState urlToTweeters = 
    topology.newStaticState(getUrlToTweetersState()); 

क्या कोई बता सकता है कि वास्तव में क्या होता है जब हम उपरोक्त कोड को परिभाषित करते हैं?

+0

क्या आप इस संदर्भ में "ट्राइडेंट" को परिभाषित कर सकते हैं? ट्रिडेंट नामक कई चीजें हैं। – Charles

+1

संदर्भ "तूफान" है: https://github.com/nathanmarz/storm/wiki/Documentation#trident – Dan

उत्तर

0

ट्राइडेंट राज्य on the storm wiki पर अच्छा दस्तावेज है। आपके प्रश्न का सरल उत्तर यह है कि urlToTweeters एक राज्य वस्तु है जिसे से पूछताछ की जा सकती है। मैं बयान संभालने कर रहा हूँ ऊपर trident tutorial से है, नीचे reproduced:

TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState()); 
TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState()); 
topology.newDRPCStream("reach") 
    .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")) 
    /* At this point we have the tweeters for each url passed in args */ 
    .shuffle()   
    .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")) 
    .parallelismHint(200) 
    .each(new Fields("followers"), new ExpandList(), new Fields("follower")) 
    .groupBy(new Fields("follower")) 
    .aggregate(new One(), new Fields("one")) 
    .parallelismHint(20) 
    .aggregate(new Count(), new Fields("reach")); 

इस उदाहरण में, urlToTweeters Tweeters के लिए यूआरएल की एक मानचित्रण की दुकान है, और DRPC reach क्वेरी को अगली पंक्ति में परिभाषित किया गया है (जो में ले जाता है यूआरएल इसके तर्क के रूप में) अंततः पहुंच प्रदान करेगा। लेकिन जिस तरह से (एक टिप्पणी इनलाइन के साथ चिह्नित) आप प्रत्येक यूआरएल के ट्वीटर्स की एक स्ट्रीम देखेंगे, यानी urlToTweeters पर एक प्रश्न का नतीजा।

+0

क्या आप इस http://stackoverflow.com/questions/35445165/total-number-of-non पर सहायता कर सकते हैं -प्रति-शब्द-में-प्रत्येक-ट्वीट – user1

9

मुझे आशा है कि यह जवाब देने के लिए, कम से कम किसी और मेरा उत्तर उपयोगी लग सकते :)

तो, topology.newStaticState() एक queryable डेटा भंडारण के ट्राइडेंट के अमूर्त है बहुत देर हो चुकी कभी नहीं है। के लिए पैरामीटर एक कार्यान्वयन होना चाहिए - विधि के अनुबंध - storm.trident.state.StateFactory के आधार पर। कारखाने, बदले में, विधि storm.trident.state.State के उदाहरण को वापस करने के तरीके को लागू करना चाहिए। हालांकि यदि आप अपने राज्य से पूछताछ करने की योजना बना रहे हैं, तो आपको storm.trident.state.map.ReadOnlyMapState की जगह वापस लेनी चाहिए, क्योंकि सादे storm.trident.state.State में वास्तविक डेटा स्रोत से पूछताछ करने के तरीके नहीं हैं (यदि आप कुछ भी उपयोग करने का प्रयास करते हैं तो आपको वास्तव में क्लास कास्ट अपवाद मिलेगा लेकिन ReadOnlyMapState)।

तो, आइए इसे आज़माएं!

एक डमी राज्य कार्यान्वयन:

public static class ExampleStaticState implements ReadOnlyMapState<String> { 

    private final Map<String, String> dataSourceStub; 

    public ExampleStaticState() { 
     dataSourceStub = new HashMap<>(); 
     dataSourceStub.put("tuple-00", "Trident"); 
     dataSourceStub.put("tuple-01", "definitely"); 
     dataSourceStub.put("tuple-02", "lacks"); 
     dataSourceStub.put("tuple-03", "documentation"); 
    } 

    @Override 
    public List<String> multiGet(List<List<Object>> keys) { 

     System.out.println("DEBUG: MultiGet, keys is " + keys); 

     List<String> result = new ArrayList<>(); 

     for (List<Object> inputTuple : keys) { 
      result.add(dataSourceStub.get(inputTuple.get(0))); 
     } 

     return result; 
    } 

    @Override 
    public void beginCommit(Long txid) { 
     // never gets executed... 
     System.out.println("DEBUG: Begin commit, txid=" + txid); 
    } 

    @Override 
    public void commit(Long txid) { 
     // never gets executed... 
     System.out.println("DEBUG: Commit, txid=" + txid); 
    } 
} 

एक कारखाना:

public static class ExampleStaticStateFactory implements StateFactory { 
    @Override 
    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { 
     return new ExampleStaticState(); 
    } 
} 

एक साधारण psvm (उर्फ public static void main):

public static void main(String... args) { 
    TridentTopology tridentTopology = new TridentTopology(); 
    FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{ 
      "foo" 
    })); 
    TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory()); 
    tridentTopology 
      .newStream("spout", spout) 
      .stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar")) 
      .each(new Fields("foo", "bar"), new Debug()) 
      ; 

    Config conf = new Config(); 
    conf.setNumWorkers(6); 

    LocalCluster localCluster = new LocalCluster(); 
    localCluster.submitTopology("tridentTopology", conf, tridentTopology.build()); 

    spout.feed(Arrays.asList(new Values[]{ 
      new Values("tuple-00"), 
      new Values("tuple-01"), 
      new Values("tuple-02"), 
      new Values("tuple-03") 
    })); 

    localCluster.shutdown(); 
} 

और अंत में, उत्पादन:

DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]] 
DEBUG: [tuple-00, Trident] 
DEBUG: [tuple-01, definitely] 
DEBUG: [tuple-02, lacks] 
DEBUG: [tuple-03, documentation] 

आप देखते हैं, stateQuery() इनपुट बैच से मूल्य प्राप्त करता है और उन्हें 'डेटा संग्रहण' में पाए गए मानों पर मानचित्रित करता है। हुड के नीचे

public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> { 
    @Override 
    public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) { 
     return map.multiGet((List) keys); 
    }  

    @Override 
    public void execute(TridentTuple tuple, Object result, TridentCollector collector) { 
     collector.emit(new Values(result)); 
    }  
} 

तो:

डाइविंग थोड़ा गहरा, आप MapGet वर्ग (पुरुष जिसका उदाहरण टोपोलॉजी अंदर क्वेरी करने के लिए प्रयोग किया जाता है) के स्रोत पर एक नज़र ले जा सकते हैं और पता चलता है कि निम्नलिखित यह आपके ReadOnlyMapState कार्यान्वयन के multiGet() विधि को कॉल करता है और फिर डेटा स्टोरेज में पाए गए मानों को उत्सर्जित करता है, जो उन्हें पहले से मौजूद टुपल में जोड़ता है।आप (हालांकि यह करने के लिए सबसे अच्छी बात नहीं हो सकती है) BaseQueryFunction<ReadOnlyMapState, Object> का अपना स्वयं का कार्यान्वयन कुछ और जटिल कर सकते हैं।

+1

धन्यवाद ... सीखने की बात आने पर कभी भी देर नहीं हुई ... – Ezhil

+0

क्या आप इस http://stackoverflow.com/questions/35445165/total-number-of पर सहायता कर सकते हैं -गैर-दोहराया-शब्द-इन-प्रत्येक-ट्वीट – user1

संबंधित मुद्दे