2015-11-26 14 views
8

में उपयोगकर्ता-परिभाषित कुल फ़ंक्शन (यूडीएफ़) से एकाधिक सरणी लौटाना मैं अपाचे स्पार्क एसक्यूएल का उपयोग कर जावा में उपयोगकर्ता द्वारा परिभाषित कुल फ़ंक्शन (यूडीएफ़) बनाने की कोशिश कर रहा हूं जो पूर्ण होने पर एकाधिक सरणी देता है। मैंने ऑनलाइन खोज की है और इसे कैसे करें इस पर कोई उदाहरण या सुझाव नहीं मिल रहा है।अपाचे स्पार्क एसक्यूएल

मैं एक एकल सरणी वापस करने में सक्षम हूं, लेकिन यह पता नहीं लगा सकता कि एकाधिक सरणी लौटने के लिए मूल्यांकन() विधि में सही प्रारूप में डेटा कैसे प्राप्त किया जाए।

यूडीएफ़ काम करता है क्योंकि मैं मूल्यांकन() विधि में सरणी मुद्रित कर सकता हूं, मैं सिर्फ उन एरे को कॉलिंग कोड (जो संदर्भ के लिए नीचे दिखाया गया है) को वापस करने का तरीका नहीं समझ सकता।

UserDefinedAggregateFunction customUDAF = new CustomUDAF(); 
DataFrame resultingDataFrame = dataFrame.groupBy().agg(customUDAF.apply(dataFrame.col("long_col"), dataFrame.col("double_col"))).as("processed_data"); 

मैं पूरी कस्टम UDAF वर्ग नीचे शामिल किया है, लेकिन कुंजी तरीकों डेटाप्रकार() कर रहे हैं और विधियों() है, जो पहले दिखाए जाते हैं का मूल्यांकन।

किसी भी मदद या सलाह की सराहना की जाएगी। धन्यवाद।

public class CustomUDAF extends UserDefinedAggregateFunction { 

    @Override 
    public DataType dataType() { 
     // TODO: Is this the correct way to return 2 arrays? 
     return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false)) 
      .add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false)); 
    } 

    @Override 
    public Object evaluate(Row buffer) { 
     // Data conversion 
     List<Long> longList = new ArrayList<Long>(buffer.getList(0)); 
     List<Double> dataList = new ArrayList<Double>(buffer.getList(1)); 

     // Processing of data (omitted) 

     // TODO: How to get data into format needed to return 2 arrays? 
     return dataList; 
    } 

    @Override 
    public StructType inputSchema() { 
     return new StructType().add("long", DataTypes.LongType).add("data", DataTypes.DoubleType); 
    } 

    @Override 
    public StructType bufferSchema() { 
     return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false)) 
      .add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false)); 
    } 

    @Override 
    public void initialize(MutableAggregationBuffer buffer) { 
     buffer.update(0, new ArrayList<Long>()); 
     buffer.update(1, new ArrayList<Double>()); 
    } 

    @Override 
    public void update(MutableAggregationBuffer buffer, Row row) { 
     ArrayList<Long> longList = new ArrayList<Long>(buffer.getList(0)); 
     longList.add(row.getLong(0)); 

     ArrayList<Double> dataList = new ArrayList<Double>(buffer.getList(1)); 
     dataList.add(row.getDouble(1)); 

     buffer.update(0, longList); 
     buffer.update(1, dataList); 
    } 

    @Override 
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) { 
     ArrayList<Long> longList = new ArrayList<Long>(buffer1.getList(0)); 
     longList.addAll(buffer2.getList(0)); 

     ArrayList<Double> dataList = new ArrayList<Double>(buffer1.getList(1)); 
     dataList.addAll(buffer2.getList(1)); 

     buffer1.update(0, longList); 
     buffer1.update(1, dataList); 
    } 

    @Override 
    public boolean deterministic() { 
     return true; 
    } 
} 

अद्यतन: मैं zero323 द्वारा जवाब के आधार पर का उपयोग कर दो सरणियों लौटने में सक्षम था:

return new Tuple2<>(longArray, dataArray); 

इस से बाहर एक संघर्ष का एक सा डेटा प्राप्त हो रहा था, लेकिन DataFrame deconstructing शामिल जावा सूचियों के लिए और फिर इसे डेटाफ्रेम पर वापस बनाते हैं।

उत्तर

5

जहां तक ​​मैं एक ट्यूपल लौटने को कह सकता हूं, बस इतना ही होना चाहिए। स्कैला में:

import org.apache.spark.sql.expressions._ 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions.udf 
import org.apache.spark.sql.{Row, Column} 

object DummyUDAF extends UserDefinedAggregateFunction { 
    def inputSchema = new StructType().add("x", StringType) 
    def bufferSchema = new StructType() 
    .add("buff", ArrayType(LongType)) 
    .add("buff2", ArrayType(DoubleType)) 
    def dataType = new StructType() 
    .add("xs", ArrayType(LongType)) 
    .add("ys", ArrayType(DoubleType)) 
    def deterministic = true 
    def initialize(buffer: MutableAggregationBuffer) = {} 
    def update(buffer: MutableAggregationBuffer, input: Row) = {} 
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {} 
    def evaluate(buffer: Row) = (Array(1L, 2L, 3L), Array(1.0, 2.0, 3.0)) 
} 

val df = sc.parallelize(Seq(("a", 1), ("b", 2))).toDF("k", "v") 
df.select(DummyUDAF($"k")).show(1, false) 

// +---------------------------------------------------+ 
// |(DummyUDAF$(k),mode=Complete,isDistinct=false)  | 
// +---------------------------------------------------+ 
// |[WrappedArray(1, 2, 3),WrappedArray(1.0, 2.0, 3.0)]| 
// +---------------------------------------------------+ 
+0

मैं एक यूडीएफ़ से आत्मविश्वास पूर्णांक के रूप में (कम, माध्य, उच्च) का एक गुच्छा लौटा रहा हूं। क्या इस टुपल को कई स्तंभों में विस्फोट करने का कोई तरीका है, इसलिए '| कुंजी | [1.0,1.5,2.0] |' मुझे 'कुंजी | 1.0 | 1.5 | 2.0 |' – TomTom101

+0

@ TomTom101 यदि यह एक है tuple (संरचना क्षेत्र) सरल चयन पर्याप्त होना चाहिए। – zero323

+0

आश्चर्य की बात यह है कि चाल! मैं अब बेहतर पठनीयता के लिए एक केस क्लास लौटा (पोस्ट करने से पहले कोशिश करनी चाहिए थी)। धन्यवाद! – TomTom101

संबंधित मुद्दे