दो कॉलम books
और इन पुस्तकों, जहां books
और readers
क्रमशः पुस्तक और पाठक आईडी कर रहे हैं, की readers
के साथ एक टेबल है:स्पार्क: गिनती सह-घटना - विशाल संग्रह के कुशल बहु-पास छानने के लिए एल्गोरिथ्म
books readers
1: 1 30
2: 2 10
3: 3 20
4: 1 20
5: 1 10
6: 2 30
रिकॉर्ड book = 1, reader = 30
का अर्थ है कि id = 1
के साथ पुस्तक id = 30
के साथ उपयोगकर्ता द्वारा पढ़ी गई थी। प्रत्येक पुस्तक जोड़ी मैं, पाठकों को दोनों इन पुस्तकों की पढ़ा की संख्या की गणना करने की जरूरत है इस एल्गोरिथ्म के साथ के लिए:
for each book
for each reader of the book
for each other_book in books of the reader
increment common_reader_count ((book, other_book), cnt)
इस कलन विधि का उपयोग करने का लाभ यह है कि यह आपरेशन के एक छोटी संख्या की आवश्यकता है दो पुस्तक संयोजनों की गिनती की तुलना में दो।
उपर्युक्त एल्गोरिदम लागू करने के लिए मैं इस डेटा को दो समूहों में व्यवस्थित करता हूं: 1) पुस्तक द्वारा कुंजी, एक आरडीडी जिसमें प्रत्येक पुस्तक के पाठक होते हैं और 2) पाठक द्वारा की जाती है, एक आरडीडी जिसमें प्रत्येक पाठक द्वारा पढ़ी जाने वाली किताबें होती हैं, जैसे कि निम्नलिखित कार्यक्रम: अलग, अक्षम एल्गोरिथ्म में तथ्य परिणामों में
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level
object Small {
case class Book(book: Int, reader: Int)
case class BookPair(book1: Int, book2: Int, cnt:Int)
val recs = Array(
Book(book = 1, reader = 30),
Book(book = 2, reader = 10),
Book(book = 3, reader = 20),
Book(book = 1, reader = 20),
Book(book = 1, reader = 10),
Book(book = 2, reader = 30))
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setAppName("Test")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val data = sc.parallelize(recs)
val bookMap = data.map(r => (r.book, r))
val bookGrps = bookMap.groupByKey
val readerMap = data.map(r => (r.reader, r))
val readerGrps = readerMap.groupByKey
// *** Calculate book pairs
// Iterate book groups
val allBookPairs = bookGrps.map(bookGrp => bookGrp match {
case (book, recIter) =>
// Iterate user groups
recIter.toList.map(rec => {
// Find readers for this book
val aReader = rec.reader
// Find all books (including this one) that this reader read
val allReaderBooks = readerGrps.filter(readerGrp => readerGrp match {
case (reader2, recIter2) => reader2 == aReader
})
val bookPairs = allReaderBooks.map(readerTuple => readerTuple match {
case (reader3, recIter3) => recIter3.toList.map(rec => ((book, rec.book), 1))
})
bookPairs
})
})
val x = allBookPairs.flatMap(identity)
val y = x.map(rdd => rdd.first)
val z = y.flatMap(identity)
val p = z.reduceByKey((cnt1, cnt2) => cnt1 + cnt2)
val result = p.map(bookPair => bookPair match {
case((book1, book2),cnt) => BookPair(book1, book2, cnt)
})
val resultCsv = result.map(pair => resultToStr(pair))
resultCsv.saveAsTextFile("./result.csv")
}
def resultToStr(pair: BookPair): String = {
val sep = "|"
pair.book1 + sep + pair.book2 + sep + pair.cnt
}
}
यह implemntation!:
for each book
find each reader of the book scanning all readers every time!
for each other_book in books of the reader
increment common_reader_count ((book, other_book), cnt)
जो क्योंकि कम करने के बजाय एल्गोरिथ्म ऊपर चर्चा का मुख्य लक्ष्य के विपरीत है, यह आपरेशन की संख्या बढ़ जाती है। उपयोगकर्ता पुस्तकों को ढूंढने के लिए सभी पुस्तकों के लिए सभी उपयोगकर्ताओं को फ़िल्टर करना आवश्यक है। इस प्रकार संचालन की संख्या ~ एन * एम जहां एन - उपयोगकर्ताओं की संख्या और एम - किताबों की संख्या।
सवाल:
- वहाँ हर पुस्तक के लिए पूर्ण पाठक संग्रह छानने के बिना स्पार्क में मूल एल्गोरिथ्म लागू करने के लिए कोई तरीका है?
- पुस्तक जोड़ी की गणना करने के लिए कोई अन्य एल्गोरिदम कुशलता से गिना जाता है?
- इसके अलावा, जब वास्तव में इस कोड को चलाते हैं तो मुझे
filter exception
मिलता है जिसके कारण मैं समझ नहीं पा रहा हूं। कोई विचार?
कृपया नीचे दिए गए अपवाद लॉग देखें:
15/05/29 18:24:05 WARN util.Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0)
15/05/29 18:24:05 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/05/29 18:24:09 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/05/29 18:24:10 INFO Remoting: Starting remoting
15/05/29 18:24:10 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:38910]
15/05/29 18:24:10 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:38910]
15/05/29 18:24:12 ERROR executor.Executor: Exception in task 0.0 in stage 6.0 (TID 4)
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.filter(RDD.scala:282)
at Small$$anonfun$4$$anonfun$apply$1.apply(Small.scala:58)
at Small$$anonfun$4$$anonfun$apply$1.apply(Small.scala:54)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at Small$$anonfun$4.apply(Small.scala:54)
at Small$$anonfun$4.apply(Small.scala:51)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
अद्यतन:
इस कोड:
books r_books COUNT(readers)
1 2 2
:
val df = sc.parallelize(Array((1,30),(2,10),(3,20),(1,10)(2,30))).toDF("books","readers")
val results = df.join(
df.select($"books" as "r_books", $"readers" as "r_readers"),
$"readers" === $"r_readers" and $"books" < $"r_books"
)
.groupBy($"books", $"r_books")
.agg($"books", $"r_books", count($"readers"))
निम्न परिणाम देता है
तो COUNT
यहां कई बार दो पुस्तकें (यहां 1 और 2) एक साथ पढ़ी गईं (जोड़े की गिनती)।
बहुत धन्यवाद के लिए scaladoc की जाँच करना चाहते हैं! मैं डेटा फ्रेम और udfs के लिए नया हूँ। क्या आप यहां शामिल मानदंडों की व्याख्या कर सकते हैं? मौखिक रूप से .. – zork
यूडीएफ भूल जाओ - मैं मूर्ख हूँ। मैंने बस यूडीएफ से छुटकारा पाने के लिए एक रास्ता निकाला और अलग –
ग्रेट! ... डेटा फ्रेम के साथ पकड़ने और इस कोड को चलाने की कोशिश कर, पहले कभी उनका इस्तेमाल नहीं किया। – zork