के साथ प्राथमिक कुंजी अपाचे स्पार्क और पोस्टग्रेएसक्यूएल के साथ एक जेडीबीसी कनेक्शन है और मैं अपने डेटाबेस में कुछ डेटा डालना चाहता हूं। जब मैं append
मोड का उपयोग करता हूं तो मुझे प्रत्येक DataFrame.Row
के लिए id
निर्दिष्ट करने की आवश्यकता है। स्पार्क के लिए प्राथमिक कुंजी बनाने का कोई तरीका है?अपाचे स्पार्क
उत्तर
स्काला:
तो आप सभी की जरूरत अद्वितीय संख्या है आप zipWithUniqueId
का उपयोग करें और DataFrame पुन: कर सकते हैं। सबसे पहले कुछ आयात और डमी डेटा:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
आगे उपयोग के लिए निकालें स्कीमा:
val schema = df.schema
आईडी फ़ील्ड जोड़ें:
val dfWithPK = sqlContext.createDataFrame(
rows, StructType(StructField("id", LongType, false) +: schema.fields))
ही:
val rows = df.rdd.zipWithUniqueId.map{
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
DataFrame बनाएं में बात अजगर:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType
row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)
df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
df_with_pk = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
आप लगातार संख्या चाहें, तो अपने zipWithIndex
साथ zipWithUniqueId
जगह ले सकता है, लेकिन यह थोड़ा और अधिक महंगा है।
DataFrame
एपीआई साथ सीधे:
(यूनिवर्सल स्काला, अजगर, जावा, काफी एक ही वाक्य रचना के साथ आर)
पहले मैं monotonicallyIncreasingId
समारोह नहीं छूटा है जो के रूप में ठीक काम करना चाहिए लंबे समय तक लगातार के रूप में आप की आवश्यकता नहीं है संख्या:
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar| id|
// +---+----+-----------+
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
// +---+----+-----------+
जबकि उपयोगी monotonicallyIncreasingId
गैर निर्धारक है। न केवल निष्पादन निष्पादन से निष्पादन से भिन्न हो सकता है लेकिन बाद के संचालन में फ़िल्टर होने पर अतिरिक्त चालों का उपयोग पंक्तियों की पहचान के लिए नहीं किया जा सकता है।
नोट:
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()
दुर्भाग्य:
यह भी rowNumber
खिड़की समारोह का उपयोग करना संभव है
चेतावनी विंडो: नहीं विभाजन खिड़की ऑपरेशन के लिए परिभाषित किया जाता है! सभी डेटा को एक विभाजन में स्थानांतरित करना, इससे गंभीर प्रदर्शन गिरावट हो सकती है।
तब तक जब तक आपके पास अपने डेटा को विभाजित करने का प्राकृतिक तरीका न हो और यह सुनिश्चित न हो कि विशिष्टता इस समय विशेष रूप से उपयोगी नहीं है।
क्या यह केवल आर के साथ काम करेगा? मुझे पता है कि आपने उपरोक्त स्कैला का उपयोग किया है, लेकिन मैं इस 'zipWithUniqueId' के बारे में सब कुछ पा सकता हूं केवल स्पार्कआर डॉक्स – Nhor
में यह वास्तव में स्कैला है। क्या आपको पाइथन समाधान की आवश्यकता है? सादा एसक्यूएल? – zero323
नहीं, मैं आपका कोड समझ सकता हूं, मैं सिर्फ यह पूछ रहा था कि 'zipWithUniqueId' के बारे में pyspark दस्तावेज़ों में कुछ भी है या नहीं, लेकिन ऐसा लगता है कि मैं सिर्फ आलसी था, क्योंकि आखिर में मैंने इसे पाया, आपके समाधान के लिए बहुत बहुत धन्यवाद! – Nhor
from pyspark.sql.functions import monotonically_increasing_id
df.withColumn("id", monotonically_increasing_id()).show()
ध्यान दें कि df.withColumn के 2 तर्क monotonically_increasing_id है() monotonically_increasing_id नहीं।
मुझे निम्न समाधान को ऐसे मामले के लिए अपेक्षाकृत सरल पाया गया है जहां zipWithIndex() वांछित व्यवहार है, यानी उन निरंतर पूर्णांकों के लिए।
इस मामले में, हम पाइसपार्क का उपयोग कर रहे हैं और मूल पंक्ति वस्तु को एक नए शब्दकोश में मैप करने के लिए शब्दकोश समझ पर निर्भर करते हैं जो अद्वितीय इंडेक्स समेत एक नई स्कीमा फिट बैठता है।
# read the initial dataframe without index
dfNoIndex = sqlContext.read.parquet(dataframePath)
# Need to zip together with a unique integer
# First create a new schema with uuid field appended
newSchema = StructType([StructField("uuid", IntegerType(), False)]
+ dfNoIndex.schema.fields)
# zip with the index, map it to a dictionary which includes new field
df = dfNoIndex.rdd.zipWithIndex()\
.map(lambda (row, id): {k:v
for k, v
in row.asDict().items() + [("uuid", id)]})\
.toDF(newSchema)
- 1. अपाचे स्पार्क बनाम अपाचे स्पार्क 2
- 2. अपाचे स्पार्क
- 3. अपाचे स्पार्क
- 4. अपाचे स्पार्क
- 5. अपाचे स्पार्क
- 6. अपाचे स्पार्क
- 7. अपाचे स्पार्क
- 8. अपाचे स्पार्क
- 9. अपाचे स्पार्क
- 10. अपाचे स्पार्क
- 11. अपाचे स्पार्क
- 12. अपाचे स्पार्क
- 13. अपाचे स्पार्क बनाम अपाचे तूफान
- 14. अपाचे स्पार्क: java.lang.NoSuchMethodError .rddToPairRDDFunctions
- 15. अपाचे स्पार्क: पाइथन 3
- 16. अपाचे स्पार्क वर्कर
- 17. अपाचे स्पार्क स्ट्रीमिंग
- 18. अपाचे स्पार्क java.lang.ClassNotFoundException
- 19. अपाचे स्पार्क एसक्यूएल
- 20. अपाचे स्पार्क आरडीडी स्प्लिट "|"
- 21. अपाचे स्पार्क जावा
- 22. अपाचे स्पार्क एएलएस सिफारिश
- 23. अपाचे स्पार्क आरडीडी
- 24. अपाचे स्पार्क एचडीएफएस
- 25. अपाचे स्पार्क स्ट्रीमिंग
- 26. अपाचे स्पार्क एमएलआईबीबी: पीएमएमएल
- 27. अपाचे स्पार्क बनाम अक्का
- 28. अपाचे स्पार्क स्ट्रीमिंग
- 29. अपाचे स्पार्क नेटवर्क पोर्ट्स कॉन्फ़िगरेशन
- 30. अपाचे स्पार्क ग्राफ़एक्स कनेक्टेड घटक
क्या आपके पास कोई विशेष आवश्यकता है? डेटा प्रकार, लगातार मूल्य, कुछ और? – zero323
नहीं, बस पुराने अच्छे अद्वितीय पूर्णांक – Nhor