2012-01-05 11 views
15

मेरे पास संभवतः लाखों लाइनों के साथ एक सादा पाठ फ़ाइल है जिसके लिए कस्टम पार्सिंग की आवश्यकता है और मैं इसे जितनी जल्दी हो सके HBase तालिका में लोड करना चाहता हूं (हैडोप या एचबेस जावा क्लाइंट का उपयोग करके)।प्रोग्रामिंग के रूप में एचबीएस में थोक लोड डेटा का सबसे तेज़ तरीका क्या है?

मेरा वर्तमान समाधान पर MapReduce नौकरी के बिना काम पर आधारित है। मैं पाठ फ़ाइल को पढ़ने के लिए FileInputFormat का उपयोग करता हूं ताकि प्रत्येक पंक्ति मेरी Mapper कक्षा की विधि को पास कर दी गई हो। इस बिंदु पर Put ऑब्जेक्ट बनाने के लिए लाइन को पार्स किया गया है जो context पर लिखा गया है। फिर, TableOutputFormatPut ऑब्जेक्ट लेता है और इसे तालिका में सम्मिलित करता है।

यह समाधान प्रति सेकंड 1,000 पंक्तियों की औसत प्रविष्टि दर उत्पन्न करता है, जो कि मेरी अपेक्षा से कम है। मेरा एचबीएस सेटअप एक सर्वर पर छद्म वितरित मोड में है।

एक दिलचस्प बात यह है कि 1,000,000 पंक्तियों के सम्मिलन के दौरान, 25 मैपर (कार्य) उत्पन्न होते हैं लेकिन वे क्रमशः (एक के बाद एक) चलाते हैं; क्या यह सामान्य है?

public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { 

    protected void map(LongWritable key, Text value, Context context) throws IOException { 
     Map<String, String> parsedLine = parseLine(value.toString()); 

     Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1]))); 
     for (String currentKey : parsedLine.keySet()) { 
      row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey))); 
     } 

     try { 
      context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 

public int run(String[] args) throws Exception { 
    if (args.length != 2) { 
     return -1; 
    } 

    conf.set("hbase.mapred.outputtable", args[1]); 

    // I got these conf parameters from a presentation about Bulk Load 
    conf.set("hbase.hstore.blockingStoreFiles", "25"); 
    conf.set("hbase.hregion.memstore.block.multiplier", "8"); 
    conf.set("hbase.regionserver.handler.count", "30"); 
    conf.set("hbase.regions.percheckin", "30"); 
    conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3"); 
    conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15"); 

    Job job = new Job(conf); 
    job.setJarByClass(BulkLoadMapReduce.class); 
    job.setJobName(NAME); 
    TextInputFormat.setInputPaths(job, new Path(args[0])); 
    job.setInputFormatClass(TextInputFormat.class); 
    job.setMapperClass(CustomMap.class); 
    job.setOutputKeyClass(ImmutableBytesWritable.class); 
    job.setOutputValueClass(Put.class); 
    job.setNumReduceTasks(0); 
    job.setOutputFormatClass(TableOutputFormat.class); 

    job.waitForCompletion(true); 
    return 0; 
} 

public static void main(String[] args) throws Exception { 
    Long startTime = Calendar.getInstance().getTimeInMillis(); 
    System.out.println("Start time : " + startTime); 

    int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args); 

    Long endTime = Calendar.getInstance().getTimeInMillis(); 
    System.out.println("End time : " + endTime); 
    System.out.println("Duration milliseconds: " + (endTime-startTime)); 

    System.exit(errCode); 
} 
+0

मुझे लगता है कि आप चाहते हैं कि आपका शीर्षक "थोक भार" और "ब्लुक लोड" न हो ... लेकिन अगर मेरा सुधार गलत था तो मुझे बताएं। :-) –

+0

क्या आपने इसे पढ़ा है? http://hbase.apache.org/docs/r0.89.20100621/bulk-loads।एचटीएमएल –

+0

इसके अलावा, क्या आपने अपने क्षेत्रों को पूर्व-विभाजित किया है? यदि नहीं, तो मूल रूप से आपके पास एकल-थ्रेडेड लेखक होता है, जो इसे समझाएगा। आप मूल रूप से प्रति क्षेत्र एक लेखक प्राप्त करते हैं। –

उत्तर

16

मैं एक प्रक्रिया है कि शायद बहुत HBase में एक एमआर से डेटा लोड करने के लिए एक कारगर तरीका खोजने के लिए प्रयास करने के लिए तुम्हारा के समान है के माध्यम से चले गए हैं:

यहाँ मेरे वर्तमान समाधान के लिए कोड है। जो मैंने काम में पाया वह एमआर के आउटपुटफॉर्मैट क्लास के रूप में HFileOutputFormat का उपयोग कर रहा है।

नीचे मेरे कोड का आधार है कि मुझे job और मैपर map फ़ंक्शन उत्पन्न करना है जो डेटा लिखता है। यह तेज़ था। हम अब इसका उपयोग नहीं करते हैं, इसलिए मेरे पास हाथों की संख्या नहीं है, लेकिन यह एक मिनट के भीतर लगभग 2.5 मिलियन रिकॉर्ड था।

यहाँ (नीचे छीन) समारोह मैं काम उत्पन्न करने के लिए मेरी MapReduce प्रक्रिया HBase

private Job createCubeJob(...) { 
    //Build and Configure Job 
    Job job = new Job(conf); 
    job.setJobName(jobName); 
    job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
    job.setMapOutputValueClass(Put.class); 
    job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper 
    job.setJarByClass(CubeBuilderDriver.class); 
    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(HFileOutputFormat.class); 

    TextInputFormat.setInputPaths(job, hiveOutputDir); 
    HFileOutputFormat.setOutputPath(job, cubeOutputPath); 

    Configuration hConf = HBaseConfiguration.create(conf); 
    hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum); 
    hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort); 

    HTable hTable = new HTable(hConf, tableName); 

    HFileOutputFormat.configureIncrementalLoad(job, hTable); 
    return job; 
} 

यह HiveToHBaseMapper वर्ग से मेरी नक्शा समारोह (कुछ संपादित) है में डेटा डाल करने के लिए लिखा है।

public void map(WritableComparable key, Writable val, Context context) 
     throws IOException, InterruptedException { 
    try{ 
     Configuration config = context.getConfiguration(); 
     String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR); 
     String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY); 
     String column = strs[COLUMN_INDEX]; 
     String Value = strs[VALUE_INDEX]; 
     String sKey = generateKey(strs, config); 
     byte[] bKey = Bytes.toBytes(sKey); 
     Put put = new Put(bKey); 
     put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
         ? Bytes.toBytes(Double.MIN_VALUE) 
         : Bytes.toBytes(value)); 

     ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey); 
     context.write(ibKey, put); 

     context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1); 
    } 
    catch(Exception e){ 
     context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1);  
    } 

} 

मैं यकीन है कि यह आपके लिए एक कॉपी & पेस्ट समाधान होने के लिए नहीं जा रहा है। स्पष्ट रूप से जिस डेटा के साथ मैं काम कर रहा था उसे किसी भी कस्टम प्रसंस्करण की आवश्यकता नहीं थी (जो इस से पहले एमआर नौकरी में किया गया था)। मुख्य बात यह है कि मैं इसे प्रदान करना चाहता हूं HFileOutputFormat है। बाकी सिर्फ एक उदाहरण है कि मैंने इसका इस्तेमाल कैसे किया। :)
मुझे आशा है कि यह आपको एक अच्छे समाधान के ठोस रास्ते पर ले जाएगा। :

+1

मैंने अपने कोड में 'HfileOutputFormat' का उपयोग करने का प्रयास किया लेकिन, मैं अपवाद से नीचे रहना जारी रखता हूं, कोई विचार? 'java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put पर org.apache.hadoop.hbase.KeyValue \t में ढाला नहीं जा सकता org.apache.hadoop.hbase.mapreduce.HFileOutputFormat $ 1.Write (HFileOutputFormat.java:82) \t org.apache.hadoop.mapred.ReduceTask $ NewTrackingRecordWriter.write (ReduceTask.java:508) पर \t org.apache.hadoop.mapreduce.TaskInputOutputContext.write पर (TaskInputOutputContext.java:80) \t org.apache.hadoop.mapreduce.Reducer.reduce (Reducer.java:156) \t ... ' –

+0

@kramer 'लिखने' का प्रयास करने से अधिक अलग-अलग प्रकार की उम्मीद है (इसलिए कलाकार त्रुटि) वास्तव में नहीं। उस पर एक शॉट लेने के लिए कोड देखने की आवश्यकता होगी। – Nija

+0

क्या HFileOutputFormat तेज है तो TableOutputFormat? क्षेत्र विभाजन के साथ समान स्थिति दी गई। –

0

एक दिलचस्प बात यह है कि 1,000,000 पंक्तियों के सम्मिलन के दौरान, 25 मैपर (कार्य) उत्पन्न होते हैं लेकिन वे क्रमशः (एक के बाद एक) चलाते हैं; क्या यह सामान्य है?

mapreduce.tasktracker.map.tasks.maximum पैरामीटर जो 2 से चूक जाता है कार्य है कि एक नोड पर समानांतर में चला सकते हैं की अधिकतम संख्या निर्धारित करता है। जब तक बदला नहीं जाता है, तो आपको प्रत्येक नोड पर एक साथ चलने वाले 2 मानचित्र कार्य देखना चाहिए।

+0

कोशिश की लेकिन परिणाम नहीं बदला। –

+0

आपने पैरामीटर निर्दिष्ट किया था? हैडोप डिमन्स शुरू होने से पहले इसे सभी नोड्स पर mapred-site.xml में निर्दिष्ट किया जाना चाहिए। इसे जांचें [प्रलेखन] (http://wiki.apache.org/hadoop/FAQ#I_see_a_maximum_of_2_maps.2BAC8-reduces_spawned_concurrently_on_each_TaskTracker.2C_how_do_I_increase_that.3F)। आपने कैसे सत्यापित किया? जॉबट्रैकर वेब कंसोल से सत्यापित किया जा सकता है। –

संबंधित मुद्दे

 संबंधित मुद्दे