2015-03-03 10 views
6

संदर्भ: मैं आउटपुट ChunkedResponseStart(bytes), MessageChunk(bytes), MessageChunk(bytes), ..., ChunkedResponseEnd के साथ Process1[ByteVector, spray.http.HttpResponsePart] लिखने की कोशिश कर रहा हूं। मैंने अभी तक स्केलज़-स्ट्रीम और इसकी शब्दावली के आसपास अपने सिर को पूरी तरह से लपेटा नहीं है।स्केलज़-स्ट्रीम: शेष के लिए अलग-अलग तरीके से "हेडर" (पहले भाग) को कैसे संभालें?

प्रक्रिया को कैसे लिखें जो पहले n हिस्सों को अलग-अलग संभाल सकता है?

मैं इस (एक उदाहरण के रूप तार) के साथ आ गया है:

val headerChunk = process1.chunk[String](5).map(_.reduce(_ + _)) 

val headerChunkAndRest: Process1[String, String] = 
    headerChunk.take(1) ++ process1.id 

io.linesR(Files.newInputStream(Paths.get("testdata/fahrenheit.txt"))) 
    .pipe(headerChunkAndRest) 
    .to(io.stdOutLines) 
    .run.run 

क्या एक मुहावरेदार और संभवतः एक आम तौर पर composable रास्ता headerChunkAndRest लिखने के लिए है?

+0

क्या आपको बाकी लाइनों को संसाधित करते समय शीर्षलेख की सामग्री तक पहुंच की आवश्यकता है? –

+1

स्कॉडेक-स्ट्रीम (स्केलज़ स्ट्रीम को डिकोड करने और बाइनरी सामग्री को एन्कोड करने के लिए) एक अच्छा उदाहरण है जो आपकी समस्या का समाधान कर सकता है: d1 ++ d2: d1 चलाएं और अपने सभी डीकोडेड मानों को उत्सर्जित करें, फिर शेष इनपुट पर d2 चलाएं और इसके मानों को छोड़ दें। उदाहरण: decode.once (codecs.int32) ++ decode.advance (12) एक हस्ताक्षरित इंट डीकोड करता है, फिर कर्सर को 12 बिट्स से आगे बढ़ाता है। (https://github.com/scodec/scodec-stream#decoding) –

+0

@TravisBrown नहीं, बाकी की प्रोसेसिंग के लिए इसकी सामग्री की आवश्यकता नहीं है। मेरे मामले में पहले खंड में शेष प्रकार के डेटा (फ़ाइल के कुछ हिस्सों) के रूप में एक ही प्रकार का डेटा होना चाहिए, यह पहले क्लाइंट को भेजने के लिए एक छोटा सा हिस्सा है। –

उत्तर

4

जनरल विचार

वहाँ दृढ़ता से अपनी आवश्यकताओं के विवरण के आधार पर, यह करने के लिए कई तरीके हैं।

  1. foldWithIndex यह आपको एक संख्या के रूप में हिस्सा की वर्तमान सूचकांक देता है: आप निम्नलिखित सहायक तरीकों कि scalaz धाराओं का हिस्सा हैं उपयोग कर सकते हैं। आप उस सूचकांक के आधार पर भेदभाव कर सकते हैं
  2. zipWithState आप अपनी विधि के एक आमंत्रण से अगले राज्य में एक राज्य जोड़ सकते हैं और इस स्थिति का उपयोग करके ट्रैक कर सकते हैं कि क्या आप अभी भी हेडर पार्स कर रहे हैं या यदि आप शरीर तक पहुंच गए हैं। अगले चरण में आप हेडर और बॉडी को अलग करने के लिए इस स्थिति का उपयोग कर सकते हैं
  3. repartition सभी हेडर और सभी बॉडी तत्वों को एक साथ समूहित करने के लिए इसका उपयोग करें। फिर आप उन्हें अगले चरण में संसाधित कर सकते हैं।
  4. zipWithNext यह फ़ंक्शन हमेशा आपको वर्तमान तत्व के साथ समूहीकृत पिछले तत्व प्रस्तुत करता है। जब आप हेडर से शरीर में स्विच कर रहे हों और तदनुसार प्रतिक्रिया दें, तो आप इसका पता लगाने के लिए इसका उपयोग कर सकते हैं।

संभवतः आपको फिर से विचार करना चाहिए, आपको वास्तव में क्या चाहिए। वास्तव में आपके प्रश्न के लिए, यह zipwithIndex और फिर map होगा। लेकिन अगर आप अपनी समस्या को फिर से सोचते हैं, तो आप शायद repartition या zipWithState के साथ समाप्त हो जाएंगे। एक HTTP ग्राहक, कि शरीर (HTTP, नहीं एचटीएमएल) से HTTP हेडर तत्वों को अलग करती है:

उदाहरण कोड

को एक सरल उदाहरण करते हैं। शीर्षलेख में कुकीज़ जैसी चीजें, शरीर में वास्तविक "सामग्री" होती हैं, जैसे किसी छवि या HTTP स्रोत।

एक सरल HTTP ग्राहक ऐसा दिखाई दे सकता:

import scalaz.stream._ 
import scalaz.concurrent.Task 
import java.net.InetSocketAddress 
import java.nio.channels.AsynchronousChannelGroup 

implicit val AG = nio.DefaultAsynchronousChannelGroup 

def httpGetRequest(hostname : String, path : String = "/"): Process[Nothing, String] = 
    Process(
    s"GET $path HTTP/1.1", 
    s"Host: $hostname", 
    "Accept: */*", 
    "User-Agent: scalaz-stream" 
).intersperse("\n").append(Process("\n\n")) 

def simpleHttpClient(hostname : String, port : Int = 80, path : String = "/")(implicit AG: AsynchronousChannelGroup) : Process[Task, String] = 
    nio.connect(new InetSocketAddress(hostname, port)).flatMap(_.run(httpGetRequest(hostname, path).pipe(text.utf8Encode))).pipe(text.utf8Decode).pipe(text.lines()) 

अब हम बाकी हिस्सों से अलग हैडर लाइनों के लिए इस कोड का उपयोग कर सकते हैं। HTTP में, हेडर लाइनों में संरचित है। यह एक खाली रेखा से शरीर से अलग किया जाता है। तो सबसे पहले, के शीर्षक में लाइनों की संख्या की गणना करते हैं:

val demoHostName="scala-lang.org" // Hope they won't mind... 
simpleHttpClient(demoHostName).zipWithIndex.takeWhile(! _._1.isEmpty).runLast.run 
// res3: Option[(String, Int)] = Some((Content-Type: text/html,8)) 

जब मैं इस भाग गया, वहाँ शीर्षक में 8 लाइनों थे।की पहली एक गणन को परिभाषित है, तो प्रतिक्रिया के कुछ हिस्सों को वर्गीकृत करते हैं:

object HttpResponsePart { 
    sealed trait EnumVal 
    case object HeaderLine extends EnumVal 
    case object HeaderBodySeparator extends EnumVal 
    case object Body extends EnumVal 
    val httpResponseParts = Seq(HeaderLine, HeaderBodySeparator, Body) 
} 

और फिर के zipWithIndex प्लस map प्रतिक्रिया के कुछ हिस्सों को वर्गीकृत करने का उपयोग करते हैं:

simpleHttpClient(demoHostName).zipWithIndex.map{ 
    case (line, idx) if idx < 9 => (line, HeaderLine) 
    case (line, idx) if idx == 10 => (line, HeaderBodySeparator) 
    case (line, _) => (line, Body) 
}.take(15).runLog.run 

मेरे लिए, यह ठीक काम करता है। लेकिन निश्चित रूप से, बिना किसी सूचना के हेडर लाइनों की मात्रा किसी भी समय बदल सकती है। प्रतिक्रिया की संरचना को मानते हुए एक बहुत ही सरल पार्सर का उपयोग करना अधिक मजबूत है। इस के लिए मैं zipWithState का उपयोग करें:

simpleHttpClient(demoHostName).zipWithState(HeaderLine : EnumVal){ 
    case (line, HeaderLine) if line.isEmpty => HeaderBodySeparator 
    case (_, HeaderLine) => HeaderLine 
    case (_, HeaderBodySeparator) => Body 
    case (line, Body) => Body 
}.take(15).runLog.run 

आप देख सकते हैं, कि दोनों दृष्टिकोण एक समान संरचना का उपयोग करें और दोनों दृष्टिकोण से समान परिणाम प्राप्त करना चाहिए। अच्छी बात यह है कि दोनों दृष्टिकोण आसानी से पुन: प्रयोज्य होते हैं। आप बस स्रोत को स्वैप कर सकते हैं, उदा। एक फाइल के साथ, और कुछ भी बदलने की ज़रूरत नहीं है। वर्गीकरण के बाद प्रसंस्करण के साथ ही। .take(15).runLog.run दोनों दृष्टिकोणों में बिल्कुल वही है।

संबंधित मुद्दे