2016-10-20 4 views
7

मैं स्पार्क 1.6.0 और स्कैला का उपयोग करता हूं।डेटाफ्रेम को संपीड़ित (gzipped) CSV के रूप में कैसे सहेजना है?

मैं डेटाफ्रेम को संकुचित सीएसवी प्रारूप के रूप में सहेजना चाहता हूं।

//set the conf to the codec I want 
sc.getConf.set("spark.hadoop.mapred.output.compress", "true") 
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "true") 
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") 
sc.getConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK") 

df.write 
    .format("com.databricks.spark.csv") 
    .save(my_directory) 

उत्पादन gz स्वरूप में नहीं है:

यहाँ मैं अब तक है (मान मैं पहले से ही SparkContext रूप df और sc है)।

+0

आरडीडी के बारे में संबंधित प्रश्न: http://stackoverflow.com/questions/32231650/spark-rdd-saveastextfile-gzip –

उत्तर

4

चिंगारी से सीएसवी GitHub पर: https://github.com/databricks/spark-csv

एक पढ़ सकते हैं:

codec: संपीड़न कोडेक जब दाखिल करने के लिए बचत का उपयोग करें। Org.apache.hadoop.io.compress.CompressionCodec या केस-असंवेदनशील शॉर्ट नामों में से एक (bzip2, gzip, lz4, और snappy) को लागू करने वाले वर्ग का पूर्णतः योग्य नाम होना चाहिए। कोडेक निर्दिष्ट नहीं होने पर कोई संपीड़न के लिए डिफ़ॉल्ट।

आपके मामले में, यह काम करना चाहिए: df.write.format("com.databricks.spark.csv").codec("gzip")\ .save('my_directory/my_file.gzip')

10

इस कोड स्पार्क 2.1, जहां .codec उपलब्ध नहीं है के लिए काम करता है।

df.write 
    .format("com.databricks.spark.csv") 
    .option("codec", "org.apache.hadoop.io.compress.GzipCodec") 
    .save(my_directory) 

स्पार्क 2.2 के लिए, आप df.write.csv(...,codec="gzip") विकल्प यहाँ वर्णित का उपयोग कर सकते हैं: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=codec

+1

हालांकि यह कोड प्रश्न का उत्तर दे सकता है, इस सवाल का जवाब दे सकता है कि क्यों और/या यह कोड प्रश्न का उत्तर कैसे देता है अपने दीर्घकालिक मूल्य में सुधार करता है। – manniL

+0

"जेसन" प्रारूप का उपयोग करने के मामले में, संपीड़न – Disha

+0

नहीं उठाया जाता है ऐसा लगता है कि कीवर्ड तर्क को 'संपीड़न' में बदल दिया गया है। https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=codec#pyspark.sql.DataFrameWriter.csv – volker238

4
स्पार्क के साथ

2.0+, इस बन गया है एक सरल सा:

df.write.csv("path", compression="gzip") 

आप की जरूरत नहीं है बाहरी डाटाबेस सीएसवी पैकेज अब और।

csv() लेखक कई आसान विकल्पों का समर्थन करता है। उदाहरण के लिए:

  • sep: विभाजक चरित्र सेट करने के लिए।
  • quote: मूल्यों को उद्धृत करने के लिए और कैसे करें।
  • header: हेडर लाइन को शामिल करना है या नहीं।

वहाँ भी gzip करने के लिए अन्य संपीड़न कोडेक आप उपयोग कर सकते हैं, इसके अलावा में की एक संख्या हैं:

  • bzip2
  • lz4
  • snappy
  • deflate

csv() लेखक के लिए पूर्ण स्पार्क डॉक्स यहां हैं: Python/Scala

+0

सीएसवी लेखक दस्तावेज़ों को जोड़ने के लिए धन्यवाद, और केवल डाटाबेस नहीं दे रहा है का जवाब! –

+0

@LaurensKoppenol - ठीक है, निष्पक्ष होने के लिए, स्पार्क में जोड़ा गया सीएसवी समर्थन मूल रूप से स्वीकृत उत्तर में बाहरी डाटाबेसिक्स सीएसवी पैकेज [लिंक] [https://github.com/databricks/spark-csv) के रूप में शुरू हुआ। :) वह पैकेज किसी भी स्पार्क उपयोगकर्ता के उपयोग के लिए उपलब्ध है, लेकिन स्पार्क 2.0 के साथ शुरू करने के लिए आपको अब इसकी आवश्यकता नहीं है। –

1

सीएसवी हेडर के साथ फाइल लिखने और .csv.gzip को

DF.coalesce(1).write.format("com.databricks.spark.csv").mode("overwrite") 
.option("header","true") 
.option("codec",org.apache.hadoop.io.compress.GzipCodec").save(tempLocationFileName) 

copyRename(tempLocationFileName, finalLocationFileName) 

def copyRename(srcPath: String, dstPath: String): Unit = { 
    val hadoopConfig = new Configuration() 
    val hdfs = FileSystem.get(hadoopConfig) 
    FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
    // the "true" setting deletes the source files once they are merged into the new output 
} 

ऐसा न करने पर भाग-000 फ़ाइल का नाम बदलने के लिए हेडर की आवश्यकता है और फिर इसे गलत पर सेट करें और आपको कोलेसे को भी करने की आवश्यकता नहीं होगी। यह भी लिखना तेज़ होगा।

संबंधित मुद्दे