2015-08-22 14 views
6

में लकड़ी के मेटाडेटा सारांश को अक्षम करें मेरे पास एक स्पार्क जॉब (1.4.1 के लिए) काफका घटनाओं की धारा प्राप्त है। मैं उन्हें लगातार टैचियन पर लकड़ी के रूप में सहेजना चाहता हूं।स्पार्क

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) 

lines.window(Seconds(1), Seconds(1)).foreachRDD { (rdd, time) => 
    if (rdd.count() > 0) { 
    val mil = time.floor(Duration(86400000)).milliseconds 
    hiveContext.read.json(rdd).toDF().write.mode(SaveMode.Append).parquet(s"tachyon://192.168.1.12:19998/persisted5$mil") 
    hiveContext.sql(s"CREATE TABLE IF NOT EXISTS persisted5$mil USING org.apache.spark.sql.parquet OPTIONS (path 'tachyon://192.168.1.12:19998/persisted5$mil')") 
    } 
} 

लेकिन मुझे लगता है कि के रूप में समय पर चला जाता है, हर छत लिखने पर, चिंगारी प्रत्येक 1 सेकंड छत भागों, जो मिलता माध्यम से चला जाता धीमी और धीमी

15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-db03b24d-6f98-4b5d-bb40-530f35b82633.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-3a7857e2-0435-4ee0-ab2c-6d40224f8842.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-47ff2ac1-da00-4473-b3f7-52640014bc5b.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-61625436-7353-4b1e-bb8d-e8afad3a582e.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-e711aa9a-9bf5-41d5-8523-f5edafa69626.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-4e0cca38-cf75-4771-8965-20a30c863100.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-d1510ed4-2c99-43e2-b3d1-38d3d54e626d.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-022d1918-392d-433f-a7f4-074e46b4460f.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-cf71f5d2-ba0e-4729-9aa1-41dad5d1d08f.gz.parquet, 65536) 
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-ce990b1e-82cc-4feb-a162-ac3ddc275609.gz.parquet, 65536) 

मैं निष्कर्ष है कि यह है के लिए आया था सारांश डेटा के अपडेट के कारण, मेरा मानना ​​है कि स्पार्क उनका उपयोग नहीं करता है। तो मैं इसे

parquet sources को अक्षम करना चाहता हूं कि मुझे "parquet.enable.summary-metadata" को गलत पर सेट करने में सक्षम होना चाहिए।

अब, मैं इस तरह यह स्थापित करने की कोशिश की है, है ना hiveContext

hiveContext.sparkContext.hadoopConfiguration.setBoolean("parquet.enable.summary-metadata", false) 
hiveContext.sparkContext.hadoopConfiguration.setInt("parquet.metadata.read.parallelism", 10) 

लेकिन सफलता नहीं मिली बनाने के बाद, मैं भी अभी भी मिल लॉग 5 (डिफ़ॉल्ट) की एक समानांतरवाद दिखा।

लकड़ी के साथ स्पार्क में सारांश डेटा को अक्षम करने का सही तरीका क्या है?

उत्तर

9

पाठ के रूप में "parquet.enable.summary-metadata" सेटिंग ("झूठी" और झूठी नहीं) हमारे लिए काम करने लगती है।

रास्ता स्पार्क तक _common_metadata फ़ाइल का उपयोग (हम कॉपी कि दोहराव नौकरियों के लिए मैन्युअल रूप से अधिक)

+0

यदि मैं ShemaMerging अक्षम करता हूं, तो मेटाडेटा अभी भी उपयोग किया जाता है? लगता है कि लाइनलाइन के साथ ठीक काम करता है। स्पार्क 1.3 में – Pierre

+0

और 1.4.1 में यह अभी भी _common_metadata (शायद वह एक बग) की तलाश में है –

+0

मेटाडेटा गुम होने पर रीफ्रेश होने पर मुझे भाग पाद लेख पढ़ने के लिए फ़ॉलबैक लगता है। लेकिन काम करता है (अभी भी धीरे-धीरे, अनिवार्य रूप से ताज़ा करने के लिए समस्या को ताज़ा करने के लिए स्थानांतरित कर दिया गया है) – Pierre

7

स्पार्क 2.0 किसी भी अधिक डिफ़ॉल्ट रूप से मेटाडाटा के सारांश को बचाने नहीं करता है, SPARK-15719 देख करता है।

यदि आप एस 3 में होस्ट किए गए डेटा के साथ काम कर रहे हैं, तो आप अभी भी लकड़ी की छत के प्रदर्शन को अपने स्कीमा की जांच करने के लिए सभी वस्तुओं की पूंछ को स्कैन करने की कोशिश कर रहे हैं। इसे स्पष्ट रूप से

sparkConf.set("spark.sql.parquet.mergeSchema", "false")