मैं काफ्कास्पॉट का उपयोग कर रहा हूं। कृपया नीचे परीक्षण कार्यक्रम खोजें।काफ्का स्कोउट का उपयोग कर काफ्का तूफान एकीकरण
मैं तूफान 0.8.1 का उपयोग कर रहा हूं। तूफान 0.8.2 में मल्टीशेम वर्ग है। मैं इसका इस्तेमाल करूँगा। मैं सिर्फ यह जानना चाहता हूं कि पहले के संस्करण स्ट्रिंगशेम() कक्षा को तुरंत चालू करके कैसे काम कर रहे थे? मैं काफ्का स्पॉट के पुराने संस्करण कहां डाउनलोड कर सकता हूं? लेकिन मुझे संदेह है कि तूफान 0.8.2 पर काम करने के बजाय एक सही विकल्प होगा। ??? (उलझन में)
जब मैं तूफान क्लस्टर पर कोड (नीचे दिया गया) चलाता हूं (यानी जब मैं अपनी टोपोलॉजी को दबाता हूं) मुझे निम्न त्रुटि मिलती है (ऐसा तब होता है जब योजना भाग को अन्यथा टिप्पणी की जाती है, मुझे संकलक त्रुटि मिल जाएगी वर्ग 0.8.1 में वहाँ नहीं है):
java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme
at storm.kafka.TestTopology.main(TestTopology.java:37)
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme
आप नीचे दिए गए() spoutConfig.scheme = नए StringScheme मिल सकता है कोड में; भाग टिप्पणी की। मुझे संकलक त्रुटि मिल रही थी अगर मैं उस रेखा पर टिप्पणी नहीं करता जो कि प्राकृतिक है क्योंकि वहां कोई रचनाकार नहीं है। साथ ही जब मैं मल्टीशेम को तुरंत चालू करता हूं तो मुझे त्रुटि मिलती है क्योंकि मेरे पास उस कक्षा में 0.8.1 नहीं है।
public class TestTopology {
public static class PrinterBolt extends BaseBasicBolt {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.toString());
}
}
public static void main(String [] args) throws Exception {
List<HostPort> hosts = new ArrayList<HostPort>();
hosts.add(new HostPort("127.0.0.1",9092));
LocalCluster cluster = new LocalCluster();
TopologyBuilder builder = new TopologyBuilder();
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID");
spoutConfig.zkServers=ImmutableList.of("localhost");
spoutConfig.zkPort=2181;
//spoutConfig.scheme=new StringScheme();
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
builder.setSpout("spout",new KafkaSpout(spoutConfig));
builder.setBolt("printer", new PrinterBolt())
.shuffleGrouping("spout");
Config config = new Config();
cluster.submitTopology("kafka-test", config, builder.createTopology());
Thread.sleep(600000);
}
मुझे लगता है मुझे समस्या नहीं आती है: क्या आप 0.8.2 पर जाते हैं तो यह सिर्फ काम करता है? यदि ऐसा होता है, तो 0.8.1 में चलाने का प्रयास क्यों करें: 0.8.2 इसे कुछ बग फिक्स और अन्य सुधारों के साथ बदल देता है। –