2017-10-21 38 views
5

मान लिया जाये कि हम निम्न पाठ फ़ाइल (df.show() आदेश के उत्पादन में) है:कैसे वापस एक डेटासेट के लिए शो ऑपरेटर के उत्पादन में पढ़ने के लिए?

+----+---------+--------+ 
|col1|  col2| col3| 
+----+---------+--------+ 
| 1|pi number|3.141592| 
| 2| e number| 2.71828| 
+----+---------+--------+ 

अब मैं पढ़ने/एक DataFrame/डेटासेट के रूप में यह पार्स करने के लिए चाहता हूँ। सबसे "स्पार्कलिंग" तरीका यह है क्या है?

पेज। मैं दोनोंscala और pyspark के लिए समाधान में दिलचस्पी रखता हूँ, यही कारण है कि दोनों टैग का उपयोग कर रहे हैं।

उत्तर

4

अद्यतन:

स्काला:

// read Spark Output Fixed width table: 
def readSparkOutput(filePath: String) : org.apache.spark.sql.DataFrame = { 
    val t = spark.read 
       .option("header","true") 
       .option("inferSchema","true") 
       .option("delimiter","|") 
       .option("parserLib","UNIVOCITY") 
       .option("ignoreLeadingWhiteSpace","true") 
       .option("ignoreTrailingWhiteSpace","true") 
       .option("comment","+") 
       .csv(filePath) 
    t.select(t.columns.filterNot(_.startsWith("_c")).map(t(_)):_*) 
} 

PySpark:

def read_spark_output(file_path): 
    t = spark.read \ 
      .option("header","true") \ 
      .option("inferSchema","true") \ 
      .option("delimiter","|") \ 
      .option("parserLib","UNIVOCITY") \ 
      .option("ignoreLeadingWhiteSpace","true") \ 
      .option("ignoreTrailingWhiteSpace","true") \ 
      .option("comment","+") \ 
      .csv("file:///tmp/spark.out") 
    # select not-null columns 
    return t.select([c for c in t.columns if not c.startswith("_")]) 
"UNIVOCITY" पार्सर lib मैं एक पंक्ति मैं कहाँ स्तंभ नाम में व्हाइटस्पेस को हटाने गया था से छुटकारा पाने सकता है का उपयोग करते हुए

उपयोग उदाहरण:

scala> val df = readSparkOutput("file:///tmp/spark.out") 
df: org.apache.spark.sql.DataFrame = [col1: int, col2: string ... 1 more field] 

scala> df.show 
+----+---------+--------+ 
|col1|  col2| col3| 
+----+---------+--------+ 
| 1|pi number|3.141592| 
| 2| e number| 2.71828| 
+----+---------+--------+ 


scala> df.printSchema 
root 
|-- col1: integer (nullable = true) 
|-- col2: string (nullable = true) 
|-- col3: double (nullable = true) 

पुराना जवाब:

यहाँ स्केला में मेरे प्रयास (स्पार्क 2.2) है:

// read Spark Output Fixed width table: 
val t = spark.read 
    .option("header","true") 
    .option("inferSchema","true") 
    .option("delimiter","|") 
    .option("comment","+") 
    .csv("file:///temp/spark.out") 
// select not-null columns 
val cols = t.columns.filterNot(c => c.startsWith("_c")).map(a => t(a)) 
// trim spaces from columns 
val colsTrimmed = t.columns.filterNot(c => c.startsWith("_c")).map(c => c.replaceAll("\\s+","")) 
// reanme columns using 'colsTrimmed' 
val df = t.select(cols:_*).toDF(colsTrimmed:_*) 

यह काम करता है, लेकिन मुझे लगता है कि वहाँ बहुत किया जाना चाहिए और अधिक सुरुचिपूर्ण तरीका यह है।

scala> df.show 
+----+---------+--------+ 
|col1|  col2| col3| 
+----+---------+--------+ 
| 1.0|pi number|3.141592| 
| 2.0| e number| 2.71828| 
+----+---------+--------+ 

scala> df.printSchema 
root 
|-- col1: double (nullable = true) 
|-- col2: string (nullable = true) 
|-- col3: double (nullable = true) 
+0

मैं हमेशा कस्टम स्पार्क स्रोत लिखने के बारे में सोच रहा हूं, लेकिन आपका समाधान बस चालाक है! धन्यवाद। –

+1

@JacekLaskowski, नहीं, धन्यवाद !!! मैं अपने [मास्टरिंग अपाचे स्पार्क 2] (https://www.gitbook.com/book/jaceklaskowski/mastering-apache-spark/details) और अपने जवाब से से बहुत कुछ सीख रहा हूँ। – MaxU

संबंधित मुद्दे