2016-06-18 10 views
12

में राज्य में राज्य प्रत्येक पंक्ति को संसाधित करने के लिए मैं एक्का धाराओं का उपयोग करके कई बड़ी फ़ाइलों को पढ़ना चाहता हूं। कल्पना करें कि प्रत्येक कुंजी में एक ("पहचानकर्ता" -> "मान") होता है। यदि कोई नया "पहचानकर्ता" पाया जाता है, तो मैं इसे डेटाबेस में सहेजना चाहता हूं और अपना "मान" सहेजना चाहता हूं, अन्यथा अगर पहचानकर्ता पहले से ही लाइनों की धारा को संसाधित करते समय पाया गया है, तो मैं केवल "मान" को सहेजना चाहता हूं। इसके लिए, मुझे लगता है कि मुझे मानचित्र में पहले से पाए गए पहचानकर्ताओं को रखने के लिए किसी तरह के पुनरावर्ती राज्य प्रवाह की आवश्यकता है। मुझे लगता है कि मैं इस प्रवाह में एक जोड़ी (newLine, contextWithIdentifiers) प्राप्त करूंगा।अक्का स्ट्रीम। प्रवाह

मैंने अभी एक्क्का धाराओं को देखना शुरू कर दिया है। मुझे लगता है कि मैं खुद को स्टेटलेस प्रोसेसिंग सामान करने के लिए प्रबंधित कर सकता हूं लेकिन मुझे "contextWithIdentifiers" को रखने के बारे में कोई जानकारी नहीं है। अगर कोई मुझे अच्छी दिशा में इंगित नहीं कर सकता तो मैं सराहना करता हूं।

मैं स्कैला का उपयोग कर रहा हूं।

+2

मैं आपको यह पूछने की सराहना करता हूं। यह इतना आसान अनुरोध है, फिर भी नमूना कोड के साथ सार्थक उत्तर ढूंढना विस्तृत लगता है। यह एकमात्र ऐसा पाया गया है! – akauppi

उत्तर

17

अरे शायद राज्यपूर्ण मैपकॉन्कैट की तरह कुछ आपको वहां मदद कर सकता है।

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Sink, Source} 
import scala.util.Random._ 
import scala.math.abs 
import scala.concurrent.ExecutionContext.Implicits.global 

implicit val system = ActorSystem() 
implicit val materializer = ActorMaterializer() 

//encapsulating your input 
case class IdentValue(id: Int, value: String) 
//some random generated input 
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere")) 

val stateFlow = Flow[IdentValue].statefulMapConcat{() => 
    //state with already processed ids 
    var ids = Set.empty[Int] 
    identValue => if (ids.contains(identValue.id)) { 
    //save value to DB 
    println(identValue.value) 
    List(identValue) 
    } else { 
    //save both to database 
    println(identValue) 
    ids = ids + identValue.id 
    List(identValue) 
    } 
} 

Source(identValues) 
    .via(stateFlow) 
    .runWith(Sink.seq) 
    .onSuccess { case identValue => println(identValue) } 
+0

कोड के लिए धन्यवाद। मैं मध्य में थोड़ा और प्रकार की सराहना करता हूं, क्योंकि इसमें एक() => ... कारखाना शामिल है। क्या आप जानते हैं कि 'statefulMap' विधि क्यों नहीं है? – akauppi

संबंधित मुद्दे