2016-02-25 16 views
11

मैं डालने और स्पार्क एसक्यूएल DataFrames और JDBC कनेक्शन का उपयोग कर MySql पर कुछ डेटा को अद्यतन करने की कोशिश कर रहा हूँ का उपयोग कर तालिका।चिंगारी एसक्यूएल - अद्यतन MySql DataFrames और JDBC

मैं SaveMode.Append का उपयोग कर नए डेटा डालने के लिए सफल रहा है। स्पार्क एसक्यूएल से MySQL तालिका में पहले से मौजूद डेटा को अपडेट करने का कोई तरीका है?

मेरे कोड डालने के लिए है:

myDataFrame.write.mode(SaveMode.Append).jdbc(JDBCurl,mySqlTable,connectionProperties)

मैं इसे पूर्ण तालिका हट जाता है और उसकी जगह एक नया SaveMode.Overwrite को बदलते हैं, तो मैं "पर नकली चाबी अद्यतन की तरह कुछ के लिए देख रहा हूँ "MySql

उत्तर

14

में उपलब्ध यह संभव नहीं है। अब के रूप में (स्पार्क 1.6.0/2.2.0 स्नैपशॉट) स्पार्क DataFrameWriter केवल चार लेखन मोड का समर्थन करता है:

  • SaveMode.Overwrite: मौजूदा डेटा को अधिलेखित।
  • SaveMode.Append: डेटा संलग्न करें।
  • SaveMode.Ignore: आपरेशन (अर्थात् कोई को-अप) की उपेक्षा।
  • SaveMode.ErrorIfExists: डिफ़ॉल्ट विकल्प, रनटाइम पर अपवाद फेंक दें।

आप mapPartitions का उपयोग कर उदाहरण के लिए मैन्युअल रूप से सम्मिलित कर सकते हैं (क्योंकि आप चाहते हैं एक Upsert आपरेशन idempotent किया जाना चाहिए और लागू करने के लिए इस तरह के आसान के रूप में), अस्थायी तालिका करने के लिए लिख सकते हैं और मैन्युअल Upsert निष्पादित या ट्रिगर्स का उपयोग करें।

सामान्य में बैच के संचालन के लिए व्यवहार Upsert प्राप्त करने और सभ्य प्रदर्शन रखने तुच्छ से दूर है। आपको याद रखना होगा कि सामान्य स्थिति में कई समवर्ती लेन-देन होंगे (प्रत्येक विभाजन के लिए एक) ताकि आपको यह सुनिश्चित करना पड़े कि कोई लेखन विवाद नहीं होगा (आमतौर पर एप्लिकेशन विशिष्ट विभाजन का उपयोग करके) या उपयुक्त पुनर्प्राप्ति प्रक्रियाएं प्रदान करें। व्यावहारिक रूप से प्रदर्शन करना और बैच एक अस्थायी तालिका में लिखना बेहतर हो सकता है और सीधे डेटाबेस में अपरर्ट भाग को हल कर सकता है।

0

zero323 के जवाब सही है, मैं बस को जोड़ने के लिए आप JayDeBeApi पैकेज इस्तेमाल कर सकते हैं कि इस समाधान करना चाहता था: https://pypi.python.org/pypi/JayDeBeApi/

अपने mysql तालिका में डेटा अद्यतन करने के लिए। यह एक कम लटकती फल हो सकता है क्योंकि आपके पास पहले से ही mysql jdbc ड्राइवर स्थापित है।

JayDeBeApi मॉड्यूल आप अजगर कोड से जावा JDBC का उपयोग कर डेटाबेस से कनेक्ट करने की अनुमति देता है। यह डेटाबेस में एक पायथन डीबी-एपीआई v2.0 प्रदान करता है।

हम पाइथन के एनाकोंडा वितरण का उपयोग करते हैं, और जेडीबीएपी पायथन पैकेज मानक आता है।

ऊपर है कि लिंक में उदाहरण देखें।

0

एक दयालुता कि स्पार्क में SaveMode.Upsert मोड नहीं है, जैसे अपरिवर्तनीय जैसे सामान्य मामलों के लिए।

शून्य322 सामान्य में सही है, लेकिन मुझे लगता है कि इस तरह की प्रतिस्थापन सुविधा प्रदान करने के लिए यह संभव होना चाहिए (प्रदर्शन में समझौता के साथ)।

मैं भी इस मामले के लिए कुछ जावा कोड प्रदान करना चाहता था। बेशक यह स्पार्क से निर्मित एक के रूप में कलाकार नहीं है - लेकिन यह आपकी आवश्यकताओं के लिए एक अच्छा आधार होना चाहिए। बस अपनी आवश्यकताओं के लिए इसे संशोधित करें:

myDF.repartition(20); //one connection per partition, see below 

myDF.foreachPartition((Iterator<Row> t) -> { 
      Connection conn = DriverManager.getConnection(
        Constants.DB_JDBC_CONN, 
        Constants.DB_JDBC_USER, 
        Constants.DB_JDBC_PASS); 

      conn.setAutoCommit(true); 
      Statement statement = conn.createStatement(); 

      final int batchSize = 100000; 
      int i = 0; 
      while (t.hasNext()) { 
       Row row = t.next(); 
       try { 
        // better than REPLACE INTO, less cycles 
        statement.addBatch(("INSERT INTO mytable " + "VALUES (" 
          + "'" + row.getAs("_id") + "', 
          + "'" + row.getStruct(1).get(0) + "' 
          + "') ON DUPLICATE KEY UPDATE _id='" + row.getAs("_id") + "';")); 
        //conn.commit(); 

        if (++i % batchSize == 0) { 
         statement.executeBatch(); 
        } 
       } catch (SQLIntegrityConstraintViolationException e) { 
        //should not occur, nevertheless 
        //conn.commit(); 
       } catch (SQLException e) { 
        e.printStackTrace(); 
       } finally { 
        //conn.commit(); 
        statement.executeBatch(); 
       } 
      } 
      int[] ret = statement.executeBatch(); 

      System.out.println("Ret val: " + Arrays.toString(ret)); 
      System.out.println("Update count: " + statement.getUpdateCount()); 
      conn.commit(); 

      statement.close(); 
      conn.close(); 
संबंधित मुद्दे