2012-05-25 7 views
11

हाल ही में मैं अमेज़ॅन वेब सर्विसेज (एडब्लूएस) के साथ काम कर रहा हूं और मैंने देखा है कि इस विषय पर ज्यादा दस्तावेज़ीकरण नहीं है, इसलिए मैंने अपना समाधान जोड़ा।जावा एप्लिकेशन में लोचदार MapReduce नौकरी प्रवाह को पूरा करने के लिए मैं कैसे इंतजार कर सकता हूं?

मैं अमेज़ॅन लोचदार MapReduce (अमेज़ॅन ईएमआर) का उपयोग कर एक आवेदन लिख रहा था। गणना समाप्त होने के बाद मुझे उनके द्वारा बनाई गई फाइलों पर कुछ काम करने की ज़रूरत थी, इसलिए मुझे यह जानना आवश्यक था कि नौकरी के प्रवाह ने अपना काम कब पूरा किया था।

यह आपके काम प्रवाह पूरा करता है, तो आप कैसे जांच कर सकते हैं है:

AmazonElasticMapReduce mapReduce = new AmazonElasticMapReduceClient(credentials); 

DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowsRequest() 
    .withJobFlowStates("COMPLETED"); 

List<JobFlowDetail> jobs = mapReduce.describeJobFlows(jobAttributes).getJobFlows(); 
JobFlowDetail detail = jobs.get(0); 

detail.getJobFlowId(); //the id of one of the completed jobs 

तुम भी DescribeJobFlowsRequest में एक विशिष्ट कार्य आईडी के लिए देख सकते हैं और फिर अगर उस काम में विफल रहा है के समाप्त हो गया है की जाँच करने के।

मुझे आशा है कि इससे दूसरों की मदद मिलेगी।

+5

तुरंत यहां आपकी समस्या का अपने स्वयं के समाधान भेजने से काफी स्वागत है, तथापि, वांछित दृष्टिकोण एक सवाल और एक जवाब में अभी भी इस विभाजित करने के लिए, (देखें [यह पूछें और उत्तर अपने प्रश्न ठीक है] है http : //blog.stackoverflow.com/2011/07/its-ok-to-ask-and-answer-your-own-questions/) - इससे चीजों को उचित रूप से वर्गीकृत/वर्गीकृत करने में मदद मिलती है, यानी वास्तव में अनुत्तरित प्रश्नों के लिए कमरा बनाएं लागू, धन्यवाद! –

+0

धन्यवाद, मैं इसे भविष्य के संदर्भ के रूप में नोट करूंगा। – siditom

+0

आपको अन्य पूर्ण राज्यों को भी शामिल करना चाहिए। कुछ लोग इसे पढ़ने के लिए हमेशा के लिए लूप हो सकता है अगर वे 'जॉब एट्रिब्यूट्स' को दिए गए हैं। 'DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowRequest()। JobFlowStates ("पूर्ण", "टर्मिनेटेड", "विफल"); ' –

उत्तर

1

एक बार नौकरी प्रवाह पूरा होने के बाद, क्लस्टर बंद हो जाता है और एचडीएफएस विभाजन खो जाता है। डेटा की हानि को रोकने के लिए , अमेज़ॅन एस 3 में परिणामों को स्टोर करने के लिए नौकरी प्रवाह के अंतिम चरण को कॉन्फ़िगर करें।

JobFlowInstancesDetail हैं: KeepJobFlowAliveWhenNoSteps पैरामीटर सही पर सेट किया जाता है, काम प्रवाह होगा बल्कि नीचे बंद एक बार चरण पूरे कर लिए की तुलना में प्रतीक्षा करने के लिए राज्य संक्रमण।

प्रत्येक नौकरी प्रवाह में अधिकतम 256 चरणों की अनुमति है।

यदि आपका काम समय लेने वाला है तो मैं आपको समय-समय पर परिणामों को संग्रहीत करने की सलाह देता हूं।

लंबी कहानी छोटी: यह कब किया जाता है यह जानने का कोई तरीका नहीं है। इसके बजाय आपको नौकरी के हिस्से के रूप में अपना डेटा सहेजने की आवश्यकता है।

1

नौकरी प्रवाह बनाते समय --wait-for-steps विकल्प का उपयोग करें।

./elastic-mapreduce --create \ 
... 
--wait-for-steps \ 
... 
3

मैं भी इस समस्या में भाग गया, और यहां समाधान है जिसके लिए मैं अभी आया हूं। यह सही नहीं है, लेकिन उम्मीद है कि यह सहायक होगा। संदर्भ के लिए, मैं जावा 1.7 और एडब्ल्यूएस जावा एसडीके संस्करण 1.9.13 का उपयोग कर रहा हूं।

नोट इस कोड को मानता है कि आप को समाप्त करने के क्लस्टर के लिए प्रतीक्षा कर रहे हैं कि, नहीं कदम सख्ती से बोला जाए; यदि आपका क्लस्टर समाप्त हो जाता है जब आपके सभी कदम किए जाते हैं तो यह ठीक है, लेकिन यदि आप क्लस्टर का उपयोग कर रहे हैं जो चरण पूर्ण होने के बाद जीवित रहते हैं तो इससे आपको बहुत मदद नहीं मिलेगी।

यह भी ध्यान दें कि यह कोड क्लस्टर राज्य परिवर्तनों पर नज़र रखता है और लॉग करता है, और इसके अतिरिक्त यह निदान करता है कि क्लस्टर त्रुटियों से समाप्त हो गया है और अगर ऐसा हुआ तो अपवाद फेंकता है।

private void yourMainMethod() { 
    RunJobFlowRequest request = ...; 

    try { 
     RunJobFlowResult submission = emr.runJobFlow(request); 
     String jobFlowId = submission.getJobFlowId(); 
     log.info("Submitted EMR job as job flow id {}", jobFlowId); 

     DescribeClusterResult result = 
      waitForCompletion(emr, jobFlowId, 90, TimeUnit.SECONDS); 
     diagnoseClusterResult(result, jobFlowId); 
    } finally { 
     emr.shutdown(); 
    } 
} 

private DescribeClusterResult waitForCompletion(
      AmazonElasticMapReduceClient emr, String jobFlowId, 
      long sleepTime, TimeUnit timeUnit) 
     throws InterruptedException { 
    String state = "STARTING"; 
    while (true) { 
     DescribeClusterResult result = emr.describeCluster(
       new DescribeClusterRequest().withClusterId(jobFlowId) 
     ); 
     ClusterStatus status = result.getCluster().getStatus(); 
     String newState = status.getState(); 
     if (!state.equals(newState)) { 
      log.info("Cluster id {} switched from {} to {}. Reason: {}.", 
        jobFlowId, state, newState, status.getStateChangeReason()); 
      state = newState; 
     } 

     switch (state) { 
      case "TERMINATED": 
      case "TERMINATED_WITH_ERRORS": 
      case "WAITING": 
       return result; 
     } 

     timeUnit.sleep(sleepTime); 
    } 
} 

private void diagnoseClusterResult(DescribeClusterResult result, String jobFlowId) { 
    ClusterStatus status = result.getCluster().getStatus(); 
    ClusterStateChangeReason reason = status.getStateChangeReason(); 
    ClusterStateChangeReasonCode code = 
     ClusterStateChangeReasonCode.fromValue(reason.getCode()); 
    switch (code) { 
    case ALL_STEPS_COMPLETED: 
     log.info("Completed EMR job {}", jobFlowId); 
     break; 
    default: 
     failEMR(jobFlowId, status); 
    } 
} 

private static void failEMR(String jobFlowId, ClusterStatus status) { 
    String msg = "EMR cluster run %s terminated with errors. ClusterStatus = %s"; 
    throw new RuntimeException(String.format(msg, jobFlowId, status)); 
} 
संबंधित मुद्दे

 संबंधित मुद्दे