2015-05-26 9 views
5

मेरे पास एक स्ट्रीम है जिसे मैं data, error, और end ईवेंट सुनकर संसाधित करता हूं, और मैं पहली स्ट्रीम में प्रत्येक data ईवेंट को संसाधित करने के लिए एक फ़ंक्शन को कॉल करता हूं। स्वाभाविक रूप से, डेटा प्रोसेसिंग फ़ंक्शन अन्य कॉलबैक को कॉल करता है, जिससे इसे अतुल्यकालिक बना दिया जाता है। तो स्ट्रीम में डेटा संसाधित होने पर मैं अधिक कोड निष्पादित करना कैसे शुरू करूं? धारा में end ईवेंट के लिए सुनना मतलब नहीं है कि एसिंक्रोनस data प्रसंस्करण कार्य समाप्त हो गए हैं।एक स्ट्रीम समाप्त होने के बाद एसिंक्रोनस कोड निष्पादित करने के लिए कैसे सुनिश्चित किया जाता है?

मैं कैसे सुनिश्चित कर सकता हूं कि जब मैं अपना अगला कथन निष्पादित करता हूं तो स्ट्रीम डेटा प्रोसेसिंग फ़ंक्शन समाप्त हो जाते हैं?

function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) { 
    var self = this; 
    var promises = []; 
    accountStream 
    .on('data', function (account) { 
     migrateAccount.bind(self)(account, finishMigration); 
    }) 
    .on('error', function (err) { 
     return console.log(err); 
    }) 
    .on('end', function() { 
     console.log("Finished updating account stream (but finishMigration is still running!!!)"); 
     callThisOnlyAfterAllAccountsAreMigrated() // finishMigration is still running! 
    }); 
} 

var migrateAccount = function (oldAccount, callback) { 
    executeSomeAction(oldAccount, function(err, newAccount) { 
    if (err) return console.log("error received:", err); 
    return callback(newAccount); 
    }); 
} 

var finishMigration = function (newAccount) { 
    // some code that is executed asynchronously... 
} 

मैं कैसे सुनिश्चित करूँ कि callThisOnlyAfterAllAccountsAreMigrated के बाद धारा संसाधित किया गया है कहा जाता है:

यहाँ एक उदाहरण है?

क्या यह वादे के साथ किया जा सकता है? क्या यह धाराओं के माध्यम से किया जा सकता है? मैं नोडज के साथ काम कर रहा हूं, इसलिए अन्य एनपीएम मॉड्यूल का संदर्भ देना सहायक हो सकता है।

उत्तर

2

जैसा कि आपने कहा था, स्ट्रीम पर end ईवेंट सुनना अपने आप पर बेकार है। स्ट्रीम आपके data हैंडलर में डेटा के साथ आप क्या कर रहे हैं, इस बारे में नहीं जानते या परवाह नहीं करते हैं, इसलिए आपको अपने स्वयं के माइग्रेट खाता स्थिति का ट्रैक रखने के लिए कुछ कोड लिखना होगा।

यदि यह मैं था, तो मैं इस पूरे खंड को फिर से लिखूंगा। यदि आप अपनी स्ट्रीम पर .read() के साथ readable ईवेंट का उपयोग करते हैं, तो आप एक समय में कई आइटम पढ़ सकते हैं जैसे आप काम करना पसंद करते हैं। अगर यह एक है, कोई समस्या नहीं है। अगर यह 30 है, तो बढ़िया। ऐसा करने का कारण यह है कि स्ट्रीम से आने वाले डेटा के साथ आप जो कुछ भी कर रहे हैं, उससे अधिक नहीं होगा। जैसा कि अभी है, यदि खातास्ट्रीम तेज़ है, तो आपका आवेदन किसी बिंदु पर निस्संदेह दुर्घटनाग्रस्त हो जाएगा।

जब आप किसी स्ट्रीम से कोई आइटम पढ़ते हैं और काम शुरू करते हैं, तो वादा करें कि आप वापस आएं (ब्लूबर्ड या इसी तरह का उपयोग करें) और इसे सरणी में फेंक दें। जब वादा हल हो जाता है, इसे सरणी से हटा दें। जब स्ट्रीम समाप्त होती है, तो .done() हैंडलर को .all() (मूल रूप से सरणी में हर वादे से एक बड़ा वादा करना) संलग्न करें।

आप प्रगति पर नौकरियों के लिए एक साधारण काउंटर का भी उपयोग कर सकते हैं।

var through = require('through2').obj; 
function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) { 
    var self = this; 
    var promises = []; 
    accountStream.pipe(through(function(account, _, next) { 
    migrateAccount.bind(self)(account, finishMigration, next); 
    })) 
    .on('data', function (account) { 
    }) 
    .on('error', function (err) { 
     return console.log(err); 
    }) 
    .on('end', function() { 
     console.log("Finished updating account stream"); 
     callThisOnlyAfterAllAccountsAreMigrated(); 
    }); 
} 

var migrateAccount = function (oldAccount, callback, next) { 
    executeSomeAction(oldAccount, function(err, newAccount) { 
    if (err) return console.log("error received:", err); 
    return callback(newAccount, next); 
    }); 
} 

var finishMigration = function (newAccount, next) { 
    // some code that is executed asynchronously, but using 'next' callback when migration is finished... 
} 
+0

, 'end' घटना, सुंदर उपयोगी प्रतीत होता है के रूप में उस बिंदु जहाँ आप कितने नौकरियों अभी भी चल रहे तलाश शुरू करने की आवश्यकता है , ताकि जब आप और नहीं छोड़े तो आप अंतिम कॉलबैक ट्रिगर कर सकते हैं। – Bergi

+0

@ बरगी अच्छा बिंदु, मैंने स्पष्ट किया है कि मैं क्या प्राप्त कर रहा था। – Brad

+1

@ ब्रैड मुझे लगता है कि आपका समाधान जो वादे का उपयोग करता है और 'पठनीय' घटना उपरोक्त मेरे समाधान से अधिक प्रभावी है। क्या आप एक कोडिंग उदाहरण जोड़ना चाहते हैं? वादे थोड़ा मुश्किल हैं, इसलिए प्रश्न के कोड पर कार्रवाई में वादे देखना उपयोगी होगा ... – modulitos

1

धारा (NPM through2 मॉड्यूल) के माध्यम से एक का उपयोग करना, मैं निम्नलिखित कोड है कि अतुल्यकालिक व्यवहार को नियंत्रित करता है का उपयोग कर इस समस्या का समाधान।

here से कॉपी किया गया, एक उदाहरण spex पुस्तकालय का उपयोग करता है:

var spex = require('spex')(Promise); 
var fs = require('fs'); 

var rs = fs.createReadStream('values.txt'); 

function receiver(index, data, delay) { 
    return new Promise(function (resolve) { 
     console.log("RECEIVED:", index, data, delay); 
     resolve(); // ok to read the next data; 
    }); 
} 

spex.stream.read(rs, receiver) 
    .then(function (data) { 
     // streaming successfully finished; 
     console.log("DATA:", data); 
    }, function (reason) { 
     // streaming has failed; 
     console.log("REASON:", reason); 
    }); 
मेरे लिए
1

यह है जब आप वादे के माध्यम से धाराओं संभाल बहुत आसान है:

संबंधित मुद्दे

 संबंधित मुद्दे