मेरे पास एक पाइस्पर्क एप्लिकेशन है जिसे लगभग 5 जीबी संकुचित डेटा (स्ट्रिंग्स) को विस्तारित करना है। मैं 12 कोर (24 धागे) और 72 जीबी रैम पर एक छोटा सर्वर का उपयोग कर रहा हूं। मेरे पायस्पार्क कार्यक्रम में केवल 2 मैप ऑपरेशंस होते हैं, 3 बहुत बड़े रेगेक्स (3 जीबी पहले से संकलित) द्वारा मदद की जाती है और pickle
के साथ लोड की जाती है। स्पार्क एक ही मशीन पर कार्यकर्ता और मास्टर के साथ स्टैंडअलोन मोड में काम कर रहा है।पर्यावरण की कितनी प्रतियां चमकती हैं?
मेरा प्रश्न है: स्पार्क प्रत्येक निष्पादक कोर के लिए प्रत्येक चर को दोहराता है? क्योंकि यह उपलब्ध सभी मेमोरी का उपयोग करता है और फिर बहुत से स्वैप स्पेस का उपयोग करता है। या शायद यह रैम में सभी विभाजन लोड करता है? आरडीडी में लगभग 10 लाख स्ट्रिंग हैं जिन्हें 3 रेगेक्स द्वारा खोजा जाना है। आरडीडी लगभग 1000 विभाजन की गणना करता है। मुझे इस कार्य को पूरा करने में परेशानी है क्योंकि कुछ मिनटों के बाद मेमोरी भर जाती है और स्वैप स्पेस का उपयोग करके स्पार्क शुरू हो जाता है। मैंने देखा है कि रेगेक्स के बिना स्थिति एक जैसी है।
यह मेरा कोड है, यह चहचहाना की ट्वीट के सभी बेकार क्षेत्रों को हटा देता है और विशेष रूप से शब्दों के लिए ट्वीट के ग्रंथों और विवरण स्कैन: (
import json
import re
import twitter_util as twu
import pickle
from pyspark import SparkContext
sc = SparkContext()
prefix = '/home/lucadiliello'
source = prefix + '/data/tweets'
dest = prefix + '/data/complete_tweets'
#Regex's path
companies_names_regex = prefix + '/data/comp_names_regex'
companies_names_dict = prefix + '/data/comp_names_dict'
companies_names_dict_to_legal = prefix + '/data/comp_names_dict_to_legal'
#Loading the regex's
comp_regex = pickle.load(open(companies_names_regex))
comp_dict = pickle.load(open(companies_names_dict))
comp_dict_legal = pickle.load(open(companies_names_dict_to_legal))
#Loading the RDD from textfile
tx = sc.textFile(source).map(lambda a: json.loads(a))
def get_device(input_text):
output_text = re.sub('<[^>]*>', '', input_text)
return output_text
def filter_data(a):
res = {}
try:
res['mentions'] = a['entities']['user_mentions']
res['hashtags'] = a['entities']['hashtags']
res['created_at'] = a['created_at']
res['id'] = a['id']
res['lang'] = a['lang']
if 'place' in a and a['place'] is not None:
res['place'] = {}
res['place']['country_code'] = a['place']['country_code']
res['place']['place_type'] = a['place']['place_type']
res['place']['name'] = a['place']['name']
res['place']['full_name'] = a['place']['full_name']
res['source'] = get_device(a['source'])
res['text'] = a['text']
res['timestamp_ms'] = a['timestamp_ms']
res['user'] = {}
res['user']['created_at'] = a['user']['created_at']
res['user']['description'] = a['user']['description']
res['user']['followers_count'] = a['user']['followers_count']
res['user']['friends_count'] = a['user']['friends_count']
res['user']['screen_name'] = a['user']['screen_name']
res['user']['lang'] = a['user']['lang']
res['user']['name'] = a['user']['name']
res['user']['location'] = a['user']['location']
res['user']['statuses_count'] = a['user']['statuses_count']
res['user']['verified'] = a['user']['verified']
res['user']['url'] = a['user']['url']
except KeyError:
return []
return [res]
results = tx.flatMap(filter_data)
def setting_tweet(tweet):
text = tweet['text'] if tweet['text'] is not None else ''
descr = tweet['user']['description'] if tweet['user']['description'] is not None else ''
del tweet['text']
del tweet['user']['description']
tweet['text'] = {}
tweet['user']['description'] = {}
del tweet['mentions']
#tweet
tweet['text']['original_text'] = text
tweet['text']['mentions'] = twu.find_retweet(text)
tweet['text']['links'] = []
for j in twu.find_links(text):
tmp = {}
try:
tmp['host'] = twu.get_host(j)
tmp['link'] = j
tweet['text']['links'].append(tmp)
except ValueError:
pass
tweet['text']['companies'] = []
for x in comp_regex.findall(text.lower()):
tmp = {}
tmp['id'] = comp_dict[x.lower()]
tmp['name'] = x
tmp['legalName'] = comp_dict_legal[x.lower()]
tweet['text']['companies'].append(tmp)
# descr
tweet['user']['description']['original_text'] = descr
tweet['user']['description']['mentions'] = twu.find_retweet(descr)
tweet['user']['description']['links'] = []
for j in twu.find_links(descr):
tmp = {}
try:
tmp['host'] = twu.get_host(j)
tmp['link'] = j
tweet['user']['description']['links'].append(tmp)
except ValueError:
pass
tweet['user']['description']['companies'] = []
for x in comp_regex.findall(descr.lower()):
tmp = {}
tmp['id'] = comp_dict[x.lower()]
tmp['name'] = x
tmp['legalName'] = comp_dict_legal[x.lower()]
tweet['user']['description']['companies'].append(tmp)
return tweet
res = results.map(setting_tweet)
res.map(lambda a: json.dumps(a)).saveAsTextFile(dest, compressionCodecClass="org.apache.hadoop.io.compress.BZip2Codec")
अद्यतन लगभग 1 घंटे के बाद, स्मृति 72gb) पूरी तरह से पूर्ण और स्वैप (72 जीबी) भी है। प्रसारण का उपयोग करना मेरे मामले में एक समाधान नहीं है।
अद्यतन 2 अचार के साथ 3 चर लोड हो रहा है बिना, यह 144GB के बजाय, रैम 10GB तक का उपयोग कर समस्याओं के बिना समाप्त हो जाती है! (72gb रैम + 72Gb स्वैप)
कोड अच्छा होगा लेकिन इसके बिना आपके प्रश्न का उत्तर देने के लिए - स्पार्क स्थानीय चर के कई प्रतियों का उपयोग करता है क्योंकि आप पाइथन श्रमिकों को असाइन किए गए कई थ्रेड (कोर) के रूप में उपयोग करते हैं। इसके लिए कुछ कामकाज हैं, लेकिन आमतौर पर काफी विस्तृत होते हैं। – zero323
कोड को देखते हुए आपको ड्राइवर प्रतिलिपि के लिए +1 जोड़ना चाहिए, ड्राइवर पर मसालेदार संस्करण के लिए +1 और प्रत्येक निष्पादक JVM (अधिक या कम) के लिए +1। आप सीधे निष्पादकों से प्रसारण या डेटा लोड करके इसका थोड़ा सा सुधार कर सकते हैं। – zero323
क्या प्रत्येक निष्पादक प्रक्रिया के लिए स्मृति में समान रेगेक्स उदाहरण का उपयोग करने के लिए कोई चाल नहीं है? अगर मुझे नहीं लगता कि मैं निष्पादकों की संख्या कम कर दूंगा ..... –