मेरे पास उपयोगकर्ताओं से जुड़े डेटा के साथ प्रवाह है। मेरे पास प्रत्येक उपयोगकर्ता के लिए एक राज्य भी है, जिसे मैं डीबी से असीमित रूप से प्राप्त कर सकता हूं।क्या समूह के उपप्रवाह उन कुंजीों पर निर्भर कर सकते हैं जिन्हें वे उत्पन्न हुए थे?
मैं प्रति प्रवाह एक उपप्रवाह के साथ अपना प्रवाह अलग करना चाहता हूं, और सबफ्लो को भौतिक रूप से पूरा करते समय प्रत्येक उपयोगकर्ता के लिए राज्य लोड करना चाहता हूं, ताकि उपप्रवाह के तत्वों को इस स्थिति के संबंध में माना जा सके।
मैं नीचे की ओर subflows विलय करने के लिए नहीं करना चाहते हैं, मैं groupBy
और Sink.lazyInit
साथ कुछ कर सकते हैं:
def getState(userId: UserId): Future[UserState] = ...
def getUserId(element: Element): UserId = ...
def treatUser(state: UserState): Sink[Element, _] = ...
val treatByUser: Sink[Element] = Flow[Element].groupBy(
Int.MaxValue,
getUserId
).to(
Sink.lazyInit(
elt => getState(getUserId(elt)).map(treatUser),
??? // this is never called, since the subflow is created when an element comes
)
)
बहरहाल, यह अगर treatUser
एक Flow
हो जाता है, के बाद से वहाँ के लिए कोई बराबर है काम नहीं करता है Sink.lazyInit
।
चूंकि groupBy
के उपप्रवाह केवल तभी भौतिक हो जाते हैं जब कोई नया तत्व धक्का दिया जाता है, तो उप-प्रवाह को मूर्त रूप देने के लिए इस तत्व का उपयोग करना संभव होना चाहिए, लेकिन मैं समूह के लिए स्रोत कोड को अनुकूलित करने में सक्षम नहीं था ताकि यह लगातार काम करे। इसी तरह, Sink.lazyInit
Flow
मामले में आसानी से अनुवाद करने योग्य प्रतीत नहीं होता है।
इस मुद्दे को हल करने के तरीके पर कोई विचार?
बहुत बहुत धन्यवाद! क्या आपके पास इसके लिए समय सारिणी पर कोई अंतर्दृष्टि है? –
दुर्भाग्य से नहीं, शायद आप उस पीआर के लेखक को एक संदेश छोड़ सकते हैं। –