2016-02-29 7 views
20

यह सवाल नया नहीं है, हालांकि मुझे स्पार्क में आश्चर्यजनक व्यवहार मिल रहा है। मुझे डेटाफ्रेम पर पंक्ति आईडी का एक स्तंभ जोड़ने की आवश्यकता है। मैंने डेटाफ्रेम विधि monotonically_increasing_id() का उपयोग किया और यह मुझे एक अतिरिक्त कॉल यूनिक्स पंक्ति आईडी प्रदान करता है (जो कि रास्ते से लगातार नहीं हैं, लेकिन अद्वितीय हैं)।मैं स्पार्क डेटाफ्रेम पर पंक्ति आईडी के लगातार कॉलम को कैसे जोड़ूं?

मेरी समस्या यह है कि जब मैं डेटाफ्रेम को फ़िल्टर करता हूं तो परिणामी डेटाफ्रेम में पंक्ति आईडी को फिर से असाइन किया जाता है। दो डेटाफ्रेम नीचे दिखाए गए हैं।

  • पहले एक पंक्ति आईडी जोड़ा साथ प्रारंभिक DataFrame इस प्रकार है:

    df.withColumn("rowId", monotonically_increasing_id()) 
    
  • दूसरा DataFrame एक df.filter(col("P")) के माध्यम से col पी पर छानने के बाद प्राप्त होता है।

समस्या ग्राहकआईडी 169 है, जो प्रारंभिक DataFrame में 5 था के लिए rowId द्वारा सचित्र है, लेकिन छानने कि rowId (5) custmId 773 को फिर से सौंपा गया था जब ग्राहकआईडी 169 बाहर फ़िल्टर किया गया था के बाद! मुझे नहीं पता कि यह डिफ़ॉल्ट व्यवहार क्यों है।

मैं rowIds "चिपचिपा" होना चाहता हूं; अगर मैं डेटाफ्रेम से पंक्तियां हटाता हूं, तो मैं नहीं चाहता कि उनकी आईडी "पुनः उपयोग" हो, मैं चाहता हूं कि वे अपनी पंक्तियों के साथ भी चले जाएं। क्या यह करना मुमकिन है? मुझे monotonically_increasing_id विधि से इस व्यवहार का अनुरोध करने के लिए कोई झंडे नहीं दिख रहे हैं।

+---------+--------------------+-------+ 
| custId | features| P |rowId| 
+---------+--------------------+-------+ 
|806  |[50,5074,...| true| 0| 
|832  |[45,120,1...| true| 1| 
|216  |[6691,272...| true| 2| 
|926  |[120,1788...| true| 3| 
|875  |[54,120,1...| true| 4| 
|169  |[19406,21...| false| 5| 

after filtering on P: 
+---------+--------------------+-------+ 
| custId| features| P |rowId| 
+---------+--------------------+-------+ 
|  806|[50,5074,...| true| 0| 
|  832|[45,120,1...| true| 1| 
|  216|[6691,272...| true| 2| 
|  926|[120,1788...| true| 3| 
|  875|[54,120,1...| true| 4| 
|  773|[3136,317...| true| 5| 
+1

क्या आप दो उदाहरण डेटाफ्रेम बनाने के लिए अपना पूरा कोड साझा कर सकते हैं? इसके लायक होने के लिए, यह संभवतः SQL क्वेरी ऑप्टिमाइज़ेशन के कारण होता है, जिसमें "स्वतंत्र" मानचित्र चरणों को पुन: व्यवस्थित किया जा सकता है। –

+0

हैमेल, मेरे द्वारा पोस्ट किए गए वास्तव में कोई अन्य परिवर्तन या कार्रवाई नहीं है। दिखाए गए डेटा फ्रेम df.show() का परिणाम हैं। आप इस व्यवहार को बहुत आसानी से पुनर्निर्मित कर सकते हैं, डेटा फ्रेम बना सकते हैं और ऊपर के रूप में एक पंक्ति आईडी कॉलम जोड़ सकते हैं, फिर इसमें एक यादृच्छिक बूलियन कॉलम जोड़ें। फिर उस कॉलम पर फ़िल्टर करें और देखें कि आपके द्वारा वर्णित रूप से बढ़ने वाली पंक्ति आईडी को "पुन: उपयोग" के रूप में वर्णित किया गया है। – Kai

+0

@ काई मैं वास्तव में जोड़ दूंगा कि इसे पुन: उत्पन्न करने का सबसे आसान तरीका केवल एक ही विभाजन का उपयोग करना है। – zero323

उत्तर

11

स्पार्क 2,0

  • यह समस्या SPARK-14241 साथ स्पार्क 2.0 में हल किया गया है है।

  • एक और इसी तरह के मुद्दे के साथ SPARK-14393

स्पार्क 1.x

समस्या स्पार्क 2.1 में हल किया गया है आप अनुभव नहीं बल्कि सूक्ष्म है, लेकिन एक साधारण तथ्य यह है करने के लिए कम किया जा सकता monotonically_increasing_id एक है बेहद बदसूरत समारोह। यह स्पष्ट रूप से शुद्ध नहीं है और इसका मूल्य उस चीज़ पर निर्भर करता है जो आपके नियंत्रण से पूरी तरह से बाहर है।

यह किसी भी पैरामीटर को ऑप्टिमाइज़र परिप्रेक्ष्य से नहीं लेता है, इससे कोई फर्क नहीं पड़ता जब इसे बुलाया जाता है और अन्य सभी परिचालनों के बाद धक्का दिया जा सकता है। इसलिए आप जो व्यवहार देखते हैं।

यदि आप कोड को देखते हैं तो आपको पता चलेगा कि यह के साथ MonotonicallyIncreasingID अभिव्यक्ति को विस्तारित करके स्पष्ट रूप से चिह्नित किया गया है।

मुझे नहीं लगता कि कोई सुरुचिपूर्ण समाधान है लेकिन एक तरह से आप इसे संभालने का तरीका फ़िल्टर किए गए मूल्य पर कृत्रिम निर्भरता जोड़ना है।इस तरह की एक यूडीएफ के साथ उदाहरण के लिए:

from pyspark.sql.types import LongType 
from pyspark.sql.functions import udf 

bound = udf(lambda _, v: v, LongType()) 

(df 
    .withColumn("rn", monotonically_increasing_id()) 
    # Due to nondeterministic behavior it has to be a separate step 
    .withColumn("rn", bound("P", "rn")) 
    .where("P")) 

सामान्य तौर पर यह एक RDD पर zipWithIndex का उपयोग कर सूचकांकों में जोड़ने के लिए और फिर इसे वापस एक DataFrame में बदलने का क्लीनर हो सकता है।


* ऊपर दिखाए गए वर्कअराउंड नहीं रह गया है स्पार्क 2.x जहां अजगर UDFs कार्य योजना लागू करके अनुकूलन के अधीन हैं में एक वैध समाधान (और न ही आवश्यक) है।

3

मैं इसे पुन: उत्पन्न नहीं कर सका। मैं स्पार्क 2.0 का उपयोग कर रहा हूं हालांकि शायद व्यवहार बदल गया है, या मैं आपके जैसा ही नहीं कर रहा हूं।

val df = Seq(("one", 1,true),("two", 2,false),("three", 3,true),("four", 4,true)) 
.toDF("name", "value","flag") 
.withColumn("rowd", monotonically_increasing_id()) 

df.show 

val df2 = df.filter(col("flag")=== true) 

df2.show 

df: org.apache.spark.sql.DataFrame = [name: string, value: int ... 2 more fields] 
+-----+-----+-----+----+ 
| name|value| flag|rowd| 
+-----+-----+-----+----+ 
| one| 1| true| 0| 
| two| 2|false| 1| 
|three| 3| true| 2| 
| four| 4| true| 3| 
+-----+-----+-----+----+ 
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, value: int ... 2 more fields] 
+-----+-----+----+----+ 
| name|value|flag|rowd| 
+-----+-----+----+----+ 
| one| 1|true| 0| 
|three| 3|true| 2| 
| four| 4|true| 3| 
+-----+-----+----+----+ 
+0

मुझे उपरोक्त कोड – thebluephantom

+0

के साथ कोई समस्या नहीं मिली है ** जावा – Yugerten

+0

org.apache.spark.sql.functions पैकेज में ** monotonically_increasing_id() ** के समतुल्य क्या है जावा एपीआई https: // स्पार्क में उपलब्ध है। apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#monotonicallyIncreasingId-- – Davos

1

monotonically_increasing_id(), आप डिस्क पर dataframe लिखने का प्रयास कर सकता है, और फिर से पढ़ने के स्थानांतरण मूल्यांकन चारों ओर पाने के लिए। फिर आईडी कॉलम अब केवल एक डेटा फ़ील्ड है जिसे पाइपलाइन में किसी बिंदु पर गतिशील रूप से गणना के बजाए पढ़ा जा रहा है। हालांकि यह एक सुंदर बदसूरत समाधान है, लेकिन जब मैंने त्वरित परीक्षण किया तो यह काम करता था।

1

यह मेरे लिए काम किया। एक और पहचान कॉलम बनाया गया और इस्तेमाल किया गया विंडो फ़ंक्शन row_number

import org.apache.spark.sql.functions.{row_number} 
import org.apache.spark.sql.expressions.Window 

val df1: DataFrame = df.withColumn("Id",lit(1)) 

df1 
.select(
..., 
row_number() 
.over(Window 
.partitionBy("Id" 
.orderBy(col("...").desc)) 
) 
.alias("Row_Nbr") 
) 
संबंधित मुद्दे