2013-07-04 3 views
12

पर डिफ़ॉल्ट डिफ़ॉल्ट मान वापस करें मुझे समानांतर में एकाधिक वायदा चलाने होंगे और प्रोग्राम को क्रैश या लटका नहीं होना चाहिए।समांतर में एकाधिक फ़्यूचर्स चलाएं, टाइमआउट

अब के लिए मैं एक-एक करके वायदा पर इंतजार करता हूं, और टाइमआउट अपवाद होने पर फ़ॉलबैक मान का उपयोग करता हूं।

val future1 = // start future1 
val future2 = // start future2 
val future3 = // start future3 

// <- at this point all 3 futures are running 

// waits for maximum of timeout1 seconds 
val res1 = toFallback(future1, timeout1, Map[String, Int]()) 
// .. timeout2 seconds 
val res2 = toFallback(future2, timeout2, List[Int]()) 
// ... timeout3 seconds 
val res3 = toFallback(future3, timeout3, Map[String, BigInt]()) 

def toFallback[T](f: Future[T], to: Int, default: T) = { 
    Try(Await.result(f, to seconds)) 
    .recover { case to: TimeoutException => default } 
} 

मैं देख सकते हैं, इस स्निपेट की अधिकतम प्रतीक्षा समय timeout1 + timeout2 + timeout3

मेरा प्रश्न है: कैसे मैं एक बार में उन वायदा के सभी पर इंतजार कर सकते हैं, तो मैं max(timeout1, timeout2, timeout3) को प्रतीक्षा समय को कम कर सकते हैं?

संपादित करें: अंत में मैं @Jatin और @senia जवाब के संशोधन का प्रयोग किया:

private def composeWaitingFuture[T](fut: Future[T], 
            timeout: Int, default: T) = 
    future { Await.result(fut, timeout seconds) } recover { 
    case e: Exception => default 
    } 

और बाद में इसे प्रयोग किया जाता है इस प्रकार है:

// starts futures immediately and waits for maximum of timeoutX seconds 
val res1 = composeWaitingFuture(future1, timeout1, Map[String, Int]()) 
val res2 = composeWaitingFuture(future2, timeout2, List[Int]()) 
val res3 = composeWaitingFuture(future3, timeout3, Map[String, BigInt]()) 

// takes the maximum of max(timeout1, timeout2, timeout3) to complete 
val combinedFuture = 
    for { 
    r1 <- res1 
    r2 <- res2 
    r3 <- res3 
    } yield (r1, r2, r3) 

और बाद में मैं combinedFuture का उपयोग के रूप में मैं फिट देखना ।

+0

क्या मुझे समझ नहीं आता है, यह 'timeout1 + timeout2 + timeout3' है? यह भविष्य के लिए 'टाइमआउट 1' है, भविष्य के लिए टाइमआउट 2 और इसी तरह। प्रश्न अभी भी मेरे लिए अस्पष्ट हैं – Jatin

+1

वह समानांतर में 3 कार्यों को चलाने के लिए चाहता है, जैसे कि टाइमआउट अधिकतम तीन कार्यों का टाइमआउट –

+1

है, मुझे लगता है कि मैंने जो जवाब दिया है वह कुछ समय पहले जैसा है, वही है जो आप चाहते हैं और यह भी गैर-अवरुद्ध कॉलबैक का लाभ उठाता है। http://stackoverflow.com/questions/16304471/scala-futures-built-in-timeout/16305056#16305056 – cmbaxter

उत्तर

8
def toFallback[T](f: Future[T], to: Int, default: T) = { 
    future{ 
    try{ 
     Await.result(f, to seconds) 
    }catch{ 
     case e:TimeoutException => default 
    } 
} 

तुम भी इस ब्लॉक अतुल्यकालिक बना सकते हैं और प्रत्येक अनुरोध इसकी अधिकतम समय के लिए इंतजार कर रहा है। यदि बहुत सारे धागे हैं, तो शायद एक सिंगल थ्रेड है जो अक्का के system scheduler का उपयोग करके अन्य वायदा की जांच करता रहता है। @ सेनिया ने इस पर उत्तर दिया है।

+5

'Await.result' ब्लॉक थ्रेड, इसलिए आपको यहां डिफ़ॉल्ट 'ExecutionContext' का उपयोग नहीं करना चाहिए। आप 'toFallback' की कॉल के लिए एक विशेष 'निष्पादन कॉन्टेक्स्ट' बना सकते हैं या 'भविष्य में' विधि की बजाय एक नया धागा भी शुरू कर सकते हैं जैसे कि [इस उत्तर] (http://stackoverflow.com/a/17215663/406435)। – senia

12

आपको लगता है कि flatMap या का उपयोग करते हुए सभी 3 वायदा का परिणाम देता है future बना सकते हैं-समझ के लिए:

val combinedFuture = 
    for { 
    r1 <- future1 
    r2 <- future2 
    r3 <- future3 
    } yield (r1, r2, r3) 

val (r1, r2, r3) = Await.result(combinedFuture , Seq(timeout1, timeout2, timeout3).max) 

आप akka उपयोग कर रहे हैं आप समय समाप्ति के बाद डिफ़ॉल्ट मान के साथ अपने भविष्य के पूरा कर सकता है:

implicit class FutureHelper[T](f: Future[T]) extends AnyVal{ 
    import akka.pattern.after 
    def orDefault(t: Timeout, default: => T)(implicit system: ActorSystem): Future[T] = { 
    val delayed = after(t.duration, system.scheduler)(Future.successful(default)) 
    Future firstCompletedOf Seq(f, delayed) 
    } 
} 

val combinedFuture = 
    for { 
    r1 <- future1.orDefault(timeout1, Map()) 
    r2 <- future2.orDefault(timeout2, List()) 
    r3 <- future3.orDefault(timeout3, Map()) 
    } yield (r1, r2, r3) 

val (r1, r2, r3) = Await.result(combinedFuture , allowance + Seq(timeout1, timeout2, timeout3).max) 
+0

इसमें एक दोष है। कहें कि 'भविष्य 1' ने काफी समय लगाया लेकिन अन्य वायदा पूरा हो गया, तो आपको कुछ भी आउटपुट नहीं मिलेगा। 'भविष्य 2' और 'भविष्य 3' आउटपुट बेकार होगा। – Jatin

+1

@ जतिन: आप अपने भविष्य को 'अक्का' में डिफ़ॉल्ट मूल्य के साथ पूरा कर सकते हैं। अद्यतन देखें। – senia

2

मैं Await.result का उपयोग करने से बचूंगा क्योंकि यह केवल अवरुद्ध करने के लिए थ्रेड का उपयोग करता है। टाइमआउट लागू करने के लिए एक विकल्प वायदा इस होगा:

val timer = new Timer() 

def toFallback[T](f: Future[T], timeout: Int, default: T) = { 
    val p = Promise[T]() 
    f.onComplete(result => p.tryComplete(result)) 
    timer.schedule(new TimerTask { 
    def run() { 
     p.tryComplete(Success(default)) 
    } 
    }, timeout) 
    p.future 
} 

यह एक वादा है जो या तो एक भविष्य निर्दिष्ट समय के बाद डिफ़ॉल्ट परिणाम द्वारा या एक तक पूरा हो जाएगा बनाता है - जो भी पहले हो।

प्रश्नों को चलाने के लिए समवर्ती तुम इतनी तरह करना होगा:

val future1 = // start future1 
val future2 = // start future2 
val future3 = // start future3 

val res1 = toFallback(future1, timeout1, Map[String, Int]()) 
val res2 = toFallback(future2, timeout2, List[Int]()) 
val res3 = toFallback(future3, timeout3, Map[String, BigInt]()) 

val resultF = for { 
    r1 <- res1 
    r2 <- res2 
    r3 <- res3 
} yield (r1, r2, r3) 

val (r1, r2, r3) = Await.result(resultF, Duration.Inf) 
println(s"$r1, $r2, $r3") 

//or 
resultF.onSuccess { 
    case (r1, r2, r3) => println(s"$r1, $r2, $r3") 
} 
0

क्यों Futureही अपवाद पकड़ने और डिफ़ॉल्ट की वापसी प्रदर्शन करने के लिए नहीं मिल सकता है? फिर आप बदले में प्रत्येक भविष्य पर Await कर सकते हैं, और आपको भविष्य के बाहर अपवाद हैंडलिंग के बारे में चिंता करने की आवश्यकता नहीं है।

0

यह शायद थोड़ा सा हैकी है, लेकिन आप बस समय बीतने के उपाय कर सकते हैं और तदनुसार टाइमआउट संशोधित कर सकते हैं। timeout1 <= timeout2 <= timeout3 मान लिया जाये:

def now  = System.currentTimeMillis(); 
val start = now; 
def remains(timeout: Long): Long 
      = math.max(0, timeout + start - now) 

def toFallback[T](f: Future[T], to: Int, default: T) = { 
    Try(Await.result(f, remains(to) seconds)) 
    .recover { case to: TimeoutException => default } 
} 

इस तरह प्रत्येक टाइमआउट पल start = now लिए आधारित है बुलाया गया था, इसलिए समग्र प्रसारण समय सबसे timeout3 पर है। यदि टाइमआउट्स को अवरुद्ध नहीं किया जाता है, तो यह अभी भी काम करता है, लेकिन कुछ कार्यों को उनके निर्दिष्ट टाइमआउट से अधिक समय तक चलाना छोड़ दिया जा सकता है।

2

यहां एक लंबा (अनकाका) उत्तर है जो पता चलता है कि उपयोग के मामले में क्या हो सकता है, अर्थात्, यदि मूल्यों में से एक "परिणाम" आप उस परिणाम के लिए डिफ़ॉल्ट मान का उपयोग करना चाहते हैं और इसके साथ कुछ भी करना चाहते हैं (जैसे कि लंबी चलती गणना या i/o या जो कुछ भी रद्द करें)।

कहने की जरूरत नहीं है, दूसरी कहानी अवरोध को कम करना है।

मूल विचार firstCompletedOf की प्रतीक्षा करने वाले लूप में बैठना है जो अभी तक पूरा नहीं हुआ है। ready पर टाइमआउट न्यूनतम शेष टाइमआउट है।

यह कोड अवधि के बजाय समय सीमा का उपयोग करता है, लेकिन "शेष समय" के रूप में अवधि का उपयोग करना आसान है।

import scala.language.postfixOps 
import scala.concurrent._ 
import scala.concurrent.duration._ 
import ExecutionContext.Implicits._ 
import scala.reflect._ 
import scala.util._ 
import java.lang.System.{ nanoTime => now } 

import Test.time 

class Test { 

    type WorkUnit[A] = (Promise[A], Future[A], Deadline, A) 
    type WorkQ[A] = Seq[WorkUnit[A]] 

    def await[A: ClassTag](work: Seq[(Future[A], Deadline, A)]): Seq[A] = { 
    // check for timeout; if using Duration instead of Deadline, decrement here 
    def ticktock(w: WorkUnit[A]): WorkUnit[A] = w match { 
     case (p, f, t, v) if !p.isCompleted && t.isOverdue => p trySuccess v ; w 
     case _ => w 
    } 
    def await0(work: WorkQ[A]): WorkQ[A] = { 
     val live = work filterNot (_._1.isCompleted) 
     val t0 = (live map (_._3)).min 
     Console println s"Next deadline in ${t0.timeLeft.toMillis}" 
     val f0 = Future firstCompletedOf (live map (_._2)) 
     Try(Await ready (f0, t0.timeLeft)) 
     val next = work map (w => ticktock(w)) 
     if (next exists (!_._1.isCompleted)) { 
     await0(next) 
     } else { 
     next 
     } 
    } 
    val wq = work map (_ match { 
     case (f, t, v) => 
     val p = Promise[A] 
     p.future onComplete (x => Console println s"Value available: $x: $time") 
     f onSuccess { 
      case a: A => p trySuccess a // doesn't match on primitive A 
      case x => p trySuccess x.asInstanceOf[A] 
     } 
     f onFailure { case _ => p trySuccess v } 
     (p, f, t, v) 
    }) 
    await0(wq) map (_ match { 
     case (p, f, t, v) => p.future.value.get.get 
    }) 
    } 
} 

object Test { 
    val start = now 
    def time = s"The time is ${ Duration fromNanos (now - start) toMillis }" 

    def main(args: Array[String]): Unit = { 
    // #2 times out 
    def calc(i: Int) = { 
     val t = if (args.nonEmpty && i == 2) 6 else i 
     Thread sleep t * 1000L 
     Console println s"Calculate $i: $time" 
     i 
    } 
    // futures to be completed by a timeout deadline 
    // or else use default and let other work happen 
    val work = List(
     (future(calc(1)), 3 seconds fromNow, 10), 
     (future(calc(2)), 5 seconds fromNow, 20), 
     (future(calc(3)), 7 seconds fromNow, 30) 
    ) 
    Console println new Test().await(work) 
    } 
} 

नमूना रन:

[email protected]:~/tmp$ skalac nextcompleted.scala ; skala nextcompleted.Test 
Next deadline in 2992 
Calculate 1: The time is 1009 
Value available: Success(1): The time is 1012 
Next deadline in 4005 
Calculate 2: The time is 2019 
Value available: Success(2): The time is 2020 
Next deadline in 4999 
Calculate 3: The time is 3020 
Value available: Success(3): The time is 3020 
List(1, 2, 3) 
[email protected]:~/tmp$ skala nextcompleted.Test arg 
Next deadline in 2992 
Calculate 1: The time is 1009 
Value available: Success(1): The time is 1012 
Next deadline in 4005 
Calculate 3: The time is 3020 
Value available: Success(3): The time is 3020 
Next deadline in 1998 
Value available: Success(20): The time is 5020 
List(1, 20, 3) 
संबंधित मुद्दे