2014-12-18 15 views
10

में मल्टी लाइन इनपुट रिकॉर्ड को कैसे संसाधित करें मेरे पास प्रत्येक फ़ाइल इनपुट फ़ाइल (बहुत बड़ी फ़ाइल) में कई पंक्तियों में फैली हुई है।स्पार्क

पूर्व:

Id: 2 
ASIN: 073870
    title: Test tile for this product 
    group: Book 
    salesrank: 168501 
    similar: 5 0738700811 1567184912 1567182813 0738700514 0738700915 
    categories: 2 
    |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Wicca[12484] 
    |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Witchcraft[12486] 
    reviews: total: 12 downloaded: 12 avg rating: 4.5 
    2001-12-16 cutomer: A11NCO6YTE4BTJ rating: 5 votes: 5 helpful: 4 
    2002-1-7 cutomer: A9CQ3PLRNIR83 rating: 4 votes: 5 helpful: 5 

कैसे पहचान करने और प्रक्रिया के लिए चिंगारी में प्रत्येक बहु लाइन रिकॉर्ड?

+1

आपका इनपुट जेएसओएन की तरह दिखता है। आप जेएसओएन को प्रति रिकॉर्ड एक रिकॉर्ड के साथ पूर्व-प्रक्रिया करना चाहते हैं, और उसके बाद 'SqlContext.jsonFile' के साथ लोड करें। – huitseeker

+1

आपको अपना खुद का हैडऑप 'इनपुटफॉर्मैट' बनाना होगा जो जानता है कि एक एकल रिकॉर्ड को विभाजित किए बिना इन मल्टीलाइन फ़ाइलों को कैसे विभाजित किया जाए। या, जैसा कि @ हूटीसेकर सुझाव देता है, आप इसे उस प्रारूप में प्रीप्रोसेस कर सकते हैं जो हैडूप पहले ही जानता है कि कैसे संभालना है। – lmm

+1

@huitseeker लेकिन यह जेएसओएन प्रारूप –

उत्तर

7

मैंने कस्टम इनपुट प्रारूप और रिकॉर्ड रीडर को लागू करके ऐसा किया है।

public class ParagraphInputFormat extends TextInputFormat { 

    @Override 
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) { 
     return new ParagraphRecordReader(); 
    } 
} 

public class ParagraphRecordReader extends RecordReader<LongWritable, Text> { 
    private long end; 
    private boolean stillInChunk = true; 

    private LongWritable key = new LongWritable(); 
    private Text value = new Text(); 

    private FSDataInputStream fsin; 
    private DataOutputBuffer buffer = new DataOutputBuffer(); 

    private byte[] endTag = "\n\r\n".getBytes(); 

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { 
     FileSplit split = (FileSplit) inputSplit; 
     Configuration conf = taskAttemptContext.getConfiguration(); 
     Path path = split.getPath(); 
     FileSystem fs = path.getFileSystem(conf); 

     fsin = fs.open(path); 
     long start = split.getStart(); 
     end = split.getStart() + split.getLength(); 
     fsin.seek(start); 

     if (start != 0) { 
      readUntilMatch(endTag, false); 
     } 
    } 

    public boolean nextKeyValue() throws IOException { 
     if (!stillInChunk) return false; 

     boolean status = readUntilMatch(endTag, true); 

     value = new Text(); 
     value.set(buffer.getData(), 0, buffer.getLength()); 
     key = new LongWritable(fsin.getPos()); 
     buffer.reset(); 

     if (!status) { 
      stillInChunk = false; 
     } 

     return true; 
    } 

    public LongWritable getCurrentKey() throws IOException, InterruptedException { 
     return key; 
    } 

    public Text getCurrentValue() throws IOException, InterruptedException { 
     return value; 
    } 

    public float getProgress() throws IOException, InterruptedException { 
     return 0; 
    } 

    public void close() throws IOException { 
     fsin.close(); 
    } 

    private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { 
     int i = 0; 
     while (true) { 
      int b = fsin.read(); 
      if (b == -1) return false; 
      if (withinBlock) buffer.write(b); 
      if (b == match[i]) { 
       i++; 
       if (i >= match.length) { 
        return fsin.getPos() < end; 
       } 
      } else i = 0; 
     } 
    } 

} 

ENDTAG प्रत्येक जानकारी के अंतिम पहचान करता है।

9

बहु लाइन डेटा एक परिभाषित रिकॉर्ड विभाजक है, तो आप बहु लाइन रिकॉर्ड के लिए Hadoop समर्थन इस्तेमाल कर सकते हैं, एक hadoop.Configuration वस्तु के माध्यम से विभाजक प्रदान:

कुछ इस तरह करना चाहिए:

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.io.{LongWritable, Text} 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat 
val conf = new Configuration 
conf.set("textinputformat.record.delimiter", "id:") 
val dataset = sc.newAPIHadoopFile("/path/to/data", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf) 
val data = dataset.map(x=>x._2.toString) 

यह आपको RDD[String] प्रदान करेगा जहां प्रत्येक तत्व एक रिकॉर्ड के अनुरूप होता है। बाद में आपको अपनी आवेदन आवश्यकताओं के बाद प्रत्येक रिकॉर्ड को पार्स करने की आवश्यकता है।