मैंने कस्टम इनपुट प्रारूप और रिकॉर्ड रीडर को लागू करके ऐसा किया है।
public class ParagraphInputFormat extends TextInputFormat {
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
return new ParagraphRecordReader();
}
}
public class ParagraphRecordReader extends RecordReader<LongWritable, Text> {
private long end;
private boolean stillInChunk = true;
private LongWritable key = new LongWritable();
private Text value = new Text();
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
private byte[] endTag = "\n\r\n".getBytes();
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
FileSplit split = (FileSplit) inputSplit;
Configuration conf = taskAttemptContext.getConfiguration();
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
fsin = fs.open(path);
long start = split.getStart();
end = split.getStart() + split.getLength();
fsin.seek(start);
if (start != 0) {
readUntilMatch(endTag, false);
}
}
public boolean nextKeyValue() throws IOException {
if (!stillInChunk) return false;
boolean status = readUntilMatch(endTag, true);
value = new Text();
value.set(buffer.getData(), 0, buffer.getLength());
key = new LongWritable(fsin.getPos());
buffer.reset();
if (!status) {
stillInChunk = false;
}
return true;
}
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
public float getProgress() throws IOException, InterruptedException {
return 0;
}
public void close() throws IOException {
fsin.close();
}
private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
if (b == -1) return false;
if (withinBlock) buffer.write(b);
if (b == match[i]) {
i++;
if (i >= match.length) {
return fsin.getPos() < end;
}
} else i = 0;
}
}
}
ENDTAG प्रत्येक जानकारी के अंतिम पहचान करता है।
स्रोत
2014-12-19 11:03:30
आपका इनपुट जेएसओएन की तरह दिखता है। आप जेएसओएन को प्रति रिकॉर्ड एक रिकॉर्ड के साथ पूर्व-प्रक्रिया करना चाहते हैं, और उसके बाद 'SqlContext.jsonFile' के साथ लोड करें। – huitseeker
आपको अपना खुद का हैडऑप 'इनपुटफॉर्मैट' बनाना होगा जो जानता है कि एक एकल रिकॉर्ड को विभाजित किए बिना इन मल्टीलाइन फ़ाइलों को कैसे विभाजित किया जाए। या, जैसा कि @ हूटीसेकर सुझाव देता है, आप इसे उस प्रारूप में प्रीप्रोसेस कर सकते हैं जो हैडूप पहले ही जानता है कि कैसे संभालना है। – lmm
@huitseeker लेकिन यह जेएसओएन प्रारूप –