2014-07-18 12 views
5

में NullPointerException मैं स्पार्क-एसक्यूएल का उपयोग कर एक सामान्य पैरामीटर पर दो फाइलों में शामिल होने के लिए एक प्रोग्राम लिख रहा हूं। मुझे लगता है कि मेरा कोड ठीक है लेकिन जब मैं इसे टेक्स्ट फ़ाइल के रूप में सहेजने की कोशिश कर रहा हूं तो मुझे त्रुटियां मिल रही हैं। मैं अपने कोड डाल रहा हूँ के रूप में नीचे: -स्पार्क-एसक्यूएल

import java.util.regex.Pattern; 

import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.sql.api.java.JavaSQLContext; 
import org.apache.spark.sql.api.java.JavaSchemaRDD; 



import java.io.Serializable; 


public class JoinCSV { 
    @SuppressWarnings("serial") 
    public static class CompleteSample implements Serializable { 
     private String ASSETNUM; 
     private String ASSETTAG; 
     private String CALNUM; 



     public String getASSETNUM() { 
      return ASSETNUM; 
     } 
     public void setASSETNUM(String aSSETNUM) { 
      ASSETNUM = aSSETNUM; 
     } 
     public String getASSETTAG() { 
      return ASSETTAG; 
     } 
     public void setASSETTAG(String aSSETTAG) { 
      ASSETTAG = aSSETTAG; 
     } 
     public String getCALNUM() { 
      return CALNUM; 
     } 
     public void setCALNUM(String cALNUM) { 
      CALNUM = cALNUM; 
     } 


     } 

    @SuppressWarnings("serial") 
    public static class ExtendedSample implements Serializable { 

     private String ASSETNUM; 
     private String CHANGEBY; 
     private String CHANGEDATE; 


     public String getASSETNUM() { 
      return ASSETNUM; 
     } 
     public void setASSETNUM(String aSSETNUM) { 
      ASSETNUM = aSSETNUM; 
     } 
     public String getCHANGEBY() { 
      return CHANGEBY; 
     } 
     public void setCHANGEBY(String cHANGEBY) { 
      CHANGEBY = cHANGEBY; 
     } 
     public String getCHANGEDATE() { 
      return CHANGEDATE; 
     } 
     public void setCHANGEDATE(String cHANGEDATE) { 
      CHANGEDATE = cHANGEDATE; 
     } 
    } 

    private static final Pattern comma = Pattern.compile(","); 
    @SuppressWarnings("serial") 
    public static void main(String[] args) throws Exception { 
     String path="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv"; 
     String path1="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv"; 

      JavaSparkContext ctx = new JavaSparkContext("local[2]", "JavaSparkSQL"); 
      JavaSQLContext sqlCtx = new JavaSQLContext(ctx); 

      JavaRDD<CompleteSample> cs = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv").map(
        new Function<String, CompleteSample>() { 
        public CompleteSample call(String line) throws Exception { 
         String[] parts = line.split(","); 

         CompleteSample cs = new CompleteSample(); 
         cs.setASSETNUM(parts[0]); 
         cs.setASSETTAG(parts[1]); 
         cs.setCALNUM(parts[2]); 

         return cs; 
        } 
        }); 

      JavaRDD<ExtendedSample> es = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv").map(
        new Function<String, ExtendedSample>() { 
        public ExtendedSample call(String line) throws Exception { 
         String[] parts = line.split(","); 

         ExtendedSample es = new ExtendedSample(); 
         es.setASSETNUM(parts[0]); 
         es.setCHANGEBY(parts[1]); 
         es.setCHANGEDATE(parts[2]); 

         return es; 
        } 
        }); 

      JavaSchemaRDD complete = sqlCtx.applySchema(cs, CompleteSample.class); 
      complete.registerAsTable("cs"); 

      JavaSchemaRDD extended = sqlCtx.applySchema(es, ExtendedSample.class); 
      extended.registerAsTable("es"); 

      JavaSchemaRDD fs= sqlCtx.sql("SELECT ASSETTAG, CALNUM FROM cs INNER JOIN es ON cs.ASSETNUM=es.ASSETNUM;"); 
      fs.saveAsTextFile("result");     //Here I am getting error 
    } 

} 

और मेरी त्रुटियों के रूप में नीचे हैं: -

14/07/19 00:40:13 INFO TaskSchedulerImpl: Cancelling stage 0 
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception failure in TID 4 on host localhost: java.lang.NullPointerException 
      java.lang.ProcessBuilder.start(Unknown Source) 
      org.apache.hadoop.util.Shell.runCommand(Shell.java:404) 
      org.apache.hadoop.util.Shell.run(Shell.java:379) 
      org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) 
      org.apache.hadoop.util.Shell.execCommand(Shell.java:678) 
------------ 
------------ 

और

14/07/19 00:40:11 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path 
    java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. 
     at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278) 
     at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300) 
     at org.apache.hadoop.util.Shell.<clinit>(Shell.java:293) 
     at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76) 
     at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362) 
     at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546) 
     at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546) 
----------------- 
----------------- 

यह दूसरा त्रुटि हर जगह आ रहा है कि क्या मैं चिंगारी का उपयोग कर रहा है , स्पार्क-एसक्यूएल या स्पार्क-स्ट्रीमिंग। मुझे कोई संकेत नहीं है कि यह त्रुटि क्या है। लेकिन ऐसा लगता है कि इस दूसरी त्रुटि का कोड पर कोई प्रभाव नहीं पड़ता है क्योंकि इस त्रुटि के बाद भी परिणाम ठीक से बाहर आने के लिए उपयोग करते हैं। लेकिन जब भी आप कोई प्रोग्राम चलाते हैं, तब भी एक अज्ञात त्रुटि देखने के लिए यह बहुत परेशान है।

क्या कोई इस मुद्दे को समझने में मेरी मदद कर सकता है? मैं इस बहुत बुरी तरह से अटक गया हूँ। धन्यवाद

+0

क्या आपको लिनक्स पर एक ही त्रुटि मिलती है? –

+0

नहीं, यहां मैं विंडोज़ ओएस पर स्थानीय फाइल सिस्टम पर फाइल को सहेजने के लिए "saveAsTextFile" का उपयोग करने की कोशिश कर रहा हूं। स्थानीय फाइल सिस्टम के लिए यहां "saveAs" विकल्प में से कोई भी काम नहीं कर रहा है। हालांकि एचडीएफएस पर फ़ाइलों को सहेजते समय ये विकल्प बहुत पूरी तरह से काम कर रहे हैं। –

+1

मुझे विंडोज-विशिष्ट समस्या की तरह दिखता है। मुझे नहीं पता कि इसे कैसे ठीक किया जाए, लेकिन अगर यह सिर्फ स्थानीय बचत है, तो आप समस्या के आसपास काम कर सकते हैं। 'RDD.collect() 'के साथ डेटा प्राप्त करें, फिर इसे नियमित जावा' FileOutputStream' के माध्यम से सहेजें। –

उत्तर

7

विंडोज पर rdd.saveAsTextFile() त्रुटि के लिए एक काम है। यह SparkException और IOException त्रुटियों दोनों को ठीक करता है कि मैं स्थानीय मोड में विंडोज 8.1 पर स्पार्क v1.1.0 के साथ भी अनुभव कर रहा था।

http://qnalist.com/questions/4994960/run-spark-unit-test-on-windows-7

यहाँ उस लिंक से कदम हैं:

1) download compiled winutils.exe;

2) इसे कहीं c:\winutil\bin जैसा रखें; System.setProperty("hadoop.home.dir", "c:\\winutil\\")

आशा यह आपके लिए काम करता है:

3) अपने कोड में यह पंक्ति जोड़।

+1

इस शानदार समाधान प्रदान करने के लिए आपको बहुत अधिक धन्यवाद। मैंने Winutils WARN संदेश को अनदेखा कर दिया है जो saveTextAsFile से संबंधित नहीं है। अब जब मैंने आपके कदमों का पालन किया है तो मेरा आवेदन अंत तक सुचारु रूप से काम करता है। – florins

+0

@ फ्लोरिन्स कोई चिंता नहीं, खुशी है कि यह आपके लिए काम करता है। –