2015-09-03 6 views
5

यह एक गैर काम कर रहे Flink उपयोग करने के लिए कोशिश स्केला गुमनाम समारोह के साथ गुना है:स्केला में Flink गुना समारोह का उपयोग कैसे करें

val myFoldFunction = (x: Double, t:(Double,String,String)) => x + t._1 
env.readFileStream(...). 
... 
.groupBy(1) 
.fold(0.0, myFoldFunction : Function2[Double, (Double,String,String), Double]) 

यह अच्छी तरह से संकलित, लेकिन निष्पादन पर, मैं एक "प्रकार विलोपन मुद्दा" मिल (निचे देखो)। जावा में ऐसा करना ठीक है, लेकिन निश्चित रूप से अधिक verbose। मुझे संक्षिप्त और स्पष्ट लैम्बडा पसंद है। मैं स्कैला में ऐसा कैसे कर सकता हूं?

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: 
Type of TypeVariable 'R' in 'public org.apache.flink.streaming.api.scala.DataStream org.apache.flink.streaming.api.scala.DataStream.fold(java.lang.Object,scala.Function2,org.apache.flink.api.common.typeinfo.TypeInformation,scala.reflect.ClassTag)' could not be determined. 
This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). 

उत्तर

3

आपकी समस्या का सामना फ्लिंक [1] में एक बग है। समस्या फ्लिंक के TypeExtractor से निकलती है और जिस तरह से जावा कार्यान्वयन के शीर्ष पर स्कैला डेटास्ट्रीम एपीआई लागू होती है। TypeExtractor स्कैला प्रकार के लिए TypeInformation उत्पन्न नहीं कर सकता है और इस प्रकार MissingTypeInformation लौटाता है। StreamFold ऑपरेटर बनाने के बाद यह अनुपलब्ध प्रकार की जानकारी मैन्युअल रूप से सेट की गई है। हालांकि, StreamFold ऑपरेटर इस तरह कार्यान्वित किया गया है कि यह MissingTypeInformation स्वीकार नहीं करता है और इसके परिणामस्वरूप, सही प्रकार की जानकारी सेट करने से पहले विफल हो जाता है।

मैंने इस समस्या को ठीक करने के लिए एक पुल अनुरोध [2] खोला है। इसे अगले दो दिनों में विलय किया जाना चाहिए। नवीनतम 0.10 स्नैपशॉट संस्करण का उपयोग करके, आपकी समस्या ठीक होनी चाहिए।

संबंधित मुद्दे