6

के माध्यम से BigQuery में PubSub संदेशों को सम्मिलित करें मैं Google क्लाउड डेटाफ़्लो का उपयोग करके किसी विषय से आने वाले PubSub संदेश डेटा को BigQuery तालिका में डालना चाहता हूं। सबकुछ बढ़िया काम करता है लेकिन BigQuery तालिका में मैं "߈ " जैसे अपठनीय तार देख सकता हूं। यह मेरा पाइपलाइन है:Google क्लाउड डेटाफ्लो

p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/project-name/topics/topic-name")) 
.apply(ParDo.named("Transformation").of(new StringToRowConverter())) 
.apply(BigQueryIO.Write.named("Write into BigQuery").to("project-name:dataset-name.table") 
    .withSchema(schema) 
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)) 

और मेरे साधारण StringToRowConverter समारोह है:

class StringToRowConverter extends DoFn<String, TableRow> { 
private static final long serialVersionUID = 0; 

@Override 
public void processElement(ProcessContext c) { 
    for (String word : c.element().split(",")) { 
     if (!word.isEmpty()) { 
      System.out.println(word); 
     c.output(new TableRow().set("data", word)); 
     } 
    } 
} 
} 

और इस संदेश को मैं एक पोस्ट अनुरोध के माध्यम से भेजा है: मैं क्या याद आ रही है

POST https://pubsub.googleapis.com/v1/projects/project-name/topics/topic-name:publish 
{ 
"messages": [ 
    { 
    "attributes":{ 
"key": "tablet, smartphone, desktop", 
"value": "eng" 
    }, 
    "data": "34gf5ert" 
    } 
] 
} 

? धन्यवाद!

+0

[यह] (https://github.com/bomboradata/pubsub-to-bigquery) एक खुला स्रोत है जिसका उपयोग आप पब/उप को बीक्यू में करने के लिए कर सकते हैं – PUG

उत्तर

6

https://cloud.google.com/pubsub/reference/rest/v1/PubsubMessage के अनुसार, पबब संदेश का JSON पेलोड बेस 64 एन्कोडेड है। डेटाफ्लो में PubsubIO, डिफ़ॉल्ट रूप से, स्ट्रिंग यूटीएफ 8 कोडर का उपयोग करता है। उदाहरण स्ट्रिंग जो आपने "34gf5ert" प्रदान की है, जब बेस 64-डीकोडेड और फिर यूटीएफ -8 स्ट्रिंग के रूप में व्याख्या की जाती है, तो बिल्कुल "߈ " देता है।

2

इस तरह मैं अपने pubsub संदेशों खोल रहा हूँ:

@Override 
public void processElement(ProcessContext c) { 

    String json = c.element(); 

    HashMap<String,String> items = new Gson().fromJson(json, new TypeToken<HashMap<String, String>>(){}.getType()); 
    String unpacked = items.get("JsonKey"); 

आप के लिए अपनी उपयोगी आशा है।

संबंधित मुद्दे