2015-05-29 2 views
5

दो कॉलम books और इन पुस्तकों, जहां books और readers क्रमशः पुस्तक और पाठक आईडी कर रहे हैं, की readers के साथ एक टेबल है:स्पार्क: गिनती सह-घटना - विशाल संग्रह के कुशल बहु-पास छानने के लिए एल्गोरिथ्म

books readers 
1:  1  30 
2:  2  10 
3:  3  20 
4:  1  20 
5:  1  10 
6:  2  30 

रिकॉर्ड book = 1, reader = 30 का अर्थ है कि id = 1 के साथ पुस्तक id = 30 के साथ उपयोगकर्ता द्वारा पढ़ी गई थी। प्रत्येक पुस्तक जोड़ी मैं, पाठकों को दोनों इन पुस्तकों की पढ़ा की संख्या की गणना करने की जरूरत है इस एल्गोरिथ्म के साथ के लिए:

for each book 
    for each reader of the book 
    for each other_book in books of the reader 
     increment common_reader_count ((book, other_book), cnt) 

इस कलन विधि का उपयोग करने का लाभ यह है कि यह आपरेशन के एक छोटी संख्या की आवश्यकता है दो पुस्तक संयोजनों की गिनती की तुलना में दो।

उपर्युक्त एल्गोरिदम लागू करने के लिए मैं इस डेटा को दो समूहों में व्यवस्थित करता हूं: 1) पुस्तक द्वारा कुंजी, एक आरडीडी जिसमें प्रत्येक पुस्तक के पाठक होते हैं और 2) पाठक द्वारा की जाती है, एक आरडीडी जिसमें प्रत्येक पाठक द्वारा पढ़ी जाने वाली किताबें होती हैं, जैसे कि निम्नलिखित कार्यक्रम: अलग, अक्षम एल्गोरिथ्म में तथ्य परिणामों में

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.log4j.Logger 
import org.apache.log4j.Level 

object Small { 

    case class Book(book: Int, reader: Int) 
    case class BookPair(book1: Int, book2: Int, cnt:Int) 

    val recs = Array(
    Book(book = 1, reader = 30), 
    Book(book = 2, reader = 10), 
    Book(book = 3, reader = 20), 
    Book(book = 1, reader = 20), 
    Book(book = 1, reader = 10), 
    Book(book = 2, reader = 30)) 

    def main(args: Array[String]) { 
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN) 
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) 
    // set up environment 
    val conf = new SparkConf() 
     .setAppName("Test") 
     .set("spark.executor.memory", "2g") 
    val sc = new SparkContext(conf) 
    val data = sc.parallelize(recs) 

    val bookMap = data.map(r => (r.book, r)) 
    val bookGrps = bookMap.groupByKey 

    val readerMap = data.map(r => (r.reader, r)) 
    val readerGrps = readerMap.groupByKey 

    // *** Calculate book pairs 
    // Iterate book groups 
    val allBookPairs = bookGrps.map(bookGrp => bookGrp match { 
     case (book, recIter) => 
     // Iterate user groups 
     recIter.toList.map(rec => { 
      // Find readers for this book 
      val aReader = rec.reader 
      // Find all books (including this one) that this reader read 
      val allReaderBooks = readerGrps.filter(readerGrp => readerGrp match { 
      case (reader2, recIter2) => reader2 == aReader 
      }) 
      val bookPairs = allReaderBooks.map(readerTuple => readerTuple match { 
      case (reader3, recIter3) => recIter3.toList.map(rec => ((book, rec.book), 1)) 
      }) 
      bookPairs 
     }) 

    }) 
    val x = allBookPairs.flatMap(identity) 
    val y = x.map(rdd => rdd.first) 
    val z = y.flatMap(identity) 
    val p = z.reduceByKey((cnt1, cnt2) => cnt1 + cnt2) 
    val result = p.map(bookPair => bookPair match { 
     case((book1, book2),cnt) => BookPair(book1, book2, cnt) 
    }) 

    val resultCsv = result.map(pair => resultToStr(pair)) 
    resultCsv.saveAsTextFile("./result.csv") 
    } 

    def resultToStr(pair: BookPair): String = { 
    val sep = "|" 
    pair.book1 + sep + pair.book2 + sep + pair.cnt 
    } 
} 

यह implemntation!:

for each book 
    find each reader of the book scanning all readers every time! 
    for each other_book in books of the reader 
     increment common_reader_count ((book, other_book), cnt) 

जो क्योंकि कम करने के बजाय एल्गोरिथ्म ऊपर चर्चा का मुख्य लक्ष्य के विपरीत है, यह आपरेशन की संख्या बढ़ जाती है। उपयोगकर्ता पुस्तकों को ढूंढने के लिए सभी पुस्तकों के लिए सभी उपयोगकर्ताओं को फ़िल्टर करना आवश्यक है। इस प्रकार संचालन की संख्या ~ एन * एम जहां एन - उपयोगकर्ताओं की संख्या और एम - किताबों की संख्या।

सवाल:

  1. वहाँ हर पुस्तक के लिए पूर्ण पाठक संग्रह छानने के बिना स्पार्क में मूल एल्गोरिथ्म लागू करने के लिए कोई तरीका है?
  2. पुस्तक जोड़ी की गणना करने के लिए कोई अन्य एल्गोरिदम कुशलता से गिना जाता है?
  3. इसके अलावा, जब वास्तव में इस कोड को चलाते हैं तो मुझे filter exception मिलता है जिसके कारण मैं समझ नहीं पा रहा हूं। कोई विचार?

कृपया नीचे दिए गए अपवाद लॉग देखें:

15/05/29 18:24:05 WARN util.Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0) 
15/05/29 18:24:05 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address 
15/05/29 18:24:09 INFO slf4j.Slf4jLogger: Slf4jLogger started 
15/05/29 18:24:10 INFO Remoting: Starting remoting 
15/05/29 18:24:10 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:38910] 
15/05/29 18:24:10 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:38910] 
15/05/29 18:24:12 ERROR executor.Executor: Exception in task 0.0 in stage 6.0 (TID 4) 
java.lang.NullPointerException 
    at org.apache.spark.rdd.RDD.filter(RDD.scala:282) 
    at Small$$anonfun$4$$anonfun$apply$1.apply(Small.scala:58) 
    at Small$$anonfun$4$$anonfun$apply$1.apply(Small.scala:54) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at Small$$anonfun$4.apply(Small.scala:54) 
    at Small$$anonfun$4.apply(Small.scala:51) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) 
    at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) 
    at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:54) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:744) 

अद्यतन:

इस कोड:

books r_books COUNT(readers) 
1  2  2  
:

val df = sc.parallelize(Array((1,30),(2,10),(3,20),(1,10)(2,30))).toDF("books","readers") 
val results = df.join(
df.select($"books" as "r_books", $"readers" as "r_readers"), 
$"readers" === $"r_readers" and $"books" < $"r_books" 
) 
.groupBy($"books", $"r_books") 
.agg($"books", $"r_books", count($"readers")) 

निम्न परिणाम देता है

तो COUNT यहां कई बार दो पुस्तकें (यहां 1 और 2) एक साथ पढ़ी गईं (जोड़े की गिनती)।

उत्तर

7

बात इस तरह की बहुत आसान है अगर आप एक DataFrame करने के लिए मूल RDD कन्वर्ट:

val df = sc.parallelize(
    Array((1,30),(2,10),(3,20),(1,10), (2,30)) 
).toDF("books","readers") 

एक बार जब आप यह है कि, बस DataFrame पर एक आत्म में शामिल होने पुस्तक जोड़े बनाने के लिए क्या करते हैं, तो गिनती कितने पाठकों प्रत्येक पुस्तक जोड़ी को पढ़ लिया है:

val results = df.join(
    df.select($"books" as "r_books", $"readers" as "r_readers"), 
    $"readers" === $"r_readers" and $"books" < $"r_books" 
).groupBy(
    $"books", $"r_books" 
).agg(
    $"books", $"r_books", count($"readers") 
) 

कि के बारे में अतिरिक्त जानकारी के लिए के रूप में शामिल होने, ध्यान दें कि मैं df स्वंय पर शामिल होने के हूँ - एक आत्म में शामिल होने: df.join(df.select(...), ...)। आप जो करना चाहते हैं वह पुस्तक # 1 - $"books" - एक ही पाठक - $"reader" === $"r_reader" से दूसरी पुस्तक - $"r_books" के साथ एक साथ सिलाई करना है। लेकिन अगर आप केवल $"reader" === $"r_reader" के साथ जुड़ गए हैं, तो आपको वही पुस्तक वापस मिल जाएगी। इसके बजाय, मैं यह सुनिश्चित करने के लिए $"books" < $"r_books" का उपयोग करता हूं कि पुस्तक जोड़े में क्रम हमेशा (<lower_id>,<higher_id>) है।

एक बार जब आप शामिल हों, तो आपको प्रत्येक पुस्तक जोड़ी के प्रत्येक पाठक के लिए एक डेटाफ्रेम मिल जाएगा। groupBy और agg फ़ंक्शंस प्रति पुस्तक युग्मक पाठकों की संख्या की वास्तविक गणना करते हैं।

संयोग से, यदि एक पाठक दो बार एक ही पुस्तक पढ़ता है, तो मेरा मानना ​​है कि आप एक डबल-गिनती के साथ समाप्त हो जाएंगे, जो आप चाहते हैं या नहीं भी हो सकता है। यदि ऐसा नहीं है तो आप count($"readers") से countDistinct($"readers") को बदलना चाहते हैं।

आप, agg कार्यों count() और countDistinct() और अन्य मज़ा सामान का एक समूह के बारे में अधिक जानते हैं org.apache.spark.sql.functions

+0

बहुत धन्यवाद के लिए scaladoc की जाँच करना चाहते हैं! मैं डेटा फ्रेम और udfs के लिए नया हूँ। क्या आप यहां शामिल मानदंडों की व्याख्या कर सकते हैं? मौखिक रूप से .. – zork

+1

यूडीएफ भूल जाओ - मैं मूर्ख हूँ। मैंने बस यूडीएफ से छुटकारा पाने के लिए एक रास्ता निकाला और अलग –

+0

ग्रेट! ... डेटा फ्रेम के साथ पकड़ने और इस कोड को चलाने की कोशिश कर, पहले कभी उनका इस्तेमाल नहीं किया। – zork

संबंधित मुद्दे