2015-10-13 19 views
18

के साथ प्राथमिक कुंजी अपाचे स्पार्क और पोस्टग्रेएसक्यूएल के साथ एक जेडीबीसी कनेक्शन है और मैं अपने डेटाबेस में कुछ डेटा डालना चाहता हूं। जब मैं append मोड का उपयोग करता हूं तो मुझे प्रत्येक DataFrame.Row के लिए id निर्दिष्ट करने की आवश्यकता है। स्पार्क के लिए प्राथमिक कुंजी बनाने का कोई तरीका है?अपाचे स्पार्क

+0

क्या आपके पास कोई विशेष आवश्यकता है? डेटा प्रकार, लगातार मूल्य, कुछ और? – zero323

+0

नहीं, बस पुराने अच्छे अद्वितीय पूर्णांक – Nhor

उत्तर

30

स्काला:

तो आप सभी की जरूरत अद्वितीय संख्या है आप zipWithUniqueId का उपयोग करें और DataFrame पुन: कर सकते हैं। सबसे पहले कुछ आयात और डमी डेटा:

import sqlContext.implicits._ 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{StructType, StructField, LongType} 

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar") 

आगे उपयोग के लिए निकालें स्कीमा:

val schema = df.schema 

आईडी फ़ील्ड जोड़ें:

val dfWithPK = sqlContext.createDataFrame(
    rows, StructType(StructField("id", LongType, false) +: schema.fields)) 

ही:

val rows = df.rdd.zipWithUniqueId.map{ 
    case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)} 

DataFrame बनाएं में बात अजगर:

from pyspark.sql import Row 
from pyspark.sql.types import StructField, StructType, LongType 

row = Row("foo", "bar") 
row_with_index = Row(*["id"] + df.columns) 

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF() 

def make_row(columns): 
    def _make_row(row, uid): 
     row_dict = row.asDict() 
     return row_with_index(*[uid] + [row_dict.get(c) for c in columns]) 
    return _make_row 

f = make_row(df.columns) 

df_with_pk = (df.rdd 
    .zipWithUniqueId() 
    .map(lambda x: f(*x)) 
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields))) 

आप लगातार संख्या चाहें, तो अपने zipWithIndex साथ zipWithUniqueId जगह ले सकता है, लेकिन यह थोड़ा और अधिक महंगा है।

DataFrame एपीआई साथ सीधे:

(यूनिवर्सल स्काला, अजगर, जावा, काफी एक ही वाक्य रचना के साथ आर)

पहले मैं monotonicallyIncreasingId समारोह नहीं छूटा है जो के रूप में ठीक काम करना चाहिए लंबे समय तक लगातार के रूप में आप की आवश्यकता नहीं है संख्या:

import org.apache.spark.sql.functions.monotonicallyIncreasingId 

df.withColumn("id", monotonicallyIncreasingId).show() 
// +---+----+-----------+ 
// |foo| bar|   id| 
// +---+----+-----------+ 
// | a|-1.0|17179869184| 
// | b|-2.0|42949672960| 
// | c|-3.0|60129542144| 
// +---+----+-----------+ 

जबकि उपयोगी monotonicallyIncreasingId गैर निर्धारक है। न केवल निष्पादन निष्पादन से निष्पादन से भिन्न हो सकता है लेकिन बाद के संचालन में फ़िल्टर होने पर अतिरिक्त चालों का उपयोग पंक्तियों की पहचान के लिए नहीं किया जा सकता है।

नोट:

from pyspark.sql.window import Window 
from pyspark.sql.functions import rowNumber 

w = Window().orderBy() 
df.withColumn("id", rowNumber().over(w)).show() 

दुर्भाग्य:

यह भी rowNumber खिड़की समारोह का उपयोग करना संभव है

चेतावनी विंडो: नहीं विभाजन खिड़की ऑपरेशन के लिए परिभाषित किया जाता है! सभी डेटा को एक विभाजन में स्थानांतरित करना, इससे गंभीर प्रदर्शन गिरावट हो सकती है।

तब तक जब तक आपके पास अपने डेटा को विभाजित करने का प्राकृतिक तरीका न हो और यह सुनिश्चित न हो कि विशिष्टता इस समय विशेष रूप से उपयोगी नहीं है।

+0

क्या यह केवल आर के साथ काम करेगा? मुझे पता है कि आपने उपरोक्त स्कैला का उपयोग किया है, लेकिन मैं इस 'zipWithUniqueId' के बारे में सब कुछ पा सकता हूं केवल स्पार्कआर डॉक्स – Nhor

+0

में यह वास्तव में स्कैला है। क्या आपको पाइथन समाधान की आवश्यकता है? सादा एसक्यूएल? – zero323

+0

नहीं, मैं आपका कोड समझ सकता हूं, मैं सिर्फ यह पूछ रहा था कि 'zipWithUniqueId' के बारे में pyspark दस्तावेज़ों में कुछ भी है या नहीं, लेकिन ऐसा लगता है कि मैं सिर्फ आलसी था, क्योंकि आखिर में मैंने इसे पाया, आपके समाधान के लिए बहुत बहुत धन्यवाद! – Nhor

7
from pyspark.sql.functions import monotonically_increasing_id 

df.withColumn("id", monotonically_increasing_id()).show() 

ध्यान दें कि df.withColumn के 2 तर्क monotonically_increasing_id है() monotonically_increasing_id नहीं।

3

मुझे निम्न समाधान को ऐसे मामले के लिए अपेक्षाकृत सरल पाया गया है जहां zipWithIndex() वांछित व्यवहार है, यानी उन निरंतर पूर्णांकों के लिए।

इस मामले में, हम पाइसपार्क का उपयोग कर रहे हैं और मूल पंक्ति वस्तु को एक नए शब्दकोश में मैप करने के लिए शब्दकोश समझ पर निर्भर करते हैं जो अद्वितीय इंडेक्स समेत एक नई स्कीमा फिट बैठता है।

# read the initial dataframe without index 
dfNoIndex = sqlContext.read.parquet(dataframePath) 
# Need to zip together with a unique integer 

# First create a new schema with uuid field appended 
newSchema = StructType([StructField("uuid", IntegerType(), False)] 
         + dfNoIndex.schema.fields) 
# zip with the index, map it to a dictionary which includes new field 
df = dfNoIndex.rdd.zipWithIndex()\ 
         .map(lambda (row, id): {k:v 
               for k, v 
               in row.asDict().items() + [("uuid", id)]})\ 
         .toDF(newSchema) 
संबंधित मुद्दे