2013-05-08 13 views
13
  • 2 धाराओं:जुटना दो (या एन) धाराओं

    पठनीय देखते हुए streamsstream1 और stream2, क्या के लिए एक मुहावरेदार (संक्षिप्त) जिस तरह से एक धारा युक्त stream1 और stream2 concatenated मिल सकता है?

    मैं stream1.pipe(outStream); stream2.pipe(outStream) नहीं कर सकता, क्योंकि तब स्ट्रीम सामग्री एक साथ जुड़ी हुई है।

  • n धाराओं:

    एक EventEmitter कि नदियों की एक अनिश्चित संख्या, उदा का उत्सर्जन करता है को देखते हुए

    eventEmitter.emit('stream', stream1) 
    eventEmitter.emit('stream', stream2) 
    eventEmitter.emit('stream', stream3) 
    ... 
    eventEmitter.emit('end') 
    

    क्या के लिए एक मुहावरेदार (संक्षिप्त) जिस तरह से सभी धाराओं को एक साथ concatenated साथ एक धारा मिल सकता है?

उत्तर

11

combined-stream पैकेज धाराओं को जोड़ता है। README से उदाहरण:

var CombinedStream = require('combined-stream'); 
var fs = require('fs'); 

var combinedStream = CombinedStream.create(); 
combinedStream.append(fs.createReadStream('file1.txt')); 
combinedStream.append(fs.createReadStream('file2.txt')); 

combinedStream.pipe(fs.createWriteStream('combined.txt')); 

मुझे विश्वास है कि आपको एक ही समय में सभी धाराएं जोड़नी होंगी। यदि कतार खाली हो जाती है, तो combinedStream स्वचालित रूप से समाप्त होता है। issue #5 देखें।

stream-stream लाइब्रेरी एक विकल्प है जिसमें एक स्पष्ट .end है, लेकिन यह बहुत कम लोकप्रिय और संभवतः अच्छी तरह से परीक्षण नहीं किया गया है। यह नोड 0.10 के streams2 एपीआई का उपयोग करता है (this discussion देखें)।

3

आप इसे अधिक संक्षिप्त बनाने के लिए सक्षम हो सकता है, लेकिन यहाँ एक है कि काम करता है:

var util = require('util'); 
var EventEmitter = require('events').EventEmitter; 

function ConcatStream(streamStream) { 
    EventEmitter.call(this); 
    var isStreaming = false, 
    streamsEnded = false, 
    that = this; 

    var streams = []; 
    streamStream.on('stream', function(stream){ 
    stream.pause(); 
    streams.push(stream); 
    ensureState(); 
    }); 

    streamStream.on('end', function() { 
    streamsEnded = true; 
    ensureState(); 
    }); 

    var ensureState = function() { 
    if(isStreaming) return; 
    if(streams.length == 0) { 
     if(streamsEnded) 
     that.emit('end'); 
     return; 
    } 
    isStreaming = true; 
    streams[0].on('data', onData); 
    streams[0].on('end', onEnd); 
    streams[0].resume(); 
    }; 

    var onData = function(data) { 
    that.emit('data', data); 
    }; 

    var onEnd = function() { 
    isStreaming = false; 
    streams[0].removeAllListeners('data'); 
    streams[0].removeAllListeners('end'); 
    streams.shift(); 
    ensureState(); 
    }; 
} 

util.inherits(ConcatStream, EventEmitter); 

हम streams (धाराओं की कतार के साथ राज्य का ट्रैक रखने; push वापस करने के लिए और shift से सामने), isStreaming, और streamsEnded। जब हमें एक नई धारा मिलती है, तो हम इसे दबाते हैं, और जब कोई स्ट्रीम समाप्त होती है, तो हम इसे सुनना और इसे बदलना बंद कर देते हैं। जब धाराओं की धारा समाप्त होती है, तो हम streamsEnded सेट करते हैं।

इन घटनाओं में से प्रत्येक पर, हम उस राज्य की जांच करते हैं जिसमें हम हैं। अगर हम पहले ही स्ट्रीमिंग कर रहे हैं (स्ट्रीम स्ट्रीमिंग), तो हम कुछ भी नहीं करते हैं। यदि कतार खाली है और streamsEnded सेट है, तो हम end ईवेंट उत्सर्जित करते हैं। अगर कतार में कुछ है, तो हम इसे फिर से शुरू करते हैं और इसकी घटनाओं को सुनते हैं।

* ध्यान दें कि pause और resume सलाहकार हैं, इसलिए कुछ धाराएं सही तरीके से व्यवहार नहीं कर सकती हैं, और बफरिंग की आवश्यकता होगी। यह अभ्यास पाठक को छोड़ दिया गया है।

यह सब किया, मैं एक EventEmitter निर्माण, इसके साथ एक ConcatStream बनाने, और दो stream घटनाओं एक end घटना के बाद उत्सर्जक द्वारा n=2 मामले करना होगा। मुझे यकीन है कि इसे अधिक संक्षेप में किया जा सकता है, लेकिन हम जो भी प्राप्त कर सकते हैं उसका उपयोग भी कर सकते हैं।

+0

धन्यवाद हारून! मैं थोड़ी उम्मीद कर रहा था कि कुछ मौजूदा लाइब्रेरी होगी इसलिए मैं इसे तीन लाइनों में हल कर सकता हूं। यदि नहीं है, तो मैं सोच रहा हूं कि मैं आपके समाधान को पैकेज में निकाल सकता हूं। क्या मैं आपके कोड का उपयोग एमआईटी लाइसेंस के तहत कर सकता हूं? –

+0

आह, स्ट्रीम-स्ट्रीम लाइब्रेरी मिली। मेरा जवाब देखें –

+0

@ जॉलीस मैंने पहले कुछ भी देखा, लेकिन मैं उस विकल्प को खोजने में विफल रहा। यदि आप अभी भी चाहते हैं तो आप निश्चित रूप से लाइब्रेरी में अपने कोड का उपयोग कर सकते हैं। –

1

streamee.js नोड 1 के आधार पर स्ट्रीम ट्रांसफार्मर और संगीतकारों का एक सेट है।0+ धाराओं और एक CONCATENATE विधि में शामिल हैं:

var stream1ThenStream2 = streamee.concatenate([stream1, stream2]); 
+0

धन्यवाद, मैं इसे देख लूंगा। वह नोड 0.10 मुझे लगता है? –

+0

हां नोड 0.10, लेकिन आप रीडमी – atamborrino

2

https://github.com/joepie91/node-combined-stream2 एक ड्रॉप में संयुक्त धारा मॉड्यूल के लिए Streams2 संगत प्रतिस्थापन (। जो ऊपर वर्णित है) यह स्वचालित रूप से Streams1 लपेटता धाराओं है।

संयुक्त-stream2 के लिए उदाहरण कोड:

var CombinedStream = require('combined-stream2'); 
var fs = require('fs'); 

var combinedStream = CombinedStream.create(); 
combinedStream.append(fs.createReadStream('file1.txt')); 
combinedStream.append(fs.createReadStream('file2.txt')); 

combinedStream.pipe(fs.createWriteStream('combined.txt')); 
3

इस वेनिला साथ किया जा सकता NodeJS

import { PassThrough } from 'stream' 
const merge = (...streams) => { 
    let pass = new PassThrough() 
    let waiting = streams.length 
    for (let stream of streams) { 
     pass = stream.pipe(pass, {end: false}) 
     stream.once('end',() => --waiting === 0 && pass.emit('end')) 
    } 
    return pass 
} 
5

एक साधारण reduce आपरेशन nodejs में ठीक होना चाहिए!

const {PassThrough} = require('stream') 

let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => { 
    s.pipe(pt, {end: false}) 
    s.once('end',() => a.every(s => s.ended) && pt.emit('end')) 
    return pt 
}, new PassThrough()) 

चीयर्स;)

+0

में लिखे गए पुराने शैली की धाराओं को 0.10+ धाराओं में लपेट सकते हैं, क्या आप कुछ कम करने से वापस नहीं आ रहे हैं? ऐसा लगता है कि 'शामिल' को अपरिभाषित किया जाएगा। –

+1

तय। धन्यवाद;) @Mark_M – Ivo

+0

चेतावनी: यह सभी धाराओं को समानांतर में पासथ्रू स्ट्रीम में पाइप करने का कारण बनता है, डेटा के क्रम की ओर किसी भी संबंध के बिना, आपके डेटा को दूषित करने की संभावना से अधिक। –

संबंधित मुद्दे