2017-08-18 15 views
12

मैं वर्तमान में जीसीएस से BigQuery में कुछ सीएसवी फाइलों को आयात करने के लिए जावा उपयोगिता लिख ​​रहा हूं। मैं इसे आसानी से bq load तक प्राप्त कर सकता हूं, लेकिन मैं इसे डेटाफ्लो नौकरी का उपयोग करके करना चाहता था। इसलिए मैं डेटाफ्लो की पाइपलाइन और पैराडो ट्रांसफार्मर का उपयोग कर रहा हूं (BigQueryIO पर इसे लागू करने के लिए टेबलरो लौटाता है) और मैंने रूपांतरण के लिए स्ट्रिंगट्रो कनवर्टर() बनाया है। यहां वास्तविक समस्या शुरू होती है - मुझे गंतव्य तालिका के लिए स्कीमा निर्दिष्ट करने के लिए मजबूर होना पड़ता है हालांकि मैं एक नई तालिका नहीं बनाना चाहता हूं यदि यह अस्तित्व में नहीं है - केवल डेटा लोड करने का प्रयास कर रहा है। इसलिए मैं टेबलरो के लिए कॉलम नाम मैन्युअल रूप से सेट नहीं करना चाहता क्योंकि मेरे पास लगभग 600 कॉलम हैं।किसी भी स्तंभ नाम या स्कीमा के बिना किसी BigQuery तालिका में एक CSV फ़ाइल आयात करने के लिए कैसे?

public class StringToRowConverter extends DoFn<String, TableRow> { 

private static Logger logger = LoggerFactory.getLogger(StringToRowConverter.class); 

public void processElement(ProcessContext c) { 
    TableRow row = new TableRow();   
    row.set("DO NOT KNOW THE COLUMN NAME", c.element()); 
    c.output(row); 
} 
} 

इसके अलावा, यह माना जाता है कि पहले से ही मेज BigQuery डाटासेट में मौजूद है और मैं इसे बनाने के लिए की जरूरत नहीं है, और यह भी एक CSV फ़ाइल को सही क्रम में स्तंभ हैं।

यदि इस परिदृश्य में कोई समाधान नहीं है और डेटा लोड के लिए कॉलम नाम की आवश्यकता है, तो मैं इसे CSV फ़ाइल की पहली पंक्ति में रख सकता हूं।

किसी भी मदद की सराहना की जाएगी।

उत्तर

7

तालिका के निर्माण से बचने के लिए, आपको BigQueryIO.Write.CreateDisposition.CREATE_NEVER का उपयोग BigQueryIO.Write के पाइपलाइन कॉन्फ़िगरेशन के दौरान करना चाहिए। स्रोत: https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/BigQueryIO.Write

आपको किसी BigQuery तालिका स्कीमा को पहले से जानने की आवश्यकता नहीं है, तो आप इसे गतिशील रूप से खोज सकते हैं। उदाहरण के लिए, आप तालिका स्कीमा से पूछने के लिए BigQuery API (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get) का उपयोग कर सकते हैं और इसे क्लास स्ट्रिंगट्रो कनवर्टर के लिए पैरामीटर के रूप में पास कर सकते हैं। एक और विकल्प और यह मानते हुए कि पहली पंक्ति एक शीर्षलेख है, पहली पंक्ति को छोड़ना और बाकी फ़ाइल को सही तरीके से मैप करने के लिए इसका उपयोग करना है।

नीचे दिया गया कोड दूसरे दृष्टिकोण को लागू करता है और मौजूदा BigQuery तालिका में शामिल करने के लिए आउटपुट को कॉन्फ़िगर करता है।

public class DFJob { 

    public static class StringToRowConverter extends DoFn<String, TableRow> { 

     private String[] columnNames; 

     private boolean isFirstRow = true; 

     public void processElement(ProcessContext c) { 
      TableRow row = new TableRow(); 

      String[] parts = c.element().split(","); 

      if (isFirstRow) { 
       columnNames = Arrays.copyOf(parts, parts.length); 
       isFirstRow = false; 
      } else { 
       for (int i = 0; i < parts.length; i++) { 
        row.set(columnNames[i], parts[i]); 
       } 
       c.output(row); 
      } 
     } 
    } 

    public static void main(String[] args) { 
     DataflowPipelineOptions options = PipelineOptionsFactory.create() 
       .as(DataflowPipelineOptions.class); 
     options.setRunner(BlockingDataflowPipelineRunner.class); 

     Pipeline p = Pipeline.create(options); 

     p.apply(TextIO.Read.from("gs://dataflow-samples/myfile.csv")) 
       .apply(ParDo.of(new StringToRowConverter())) 
       .apply(BigQueryIO.Write.to("myTable") 
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) 
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); 

     PipelineResult result = p.run(); 
    } 
} 
संबंधित मुद्दे