15

पर पाइस्पार्क का उपयोग कर रहा है, स्पार्क-सीएसवी के साथ डेटाफ्रेम में एक बड़ी सीएसवी फ़ाइल लोड कर रहा है, और प्री-प्रोसेसिंग चरण के रूप में मुझे विभिन्न प्रकार के ऑपरेशन लागू करने की आवश्यकता है कॉलम में से किसी एक में उपलब्ध डेटा (जिसमें एक जेसन स्ट्रिंग है)। इससे एक्स मान वापस आ जाएंगे, जिनमें से प्रत्येक को अपने अलग कॉलम में संग्रहीत करने की आवश्यकता है।अपाचे स्पार्क - यूडीएफ के परिणाम को कई डेटाफ्रेम कॉलम

वह कार्यक्षमता एक यूडीएफ में लागू की जाएगी। हालांकि, मुझे यकीन नहीं है कि उस यूडीएफ से मूल्यों की सूची कैसे वापस लाएं और इन्हें व्यक्तिगत कॉलम में खिलाएं।

(...) 
from pyspark.sql.functions import udf 
def udf_test(n): 
    return [n/2, n%2] 

test_udf=udf(udf_test) 


df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4) 

कि निम्नलिखित का उत्पादन:

+------+----------+--------------------+ 
|amount|trans_date|    test| 
+------+----------+--------------------+ 
| 28.0|2016-02-07|   [14.0, 0.0]| 
| 31.01|2016-02-07|[15.5050001144409...| 
| 13.41|2016-02-04|[6.70499992370605...| 
| 307.7|2015-02-17|[153.850006103515...| 
| 22.09|2016-02-05|[11.0450000762939...| 
+------+----------+--------------------+ 
only showing top 5 rows 

सबसे अच्छा तरीका क्या स्टोर करने के लिए दो (इस उदाहरण में) अलग-अलग कॉलम पर यूडीएफ द्वारा दिया जा रहा है मान जाएगा नीचे एक सरल उदाहरण है? अभी वे तारों के रूप में आपके द्वारा लिखा गया जा रहा है:

df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema() 

root 
|-- amount: float (nullable = true) 
|-- trans_date: string (nullable = true) 
|-- test: string (nullable = true) 

उत्तर

25

यह एक यूडीएफ कॉल से कई शीर्ष स्तर कॉलम बनाने के लिए संभव नहीं है, लेकिन आप एक नया struct बना सकते हैं। यह निर्दिष्ट returnType के साथ एक यूडीएफ की आवश्यकता है:

from pyspark.sql.functions import udf 
from pyspark.sql.types import * 

schema = StructType([ 
    StructField("foo", FloatType(), False), 
    StructField("bar", FloatType(), False) 
]) 

def udf_test(n): 
    return (n/2, n % 2) if n and n != 0.0 else (float('nan'), float('nan')) 

test_udf = udf(udf_test, schema) 
df = sc.parallelize([(1, 2.0), (2, 3.0)]).toDF(["x", "y"]) 

foobars = df.select(test_udf("y").alias("foobar")) 
foobars.printSchema() 
## root 
## |-- foobar: struct (nullable = true) 
## | |-- foo: float (nullable = false) 
## | |-- bar: float (nullable = false) 

आप आगे सरल select साथ स्कीमा समतल:

foobars.select("foobar.foo", "foobar.bar").show() 
## +---+---+ 
## |foo|bar| 
## +---+---+ 
## |1.0|0.0| 
## |1.5|1.0| 
## +---+---+ 

भी देखें Derive multiple columns from a single column in a Spark DataFrame

+0

बहुत खूब! यह मेरी जरूरत के लिए बहुत अच्छी तरह से काम करता है। मैं वहां से सबसे अधिक था, लेकिन स्ट्रक्टाइप स्कीमा को गलत तरीके से udf को खिला रहा था, जो मेरे नए कॉलम को स्ट्रिंगटाइप के रूप में समाप्त करने का कारण बन रहा था। बहुत बहुत धन्यवाद! –

+0

धन्यवाद !! यह ठीक वही है जिसकी मुझे तलाश थी। :) – dksahuji

संबंधित मुद्दे