2016-01-11 5 views
15

सेट कर दिया जाता रहा चिंगारी 1.6 उपयोग कर रहा हूँ और जब मैं निम्नलिखित कोड चलाने ऊपर मुद्दे में चलाने: यहाँस्पार्क 1.6: java.lang.IllegalArgumentException: spark.sql.execution.id पहले से ही

// Imports 
import org.apache.spark.sql.hive.HiveContext 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.SaveMode 
import scala.concurrent.ExecutionContext.Implicits.global 
import java.util.Properties 
import scala.concurrent.Future 

// Set up spark on local with 2 threads 
val conf = new SparkConf().setMaster("local[2]").setAppName("app") 
val sc = new SparkContext(conf) 
val sqlCtx = new HiveContext(sc) 

// Create fake dataframe 
import sqlCtx.implicits._ 
var df = sc.parallelize(1 to 50000).map { i => (i, i, i, i, i, i, i) }.toDF("a", "b", "c", "d", "e", "f", "g").repartition(2) 
// Write it as a parquet file 
df.write.parquet("/tmp/parquet1") 
df = sqlCtx.read.parquet("/tmp/parquet1") 

// JDBC connection 
val url = s"jdbc:postgresql://localhost:5432/tempdb" 
val prop = new Properties() 
prop.setProperty("user", "admin") 
prop.setProperty("password", "") 

// 4 futures - at least one of them has been consistently failing for 
val x1 = Future { df.write.jdbc(url, "temp1", prop) } 
val x2 = Future { df.write.jdbc(url, "temp2", prop) } 
val x3 = Future { df.write.jdbc(url, "temp3", prop) } 
val x4 = Future { df.write.jdbc(url, "temp4", prop) } 

GitHub सार: https://gist.github.com/karanveerm/27d852bf311e39f05491

त्रुटि मैं मिलता है:

org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1482) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:247) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:306) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at writer.SQLWriter$.writeDf(Writer.scala:75) ~[temple.temple-1.0-sans-externalized.jar:na] 
     at writer.Writer$.writeDf(Writer.scala:33) ~[temple.temple-1.0-sans-externalized.jar:na] 
     at controllers.Api$$anonfun$downloadTable$1$$anonfun$apply$25.apply(Api.scala:460) ~[temple.temple-1.0-sans-externalized.jar:2.4.6] 
     at controllers.Api$$anonfun$downloadTable$1$$anonfun$apply$25.apply(Api.scala:452) ~[temple.temple-1.0-sans-externalized.jar:2.4.6] 
     at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) ~[org.scala-lang.scala-library-2.11.7.jar:na] 

पर इस एक चिंगारी बग है या मैं कुछ गलत कर रहा हूँ/किसी भी समाधान?

+0

क्या मैं पूछ सकता हूं कि आपने इस कोड को किस मशीन पर चलाया था? मैं विशेष रूप से सीपीयू (कितने कोर) में रुचि रखते हैं? –

+0

ओएसएक्स एल कैपिटन 10.11.1 | मैकबुक एयर (13-इंच, प्रारंभिक 2014) | 1.7 गीगाहर्ट्ज इंटेल कोर i7 | 8 जीबी 1600 मेगाहट्र्ज डीडीआर 3 (मुझे विश्वास है कि i7 4 कोर है) – sparknoob

+0

दिलचस्प, मैं इसे एक समान सेटअप (स्पार्क खोल से) पर पुन: उत्पन्न नहीं कर सकता। यह कुछ बुरा बग हो सकता है, उन्हें पहले आईडी पीढ़ी के साथ समस्याएं थीं। आप इसके लिए एक जिरा बनाना चाहते हैं। –

उत्तर

0

टेस्ट 1: क्या आप समानांतर भविष्य के बजाय सीरियल फैशन में प्रत्येक df.write ऑपरेशन को चलाने में मदद करते हैं?

टेस्ट 2: क्या आप डेटाफ्रेम को बनाए रखने में मदद करते हैं और फिर समानांतर में सभी df.write ऑपरेशन करते हैं और यह देखने के लिए पूर्ण होते हैं कि यह मदद करता है या नहीं?

1

कई बातें की कोशिश कर के बाद, मैं धागे वैश्विक ForkJoinPool द्वारा बनाई में से एक अपने spark.sql.execution.id संपत्ति एक यादृच्छिक मान पर सेट हो जाता है कि पाया। मैं वास्तव में ऐसा प्रक्रिया की पहचान नहीं कर सका, लेकिन मैं अपने ExecutionContext का उपयोग करके इसके आसपास काम कर सकता था।

import java.util.concurrent.Executors 
import concurrent.ExecutionContext 
val executorService = Executors.newFixedThreadPool(4) 
implicit val ec = ExecutionContext.fromExecutorService(executorService) 

मैंने http://danielwestheide.com/blog/2013/01/16/the-neophytes-guide-to-scala-part-9-promises-and-futures-in-practice.html से कोड का उपयोग किया। हो सकता है कि ForkJoinPool क्लोन थ्रेड नए बनाते समय गुणों को प्रदर्शित करते हैं और यदि यह SQL निष्पादन के संदर्भ में होता है तो यह इसके शून्य शून्य मान जाएगा जबकि FixedThreadPool तत्काल पर थ्रेड बनाएगा।

+0

पर चल रहा हूं, मैंने एक ही समस्या में भाग लिया है। लेकिन यह समाधान मदद नहीं करता है। मैं अभी भी 'spark.sql.execution.id पहले से सेट 'त्रुटि देखता हूं। – mottosan

+0

@ कन्शिरो, निष्पादक नहीं होना चाहिए.न्यूफिक्स्ड थ्रेडपूल (1)? – smas

+0

@smas समस्या थ्रेड की संख्या में नहीं है लेकिन उन धागे की शुरुआत में है। कांटा में शामिल पूल यादृच्छिक समय पर धागे को शुरू करेगा और नए धागे को शुरू करने के लिए यह सभी विशेषताओं को क्लोन करेगा। तो यदि किसी नए धागे की शुरुआत के समय मौजूदा थ्रेड में एक SQL निष्पादन आईडी सेट है, तो यह एक नया उत्पन्न होने की बजाय इसे नए में कॉपी करेगा। – Knshiro

1

कृपया, जाँच SPARK-13747

अपने वातावरण में स्पार्क संस्करण 2.2.0 या उच्चतर यदि लागू उपयोग करने के लिए विचार करें।

संबंधित मुद्दे