2014-07-08 9 views
11

कार्यान्वयन के लिए एक सर्वोत्तम अभ्यास की आवश्यकता है। ऑपरेटिंग वातावरण इस प्रकार है:यार्न पर अपाचे स्पार्क: बड़ी मात्रा में इनपुट डेटा फ़ाइलों (स्पार्क में एकाधिक इनपुट फाइलों को गठबंधन करें)

  • लॉग डेटा फ़ाइल अनियमित आता है।
  • लॉग डेटा फ़ाइल का आकार 3.9 केबी से 8.5 एमबी तक है। औसत लगभग 1 एमबी है।
  • डेटा फ़ाइल के रिकॉर्ड की संख्या 13 लाइनों से 22000 लाइनों तक है। औसत 2700 लाइनें है।
  • डेटा फ़ाइल को एकत्रीकरण से पहले पोस्ट-प्रोसेस किया जाना चाहिए।
  • पोस्ट-प्रोसेसिंग एल्गोरिदम बदला जा सकता है।
  • पोस्ट-प्रोसेस की गई फ़ाइल को मूल डेटा फ़ाइल के साथ अलग से प्रबंधित किया जाता है, क्योंकि पोस्ट-प्रोसेसिंग एल्गोरिदम बदला जा सकता है।
  • दैनिक एकत्रीकरण किया जाता है। सभी पोस्ट-प्रोसेस की गई डेटा फ़ाइल को रिकॉर्ड-दर-रिकॉर्ड और एकत्रीकरण (औसत, अधिकतम न्यूनतम ...) फ़िल्टर किया जाना चाहिए।
  • चूंकि एकत्रीकरण ठीक है, इसलिए एकत्रीकरण के बाद रिकॉर्ड्स की संख्या इतनी छोटी नहीं है। यह मूल रिकॉर्ड की संख्या का आधा हिस्सा हो सकता है।
  • एक बिंदु पर, के बाद संसाधित फ़ाइल की संख्या 200,000 के बारे में हो सकता है।
  • एक डेटा फ़ाइल को व्यक्तिगत रूप से हटाया जा सकता है।

एक परीक्षण में, मैं 160,000 के बाद संसाधित ग्लोब पथ के साथ() sc.textFile के साथ शुरू स्पार्क द्वारा फ़ाइलों को प्रोसेस करने की कोशिश की, यह चालक की प्रक्रिया पर OutOfMemory अपवाद के साथ विफल रहा है।

इस तरह के डेटा को संभालने का सबसे अच्छा अभ्यास क्या है? क्या मुझे पोस्ट-प्रोसेस किए गए डेटा को सहेजने के लिए सादे फाइलों के बजाय एचबीएएस का उपयोग करना चाहिए?

उत्तर

8

हमने अपना लोडर लिखा है। यह HDFS में छोटे फाइलों के साथ हमारी समस्या को हल किया। यह हैडोप CombineFileInputFormat का उपयोग करता है। हमारे मामले में यह 100000 लगभग 3000 से मानचित्रकारों की संख्या कम और काफी तेजी काम कर दिया।

https://github.com/RetailRocket/SparkMultiTool

उदाहरण:

import ru.retailrocket.spark.multitool.Loaders 
val sessions = Loaders.combineTextFile(sc, "file:///test/*") 
// or val sessions = Loaders.combineTextFile(sc, conf.weblogs(), size = 256, delim = "\n") 
// where size is split size in Megabytes, delim - line break character 
println(sessions.count()) 
+0

पर 8000 का उपयोग किया, इसे साझा करने के लिए धन्यवाद। मुझे लगता है कि आकार तर्क विशेष रूप से मूल्यवान है, क्योंकि इसे coalesce() पर निर्दिष्ट नहीं किया जा सकता है। – zeodtr

+0

यह समाधान coalesce से बेहतर है क्योंकि यह मानचित्र चरण पर काम करता है, लेकिन बाद में coalesce। –

+1

चूंकि अब हैपॉप CombineTextInputFormat (कम से कम 2.2 से) का समर्थन करता है, छोटे इनपुट फ़ाइलों को संयोजित करने के साथ sc.newAPIHadoopFile() के साथ किया जा सकता है, कस्टम क्लास को लागू किए बिना। – zeodtr

3

मैं यकीन है कि कारण आपके हो रही OOM क्योंकि बहुत से छोटे फ़ाइलों से निपटने में है हूँ। क्या आप चाहते हैं इनपुट फ़ाइलें गठबंधन करने के लिए ताकि आप इतने सारे विभाजन नहीं मिलता है। मैं अपनी नौकरियों को लगभग 10k विभाजन तक सीमित करने की कोशिश करता हूं।

textFile के बाद, आप .coalesce(10000, false) उपयोग कर सकते हैं ...% नहीं 100 सुनिश्चित करें कि हालांकि काम करेंगे क्योंकि यह कुछ समय के बाद से मैं इसे किया है हो गया है, तो कृपया मुझे बताएं। तो

sc.textFile(path).coalesce(10000, false) 
+0

धन्यवाद! मैं कोशिश करूँगा। – zeodtr

+0

यह काम किया! असल में मैंने कोलेसेस कारक 1227 का उपयोग किया, जो कि विभाजन की संख्या है जब स्पार्क बड़ी रिकॉर्ड को संसाधित करता है जिसमें पूरे रिकॉर्ड होते हैं। लेकिन नौकरी धीमी गति से चलती है (जैसा कि अपेक्षित है), और फिर भी ऐसा लगता है कि सभी फाइलों की जानकारी अभी भी ड्राइवर प्रक्रिया में स्थानांतरित की गई है, जो ओओएम का कारण बन सकती है जब बहुत सारी फाइलें शामिल होती हैं। लेकिन 168016 फाइलों के लिए ड्राइवर प्रक्रिया के लिए 1.68 जीबी इतनी खराब नहीं है। – zeodtr

+0

ठीक है, हमारे पास फाइलों की संख्या को कम करने के लिए विशेष रूप से एक अलग सरल काम है क्योंकि यह एक महत्वपूर्ण बात है। एक बार मुझे 5 में इसे चलाने के बाद 5 सबसेट – samthebest

0

आप उपयोग कर सकते हैं इस

सबसे पहले आप कर सकते हैं एक बफर/S3 पथ/की सूची HDFS या स्थानीय पथ के लिए एक ही मिलता है

आप तो अमेज़ॅन एस 3 के साथ कोशिश कर रहे हैं:

import scala.collection.JavaConverters._ 
import java.util.ArrayList 
import com.amazonaws.services.s3.AmazonS3Client 
import com.amazonaws.services.s3.model.ObjectListing 
import com.amazonaws.services.s3.model.S3ObjectSummary 
import com.amazonaws.services.s3.model.ListObjectsRequest 

def listFiles(s3_bucket:String, base_prefix : String) = { 
    var files = new ArrayList[String] 

    //S3 Client and List Object Request 
    var s3Client = new AmazonS3Client(); 
    var objectListing: ObjectListing = null; 
    var listObjectsRequest = new ListObjectsRequest(); 

    //Your S3 Bucket 
    listObjectsRequest.setBucketName(s3_bucket) 

    //Your Folder path or Prefix 
    listObjectsRequest.setPrefix(base_prefix) 

    //Adding s3:// to the paths and adding to a list 
    do { 
     objectListing = s3Client.listObjects(listObjectsRequest); 
     for (objectSummary <- objectListing.getObjectSummaries().asScala) { 
     files.add("s3://" + s3_bucket + "/" + objectSummary.getKey()); 
     } 
     listObjectsRequest.setMarker(objectListing.getNextMarker()); 
    } while (objectListing.isTruncated()); 

    //Removing Base Directory Name 
    files.remove(0) 

    //Creating a Scala List for same 
    files.asScala 
    } 

अब कोड का निम्न भाग को यह सूची वस्तु दर्रा, ध्यान दें: अनुसूचित जाति SQLContext

var df: DataFrame = null; 
    for (file <- files) { 
    val fileDf= sc.textFile(file) 
    if (df!= null) { 
     df= df.unionAll(fileDf) 
    } else { 
     df= fileDf 
    } 
    } 

अब की एक वस्तु आपके पास एक अंतिम एकीकृत RDD यानी df

वैकल्पिक है, और आप भी कर सकते हैं एक भी BigRDD

val files = sc.textFile(filename, 1).repartition(1) 

में यह पुनर्विभाजन पुनर्विभाजन हमेशा काम करता है: डी

संबंधित मुद्दे