2012-08-13 12 views
11

पर संसाधित करने के लिए Iteratee के साथ एक बड़ी सीएसवी फ़ाइल अपलोड करने का सबसे अच्छा तरीका मैं elasticsearch पर एक बहुत बड़ी CSV फ़ाइलें (लाइनों के लाखों) अपलोड करने के लिए Play2 का उपयोग करना चाहता हूं। मैंने निम्नलिखित कोड लिखा है जो ठीक काम करता है।प्ले 2 स्कैला - प्रत्येक पंक्ति को प्रतिक्रियाशील रूप से

मैं पहले खंड में http प्रतिक्रिया शीर्षलेख को छोड़ने के तरीके से प्रसन्न नहीं हूं। इस Ieratee को पहली Iteratee के साथ श्रृंखलाबद्ध करने का एक तरीका होना चाहिए जो http शीर्षलेख को छोड़ दें और सीधे संपन्न स्थिति पर स्विच करें, लेकिन मुझे नहीं मिला है।

किसी को भी दो iteratees एक साथ

object ReactiveFileUpload extends Controller { 
    def upload = Action(BodyParser(rh => new CsvIteratee(isFirst = true))) { 
    request => 
     Ok("File Processed") 
    } 
} 

case class CsvIteratee(state: Symbol = 'Cont, input: Input[Array[Byte]] = Empty, lastChunk: String = "", isFirst: Boolean = false) extends Iteratee[Array[Byte], Either[Result, String]] { 
    def fold[B](
       done: (Either[Result, String], Input[Array[Byte]]) => Promise[B], 
       cont: (Input[Array[Byte]] => Iteratee[Array[Byte], Either[Result, String]]) => Promise[B], 
       error: (String, Input[Array[Byte]]) => Promise[B] 
       ): Promise[B] = state match { 
    case 'Done => 
     done(Right(lastChunk), Input.Empty) 

    case 'Cont => cont(in => in match { 
     case in: El[Array[Byte]] => { 
     // Retrieve the part that has not been processed in the previous chunk and copy it in front of the current chunk 
     val content = lastChunk + new String(in.e) 
     val csvBody = 
      if (isFirst) 
      // Skip http header if it is the first chunk 
      content.drop(content.indexOf("\r\n\r\n") + 4) 
      else content 
     val csv = new CSVReader(new StringReader(csvBody), ';') 
     val lines = csv.readAll 
     // Process all lines excepted the last one since it is cut by the chunk 
     for (line <- lines.init) 
      processLine(line) 
     // Put forward the part that has not been processed 
     val last = lines.last.toList.mkString(";") 
     copy(input = in, lastChunk = last, isFirst = false) 
     } 
     case Empty => copy(input = in, isFirst = false) 
     case EOF => copy(state = 'Done, input = in, isFirst = false) 
     case _ => copy(state = 'Error, input = in, isFirst = false) 
    }) 

    case _ => 
     error("Unexpected state", input) 

    } 

    def processLine(line: Array[String]) = WS.url("http://localhost:9200/affa/na/").post(
    toJson(
     Map(
     "date" -> toJson(line(0)), 
     "trig" -> toJson(line(1)), 
     "code" -> toJson(line(2)), 
     "nbjours" -> toJson(line(3).toDouble) 
    ) 
    ) 
) 
} 
+1

:

आप की तरह कुछ करने के लिए चाहते हो सकता है -iteratees-एक्स्ट्रा कलाकार/ब्लॉब/मास्टर/src/मुख्य/स्केला/खेल/अतिरिक्त/iteratees/Csv.scala)। –

उत्तर

1
श्रृंखला के लिए

मदद कर सकते हैं, तो flatMap का उपयोग करें।

val combinedIteratee = firstIteratee.flatMap(firstResult => secondIteratee) 

या, समझ के लिए एक का उपयोग कर:

val combinedIteratee = for { 
    firstResult <- firstIteratee 
    secondResult <- secondIteratee 
} yield secondResult 

आप जितने चाहें एक साथ के रूप में कई iteratees क्रम को flatMap उपयोग कर सकते हैं। आप अनौपचारिक प्ले-iteratee-एक्स्ट्रा कलाकार है, जो एक [सीएसवी पार्सर] (https://github.com/jroper/play है में रुचि रखते द्वारा कर सकता है

val headerAndCsvIteratee = for { 
    headerResult <- headerIteratee 
    csvResult <- csvIteratee 
} yield csvResult 
संबंधित मुद्दे