2013-03-07 6 views
6

नौकरी का निम्न चरण विफल रहता है:कार्य प्रयास 600 सेकंड के लिए स्थिति की रिपोर्ट करने में विफल होने के कारण विफल रहता है। मारना! उपाय?

विफल कमी कार्यों की अनुमति सीमा पार हो गई।

कारण है कि प्रत्येक कार्य विफल हो जाता है:

टास्क attempt_201301251556_1637_r_000005_0 600 सेकंड के लिए स्थिति की रिपोर्ट करने में विफल रहा। मारना! विस्तार से

समस्या:

मानचित्र चरण प्रत्येक रिकॉर्ड स्वरूप की है, जिसमें लेता है: समय, छुटकारा, डेटा।

डेटा प्रारूप का है: डेटा तत्व, और इसकी गणना।

उदाहरण: ए, 1 बी, 4 सी, 7 कोरसपॉन्ड रिकॉर्ड के डेटा पर।

प्रत्येक डेटा तत्व के लिए मैपर आउटपुट प्रत्येक रिकॉर्ड के लिए डेटा। उदाहरण:

कुंजी: (समय, ए,), वैल: (छुटकारा, डेटा) कुंजी: (समय, बी,), वैल: (छुटकारा, डेटा) कुंजी: (समय, सी,), वैल्यू : (छुटकारा, डेटा)

प्रत्येक कमी सभी रिकॉर्ड्स से एक ही कुंजी से संबंधित सभी डेटा प्राप्त करती है। उदाहरण: कुंजी: (समय, ए), वैल: (rid1, डेटा) और कुंजी: (समय, ए), वैल: (rid2, डेटा) एक ही कम उदाहरण तक पहुंचें।

यह कुछ प्रसंस्करण करता है और समान छापों को आउटपुट करता है।

मेरा प्रोग्राम 10 एमबी जैसे छोटे डेटासेट के लिए बिना किसी परेशानी के चलाता है। लेकिन उपर्युक्त कारण के साथ डेटा 1 जी कहने के लिए बढ़ता है जब विफल रहता है। मुझे नहीं पता कि ऐसा क्यों होता है। कृपया सहायता कीजिए!

कोड कम करें:

  • VCLReduce0Split
  • CoreSplit

एक:

नीचे दो वर्गों रहे हैं। VCLReduce0SPlit

public class VCLReduce0Split extends MapReduceBase implements Reducer<Text, Text, Text, Text>{ 
    // @SuppressWarnings("unchecked") 
     public void reduce (Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { 

      String key_str = key.toString(); 
      StringTokenizer stk = new StringTokenizer(key_str); 
      String t = stk.nextToken(); 

      HashMap<String, String> hmap = new HashMap<String, String>(); 

      while(values.hasNext()) 
      { 
       StringBuffer sbuf1 = new StringBuffer(); 
       String val = values.next().toString(); 
       StringTokenizer st = new StringTokenizer(val); 

       String uid = st.nextToken(); 

       String data = st.nextToken(); 

        int total_size = 0; 

        StringTokenizer stx = new StringTokenizer(data,"|"); 

        StringBuffer sbuf = new StringBuffer(); 

        while(stx.hasMoreTokens()) 
        { 
         String data_part = stx.nextToken(); 
         String data_freq = stx.nextToken(); 

        // System.out.println("data_part:----->"+data_part+" data_freq:----->"+data_freq); 
         sbuf.append(data_part); 
         sbuf.append("|"); 
         sbuf.append(data_freq); 
         sbuf.append("|"); 
        } 
       /*  
        for(int i = 0; i<parts.length-1; i++) 
        { 
         System.out.println("data:--------------->"+data); 
         int part_size = Integer.parseInt(parts[i+1]); 
         sbuf.append(parts[i]); 
         sbuf.append("|"); 
         sbuf.append(part_size); 
         sbuf.append("|"); 
         total_size = part_size+total_size; 
         i++; 
        }*/ 

       sbuf1.append(String.valueOf(total_size)); 
       sbuf1.append(","); 
       sbuf1.append(sbuf); 
       if(uid.equals("203664471")){ 
       // System.out.println("data:--------------------------->"+data+" tot_size:---->"+total_size+" sbuf:------->"+sbuf); 
       } 
       hmap.put(uid, sbuf1.toString()); 

      } 

      float threshold = (float)0.8; 

      CoreSplit obj = new CoreSplit(); 


      ArrayList<CustomMapSimilarity> al = obj.similarityCalculation(t, hmap, threshold); 

      for(int i = 0; i<al.size(); i++) 
      { 
       CustomMapSimilarity cmaps = al.get(i); 
       String xy_pair = cmaps.getRIDPair(); 
       String similarity = cmaps.getSimilarity(); 
       output.collect(new Text(xy_pair), new Text(similarity)); 
      } 


     } 
    } 

ज। coreSplit

package com.a; 

import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.HashSet; 
import java.util.Iterator; 
import java.util.Set; 
import java.util.StringTokenizer; 
import java.util.TreeMap; 

import org.apache.commons.collections.map.MultiValueMap; 

public class PPJoinPlusCoreOptNewSplit{ 


    public ArrayList<CustomMapSimilarity> similarityCalculation(String time, HashMap<String,String>hmap, float t) 
    { 

     ArrayList<CustomMapSimilarity> als = new ArrayList<CustomMapSimilarity>(); 
     ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>(); 

     Iterator<String> iter = hmap.keySet().iterator(); 

     MultiValueMap index = new MultiValueMap(); 

     String RID; 
     TreeMap<String, Integer> hmap2; 
     Iterator<String> iter1; 

     int size; 
     float prefix_size; 
     HashMap<String, Float> alpha; 
     HashMap<String, CustomMapOverlap> hmap_overlap; 

     String data; 

     while(iter.hasNext()) 
      { 
       RID = (String)iter.next(); 

       String data_val = hmap.get(RID); 

       StringTokenizer st = new StringTokenizer(data_val,","); 
      // System.out.println("data_val:--**********-->"+data_val+" RID:------------>"+RID+" time::---?"+time); 
       String RIDsize = st.nextToken(); 
       size = Integer.parseInt(RIDsize); 
       data = st.nextToken(); 


       StringTokenizer st1 = new StringTokenizer(data,"\\|"); 


       String[] parts = data.split("\\|"); 

      // hmap2 = (TreeMap<String, Integer>)hmap.get(RID); 
     //  iter1 = hmap2.keySet().iterator(); 

      // size = hmap_size.get(RID); 

       prefix_size = (float)(size-(0.8*size)+1); 

       if(size==1) 
       { 
        prefix_size = 1; 
       } 

       alpha = new HashMap<String, Float>(); 

       hmap_overlap = new HashMap<String, CustomMapOverlap>(); 

     //  Iterator<String> iter2 = hmap2.keySet().iterator(); 

       int prefix_index = 0; 

       int pi=0; 

       for(float j = 0; j<=prefix_size; j++) 
       { 

        boolean prefix_chk = false; 
        prefix_index++; 
        String ptoken = parts[pi]; 
      //  System.out.println("data:---->"+data+" ptoken:---->"+ptoken); 
        float val = Float.parseFloat(parts[pi+1]); 
        float temp_j = j; 
        j = j+val; 
        boolean j_l = false ; 
        float prefix_contri = 0; 
        pi= pi+2; 

        if(j>prefix_size) 
         { 

          // prefix_contri = j-temp_j; 
          prefix_contri = prefix_size-temp_j; 

          if(prefix_contri>0) 
          { 
           j_l = true; 
           prefix_chk = false; 

          } 
          else 
          { 
           prefix_chk = true;        
          } 
         }     


        if(prefix_chk == false){ 


         filters(index, ptoken, RID, hmap,t, size, val, j_l, alpha, hmap_overlap, j, prefix_contri); 


        CustomMapPrefixTokens cmapt = new CustomMapPrefixTokens(RID,j); 
        index.put(ptoken, cmapt); 

       } 

      } 


       als = calcSimilarity(time, RID, hmap, alpha, hmap_overlap); 

       for(int i = 0; i<als.size(); i++) 
       { 
        if(als.get(i).getRIDPair()!=null) 
        { 
         alsim.add(als.get(i)); 

        } 
       } 

      } 

     return alsim; 

    } 


    public void filters(MultiValueMap index, String ptoken, String RID, HashMap<String, String> hmap, float t, int size, float val, boolean j_l, HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap, float j, float prefix_contri) 
    { 
      @SuppressWarnings("unchecked") 

      ArrayList<CustomMapPrefixTokens> positions_list = (ArrayList<CustomMapPrefixTokens>) index.get(ptoken); 

      if((positions_list!=null) &&(positions_list.size()!=0)) 
      { 

       CustomMapPrefixTokens cmapt ; 
       String y; 
       Iterator<String> iter3; 
       int y_size = 0; 
       float check_size = 0; 
      // TreeMap<String, Integer> hmapy; 
       float RID_val=0; 
       float y_overlap = 0; 
       float ubound = 0; 
       ArrayList<Float> fl = new ArrayList<Float>(); 

       StringTokenizer st; 

      for(int k = 0; k<positions_list.size(); k++) 
      { 
       cmapt = positions_list.get(k); 

       if(!cmapt.getRID().equals(RID)) 
       { 

       y = hmap.get(cmapt.getRID()); 

       // iter3 = y.keySet().iterator(); 

       String yRID = cmapt.getRID(); 

       st = new StringTokenizer(y,","); 

       y_size = Integer.parseInt(st.nextToken()); 

       check_size = (float)0.8*(size); 

       if(y_size>=check_size) 
       { 

        //hmapy = hmap.get(yRID); 

        String y_data = st.nextToken(); 

        StringTokenizer st1 = new StringTokenizer(y_data,"\\|"); 


        while(st1.hasMoreTokens()) 
        { 
         String token = st1.nextToken(); 
         if(token.equals(ptoken)) 
         { 

          String nxt_token = st1.nextToken(); 
        //  System.out.println("ydata:--->"+y_data+" nxt_token:--->"+nxt_token); 
          RID_val = (float)Integer.parseInt(nxt_token); 
          break; 
         } 
        } 

       // RID_val = (float) hmapy.get(ptoken); 
        float alpha1 = (float)(0.8/1.8)*(size+y_size); 

        fl = overlapCalc(alpha1, size, y_size, cmapt, j, alpha, j_l,RID_val,val,prefix_contri); 

        ubound = fl.get(0); 
        y_overlap = fl.get(1); 


        positionFilter(ubound, alpha1, cmapt, y_overlap, hmap_overlap); 

        } 

       } 
      } 
     } 



    } 


    public void positionFilter(float ubound,float alpha1, CustomMapPrefixTokens cmapt, float y_overlap, HashMap<String, CustomMapOverlap> hmap_overlap) 
    { 

    float y_overlap_total = 0; 

      if(null!=hmap_overlap.get(cmapt.getRID())) 
      { 

      y_overlap_total = hmap_overlap.get(cmapt.getRID()).getOverlap(); 

      if((y_overlap_total+ubound)>=alpha1) 
      { 

       CustomMapOverlap cmap_tmp = hmap_overlap.get(cmapt.getRID()); 

       float y_o_t = y_overlap+y_overlap_total; 

       cmap_tmp.setOverlap(y_o_t); 
       hmap_overlap.put(cmapt.getRID(),cmap_tmp); 

      } 
      else 
      { 
       float n = 0; 
       hmap_overlap.put(cmapt.getRID(), new CustomMapOverlap(cmapt.getRID(),n)); 
      } 

      } 
      else 
      { 
       CustomMapOverlap cmap_tmp = new CustomMapOverlap(cmapt.getRID(),y_overlap); 
       hmap_overlap.put(cmapt.getRID(), cmap_tmp); 

      } 

    } 

    public ArrayList<Float> overlapCalc(float alpha1, int size, int y_size, CustomMapPrefixTokens cmapt, float j, HashMap<String, Float> alpha, boolean j_l, float RID_val, float val, float prefix_contri) 
    { 

      alpha.put(cmapt.getRID(), alpha1); 
      float min1 = y_size-cmapt.getPosition(); 
      float min2 = size-j; 
      float min = 0; 

      float y_overlap = 0; 

      if(min1<min2) 
      { 
       min = min1; 
      } 
      else 
      { 
       min = min2; 
      } 
      if(j_l==true) 
      { 
       val = prefix_contri;  
      }          
      if(RID_val<val) 
      { 
       y_overlap = RID_val; 
      } 
      else 
      { 
       y_overlap = val; 
      } 

      float ubound = y_overlap+min; 

      ArrayList<Float> fl = new ArrayList<Float>(); 
      fl.add(ubound); 
      fl.add(y_overlap); 

      return fl; 

    } 


    public ArrayList<CustomMapSimilarity> calcSimilarity(String time, String RID, HashMap<String,String> hmap , HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap) 
    { 

     float jaccard = 0; 

     CustomMapSimilarity cms = new CustomMapSimilarity(null, null); 
     ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>(); 

     Iterator<String> iter = hmap_overlap.keySet().iterator(); 

     while(iter.hasNext()) 
     { 
      String key = (String)iter.next(); 

      CustomMapOverlap val = (CustomMapOverlap)hmap_overlap.get(key); 

      float overlap = (float)val.getOverlap(); 

      if(overlap>0) 
      { 

       String yRID = val.getRID(); 

       String RIDpair = RID+" "+yRID; 

      jaccard = unionIntersection(hmap, RIDpair); 

      if(jaccard>0.8) 
       { 
        cms = new CustomMapSimilarity(time+" "+RIDpair, String.valueOf(jaccard)); 
        alsim.add(cms); 
       } 

      } 

     } 

     return alsim; 

    } 


    public float unionIntersection(HashMap<String,String> hmap, String RIDpair) 
    { 


      StringTokenizer st = new StringTokenizer(RIDpair); 

      String xRID = st.nextToken(); 

      String yRID = st.nextToken(); 

      String xdata = hmap.get(xRID); 

      String ydata = hmap.get(yRID); 


      int total_union = 0; 

      int xval = 0; 
      int yval = 0; 
      int part_union = 0; 

      int total_intersect = 0; 

     // System.out.println("xdata:------*************>"+xdata); 

      StringTokenizer xtokenizer = new StringTokenizer(xdata,","); 
      StringTokenizer ytokenizer = new StringTokenizer(ydata,","); 
     // String[] xpart = xdata.split(","); 
     // String[] ypart = ydata.split(","); 

      xtokenizer.nextToken(); 
      ytokenizer.nextToken(); 

      String datax = xtokenizer.nextToken(); 
      String datay = ytokenizer.nextToken(); 


      HashMap<String,Integer> x = new HashMap<String, Integer>(); 
      HashMap<String,Integer> y = new HashMap<String, Integer>(); 


      String [] xparts; 

       xparts = datax.toString().split("\\|"); 


       String [] yparts; 

       yparts = datay.toString().split("\\|"); 


       for(int i = 0; i<xparts.length-1; i++) 
       { 
        int part_size = Integer.parseInt(xparts[i+1]); 
        x.put(xparts[i], part_size); 

        i++; 
       } 

       for(int i = 0; i<yparts.length-1; i++) 
       { 
        int part_size = Integer.parseInt(yparts[i+1]); 
        y.put(xparts[i], part_size); 

        i++; 
       } 


      Set<String> xset = x.keySet(); 
      Set<String> yset = y.keySet(); 

      for(String elm:xset) 
      { 

       yval = 0; 

       xval = (Integer)x.get(elm); 

       part_union = 0; 
       int part_intersect = 0; 
       if(yset.contains(elm)){ 

        yval = (Integer) y.get(elm); 

       if(xval>yval) 
       { 
        part_union = xval; 
        part_intersect = yval; 
       } 
       else 
       { 
        part_union = yval; 
        part_intersect = xval; 
       } 
       total_intersect = total_intersect+part_intersect; 
       } 
       else 
       { 
        part_union = xval; 
       } 

       total_union = total_union+part_union; 


      } 


      for(String elm: yset) 
      { 
       part_union = 0; 

       if(!xset.contains(elm)) 
       { 
        part_union = (Integer) y.get(elm); 
        total_union = total_union+part_union; 
       } 

      } 

      float jaccard = (float)total_intersect/total_union; 

     return jaccard; 

    } 

} 
+0

आप अपने कम करने कोड पोस्ट कर सकते हैं? –

+0

मैंने कोड जोड़ा है। क्या आप मुझे सुझाव दे सकते हैं, अगर मैं इसे अधिक CPU कुशल बनाने के लिए कुछ बदलना चाहता हूं, और इसी तरह। –

उत्तर

10

टाइमआउट का कारण हैडॉप ढांचे पर प्रगति की रिपोर्ट किए बिना आपके reducer में एक लंबे समय से चलने वाला गणना हो सकता है। यह अलग दृष्टिकोण का उपयोग कर हल किया जा सकता:

मैं mapred-site.xml में टाइमआउट बढ़ाने से:

<property> 
    <name>mapred.task.timeout</name> 
    <value>1200000</value> 
</property> 

डिफ़ॉल्ट 600000 ms = 600 seconds है।

II। प्रगति रिपोर्ट करना Reducer example in javadoc में के रूप में हर एक्स रिकॉर्ड:

public void reduce(K key, Iterator<V> values, 
          OutputCollector<K, V> output, 
          Reporter reporter) throws IOException { 
    // report progress 
    if ((noValues%10) == 0) { 
    reporter.progress(); 
    } 

    // ... 
} 

वैकल्पिक रूप से आप एक कस्टम काउंटर के रूप में example में भी वृद्धि कर सकते हैं:

reporter.incrCounter(NUM_RECORDS, 1); 
+0

हाय, उत्तर के लिए धन्यवाद! मैंने ऊपर अपना कम कोड चिपकाया है। मेरी कम कक्षा में, मुख्य गणना के बाद शुरू होता है, संपूर्ण low_value_list पढ़ी गई है। इस मामले में, यह लूप, जबकि प्रगति की रिपोर्ट कैसे करें, के दौरान कमी_value_list के बाहर मुख्य गणना में कार्यक्रम को बंधे हुए हैं? इसके अलावा, क्या आप उपरोक्त चिपकाए गए कोड को करने के किसी भी CPU कुशल तरीके का सुझाव दे सकते हैं? प्रारंभ में, मैंने हैशैप्स का उपयोग किया था, जिसने अधिक CPU दक्षता की पेशकश की, लेकिन स्मृति समस्याओं के कारण इसे हटा दिया। –

+0

मुझे लगता है कि सबसे लंबा और अधिकांश सीपीयू-गहन गणना 'समानता गणना)() 'में की जाती है। आपको इस विधि में प्रगति की रिपोर्ट करनी चाहिए। आपको उचित वर्गों के साथ सभी स्ट्रिंग टोकनिंग को बदलने पर भी विचार करना चाहिए, ताकि स्ट्रिंग पार्सिंग और टोकनिंग केवल एक बार किया जा सके। यह आपके एल्गोरिदम को बेहतर बना सकता है। – harpun

+0

एक और दृष्टिकोण एल्गोरिदम को फिर से लिखना होगा, ताकि ओवरलैपिंग की कुछ गणना मैपर्स में की जा सके। समानांतर गणना उम्मीद है कि एल्गोरिदम को तेज कर देगा। हालांकि यह sth है कि आपको अपने लिए पता लगाना होगा और यह सत्यापित करना होगा कि दृष्टिकोण उस विशिष्ट एल्गोरिदम के लिए मान्य है जिसे आप कार्यान्वित करना चाहते हैं। – harpun

2

यह है कि आप जावा के ढेर अंतरिक्ष के सभी या भस्म हो सकता है जीसी बहुत जल्दी-जल्दी हो रहा है महारत हासिल करने के स्थिति की रिपोर्ट करने के लिए कम करने के लिए कोई मौका नहीं दे रही है और इसलिए मार दिया जाता है संभव है।

एक और संभावना यह है कि रेड्यूसर में से एक बहुत ही खराब डेटा प्राप्त कर रहा है, यानी एक विशेष छुटकारा पाने के लिए, बहुत सारे रिकॉर्ड हैं।

कोशिश निम्नलिखित config सेट करके अपने जावा ढेर को बढ़ाने के लिए: mapred.child.java.opts

-Xmx2048m

इसके अलावा करने के लिए, कोशिश करते हैं और एक कम करने के लिए निम्न config की स्थापना द्वारा समानांतर reducers की संख्या को कम वर्तमान में इसके मूल्य से मूल्य (डिफ़ॉल्ट मान 2 है):

mapred.tasktracker.reduce.tasks.maximum

+0

उत्तर के लिए धन्यवाद। यह बहुत उपयोगी था। मैंने कम कोड को चिपकाया है, क्या आप इसे लागू करने के अधिक कुशल तरीकों का सुझाव दे सकते हैं? –

संबंधित मुद्दे

 संबंधित मुद्दे