2017-07-31 20 views
5

मैं कई सह-रूटीन से उपभोग कर रहा हूं और परिणाम चैनल पर वापस धक्का दे रहा हूं। निर्माता अंतिम आइटम के बाद अपने चैनल को बंद कर रहा है।फैन-आउट/फैन-इन - क्लोजिंग परिणाम चैनल

कोड कभी खत्म नहीं होता है परिणाम चैनल बंद नहीं किया जा रहा है। पुनरावृत्ति का पता लगाने और सही तरीके से कैसे समाप्त करें ताकि hasNext() वापसी false?

val inputData = (0..99).map { "Input$it" } 
val threads = 10 

val bundleProducer = produce<String>(CommonPool, threads) { 
    inputData.forEach { item -> 
     send(item) 
     println("Producing: $item") 
    } 

    println("Producing finished") 
    close() 
} 

val resultChannel = Channel<String>(threads) 

repeat(threads) { 
    launch(CommonPool) { 
     bundleProducer.consumeEach { 
      println("CONSUMING $it") 
      resultChannel.send("Result ($it)") 
     } 
    } 
} 

val iterator = object : Iterator<String> { 
    val iterator = resultChannel.iterator() 
    override fun hasNext() = runBlocking { iterator.hasNext() } 
    override fun next() = runBlocking { iterator.next() } 
}.asSequence() 

println("Starting interation...") 

val result = iterator.toList() 

println("finish: ${result.size}") 
+0

ऐसा करने के लिए हैकिश तरीका मुझे लगता है कि परिणामस्वरूप अनुक्रम पर (100)। (100) है, लेकिन मुझे यकीन नहीं है कि यह अंतर्निहित संरचनाओं को किस राज्य में छोड़ देता है। – atok

उत्तर

3

आप एक coroutine उपभोक्ताओं को समाप्त करने के लिए इंतजार कर रहा है कि चला सकते हैं और उसके बाद resultChannel बंद कर देता है।

पहले, कोड है कि उपभोक्ताओं को शुरू होता है Job रों को बचाने के लिए फिर से लिखने:

val jobs = (1..threads).map { 
    launch(CommonPool) { 
     bundleProducer.consumeEach { 
      println("CONSUMING $it") 
      resultChannel.send("Result ($it)") 
     } 
    } 
} 

और फिर एक और coroutine कि चैनल एक बार सभी Job रों किया जाता है बंद कर देता है चलाएँ:

launch(CommonPool) { 
    jobs.forEach { it.join() } 
    resultChannel.close() 
} 
संबंधित मुद्दे