2013-09-05 8 views
6

शामिल करने के लिए अनुक्रम FileInputFormat को विस्तारित करें, मैं एक कस्टम इनपुट फ़ॉर्मेट बनाने में सक्षम होना चाहता हूं जो अनुक्रम फ़ाइलों को पढ़ता है, लेकिन इसके अतिरिक्त फ़ाइल पथ का खुलासा करता है और उस फ़ाइल के भीतर ऑफ़सेट होता है जहां रिकॉर्ड स्थित होता है।फ़ाइल नाम + ऑफसेट

एक कदम वापस लेने के लिए, यहां उपयोग केस है: मेरे पास एक अनुक्रम फ़ाइल है जिसमें विविध आकार का डेटा है। चाबियाँ अधिकतर अप्रासंगिक होती हैं, और मान दो मेगाबाइट तक होते हैं जिनमें विभिन्न प्रकार के विभिन्न फ़ील्ड होते हैं। मैं इन क्षेत्रों में से कुछ को फ़ाइल नाम और ऑफसेट के साथ elasticsearch में इंडेक्स करना चाहता हूं। इस तरह, मैं elasticsearch से उन क्षेत्रों से पूछताछ कर सकता हूं, और फिर अनुक्रम फ़ाइल पर वापस जाने के लिए फ़ाइल नाम और ऑफसेट का उपयोग कर सकता हूं और ES में पूरी चीज़ को संग्रहीत करने के बजाय मूल रिकॉर्ड प्राप्त कर सकता हूं।

मेरे पास यह पूरी प्रक्रिया एक जावा प्रोग्राम के रूप में काम कर रही है। SequenceFile.Reader क्लास यह होने के लिए getPosition और seek विधियों को आसानी से देता है।

हालांकि, अंत में डेटा के कई टेराबाइट शामिल होंगे, इसलिए मुझे इसे मैपरेडस जॉब (शायद मानचित्र-केवल) में परिवर्तित करने की आवश्यकता होगी। चूंकि अनुक्रम फ़ाइल में वास्तविक कुंजी अप्रासंगिक हैं, जिस दृष्टिकोण को मैंने लेने की आशा की थी वह एक कस्टम इनपुट फ़ॉर्मैट बनाना होगा जो किसी भी तरह से SquenceFileInputFormat का उपयोग करता है या वास्तविक कुंजी को वापस करने के बजाय, फ़ाइल के साथ एक समग्र कुंजी देता है और ऑफसेट।

हालांकि, यह अभ्यास में और अधिक कठिन साबित हो रहा है। ऐसा लगता है कि यह संभव होना चाहिए, लेकिन वास्तविक एपीआई और क्या खुलासा किया गया है, यह मुश्किल है। कोई विचार? शायद मुझे एक वैकल्पिक दृष्टिकोण लेना चाहिए?

उत्तर

5

यदि किसी को भी ऐसी ही समस्या का सामना करना पड़ता है, तो यहां समाधान है जिसके साथ मैं आया हूं। मैं बस अनुक्रम FileInputFormat/RecordReader में कुछ कोड डुप्लिकेट कर रहा हूं और बस इसे संशोधित कर रहा हूं। मैं या तो एक उपवर्ग या एक डेकोरेटर या कुछ लिखने के लिए आशा व्यक्त की थी ... इस तरह से सुंदर नहीं है, लेकिन यह काम करता है:

SequenceFileOffsetInputFormat.java:

import java.io.IOException; 
import java.util.List; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.SequenceFile; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 

public class SequenceFileOffsetInputFormat<V extends Writable> extends FileInputFormat<PathOffsetWritable, V> { 

    private static class SequenceFileOffsetRecordReader<V extends Writable> extends RecordReader<PathOffsetWritable, V> { 

     private SequenceFile.Reader in; 
     private long start; 
     private long end; 
     private boolean more = true; 
     private PathOffsetWritable key = null; 
     private Writable k = null; 
     private V value = null; 
     private Configuration conf; 

     @Override 
     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 
      FileSplit fileSplit = (FileSplit) split; 
      conf = context.getConfiguration(); 
      Path path = fileSplit.getPath(); 
      FileSystem fs = path.getFileSystem(conf); 
      this.in = new SequenceFile.Reader(fs, path, conf); 
      try { 
       this.k = (Writable) in.getKeyClass().newInstance(); 
       this.value = (V) in.getValueClass().newInstance(); 
      } catch (InstantiationException e) { 
       throw new IOException(e); 
      } catch (IllegalAccessException e) { 
       throw new IOException(e); 
      } 
      this.end = fileSplit.getStart() + fileSplit.getLength(); 

      if (fileSplit.getStart() > in.getPosition()) { 
       in.sync(fileSplit.getStart()); 
      } 

      this.start = in.getPosition(); 
      more = start < end; 

      key = new PathOffsetWritable(path, start); 
     } 

     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
      if (!more) { 
       return false; 
      } 
      long pos = in.getPosition(); 

      more = in.next(k, value); 
      if (!more || (pos >= end && in.syncSeen())) { 
       key = null; 
       value = null; 
       more = false; 
      } else { 
       key.setOffset(pos); 
      } 
      return more; 
     } 

     @Override 
     public PathOffsetWritable getCurrentKey() { 
      return key; 
     } 

     @Override 
     public V getCurrentValue() { 
      return value; 
     } 

     @Override 
     public float getProgress() throws IOException, InterruptedException { 
      if (end == start) { 
       return 0.0f; 
      } else { 
       return Math.min(1.0f, (in.getPosition() - start)/(float)(end - start)); 
      } 
     } 

     @Override 
     public void close() throws IOException { 
      in.close(); 
     } 

    } 

    @Override 
    public RecordReader<PathOffsetWritable, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 
     return new SequenceFileOffsetRecordReader<V>(); 
    } 

    @Override 
    public List<InputSplit> getSplits(JobContext context) throws IOException { 
     return new SequenceFileInputFormat<PathOffsetWritable, V>().getSplits(context); 
    } 

    @Override 
    public long getFormatMinSplitSize() { 
     return SequenceFile.SYNC_INTERVAL; 
    } 


} 

PathOffsetWritable.java:

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparable; 

public class PathOffsetWritable implements WritableComparable<PathOffsetWritable> { 

    private Text t = new Text(); 
    private Path path; 
    private long offset; 

    public PathOffsetWritable(Path path, long offset) { 
     this.path = path; 
     this.offset = offset; 
    } 

    public Path getPath() { 
     return path; 
    } 

    public long getOffset() { 
     return offset; 
    } 

    public void setPath(Path path) { 
     this.path = path; 
    } 

    public void setOffset(long offset) { 
     this.offset = offset; 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     t.readFields(in); 
     path = new Path(t.toString()); 
     offset = in.readLong(); 
    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     t.set(path.toString()); 
     t.write(out); 
     out.writeLong(offset); 
    } 

    @Override 
    public int compareTo(PathOffsetWritable o) { 
     int x = path.compareTo(o.path); 
     if (x != 0) { 
      return x; 
     } else { 
      return Long.valueOf(offset).compareTo(Long.valueOf(o.offset)); 
     } 
    } 


} 
संबंधित मुद्दे