2012-09-11 15 views
9

को रोक नहीं है मुझे दो मैपरेडस नौकरियों की श्रृंखला की आवश्यकता है। मैंने जॉब कंट्रोल का इस्तेमाल जॉब 2 को जॉब 1 के आश्रित के रूप में सेट करने के लिए किया था। यह काम करता है, आउटपुट फाइलें बनाई जाती हैं !! लेकिन यह रुकता नहीं है! खोल में यह इस राज्य में रहता है:(हाडोप) मैपरेडस - चेन जॉब्स - जॉबकंट्रोल

12/09/11 19:06:24 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
12/09/11 19:06:25 INFO input.FileInputFormat: Total input paths to process : 1 
12/09/11 19:06:25 INFO util.NativeCodeLoader: Loaded the native-hadoop library 
12/09/11 19:06:25 WARN snappy.LoadSnappy: Snappy native library not loaded 
12/09/11 19:07:00 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
12/09/11 19:07:00 INFO input.FileInputFormat: Total input paths to process : 1 

मैं इसे कैसे रोक सकता है? यह मेरा मुख्य है।

public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    Configuration conf2 = new Configuration(); 

    Job job1 = new Job(conf, "canzoni"); 
    job1.setJarByClass(CanzoniOrdinate.class); 
    job1.setMapperClass(CanzoniMapper.class); 
    job1.setReducerClass(CanzoniReducer.class); 
    job1.setOutputKeyClass(Text.class); 
    job1.setOutputValueClass(IntWritable.class); 

    ControlledJob cJob1 = new ControlledJob(conf); 
    cJob1.setJob(job1); 
    FileInputFormat.addInputPath(job1, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job1, new Path("/user/hduser/tmp")); 


    Job job2 = new Job(conf2, "songsort"); 
    job2.setJarByClass(CanzoniOrdinate.class); 
    job2.setMapperClass(CanzoniSorterMapper.class); 
    job2.setSortComparatorClass(ReverseOrder.class); 
    job2.setInputFormatClass(KeyValueTextInputFormat.class); 
    job2.setReducerClass(CanzoniSorterReducer.class); 
    job2.setMapOutputKeyClass(IntWritable.class); 
    job2.setMapOutputValueClass(Text.class); 
    job2.setOutputKeyClass(Text.class); 
    job2.setOutputValueClass(IntWritable.class); 

    ControlledJob cJob2 = new ControlledJob(conf2); 
    cJob2.setJob(job2); 
    FileInputFormat.addInputPath(job2, new Path("/user/hduser/tmp/part*")); 
    FileOutputFormat.setOutputPath(job2, new Path(args[1])); 

    JobControl jobctrl = new JobControl("jobctrl"); 
    jobctrl.addJob(cJob1); 
    jobctrl.addJob(cJob2); 
    cJob2.addDependingJob(cJob1); 
    jobctrl.run(); 


    //////////////// 
    // NEW CODE /// 
    ////////////// 


    // delete jobctrl.run(); 
    Thread t = new Thread(jobctrl); 
    t.start(); 
    String oldStatusJ1 = null; 
    String oldStatusJ2 = null; 
    while (!jobctrl.allFinished()) { 
     String status =cJob1.toString(); 
     String status2 =cJob2.toString(); 
     if (!status.equals(oldStatusJ1)) { 
     System.out.println(status); 
     oldStatusJ1 = status; 
     } 
     if (!status2.equals(oldStatusJ2)) { 
     System.out.println(status2); 
     oldStatusJ2 = status2; 
     }  
    } 
    System.exit(0); 

} }

+1

मैं एक थ्रेड का उपयोग कर JobControl शुरू करने के लिए इसे हल। मैंने जांच की कि कुछ समय चक्र का उपयोग करके नौकरियां की गई हैं: जबकि (! Jobctrl.allFinished()) और एक system.exit() चक्र से बाहर है। अब मैं उन नौकरियों को सूचना संदेश लौटा दूंगा, जो मैंने प्राप्त किया है, यह जानना है कि कौन सी नौकरी चल रही है, ControlledJob.toString() के साथ। मुझे नहीं पता कि सूचना संदेश कैसे प्राप्त करते हैं: मैपर कार्य की संख्या, कार्य को कम करने की संख्या, इनपुट में रिकॉर्ड या आउटपुट आदि ... इन संदेशों को प्राप्त करने का कोई विचार? –

+0

"job.getCounters()। ToString()" पर्याप्त है? – zsxwing

+0

क्या यह जॉबकंट्रोल कक्षा में एक बग है? – Rags

उत्तर

5

मैं अनिवार्य रूप से क्या पिएत्रो ऊपर उल्लेख किया है।

public class JobRunner implements Runnable { 
    private JobControl control; 

    public JobRunner(JobControl _control) { 
    this.control = _control; 
    } 

    public void run() { 
    this.control.run(); 
    } 
} 

और मेरे मानचित्र में/वर्ग को कम मेरे पास है:

public void handleRun(JobControl control) throws InterruptedException { 
    JobRunner runner = new JobRunner(control); 
    Thread t = new Thread(runner); 
    t.start(); 

    while (!control.allFinished()) { 
     System.out.println("Still running..."); 
     Thread.sleep(5000); 
    } 
} 

जिसमें मैं सिर्फ jobControl वस्तु गुजरती हैं।

+2

+1 – beterthanlife

3

JobControl वस्तु ही Runnable है, तो आप सिर्फ इस तरह इसका इस्तेमाल कर सकते हैं:

new Thread(myJobControlInstance).start() 
0

बस कोड स्निपेट क्या sinemetu1 साझा किया था करने के लिए एक ट्वीक ..

आप करने के लिए कॉल ड्रॉप कर सकते हैं अपने आप में JobControl रूप JobRunner Runnable

 Thread thread = new Thread(jobControl); 
     thread.start(); 

     while (!jobControl.allFinished()) { 
      System.out.println("Still running..."); 
      Thread.sleep(5000); 
     } 

मैं भी इस लिंक पर ठोकर खाई को लागू करता है, जहां उपयोगकर्ता पुष्टि की है कि JobControl केवल नए धागे से चलाया जा सकता है। https://www.mail-archive.com/[email protected]/msg00556.html

0

इस प्रयास करें:

Thread jcThread = new Thread(jobControl); 
    jcThread.start(); 
    System.out.println("循环判断jobControl运行状态 >>>>>>>>>>>>>>>>"); 
    while (true) { 
     if (jobControl.allFinished()) { 
     System.out.println("====>> jobControl.allFinished=" + jobControl.getSuccessfulJobList()); 
     jobControl.stop(); 
     // 如果不加 break 或者 return,程序会一直循环 
     break; 
    } 

    if (jobControl.getFailedJobList().size() > 0) { 
     succ = 0; 
     System.out.println("====>> jobControl.getFailedJobList=" + jobControl.getFailedJobList()); 
     jobControl.stop(); 

     // 如果不加 break 或者 return,程序会一直循环 
     break; 
    } 
} 
संबंधित मुद्दे