2016-03-09 9 views
16

जब मैं चिंगारी एसक्यूएल में एक json फ़ाइल, मैं कैसे बता सकते हैं किसी दिए गए स्तंभ उदाहरण json स्कीमा के लिए .selectमैं कैसे पता नहीं लगा पाते, तो एक चिंगारी dataframe एक स्तंभ है

उदाहरण कॉल करने से पहले मौजूद रहने पर से एक DataFrame बनाने

potential_columns = Seq("b", "c", "d") 
df = sqlContext.read.json(filename) 
potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column")) 

लेकिन मैं hasColumn के लिए एक अच्छा समारोह नहीं मिल सकता है:

{ 
    "a": { 
    "b": 1, 
    "c": 2 
    } 
} 

यह मुझे क्या करना चाहते हैं।

scala> df.select("a.*").columns 
res17: Array[String] = Array(b, c) 

उत्तर

7

वास्तव में आप भी आदेश स्तंभ का उपयोग करने में चयन कॉल करने के लिए, तो आप सिर्फ dataframe पर कॉल कर सकते हैं की जरूरत नहीं है: निकटतम मैं मिल गया है परीक्षण करने के लिए करता है, तो स्तंभ यह कुछ हद तक अजीब सरणी में है खुद

// define test data 
case class Test(a: Int, b: Int) 
val testList = List(Test(1,2), Test(3,4)) 
val testDF = sqlContext.createDataFrame(testList) 

// define the hasColumn function 
def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = df.columns.contains(colName) 

// then you can just use it on the DF with a given column name 
hasColumn(testDF, "a") // <-- true 
hasColumn(testDF, "c") // <-- false 

वैकल्पिक रूप से आप एक अंतर्निहित वर्ग दलाल मेरे पुस्तकालय पद्धति का उपयोग कर ताकि hasColumn विधि सीधे अपने dataframes पर उपलब्ध है

implicit class DataFrameImprovements(df: org.apache.spark.sql.DataFrame) { 
    def hasColumn(colName: String) = df.columns.contains(colName) 
} 

तो फिर तुम c परिभाषित कर सकते हैं एक प्रयोग के रूप में:

testDF.hasColumn("a") // <-- true 
testDF.hasColumn("c") // <-- false 
+4

यह साथ काम नहीं करता नेस्टेड कॉलम। जेसन '{" ए ": {" बी ": 1," सी ": 0}}' – ben

2

इस के लिए आपका दूसरा विकल्प कुछ सरणी हेरफेर करने के लिए df.columns और अपने potential_columns पर (इस मामले एक intersect में) होगा।

// Loading some data (so you can just copy & paste right into spark-shell) 
case class Document(a: String, b: String, c: String) 
val df = sc.parallelize(Seq(Document("a", "b", "c")), 2).toDF 

// The columns we want to extract 
val potential_columns = Seq("b", "c", "d") 

// Get the intersect of the potential columns and the actual columns, 
// we turn the array of strings into column objects 
// Finally turn the result into a vararg (: _*) 
df.select(potential_columns.intersect(df.columns).map(df(_)): _*).show 

अलास यह आपके भीतर आंतरिक ऑब्जेक्ट परिदृश्य के लिए काम नहीं करेगा। इसके लिए आपको स्कीमा को देखना होगा।

मैं अपने potential_columns को पूरी तरह से योग्य स्तंभ नाम

val potential_columns = Seq("a.b", "a.c", "a.d") 

// Our object model 
case class Document(a: String, b: String, c: String) 
case class Document2(a: Document, b: String, c: String) 

// And some data... 
val df = sc.parallelize(Seq(Document2(Document("a", "b", "c"), "c2")), 2).toDF 

// We go through each of the fields in the schema. 
// For StructTypes we return an array of parentName.fieldName 
// For everything else we return an array containing just the field name 
// We then flatten the complete list of field names 
// Then we intersect that with our potential_columns leaving us just a list of column we want 
// we turn the array of strings into column objects 
// Finally turn the result into a vararg (: _*) 
df.select(df.schema.map(a => a.dataType match { case s : org.apache.spark.sql.types.StructType => s.fieldNames.map(x => a.name + "." + x) case _ => Array(a.name) }).flatMap(x => x).intersect(potential_columns).map(df(_)) : _*).show 

यह केवल एक ही स्तर गहरी हो जाता है बदलने के लिए, तो यह सामान्य आप अधिक काम करने के लिए होता है बनाने के लिए जा रहा हूँ।

38

बस मान लें कि यह अस्तित्व में है और इसे Try के साथ विफल होने दें। सादा और सरल और समर्थन करता है, एक मनमाना नेस्टिंग:

import scala.util.Try 
import org.apache.spark.sql.DataFrame 

def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess 

val df = sqlContext.read.json(sc.parallelize(
    """{"foo": [{"bar": {"foobar": 3}}]}""" :: Nil)) 

hasColumn(df, "foobar") 
// Boolean = false 

hasColumn(df, "foo") 
// Boolean = true 

hasColumn(df, "foo.bar") 
// Boolean = true 

hasColumn(df, "foo.bar.foobar") 
// Boolean = true 

hasColumn(df, "foo.bar.foobaz") 
// Boolean = false 

या और भी आसान:

val columns = Seq(
    "foobar", "foo", "foo.bar", "foo.bar.foobar", "foo.bar.foobaz") 

columns.flatMap(c => Try(df(c)).toOption) 
// Seq[org.apache.spark.sql.Column] = List(
// foo, foo.bar AS bar#12, foo.bar.foobar AS foobar#13) 

अजगर बराबर: के रूप में यह Try अंदर अभिव्यक्ति का मूल्यांकन करेंगे

from pyspark.sql.utils import AnalysisException 
from pyspark.sql import Row 


def has_column(df, col): 
    try: 
     df[col] 
     return True 
    except AnalysisException: 
     return False 

df = sc.parallelize([Row(foo=[Row(bar=Row(foobar=3))])]).toDF() 

has_column(df, "foobar") 
## False 

has_column(df, "foo") 
## True 

has_column(df, "foo.bar") 
## True 

has_column(df, "foo.bar.foobar") 
## True 

has_column(df, "foo.bar.foobaz") 
## False 
+1

यह संरचित क्षेत्र के साथ भी काम करता है। 'युक्त' फ़ंक्शन का उपयोग करने वाले समाधान नहीं हैं! +1 –

+1

धन्यवाद, मैं यह जवाब स्वीकार कर लिया होगा! – sparker

1

Try इष्टतम नहीं है निर्णय लेने से पहले।

बड़े डेटा सेट के लिए, Scala में नीचे का उपयोग करें:

df.schema.fieldNames.contains("column_name") 
+0

नेस्टेड डेटा के साथ काम नहीं करता है। – user8371915

2

जो मैं सामान्य रूप से उपयोग

df.columns.contains("column-name-to-check") 

यह रिटर्न है एक और विकल्प एक बूलियन

+2

नेस्टेड कॉलम के साथ काम नहीं करता है। – Sindhu

संबंधित मुद्दे