2017-03-17 7 views
6

this question की निरंतरता के रूप में, क्या आप कृपया मुझे बता सकते हैं कि SparkContext.setLocalProperties से मैं कौन से गुण बदल सकता हूं? क्या मैं कोर, रैम इत्यादि बदल सकता हूं?स्पार्ककॉन्टेक्स्ट सेट लोकलप्रॉपर्टीज

+0

मुझे लगता है कि इस भाग को आपको उपलब्ध गुण देना चाहिए: http://spark.apache.org/docs/latest/configuration.html#available-properties – Adonis

+0

@asettouf का मतलब है कि मैं दौड़ सकता हूं स्टैंडअलोन क्लस्टर मैनेजर पर एक साथ विभिन्न गुणों के साथ कई नौकरियां? –

+0

मेरी समझ से, यह आपके चल रहे थ्रेड पर गुण सेट करता है, इसलिए सैद्धांतिक रूप से एक नया थ्रेड स्पैन करना संभव हो सकता है, उसी स्पार्ककॉन्टेक्स्ट का उपयोग कर सकते हैं, और विभिन्न स्थानीय गुणों को सेट कर सकते हैं (कई स्पार्क कॉन्टेक्स्ट को बनाने और बनाने के लिए देखें), दस्तावेज़ के रूप में ऐसा लगता है (मैंने परीक्षण नहीं किया था इसलिए मैं पुष्टि नहीं कर सकता कि यह काम करेगा) http://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/SparkContext.html#setLocalProperty(java .lang.String,% 20java.lang.String) – Adonis

उत्तर

3

प्रति प्रलेखन वर्णन localProperties के रूप में एक SparkContext कि गुण जिसके माध्यम से आप तार्किक काम समूह बना सकते हैं कर रहे हैं की एक protected[spark] संपत्ति है। दूसरी ओर वे Inheritable धागे-स्थानीय चर हैं। जिसका अर्थ यह है कि वे सामान्य थ्रेड-स्थानीय चर के वरीयता में उपयोग किए जाते हैं जब परिवर्तनीय में प्रति-थ्रेड-एट्रिब्यूट बनाए रखा जाना चाहिए स्वचालित रूप से बनाए गए किसी भी बच्चे धागे को प्रेषित किया जाना चाहिए। SparkContext को चलाने के लिए अनुरोध किया जाता है जब स्थानीय गुणों को श्रमिकों को प्रसारित करना शुरू होता है या एक स्पार्क नौकरी जमा करें जो बदले में उन्हें DAGScheduler पर भेज देती है।

और सामान्य Local properties में spark.scheduler.pool प्रति-धागा संपत्ति द्वारा मेला काम अनुसूचक में पूल में समूह नौकरियों के लिए प्रयोग किया जाता है और विधि SQLExecution.withNewExecutionId में spark.sql.execution.id स्थापित करने के लिए।

मेरे पास स्टैंडअलोन स्पार्क क्लस्टर में थ्रेड-स्थानीय गुणों को असाइन करने का कोई अनुभव नहीं है। कोशिश करने और इसे जांचने के लायक है।

+0

विवरण के लिए धन्यवाद! मैं कुछ दिनों में कोशिश करूँगा –

3

मैं संपत्ति spark.executor.memory (उपलब्ध संपत्तियों here कर रहे हैं),, और वास्तव में एक बहुत ही सरल स्थानीय स्पार्क, कोड के साथ दो धागे विभिन्न सेटिंग्स के साथ प्रत्येक धागे तक ही सीमित हो रहे हैं, शुरू करने पर (शायद के साथ कुछ परीक्षण किए गए इस पोस्ट के अंत में, आप इस कोड के अंत में उत्पादन में तैनात नहीं होंगे) यह सुनिश्चित करने के लिए धागे की कुछ अंतःक्रिया कर रहे हैं कि यह कुछ बेहद शेड्यूलिंग किस्मत के माध्यम से नहीं है, मुझे निम्न आउटपुट (मेरे कंसोल पर स्पार्क आउटपुट की सफाई) प्राप्त होता है:

Thread 1 Before sleeping mem: 512 
Thread 2 Before sleeping mem: 1024 
Thread 1 After sleeping mem: 512 
Thread 2 After sleeping mem: 1024 

थ्रेड में घोषित संपत्ति का निरीक्षण करने के लिए बहुत साफ है, थ्रेड के अंदर रहता है, हालांकि मुझे पूरा यकीन है कि यह आसानी से बकवास हो सकता है ट्यूशन, इसलिए मैं अभी भी ऐसी तकनीकों को लागू करने से पहले सावधानी बरतूँगा।

public class App { 
    private static JavaSparkContext sc; 
    public static void main(String[] args) { 
     SparkConf conf = new SparkConf().setMaster("local") 
       .setAppName("Testing App"); 
     sc = new JavaSparkContext(conf); 
     SparkThread Thread1 = new SparkThread(1); 
     SparkThread Thread2 = new SparkThread(2); 
     ExecutorService executor = Executors.newFixedThreadPool(2); 
     Future ThreadCompletion1 = executor.submit(Thread1); 
     try { 
      Thread.sleep(5000); 
     } catch (InterruptedException e1) { 
      // TODO Auto-generated catch block 
      e1.printStackTrace(); 
     } 
     Future ThreadCompletion2 = executor.submit(Thread2); 
     try { 
      ThreadCompletion1.get(); 
      ThreadCompletion2.get(); 
     } catch (InterruptedException | ExecutionException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

    } 

    private static class SparkThread implements Runnable{ 
     private int i = 1; 
     public SparkThread(int i) { 
      this.i = i; 

     } 
     @Override 
     public void run() { 
      int mem = 512; 
      sc.setLocalProperty("spark.executor.memory", Integer.toString(mem * i)); 
      JavaRDD<String> input = sc.textFile("test" + i); 

      FlatMapFunction<String, String> tt = s -> Arrays.asList(s.split(" ")) 
        .iterator(); 
      JavaRDD<String> words = input.flatMap(tt); 
      System.out.println("Thread " + i + " Before sleeping mem: " + sc.getLocalProperty("spark.executor.memory")); 

      try { 
       Thread.sleep(7000); 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      //do some work 
      JavaPairRDD<String, Integer> counts = words.mapToPair(t -> new Tuple2(t, 1)) 
        .reduceByKey((x, y) -> (int) x + (int) y); 

      counts.saveAsTextFile("output" + i); 
      System.out.println("Thread " + i + " After sleeping mem: " + sc.getLocalProperty("spark.executor.memory")); 
     } 

    } 
} 
0

LocalProperties निष्पादकों के लिए ड्राइवर से (उपयोगकर्ता परिभाषित) विन्यास पारित करने के लिए एक आसान तंत्र प्रदान करते हैं। आप उन्हें एक्सेस करने के लिए निष्पादक पर कार्य कॉन्टेक्स्ट का उपयोग कर सकते हैं। इसका एक उदाहरण SQL Execution ID

+0

अधिक जानकारी। क्या आप कृपया मुझे एक उदाहरण प्राप्त कर सकते हैं कि अलग-अलग नौकरियों के लिए उसी सत्र में कोर/रैम कैसे बदला जाए? –

+0

@VolodymyrBakhmatiuk यकीन नहीं है कि मैं स्पार्क गुणों के मुताबिक, आपके प्रश्न को समझता हूं, आप ड्राइवर कोर और मेमोरी को संशोधित कर सकते हैं, साथ ही प्रति नौकरी निष्पादक स्मृति को संशोधित कर सकते हैं, जब तक कि आप उन्हें अलग-अलग धागे में सही ढंग से encapsulate। क्या मुझसे कोई चूक हो रही है? – Adonis

+0

@asettouf मैंने अभी सोचा कि कैसे हरमन का विचार आपके से अलग है –

संबंधित मुद्दे