मुझे आशा है कि यह जवाब देने के लिए, कम से कम किसी और मेरा उत्तर उपयोगी लग सकते :)
तो, topology.newStaticState()
एक queryable डेटा भंडारण के ट्राइडेंट के अमूर्त है बहुत देर हो चुकी कभी नहीं है। के लिए पैरामीटर एक कार्यान्वयन होना चाहिए - विधि के अनुबंध - storm.trident.state.StateFactory
के आधार पर। कारखाने, बदले में, विधि storm.trident.state.State
के उदाहरण को वापस करने के तरीके को लागू करना चाहिए। हालांकि यदि आप अपने राज्य से पूछताछ करने की योजना बना रहे हैं, तो आपको storm.trident.state.map.ReadOnlyMapState
की जगह वापस लेनी चाहिए, क्योंकि सादे storm.trident.state.State
में वास्तविक डेटा स्रोत से पूछताछ करने के तरीके नहीं हैं (यदि आप कुछ भी उपयोग करने का प्रयास करते हैं तो आपको वास्तव में क्लास कास्ट अपवाद मिलेगा लेकिन ReadOnlyMapState
)।
तो, आइए इसे आज़माएं!
एक डमी राज्य कार्यान्वयन:
public static class ExampleStaticState implements ReadOnlyMapState<String> {
private final Map<String, String> dataSourceStub;
public ExampleStaticState() {
dataSourceStub = new HashMap<>();
dataSourceStub.put("tuple-00", "Trident");
dataSourceStub.put("tuple-01", "definitely");
dataSourceStub.put("tuple-02", "lacks");
dataSourceStub.put("tuple-03", "documentation");
}
@Override
public List<String> multiGet(List<List<Object>> keys) {
System.out.println("DEBUG: MultiGet, keys is " + keys);
List<String> result = new ArrayList<>();
for (List<Object> inputTuple : keys) {
result.add(dataSourceStub.get(inputTuple.get(0)));
}
return result;
}
@Override
public void beginCommit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Begin commit, txid=" + txid);
}
@Override
public void commit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Commit, txid=" + txid);
}
}
एक कारखाना:
public static class ExampleStaticStateFactory implements StateFactory {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
return new ExampleStaticState();
}
}
एक साधारण psvm
(उर्फ public static void main
):
public static void main(String... args) {
TridentTopology tridentTopology = new TridentTopology();
FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{
"foo"
}));
TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory());
tridentTopology
.newStream("spout", spout)
.stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar"))
.each(new Fields("foo", "bar"), new Debug())
;
Config conf = new Config();
conf.setNumWorkers(6);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("tridentTopology", conf, tridentTopology.build());
spout.feed(Arrays.asList(new Values[]{
new Values("tuple-00"),
new Values("tuple-01"),
new Values("tuple-02"),
new Values("tuple-03")
}));
localCluster.shutdown();
}
और अंत में, उत्पादन:
DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]]
DEBUG: [tuple-00, Trident]
DEBUG: [tuple-01, definitely]
DEBUG: [tuple-02, lacks]
DEBUG: [tuple-03, documentation]
आप देखते हैं, stateQuery() इनपुट बैच से मूल्य प्राप्त करता है और उन्हें 'डेटा संग्रहण' में पाए गए मानों पर मानचित्रित करता है। हुड के नीचे
public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> {
@Override
public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) {
return map.multiGet((List) keys);
}
@Override
public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
collector.emit(new Values(result));
}
}
तो:
डाइविंग थोड़ा गहरा, आप MapGet
वर्ग (पुरुष जिसका उदाहरण टोपोलॉजी अंदर क्वेरी करने के लिए प्रयोग किया जाता है) के स्रोत पर एक नज़र ले जा सकते हैं और पता चलता है कि निम्नलिखित यह आपके ReadOnlyMapState
कार्यान्वयन के multiGet()
विधि को कॉल करता है और फिर डेटा स्टोरेज में पाए गए मानों को उत्सर्जित करता है, जो उन्हें पहले से मौजूद टुपल में जोड़ता है।आप (हालांकि यह करने के लिए सबसे अच्छी बात नहीं हो सकती है) BaseQueryFunction<ReadOnlyMapState, Object>
का अपना स्वयं का कार्यान्वयन कुछ और जटिल कर सकते हैं।
क्या आप इस संदर्भ में "ट्राइडेंट" को परिभाषित कर सकते हैं? ट्रिडेंट नामक कई चीजें हैं। – Charles
संदर्भ "तूफान" है: https://github.com/nathanmarz/storm/wiki/Documentation#trident – Dan