2016-04-20 11 views
5

का कॉन्फ़िगरेशन मेरे पास एक बहुत ही सरल एकीकरण प्रवाह है, जहां एक प्रकाशन-सबस्क्राइब चैनल का उपयोग करके दो प्रदाताओं को एक त्वरित अनुरोध भेजा जाता है। रीस्टफुल सेवाओं दोनों का नतीजा तब एक सरणी में एकत्रित किया जाता है। एकीकरण के प्रवाह के स्केच के रूप में नीचे दिखाया गया है:स्प्रिंग इंटीग्रेशन जावा डीएसएल - एग्रीगेटर

@Bean 
IntegrationFlow flow() throws Exception { 
    return IntegrationFlows.from("inputChannel") 
      .publishSubscribeChannel(s -> s.applySequence(true) 
       .subscribe(f -> f 
         .handle(Http.outboundGateway("http://provider1.com/...") 
           .httpMethod(HttpMethod.GET) 
           .expectedResponseType(ItemDTO[].class)) 
       ).subscribe(f -> f 
         .handle(Http.outboundGateway("http://provider2.com/...") 
           .httpMethod(HttpMethod.GET) 
           .expectedResponseType(ItemDTO[].class) 
         ) 
       ) 
      ) 
      .aggregate() 
      .get(); 
} 

हालांकि, जब मेरे कोड चल रहा है, जिसके परिणामस्वरूप सरणी RESTful सेवाओं का केवल एक ही द्वारा लौटाए गए आइटम शामिल हैं। क्या कोई विन्यास चरण है जो मुझे याद आ रहा है?

अद्यतन

निम्न संस्करण पूर्ण समाधान से मेल खाती है, खाते आर्टेम की टिप्पणी को ध्यान में रखकर।

@Bean 
IntegrationFlow flow() throws Exception { 
    return IntegrationFlows.from("inputChannel-scatter") 
      .publishSubscribeChannel(s -> s.applySequence(true) 
       .subscribe(f -> f 
         .handle(Http.outboundGateway("http://provider1.com/...") 
           .httpMethod(HttpMethod.GET) 
           .expectedResponseType(ItemDTO[].class)) 
         .channel("inputChannel-gather")) 
       .subscribe(f -> f 
         .handle(Http.outboundGateway("http://provider2.com/...") 
           .httpMethod(HttpMethod.GET) 
           .expectedResponseType(ItemDTO[].class)) 
         .channel("inputChannel-gather"))) 
      .get(); 
} 

@Bean 
IntegrationFlow gatherFlow() { 
    return IntegrationFlows.from("inputChannel-gather") 
      .aggregate(a -> a.outputProcessor(g -> new GenericMessage<ItemDTO[]>(
         g.getMessages().stream() 
           .flatMap(m -> Arrays.stream((ItemDTO[]) m.getPayload())) 
           .collect(Collectors.toList()).toArray(new ItemDTO[0])))) 
      .get(); 
} 

उत्तर

3

असल में यह इस तरह से काम नहीं करता है।

.aggregate()तीसरा ग्राहक publishSubscribeChannel पर ग्राहक है।

आपको अपने प्रवाह को दो में से अलग करना होगा। इस तरह:

@Bean 
    public IntegrationFlow publishSubscribeFlow() { 
     return flow -> flow 
       .publishSubscribeChannel(s -> s 
         .applySequence(true) 
         .subscribe(f -> f 
           .handle((p, h) -> "Hello") 
           .channel("publishSubscribeAggregateFlow.input")) 
         .subscribe(f -> f 
           .handle((p, h) -> "World!") 
           .channel("publishSubscribeAggregateFlow.input")) 
       ); 
    } 

    @Bean 
    public IntegrationFlow publishSubscribeAggregateFlow() { 
     return flow -> flow 
       .aggregate(a -> a.outputProcessor(g -> g.getMessages() 
         .stream() 
         .<String>map(m -> (String) m.getPayload()) 
         .collect(Collectors.joining(" ")))) 
       .channel(c -> c.queue("subscriberAggregateResult")); 
    } 

वेतन ध्यान, कृपया, दोनों ग्राहकों से .channel("publishSubscribeAggregateFlow.input") उपयोग करने के लिए।

ईमानदार होने के लिए यह किसी भी publish-subscribe का बिंदु है। हमें पता होना चाहिए कि सभी ग्राहकों का नतीजा कहां भेजना है यदि हम उन्हें एकत्रित करने जा रहे हैं।

आपका उपयोग-मामला मुझे Scatter-Gather ईआईपी पैटर्न याद करता है।

हमारे पास अभी तक डीएसएल में इसका कार्यान्वयन नहीं है। इस मामले पर GH issue बढ़ाने के लिए स्वतंत्र महसूस करें और हम आगामी 1.2 संस्करण में इसे संभालने का प्रयास करेंगे।

अद्यतन

जीएच मामले पर मुद्दा: https://github.com/spring-projects/spring-integration-java-dsl/issues/75

+0

धन्यवाद एक बहुत आप Artem मदद के लिए। असल में, मैंने बिना किसी सफलता के चैनलों और अलग प्रवाह का उपयोग करने की कोशिश की क्योंकि मुझे एग्रीगेटर के साथ भी समस्या थी। आपके उत्तर ने मुझे एग्रीगेटर को लिखने के तरीके पर भी संकेत दिया। – user3329862

+0

बढ़िया! लेकिन स्कैटर-गदर डीएसएल के लिए एक अच्छा जोड़ा होगा। तो, जीएच मुद्दे को बढ़ाने के लिए शर्मिंदा मत बनो! –

संबंधित मुद्दे