इस के लिए आपका दूसरा विकल्प कुछ सरणी हेरफेर करने के लिए df.columns
और अपने potential_columns
पर (इस मामले एक intersect
में) होगा।
// Loading some data (so you can just copy & paste right into spark-shell)
case class Document(a: String, b: String, c: String)
val df = sc.parallelize(Seq(Document("a", "b", "c")), 2).toDF
// The columns we want to extract
val potential_columns = Seq("b", "c", "d")
// Get the intersect of the potential columns and the actual columns,
// we turn the array of strings into column objects
// Finally turn the result into a vararg (: _*)
df.select(potential_columns.intersect(df.columns).map(df(_)): _*).show
अलास यह आपके भीतर आंतरिक ऑब्जेक्ट परिदृश्य के लिए काम नहीं करेगा। इसके लिए आपको स्कीमा को देखना होगा।
मैं अपने potential_columns
को पूरी तरह से योग्य स्तंभ नाम
val potential_columns = Seq("a.b", "a.c", "a.d")
// Our object model
case class Document(a: String, b: String, c: String)
case class Document2(a: Document, b: String, c: String)
// And some data...
val df = sc.parallelize(Seq(Document2(Document("a", "b", "c"), "c2")), 2).toDF
// We go through each of the fields in the schema.
// For StructTypes we return an array of parentName.fieldName
// For everything else we return an array containing just the field name
// We then flatten the complete list of field names
// Then we intersect that with our potential_columns leaving us just a list of column we want
// we turn the array of strings into column objects
// Finally turn the result into a vararg (: _*)
df.select(df.schema.map(a => a.dataType match { case s : org.apache.spark.sql.types.StructType => s.fieldNames.map(x => a.name + "." + x) case _ => Array(a.name) }).flatMap(x => x).intersect(potential_columns).map(df(_)) : _*).show
यह केवल एक ही स्तर गहरी हो जाता है बदलने के लिए, तो यह सामान्य आप अधिक काम करने के लिए होता है बनाने के लिए जा रहा हूँ।
स्रोत
2016-03-10 14:54:52
यह साथ काम नहीं करता नेस्टेड कॉलम। जेसन '{" ए ": {" बी ": 1," सी ": 0}}' – ben