5

मुझे कैसंड्रा तालिका लाखों पंक्तियों से पढ़ने के लिए एक कठिन काम है। असल में इस तालिका में 40 ~ 50 मिलियन पंक्तियों की तरह है। डेटा वास्तव में हमारे सिस्टम के लिए आंतरिक यूआरएल है और हमें उन सभी को आग लगाना होगा। इसे आग लगाने के लिए, हम अक्का स्ट्रीम का उपयोग कर रहे हैं और यह बहुत अच्छा काम कर रहा है, आवश्यकतानुसार कुछ बैक प्रेशर कर रहा है। लेकिन हमें अभी भी सबकुछ प्रभावी ढंग से पढ़ने का कोई तरीका नहीं मिला है।कैसंद्रा से लाखों पंक्तियों को प्रभावी रूप से कैसे पढ़ा जाए?

क्या हम अब तक की कोशिश की है:

  • अक्का स्ट्रीम का उपयोग कर स्ट्रीम के रूप में डेटा पढ़ना। हम प्रेत-डीएसएल का उपयोग कर रहे हैं जो एक विशिष्ट तालिका के लिए एक प्रकाशक प्रदान करता है। लेकिन यह सब कुछ नहीं पढ़ता है, केवल एक छोटा सा हिस्सा। वास्तव में यह पहले 1 मिलियन के बाद पढ़ने के लिए बंद हो जाता है।

  • स्पार्क का उपयोग किसी विशिष्ट दिनांक से पढ़ना पढ़ना। हमारी तालिका को साल, महीना, दिन, मिनट ... कॉलम के साथ एक समय श्रृंखला तालिका की तरह मॉडलिंग किया जाता है। अभी हम दिन-प्रतिदिन चयन कर रहे हैं, इसलिए स्पार्क संसाधित होने के लिए बहुत सी चीजें नहीं लाएगा, लेकिन यह उन सभी दिनों का चयन करने का दर्द है।

    val cassandraRdd = 
         sc 
         .cassandraTable("keyspace", "my_table") 
         .select("id", "url") 
         .where("year = ? and month = ? and day = ?", date.getYear, date.getMonthOfYear, date.getDayOfMonth) 
    

    दुर्भाग्य से मैं विभाजन से अधिक पुनरावृति नहीं कर सकते कम डेटा प्राप्त करने के लिए, मैं क्योंकि यह शिकायत अभिनेता serializable नहीं है एक कलेक्ट उपयोग करने के लिए:

कोड निम्नलिखित है।

val httpPool: Flow[(HttpRequest, String), (Try[HttpResponse], String), HostConnectionPool] = Http().cachedHostConnectionPool[String](host, port).async 

val source = 
    Source 
    .actorRef[CassandraRow](10000000, OverflowStrategy.fail) 
    .map(row => makeUrl(row.getString("id"), row.getString("url"))) 
    .map(url => HttpRequest(uri = url) -> url) 

val ref = Flow[(HttpRequest, String)] 
    .via(httpPool.withAttributes(ActorAttributes.supervisionStrategy(decider))) 
    .to(Sink.actorRef(httpHandlerActor, IsDone)) 
    .runWith(source) 

cassandraRdd.collect().foreach { row => 
    ref ! row 
} 

मुझे पता है कि अगर आप में से किसी के लिए कुछ भी और इतने पर एकत्रीकरण से अलग करने के लिए पंक्तियों के लाखों लोगों को पढ़ने पर इस तरह के अनुभव है चाहते हैं।

इसके अलावा मैंने सब कुछ पढ़ना और कफका विषय को भेजना सोचा है, जहां मुझे स्ट्रीमिंग (स्पार्क या अक्का) का उपयोग करना होगा, लेकिन समस्या वही होगी, उन सभी डेटा को प्रभावी ढंग से कैसे लोड करें?

संपादित

अभी के लिए, मैं स्मृति 100GB का एक उचित राशि के साथ एक क्लस्टर पर चल रहा है और एक कलेक्ट कर रहे हैं और इस पर पुनरावृत्ति कर रहा हूँ।

इसके अलावा, इस मैं लाने और HTTP पर =/

अब तक सब कुछ भेजने की जरूरत है आदि

चिंगारी के साथ bigdata हो रही है और reduceByKey, aggregateByKey, आदि जैसी चीजों का उपयोग कर इसे विश्लेषण, से दूर अलग है यह मेरे द्वारा किए गए तरीके से काम कर रहा है, लेकिन मुझे डर है कि यह डेटा एक बिंदु पर बड़ा और बड़ा हो जाता है जहां सबकुछ स्मृति में लाने से कोई मतलब नहीं होता है।

इस डेटा को स्ट्रीम करना सबसे अच्छा समाधान होगा, जो भाग में ला रहा है, लेकिन मुझे इसके लिए अभी तक एक अच्छा दृष्टिकोण नहीं मिला है।

अंत में, मैं उन सभी डेटा प्राप्त करने के लिए स्पार्क का उपयोग करने, सीएसवी फ़ाइल उत्पन्न करने और प्रक्रिया के लिए अक्का स्ट्रीम आईओ का उपयोग करने के बारे में सोच रहा हूं, इस तरह से मैं स्मृति में बहुत सी चीजों को स्मृति में रखने के लिए बेदखल कर दूंगा हर लाख प्रक्रिया करने के लिए घंटे।

+0

यह स्टैक ओवरफ्लो पर उत्तर देने के लिए एक बहुत बड़ा सवाल है और यह विशेष रूप से प्रेत से संबंधित नहीं है जो मैं बता सकता हूं। मेरी मजबूत भावना यह है कि पूरे अपराधी को HTTP पर जाना पड़ता है। एक एचडीएफएस या जो भी अन्य विधि साझा करना, जैसे HTTP बिट्स को पहले लाने और इसी तरह। प्रेत स्ट्रीमिंग आपको सबकुछ वापस नहीं दे रहा है, इस बारे में बहुत अधिक जानकारी प्राप्त करना दिलचस्प होगा, मुझे संदेह है कि यह या तो बह रहा है या शायद ओवरस्क्रिप्टिंग कर रहा है, इसलिए ऐसा लगता है कि यह वास्तव में पहले किया गया है। वास्तव में अजीब लगता है। – flavian

+0

हां, वास्तव में, यह एक शुद्ध प्रेत प्रश्न नहीं है, हालांकि मैंने यह जोड़ा है क्योंकि यह मेरा पहला विचार था, लेकिन दुर्भाग्य से यह 1 मिलियन या उससे भी अधिक के बाद बंद हो गया। यहां पर HTTP यहां एक बोतल की गर्दन है और यह सबकुछ संसाधित करने में घंटों की तरह लगती है, और शायद प्रतिक्रियाशील-प्रेत को परेशानी हो रही है, क्योंकि यह HTTP बैकप्रेस को प्रेत स्ट्रीम को साफ़ करता है, इसलिए मुझे कुछ घंटों के बाद पता नहीं चला कि यह पढ़ नहीं सका और अधिक सोचा और यह पहले से ही किया है। –

उत्तर

5

ठीक है, कुछ समय परिणाम खर्च पढ़ने, अन्य लोगों के साथ बात कर रही है और कर परीक्षण निम्नलिखित कोड नमूना द्वारा प्राप्त किया जा सकता है के बाद:

val sc = new SparkContext(sparkConf) 

val cassandraRdd = sc.cassandraTable(config.getString("myKeyspace"), "myTable") 
    .select("key", "value") 
    .as((key: String, value: String) => (key, value)) 
    .partitionBy(new HashPartitioner(2 * sc.defaultParallelism)) 
    .cache() 

cassandraRdd 
    .groupByKey() 
    .foreachPartition { partition => 
    partition.foreach { row => 

     implicit val system = ActorSystem() 
     implicit val materializer = ActorMaterializer() 

     val myActor = system.actorOf(Props(new MyActor(system)), name = "my-actor") 

     val source = Source.fromIterator {() => row._2.toIterator } 
     source 
     .map { str => 
      myActor ! Count 
      str 
     } 
     .to(Sink.actorRef(myActor, Finish)) 
     .run() 
    } 
    } 

sc.stop() 


class MyActor(system: ActorSystem) extends Actor { 

    var count = 0 

    def receive = { 

    case Count => 
     count = count + 1 

    case Finish => 
     println(s"total: $count") 
     system.shutdown() 

    } 
} 

case object Count 
case object Finish 

क्या मैं कर रहा हूँ है निम्नलिखित:

  • विभाजन का उपयोग करके विभाजन और विभाजनकर्ता की एक अच्छी संख्या प्राप्त करने का प्रयास करें और समूह द्वारा विधि
  • डेटा शफल को रोकने के लिए कैश का उपयोग करें, जिससे उच्च स्पा Io आदि का उपयोग करके स्पार्क को नोड्स में बड़े डेटा को स्थानांतरित किया जा सके।
  • संपूर्ण अभिनेता प्रणाली को इसकी निर्भरताओं के साथ-साथ foreachPartition विधि के अंदर स्ट्रीम बनाएं। यहां एक व्यापार बंद है, आपके पास केवल एक अभिनेता सिस्टम हो सकता है लेकिन जैसा कि मैंने प्रश्न में लिखा था, आपको इसका चयन करना होगा। हालांकि अंदर सब कुछ बनाते हुए, आपके पास अभी भी अपने क्लस्टर में वितरित स्पार्क के अंदर चीजें चलाने की क्षमता है।
  • समाप्त इटरेटर को मारने के लिए (पूरा करें) एक संदेश के साथ Sink.actorRef का उपयोग कर

के अंत में प्रत्येक अभिनेता प्रणाली शायद यह कोड और भी बेहतर हो सकता है, लेकिन अब तक मैं ऐसा करने के लिए खुश हूँ अब का चयन न करें और केवल स्पार्क के अंदर काम करें।

+0

यह दिलचस्प है कि मैं सामान्य रूप से आपके प्रदर्शन के बारे में बहुत उत्सुक हूं और यदि आप इसे बेंचमार्क करने में सक्षम हैं। वास्तव में, मुझे लगता है कि प्रति विभाजन एक अभिनेता प्रणाली बनाना एक भारी ऑपरेशन है। विभाजन की संख्या के आधार पर, आप एक ही वीएम पर एक ही मशीन पर 8 से अधिक अभिनेताओं के सिस्टम के साथ खुद को पा सकते हैं? समानांतर होने के बावजूद सिंक्रोनस कॉल करने से तेज़ी से यह कैसा है। – MaatDeamon

+0

@MatDeamon मुझे याद है कि यह काफी तेज था, लेकिन दुर्भाग्य से इसे माप नहीं आया है। अगर मुझे आज फिर से करना होगा, तो मैं अल्पाक्का कैसंद्रा कनेक्टर के लिए जाऊंगा। –

+0

मैं देखता हूँ। आप वितरण दे देंगे? – MaatDeamon

संबंधित मुद्दे