2015-02-05 15 views
8

के साथ उदाहरण में शामिल है मैं अपाचे स्पार्क के लिए बहुत नया हूं। मैं वास्तव में मूल स्पार्क एपीआई विनिर्देश पर ध्यान केंद्रित करना चाहता हूं और स्पार्क एपीआई का उपयोग करके कुछ प्रोग्राम समझना और लिखना चाहता हूं। मैंने जॉन्स अवधारणा को लागू करने के लिए अपाचे स्पार्क का उपयोग करके एक जावा प्रोग्राम लिखा है।अपाचे स्पार्क जावा

जब मैं वामपंथी का उपयोग करता हूं - बाएंऑउटरजॉइन() या राइट ऑउटर जॉइन - दाएंऑउटरजॉइन(), दोनों विधियां जावापेयरआरडीडी लौट रही हैं जिसमें एक विशेष प्रकार के Google विकल्प हैं। लेकिन मुझे नहीं पता कि मूल मूल्यों को वैकल्पिक प्रकार से कैसे निकाला जाए।

वैसे भी मैं जानना चाहता हूं कि मैं समान जुड़ने के तरीकों का उपयोग कर सकता हूं जो डेटा को अपने प्रारूप में वापस कर देता है। मुझे ऐसा करने का कोई रास्ता नहीं मिला। मतलब यह है कि जब मैं अपाचे स्पार्क का उपयोग कर रहा हूं, तो मैं अपनी शैली में कोड को कस्टमाइज़ करने में सक्षम नहीं हूं क्योंकि उन्होंने पहले से ही सभी पूर्व-परिभाषित चीजें दी हैं।

प्राप्त करें

my 2 sample input datasets 

customers_data.txt: 
4000001,Kristina,Chung,55,Pilot 
4000002,Paige,Chen,74,Teacher 
4000003,Sherri,Melton,34,Firefighter 

and 

trasaction_data.txt 
00000551,12-30-2011,4000001,092.88,Games,Dice & Dice Sets,Buffalo,New York,credit 
00004811,11-10-2011,4000001,180.35,Outdoor Play Equipment,Water Tables,Brownsville,Texas,credit 
00034388,09-11-2011,4000002,020.55,Team Sports,Beach Volleyball,Orange,California,cash 
00008996,11-21-2011,4000003,121.04,Outdoor Recreation,Fishing,Colorado Springs,Colorado,credit 
00009167,05-24-2011,4000003,194.94,Exercise & Fitness,Foam Rollers,El Paso,Texas,credit 

यहाँ नीचे कोड मेरी जावा कोड

**SparkJoins.java:** 

public class SparkJoins { 

    @SuppressWarnings("serial") 
    public static void main(String[] args) throws FileNotFoundException { 
     JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count").setMaster("local")); 
     JavaRDD<String> customerInputFile = sc.textFile("C:/path/customers_data.txt"); 
     JavaPairRDD<String, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, String, String>() { 
      public Tuple2<String, String> call(String s) { 
       String[] customerSplit = s.split(","); 
       return new Tuple2<String, String>(customerSplit[0], customerSplit[1]); 
      } 
     }).distinct(); 

     JavaRDD<String> transactionInputFile = sc.textFile("C:/path/transactions_data.txt"); 
     JavaPairRDD<String, String> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, String, String>() { 
      public Tuple2<String, String> call(String s) { 
       String[] transactionSplit = s.split(","); 
       return new Tuple2<String, String>(transactionSplit[2], transactionSplit[3]+","+transactionSplit[1]); 
      } 
     }); 

     //Default Join operation (Inner join) 
     JavaPairRDD<String, Tuple2<String, String>> joinsOutput = customerPairs.join(transactionPairs); 
     System.out.println("Joins function Output: "+joinsOutput.collect()); 

     //Left Outer join operation 
     JavaPairRDD<String, Iterable<Tuple2<String, Optional<String>>>> leftJoinOutput = customerPairs.leftOuterJoin(transactionPairs).groupByKey().sortByKey(); 
     System.out.println("LeftOuterJoins function Output: "+leftJoinOutput.collect()); 

     //Right Outer join operation 
     JavaPairRDD<String, Iterable<Tuple2<Optional<String>, String>>> rightJoinOutput = customerPairs.rightOuterJoin(transactionPairs).groupByKey().sortByKey(); 
     System.out.println("RightOuterJoins function Output: "+rightJoinOutput.collect()); 

     sc.close(); 
    } 
} 

और यहाँ उत्पादन जो मैं

Joins function Output: [(4000001,(Kristina,092.88,12-30-2011)), (4000001,(Kristina,180.35,11-10-2011)), (4000003,(Sherri,121.04,11-21-2011)), (4000003,(Sherri,194.94,05-24-2011)), (4000002,(Paige,020.55,09-11-2011))] 

LeftOuterJoins function Output: [(4000001,[(Kristina,Optional.of(092.88,12-30-2011)), (Kristina,Optional.of(180.35,11-10-2011))]), (4000002,[(Paige,Optional.of(020.55,09-11-2011))]), (4000003,[(Sherri,Optional.of(121.04,11-21-2011)), (Sherri,Optional.of(194.94,05-24-2011))])] 

RightOuterJoins function Output: [(4000001,[(Optional.of(Kristina),092.88,12-30-2011), (Optional.of(Kristina),180.35,11-10-2011)]), (4000002,[(Optional.of(Paige),020.55,09-11-2011)]), (4000003,[(Optional.of(Sherri),121.04,11-21-2011), (Optional.of(Sherri),194.94,05-24-2011)])] 

हो रही है मैं पर इस कार्यक्रम चल रहा है विंडोज प्लेटफ़ॉर्म

ऊपर उत्पादन का पालन करेंगे और मुझे वैकल्पिक प्रकार से मूल्यों को निकालने

अग्रिम

+0

क्यों स्काला के बजाय का उपयोग नहीं:

यहाँ उदाहरण है? – maasg

+0

हाय @ मासग, मैं मूल रूप से एक जावा डेवलपर हूं .. मुझे वास्तव में स्कैला नहीं पता .. लेकिन मुझे लगता है कि अपाचे स्पार्क स्कैला प्रोग्रामिंग के बाद जावा के लिए सबसे उपयुक्त है। –

+0

@ शेकरपेटेल क्या आप कृपया अपने कोड को अपडेट कर सकते हैं कि आपने इसे वैकल्पिक कैसे हटा दिया .. यह दूसरों के लिए सहायक होगा। – Shankar

उत्तर

8

धन्यवाद में मदद जब आप बाईं बाहरी करना शामिल होने और सही बाहरी में शामिल होने, आप शून्य मान हो सकता है कृपया। सही!

तो स्पार्क वैकल्पिक ऑब्जेक्ट देता है। परिणाम प्राप्त करने के बाद, आप उस परिणाम को अपने प्रारूप में मैप कर सकते हैं।

आप अपने डेटा को मैप करने के लिए वैकल्पिक के वैकल्पिक() विधि का उपयोग कर सकते हैं।

JavaPairRDD<String,String> firstRDD = .... 
JavaPairRDD<String,String> secondRDD =.... 
// join both rdd using left outerjoin 
JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> rddWithJoin = firstRDD.leftOuterJoin(secondRDD); 


// mapping of join result 
JavaPairRDD<String, String> mappedRDD = rddWithJoin 
      .mapToPair(tuple -> { 
       if (tuple._2()._2().isPresent()) { 
        //do your operation and return 
        return new Tuple2<String, String>(tuple._1(), tuple._2()._1()); 
       } else { 
        return new Tuple2<String, String>(tuple._1(), "not present"); 
       } 
      }); 
+0

धन्यवाद दोस्त .. यह ठीक काम कर रहा है .. –

+0

@ sms_1190 इस परिणाम को अपने प्रारूप में कैसे मैप करना है? मेरे द्वारा भी यही समस्या का सामना किया जा रहा है। – Shankar

+0

@ शंकर: मैंने उपर्युक्त उत्तर में उदाहरण जोड़ा। mappedRDD आपका स्वयं का प्रारूप है। –

संबंधित मुद्दे