2017-05-13 26 views
6

मेरे पास एक पाइस्पर्क एप्लिकेशन है जिसे लगभग 5 जीबी संकुचित डेटा (स्ट्रिंग्स) को विस्तारित करना है। मैं 12 कोर (24 धागे) और 72 जीबी रैम पर एक छोटा सर्वर का उपयोग कर रहा हूं। मेरे पायस्पार्क कार्यक्रम में केवल 2 मैप ऑपरेशंस होते हैं, 3 बहुत बड़े रेगेक्स (3 जीबी पहले से संकलित) द्वारा मदद की जाती है और pickle के साथ लोड की जाती है। स्पार्क एक ही मशीन पर कार्यकर्ता और मास्टर के साथ स्टैंडअलोन मोड में काम कर रहा है।पर्यावरण की कितनी प्रतियां चमकती हैं?

मेरा प्रश्न है: स्पार्क प्रत्येक निष्पादक कोर के लिए प्रत्येक चर को दोहराता है? क्योंकि यह उपलब्ध सभी मेमोरी का उपयोग करता है और फिर बहुत से स्वैप स्पेस का उपयोग करता है। या शायद यह रैम में सभी विभाजन लोड करता है? आरडीडी में लगभग 10 लाख स्ट्रिंग हैं जिन्हें 3 रेगेक्स द्वारा खोजा जाना है। आरडीडी लगभग 1000 विभाजन की गणना करता है। मुझे इस कार्य को पूरा करने में परेशानी है क्योंकि कुछ मिनटों के बाद मेमोरी भर जाती है और स्वैप स्पेस का उपयोग करके स्पार्क शुरू हो जाता है। मैंने देखा है कि रेगेक्स के बिना स्थिति एक जैसी है।

यह मेरा कोड है, यह चहचहाना की ट्वीट के सभी बेकार क्षेत्रों को हटा देता है और विशेष रूप से शब्दों के लिए ट्वीट के ग्रंथों और विवरण स्कैन: (

import json 
import re 
import twitter_util as twu 
import pickle 

from pyspark import SparkContext 
sc = SparkContext() 

prefix = '/home/lucadiliello' 

source = prefix + '/data/tweets' 
dest = prefix + '/data/complete_tweets' 

#Regex's path 
companies_names_regex = prefix + '/data/comp_names_regex' 
companies_names_dict = prefix + '/data/comp_names_dict' 
companies_names_dict_to_legal = prefix + '/data/comp_names_dict_to_legal' 

#Loading the regex's 
comp_regex = pickle.load(open(companies_names_regex)) 
comp_dict = pickle.load(open(companies_names_dict)) 
comp_dict_legal = pickle.load(open(companies_names_dict_to_legal)) 

#Loading the RDD from textfile 
tx = sc.textFile(source).map(lambda a: json.loads(a)) 


def get_device(input_text): 
    output_text = re.sub('<[^>]*>', '', input_text) 
    return output_text 

def filter_data(a): 
    res = {} 
    try: 
     res['mentions'] = a['entities']['user_mentions'] 
     res['hashtags'] = a['entities']['hashtags'] 
     res['created_at'] = a['created_at'] 
     res['id'] = a['id'] 

     res['lang'] = a['lang'] 
     if 'place' in a and a['place'] is not None:  
      res['place'] = {} 
      res['place']['country_code'] = a['place']['country_code'] 
      res['place']['place_type'] = a['place']['place_type'] 
      res['place']['name'] = a['place']['name'] 
      res['place']['full_name'] = a['place']['full_name'] 

     res['source'] = get_device(a['source']) 
     res['text'] = a['text'] 
     res['timestamp_ms'] = a['timestamp_ms'] 

     res['user'] = {} 
     res['user']['created_at'] = a['user']['created_at'] 
     res['user']['description'] = a['user']['description'] 
     res['user']['followers_count'] = a['user']['followers_count'] 
     res['user']['friends_count'] = a['user']['friends_count'] 
     res['user']['screen_name'] = a['user']['screen_name'] 
     res['user']['lang'] = a['user']['lang'] 
     res['user']['name'] = a['user']['name'] 
     res['user']['location'] = a['user']['location'] 
     res['user']['statuses_count'] = a['user']['statuses_count'] 
     res['user']['verified'] = a['user']['verified'] 
     res['user']['url'] = a['user']['url'] 
    except KeyError: 
     return [] 

    return [res] 


results = tx.flatMap(filter_data) 


def setting_tweet(tweet): 

    text = tweet['text'] if tweet['text'] is not None else '' 
    descr = tweet['user']['description'] if tweet['user']['description'] is not None else '' 
    del tweet['text'] 
    del tweet['user']['description'] 

    tweet['text'] = {} 
    tweet['user']['description'] = {} 
    del tweet['mentions'] 

    #tweet 
    tweet['text']['original_text'] = text 
    tweet['text']['mentions'] = twu.find_retweet(text) 
    tweet['text']['links'] = [] 
    for j in twu.find_links(text): 
     tmp = {} 
     try: 
      tmp['host'] = twu.get_host(j) 
      tmp['link'] = j 
      tweet['text']['links'].append(tmp) 
     except ValueError: 
      pass 

    tweet['text']['companies'] = [] 
    for x in comp_regex.findall(text.lower()): 
     tmp = {} 
     tmp['id'] = comp_dict[x.lower()] 
     tmp['name'] = x 
     tmp['legalName'] = comp_dict_legal[x.lower()] 
     tweet['text']['companies'].append(tmp) 

    # descr 
    tweet['user']['description']['original_text'] = descr 
    tweet['user']['description']['mentions'] = twu.find_retweet(descr) 
    tweet['user']['description']['links'] = [] 
    for j in twu.find_links(descr): 
     tmp = {} 
     try: 
      tmp['host'] = twu.get_host(j) 
      tmp['link'] = j 
      tweet['user']['description']['links'].append(tmp) 
     except ValueError: 
      pass 

    tweet['user']['description']['companies'] = [] 
    for x in comp_regex.findall(descr.lower()): 
     tmp = {} 
     tmp['id'] = comp_dict[x.lower()] 
     tmp['name'] = x 
     tmp['legalName'] = comp_dict_legal[x.lower()] 
     tweet['user']['description']['companies'].append(tmp) 

    return tweet 


res = results.map(setting_tweet) 

res.map(lambda a: json.dumps(a)).saveAsTextFile(dest, compressionCodecClass="org.apache.hadoop.io.compress.BZip2Codec") 

अद्यतन लगभग 1 घंटे के बाद, स्मृति 72gb) पूरी तरह से पूर्ण और स्वैप (72 जीबी) भी है। प्रसारण का उपयोग करना मेरे मामले में एक समाधान नहीं है।

अद्यतन 2 अचार के साथ 3 चर लोड हो रहा है बिना, यह 144GB के बजाय, रैम 10GB तक का उपयोग कर समस्याओं के बिना समाप्त हो जाती है! (72gb रैम + 72Gb स्वैप)

+1

कोड अच्छा होगा लेकिन इसके बिना आपके प्रश्न का उत्तर देने के लिए - स्पार्क स्थानीय चर के कई प्रतियों का उपयोग करता है क्योंकि आप पाइथन श्रमिकों को असाइन किए गए कई थ्रेड (कोर) के रूप में उपयोग करते हैं। इसके लिए कुछ कामकाज हैं, लेकिन आमतौर पर काफी विस्तृत होते हैं। – zero323

+0

कोड को देखते हुए आपको ड्राइवर प्रतिलिपि के लिए +1 जोड़ना चाहिए, ड्राइवर पर मसालेदार संस्करण के लिए +1 और प्रत्येक निष्पादक JVM (अधिक या कम) के लिए +1। आप सीधे निष्पादकों से प्रसारण या डेटा लोड करके इसका थोड़ा सा सुधार कर सकते हैं। – zero323

+0

क्या प्रत्येक निष्पादक प्रक्रिया के लिए स्मृति में समान रेगेक्स उदाहरण का उपयोग करने के लिए कोई चाल नहीं है? अगर मुझे नहीं लगता कि मैं निष्पादकों की संख्या कम कर दूंगा ..... –

उत्तर

1

मेरा प्रश्न है: चिंगारी प्रत्येक निष्पादक कोर के लिए प्रत्येक चर को दोहराने करता है?

हाँ!

प्रत्येक (स्थानीय) चर के लिए प्रतियों की संख्या पाइथन श्रमिकों को आपके द्वारा निर्दिष्ट थ्रेड की संख्या के बराबर है।


आप समस्या का सवाल है, pickle का उपयोग किए बिना comp_regex, comp_dict और comp_dict_legal लोड करके देखें।

संबंधित मुद्दे