2015-01-05 18 views
5

के लिए यह बहुत धीमी है, मेरे पास स्पार्क एसक्यूएल क्वेरी के लिए 2 स्पार्क आरडीडी, डेटाआरडीडी और न्यूपेयरडाटाआरडीडी है। जब मेरा आवेदन init, डेटाआरडीडी शुरू किया जाएगा। एक निर्दिष्ट hbase इकाई में सभी डेटा डेटाआरडीडी में संग्रहीत किया जाएगा।स्पार्क आरडीडी यूनियन

जब क्लाइंट की एसक्यूएल क्वेरी आती है, तो मेरे एपीपी को नए नए अपडेट और नए पेयरडेटा डीडीडी में सम्मिलित होंगे। डेटाआरडीडी यूनियन newPairDataRDD और स्पार्क SQL संदर्भ में तालिका के रूप में पंजीकृत करें।

मुझे डेटाआरडीडी में 0 रिकॉर्ड और नए पेयरडाटाआरडी में 1 नया डाला गया रिकॉर्ड मिला। संघ के लिए इसमें 4 सेकंड लगेंगे। यह बहुत धीमा है

मुझे लगता है कि यह उचित नहीं है। कोई भी जानता है कि इसे कैसे जल्दी बनाना है? धन्यवाद

// Step1: load all data from hbase to dataRDD when initial, this only run once. 
    JavaPairRDD<String, Row> dataRDD= getAllBaseDataToJavaRDD(); 
    dataRDD.cache(); 
    dataRDD.persist(StorageLevel.MEMORY_ONLY()); 
    logger.info(dataRDD.count()); 

    // Step2: when spark sql query coming, load latest updated and inserted data from db to newPairDataRDD 

    JavaPairRDD<String, Row> newPairDataRDD = getUpdateOrInstertBaseDataToJavaRDD(); 
    // Step3: if count>0 do union and reduce 

     if(newPairDataRDD.count() > 0) { 

     JavaPairRDD<String, Row> unionedRDD =dataRDD.union(newPairDataRDD); 

    // if data was updated in DB, need to delete the old version from the dataRDD. 

     dataRDD = unionedRDD.reduceByKey(
      new Function2<Row, Row, Row>() { 
      // @Override 
      public Row call(Row r1, Row r2) { 
      return r2; 
      } 
      }); 
    } 
//step4: register the dataRDD 
JavaSchemaRDD schemaRDD = sqlContext.applySchema(dataRDD..values(), schema); 

//step5: execute sql query 
retRDD = sqlContext.sql(sql); 
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

चिंगारी वेब ui से नीचे के रूप में सरल कोड, मैं नीचे देख सकते हैं। जाहिर है यह संघ के लिए 4s जरूरत

पूरे चरणों (8)

StageId विवरण प्रस्तुत अवधि कार्य: सफल/कुल इनपुट घसीटना पढ़ें घसीटना लिखें

6 SparkPlan.scala पर इकट्ठा: 85 + विवरण 1/4 SparkSqlQueryForMarsNew.java:389+details पर/2015 08:17 2 8 अगस्त 156.0 बी

7 संघ 2015/01/04 08:17 4 8 अगस्त 64.0 बी 156.0 बी

उत्तर

1

एक अधिक कुशल आप जो चाहते हैं उसे प्राप्त करने का तरीकाका उपयोग करना हैऔर flatMapValues(), संघ का उपयोग करके dataRDD पर नए विभाजन को छोड़कर बहुत कम करता है, जिसका अर्थ है कि सभी डेटा को reduceByKey() से पहले शफ़ल किया जाना चाहिए। एक cogroup() और flatMapValues() केवल newPairDataRDD की पुनरावृत्ति का कारण बन जाएगा।

JavaPairRDD<String, Tuple2<List<Row>, List<Row>>> unionedRDD = dataRDD.cogroup(newPairDataRDD); 
JavaPairRDD<String, Row> updated = unionedRDD.flatMapValues(
    new Function<Tuple2<List<Row>, List<Row>>, Iterable<Row>>() { 
     public Iterable<Row> call(Tuple2<List<Row>, List<Row>> grouped) { 
      if (grouped._2.nonEmpty()) { 
       return grouped._2; 
      } else { 
       return grouped._1; 
      } 
     } 
    }); 

या स्काला

में
val unioned = dataRDD.cogroup(newPairDataRDD) 
val updated = unioned.flatMapValues { case (oldVals, newVals) => 
    if (newVals.nonEmpty) newVals else oldVals 
} 

अस्वीकरण, मैं जावा में चिंगारी लेखन के लिए इस्तेमाल नहीं कर रहा हूँ! अगर कोई उपरोक्त गलत है तो कृपया मुझे सही करें!

0

अपने RDDs repartitioning का प्रयास करें:

JavaPairRDD unionedRDD = dataRDD.repartition (sc.defaultParallelism * 3) .union (newPairDataRDD.repartition (sc.defaultParallelism * 3));

संबंधित मुद्दे