2013-02-21 11 views
6

का उपयोग कर तूफान टोपोलॉजी असंतुलन मैं अपने तूफान टोपोलॉजी को पुनर्व्यवस्थित करने की कोशिश कर रहा हूं जो एक काफ्कास्पॉट का उपयोग कर रहा है। मेरे कोड है:जावा कोड

TopologyBuilder builder = new TopologyBuilder(); 
    Properties kafkaProps = new Properties(); 
    kafkaProps.put("zk.connect", "localhost:2181"); 
    kafkaProps.put("zk.connectiontimeout.ms", "1000000"); 
    kafkaProps.put("groupid", "storm"); 

    builder.setSpout("kafkaSpout" , new KafkaSpout(kafkaProps, "test"), 3); 
    builder.setBolt("eventBolt", new EventBolt(), 2).shuffleGrouping("kafkaSpout", "eventStream"); 
    builder.setBolt("tableBolt", new TableBolt(), 2).shuffleGrouping("kafkaSpout", "tableStream"); 

    Map<String, Object> conf = new HashMap<String, Object>(); 
    conf.put(Config.TOPOLOGY_DEBUG, true); 

    LocalCluster cluster = new LocalCluster(); 
    cluster.submitTopology("test", conf, builder.createTopology()); 

    Utils.sleep(1000*5); 

    List<TopologySummary> topologySummaries = cluster.getClusterInfo().get_topologies(); 
    for (TopologySummary summary : topologySummaries) { 
     StormTopology topology = cluster.getTopology(summary.get_id()); 
     RebalanceOptions options = new RebalanceOptions(); 
     options.set_wait_secs(0); 
     options.set_num_workers(4); 

     for (String name : topology.get_bolts().keySet()) { 
      System.err.println(name + " " + topology.get_bolts().get(name).get_common().get_json_conf()); 
      options.put_to_num_executors(name , 5); 
     } 
     for (String name : topology.get_spouts().keySet()) { 
      System.err.println(name + " " + topology.get_spouts().get(name).get_common().get_json_conf()); 
      options.put_to_num_executors(name , 5); 
     } 

     cluster.rebalance(summary.get_name() , options); 
    } 

हालांकि, संतुलन फिर दौरान, निम्न त्रुटि का पता लगाने दिखाया गया है:

10341 [storm_rishabh-1361473654345-95461d10_watcher_executor] INFO kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-95461d10 begin rebalancing consumer storm_rishabh-1361473654345-95461d10 try #1 
10341 [storm_rishabh-1361473654345-3b26ed76_watcher_executor] INFO kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-3b26ed76 begin rebalancing consumer storm_rishabh-1361473654345-3b26ed76 try #1 
10342 [storm_rishabh-1361473654345-95461d10_watcher_executor] ERROR kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-95461d10 error during syncedRebalance 
java.lang.NullPointerException: null 
at kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:181) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:202) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:447) ~[kafka_2.9.2-0.7.0.jar:na] 
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:78) ~[scala-library-2.9.2.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:444) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:401) ~[kafka_2.9.2-0.7.0.jar:na] 
10342 [storm_rishabh-1361473654345-3b26ed76_watcher_executor] ERROR kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-3b26ed76 error during syncedRebalance 
java.lang.NullPointerException: null 
at kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:181) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:202) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:447) ~[kafka_2.9.2-0.7.0.jar:na] 
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:78) ~[scala-library-2.9.2.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:444) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:401) ~[kafka_2.9.2-0.7.0.jar:na] 
10342 [storm_rishabh-1361473654345-95461d10_watcher_executor] INFO kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-95461d10 stopping watcher executor thread for consumer storm_rishabh-1361473654345-95461d10 
10343 [storm_rishabh-1361473654345-3b26ed76_watcher_executor] INFO kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-3b26ed76 stopping watcher executor thread for consumer storm_rishabh-1361473654345-3b26ed76 

कर सकते हैं किसी कृपया मुझे बताओ क्या समस्या हो सकती है? क्या मुझे kafkaSpout में कुछ और परिभाषित करने की आवश्यकता है ताकि rebalancing के समय, यह ठीक से बंद हो जाता है और फिर फिर से शुरू होता है?

उत्तर

0

LocalCluster (विकास उद्देश्यों के लिए) में चलते समय मुझे यह समस्या हो रही थी। मैंने श्रमिकों की संख्या को कम करने के लिए 1:

topology.workers: 1 

इस समस्या को सही किया। मैंने अभी तक इसे एक वास्तविक वितरित क्लस्टर पर चलाने का प्रयास नहीं किया है, इसलिए मुझे नहीं पता कि यह LocalCluster मोड में चलने का एक आर्टिफैक्ट है या नहीं।

(मेरे कोड में मैं LocalCluster.rebalance आह्वान कभी नहीं।) पर्यवेक्षक या निंबस नोड से

0

उपयोग तूफान को संतुलित आदेश।

उदाहरण के लिए, तूफान पुनर्विक्रय mytopology -n 5 -e blue-spout = 3 -e yellow-bolt = 10।

कृपया इस साइट को देखें। www.michael-noll.com.