2016-08-17 19 views
5

कोड:Pyspark Column.isin() एक बड़े सेट

views = sdf \ 
    .where(sdf['PRODUCT_ID'].isin(PRODUCTS)) \ 
    .rdd \ 
    .groupBy(lambda x: x['SESSION_ID']) \ 
    .toLocalIterator() 

for sess_id, rows in views: 
    # do something 

PRODUCTS एक set है। यह बड़ी है, लगभग 10000 आइटम।

कोड के साथ विफल:

--> 9 for sess_id, rows in views: 

/usr/local/spark/python/pyspark/rdd.py in _load_from_socket(port, serializer) 
--> 142   for item in serializer.load_stream(rf): 

/usr/local/spark/python/pyspark/serializers.py in load_stream(self, stream) 
--> 139     yield self._read_with_length(stream) 

/usr/local/spark/python/pyspark/serializers.py in _read_with_length(self, stream) 
--> 156   length = read_int(stream) 

/usr/local/spark/python/pyspark/serializers.py in read_int(stream) 
--> 543  length = stream.read(4) 

/opt/conda/lib/python3.5/socket.py in readinto(self, b) 
    574    try: 
--> 575     return self._sock.recv_into(b) 
    576    except timeout: 
    577     self._timeout_occurred = True 

timeout: timed out 

लेकिन जब मैं बनाने PRODUCTS सेट छोटे सब कुछ ठीक है। मैंने स्पार्क कॉन्फ़िगरेशन में कुछ टाइमआउट मानों को बदलने की कोशिश की। यह मदद नहीं की। ऐसे दुर्घटनाओं से कैसे बचें?

अद्यतन

PRODUCTS = sdf.sort(['TIMESTAMP']).select('PRODUCT_ID').limit(10000).drop_duplicates() 

views = sdf \ 
    .join(PRODUCTS, 'PRODUCT_ID', 'inner') \ 
    .rdd \ 
    .groupBy(lambda x: x['SESSION_ID']) \ 
    .toLocalIterator() 

for sess_id, rows in views: 
    # do ... 

अब PRODUCTS एक dataframe है। और मैं join का उपयोग करता हूं।

Py4JJavaError: An error occurred while calling o289.javaToPython. 
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) 
.... 

यह त्रुटि केवल एक बार दिखाई:

views = sdf \ 
    .join(PRODUCTS, 'PRODUCT_ID', 'inner') \ 
    .rdd \ 
    .groupBy(lambda x: x['SESSION_ID']) 
views.cache() 

for sess_id, rows in views.toLocalIterator(): 
    pass 

कुछ समय के बाद एक बहुत लंबे त्रुटि मिली: एक ही त्रुटि ..

अद्यतन 2

इस समाधान की कोशिश कर रहा समझे ! अब मुझे एक ही टाइमआउट अपवाद मिलते हैं!

+1

ISIN बेकार जटिलता बुद्धिमान है बेहतर प्रदर्शन एक में शामिल होने या यहाँ तक कि कुछ तो बाकी सही गलत तत्व प्राप्त करने की कोशिश की तरह – eliasah

+0

आप एक cou प्रदर्शन कर सकते हैं 'toLocalIterator' के बजाय nt? मुझे यकीन नहीं है कि आप इसके साथ क्या हासिल करने की कोशिश कर रहे हैं। – eliasah

+0

जो मैं करने की कोशिश कर रहा हूं वह 'SESSION_ID' कॉलम द्वारा' sdf' डेटा फ्रेम को समूहित करना और इन समूहों पर पुनरावृत्ति करना है। लेकिन मुझे केवल उन पंक्तियों को चुनने की आवश्यकता है जिनके लिए 'PRODUCT_ID' पूर्वनिर्धारित सेट में है। और 'toLocalIterator' के बजाय 'गिनती' सामान्य रूप से काम करता है। – Leonid

उत्तर

0

जैसा कि @eliasah ने अपनी टिप्पणी में कहा था। आपको अपने PRODUCTS तालिका में जो नहीं है उसे बाहर करने के लिए आपको डेटाफ्रेम दोनों में शामिल होने का प्रयास करना चाहिए।

views = sdf \ 
    .join(PRODUCTS) \ 
    .where(sdf['PRODUCT_ID']) \ 
    .rdd \ 
    .groupBy(lambda x: x['SESSION_ID']) \ 
    .toLocalIterator() 
+0

एसओ में आपका स्वागत है! लेकिन कृपया अपने कोड में कुछ स्पष्टीकरण लिखने और इसे स्वरूपित करने के कुछ प्रयास करें। – eliasah

1

मेरा मानना ​​है कि इस pyspark 2.0.2 में toLocalIterator() के कार्यान्वयन में मूल रूप से एक बग के कारण है। आप यहां और पढ़ सकते हैं: [SPARK-18281][SQL][PySpark] Remove timeout for reading data through socket for local iterator

ऐसा लगता है कि यह फिक्स 2.0.2 के बाद और 2.1.x रिलीज़ के बाद अगले अपडेट में उपलब्ध होगा। आप इसे अपने आप को अस्थायी रूप से ठीक करने के लिए चाहते हैं, आप ऊपर मुद्दे से परिवर्तनों को लागू कर सकते हैं:

:

वास्तविक चिंगारी क्लस्टर पर rdd.py की लाइन 138 के आसपास इस बदलें (, यह आप rdd.pypyspark.zip अंदर अद्यतन करने की जरूरत लगती है

try: 
    rf = sock.makefile("rb", 65536) 
    for item in serializer.load_stream(rf): 
     yield item 
finally: 
    sock.close() 
इस के साथ

:।।

एक बड़े डेटासेट के लिए
sock.settimeout(None) # << this is they key line that disables timeout after the initial connection 
return serializer.load_stream(sock.makefile("rb", 65536)) 
+1

ऐसा लगता है कि अद्यतन जारी किया गया है। मुझे एक ही समस्या थी और 2.1.0 को अपडेट करने के बाद यह चला गया। – amustafa