कोड:Pyspark Column.isin() एक बड़े सेट
views = sdf \
.where(sdf['PRODUCT_ID'].isin(PRODUCTS)) \
.rdd \
.groupBy(lambda x: x['SESSION_ID']) \
.toLocalIterator()
for sess_id, rows in views:
# do something
PRODUCTS
एक set
है। यह बड़ी है, लगभग 10000 आइटम।
कोड के साथ विफल:
--> 9 for sess_id, rows in views:
/usr/local/spark/python/pyspark/rdd.py in _load_from_socket(port, serializer)
--> 142 for item in serializer.load_stream(rf):
/usr/local/spark/python/pyspark/serializers.py in load_stream(self, stream)
--> 139 yield self._read_with_length(stream)
/usr/local/spark/python/pyspark/serializers.py in _read_with_length(self, stream)
--> 156 length = read_int(stream)
/usr/local/spark/python/pyspark/serializers.py in read_int(stream)
--> 543 length = stream.read(4)
/opt/conda/lib/python3.5/socket.py in readinto(self, b)
574 try:
--> 575 return self._sock.recv_into(b)
576 except timeout:
577 self._timeout_occurred = True
timeout: timed out
लेकिन जब मैं बनाने PRODUCTS
सेट छोटे सब कुछ ठीक है। मैंने स्पार्क कॉन्फ़िगरेशन में कुछ टाइमआउट मानों को बदलने की कोशिश की। यह मदद नहीं की। ऐसे दुर्घटनाओं से कैसे बचें?
अद्यतन
PRODUCTS = sdf.sort(['TIMESTAMP']).select('PRODUCT_ID').limit(10000).drop_duplicates()
views = sdf \
.join(PRODUCTS, 'PRODUCT_ID', 'inner') \
.rdd \
.groupBy(lambda x: x['SESSION_ID']) \
.toLocalIterator()
for sess_id, rows in views:
# do ...
अब PRODUCTS
एक dataframe है। और मैं join
का उपयोग करता हूं।
Py4JJavaError: An error occurred while calling o289.javaToPython.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
....
यह त्रुटि केवल एक बार दिखाई:
views = sdf \
.join(PRODUCTS, 'PRODUCT_ID', 'inner') \
.rdd \
.groupBy(lambda x: x['SESSION_ID'])
views.cache()
for sess_id, rows in views.toLocalIterator():
pass
कुछ समय के बाद एक बहुत लंबे त्रुटि मिली: एक ही त्रुटि ..
अद्यतन 2
इस समाधान की कोशिश कर रहा समझे ! अब मुझे एक ही टाइमआउट अपवाद मिलते हैं!
ISIN बेकार जटिलता बुद्धिमान है बेहतर प्रदर्शन एक में शामिल होने या यहाँ तक कि कुछ तो बाकी सही गलत तत्व प्राप्त करने की कोशिश की तरह – eliasah
आप एक cou प्रदर्शन कर सकते हैं 'toLocalIterator' के बजाय nt? मुझे यकीन नहीं है कि आप इसके साथ क्या हासिल करने की कोशिश कर रहे हैं। – eliasah
जो मैं करने की कोशिश कर रहा हूं वह 'SESSION_ID' कॉलम द्वारा' sdf' डेटा फ्रेम को समूहित करना और इन समूहों पर पुनरावृत्ति करना है। लेकिन मुझे केवल उन पंक्तियों को चुनने की आवश्यकता है जिनके लिए 'PRODUCT_ID' पूर्वनिर्धारित सेट में है। और 'toLocalIterator' के बजाय 'गिनती' सामान्य रूप से काम करता है। – Leonid