2016-12-23 20 views
8

मैं वितरित tensorflow वितरित करने के लिए नया हूं और CPUs पर तुल्यकालिक प्रशिक्षण करने के लिए एक अच्छा उदाहरण ढूंढ रहा हूं।वितरित Tensorflow: CPUs पर सिंक्रोनस प्रशिक्षण के लिए अच्छा उदाहरण

मैंने पहले ही Distributed Tensorflow Example को आजमाया है और यह 1 पैरामीटर सर्वर (1 सीपीयू वाला 1 मशीन) और 3 श्रमिकों (प्रत्येक कार्यकर्ता = 1 मशीन के साथ 1 मशीन) पर सफलतापूर्वक एसिंक्रोनस प्रशिक्षण कर सकता है। हालांकि, जब यह सिंक्रोनस प्रशिक्षण की बात आती है, तो मैं इसे सही तरीके से चलाने में सक्षम नहीं हूं, हालांकि मैंने SyncReplicasOptimizer(V1.0 and V2.0) के ट्यूटोरियल का पालन किया है।

मैंने आधिकारिक सिंक्रिप्लिकस ऑप्टीमाइज़र कोड को काम करने वाले असीमित प्रशिक्षण उदाहरण में डाला है लेकिन प्रशिक्षण प्रक्रिया अभी भी असीमित है। मेरा विस्तृत कोड इस प्रकार है। सिंक्रोनस प्रशिक्षण से संबंधित कोई भी कोड ****** के ब्लॉक के भीतर है।

import tensorflow as tf 
import sys 
import time 

# cluster specification ---------------------------------------------------------------------- 
parameter_servers = ["xx1.edu:2222"] 
workers = ["xx2.edu:2222", "xx3.edu:2222", "xx4.edu:2222"] 
cluster = tf.train.ClusterSpec({"ps":parameter_servers, "worker":workers}) 

# input flags 
tf.app.flags.DEFINE_string("job_name", "", "Either 'ps' or 'worker'") 
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") 
FLAGS = tf.app.flags.FLAGS 

# start a server for a specific task 
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) 

# Parameters ---------------------------------------------------------------------- 
N = 3 # number of replicas 
learning_rate = 0.001 
training_epochs = int(21/N) 
batch_size = 100 

# Network Parameters 
n_input = 784 # MNIST data input (img shape: 28*28) 
n_hidden_1 = 256 # 1st layer number of features 
n_hidden_2 = 256 # 2nd layer number of features 
n_classes = 10 # MNIST total classes (0-9 digits) 

if FLAGS.job_name == "ps": 
    server.join() 
    print("--- Parameter Server Ready ---") 
elif FLAGS.job_name == "worker": 
    # Import MNIST data 
    from tensorflow.examples.tutorials.mnist import input_data 
    mnist = input_data.read_data_sets("/tmp/data/", one_hot=True) 
    # Between-graph replication 
    with tf.device(tf.train.replica_device_setter(
     worker_device="/job:worker/task:%d" % FLAGS.task_index, 
     cluster=cluster)): 
     # count the number of updates 
     global_step = tf.get_variable('global_step', [], 
             initializer = tf.constant_initializer(0), 
             trainable = False, 
             dtype = tf.int32) 
     # tf Graph input 
     x = tf.placeholder("float", [None, n_input]) 
     y = tf.placeholder("float", [None, n_classes]) 

     # Create model 
     def multilayer_perceptron(x, weights, biases): 
      # Hidden layer with RELU activation 
      layer_1 = tf.add(tf.matmul(x, weights['h1']), biases['b1']) 
      layer_1 = tf.nn.relu(layer_1) 
      # Hidden layer with RELU activation 
      layer_2 = tf.add(tf.matmul(layer_1, weights['h2']), biases['b2']) 
      layer_2 = tf.nn.relu(layer_2) 
      # Output layer with linear activation 
      out_layer = tf.matmul(layer_2, weights['out']) + biases['out'] 
      return out_layer 

     # Store layers weight & bias 
     weights = { 
      'h1': tf.Variable(tf.random_normal([n_input, n_hidden_1])), 
      'h2': tf.Variable(tf.random_normal([n_hidden_1, n_hidden_2])), 
      'out': tf.Variable(tf.random_normal([n_hidden_2, n_classes])) 
     } 
     biases = { 
      'b1': tf.Variable(tf.random_normal([n_hidden_1])), 
      'b2': tf.Variable(tf.random_normal([n_hidden_2])), 
      'out': tf.Variable(tf.random_normal([n_classes])) 
     } 

     # Construct model 
     pred = multilayer_perceptron(x, weights, biases) 

     # Define loss and optimizer 
     cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(pred, y)) 

     # ************************* SyncReplicasOpt Version 1.0 ***************************************************** 
     ''' This optimizer collects gradients from all replicas, "summing" them, 
     then applying them to the variables in one shot, after which replicas can fetch the new variables and continue. ''' 
     # Create any optimizer to update the variables, say a simple SGD 
     opt = tf.train.AdamOptimizer(learning_rate=learning_rate) 

     # Wrap the optimizer with sync_replicas_optimizer with N replicas: at each step the optimizer collects N gradients before applying to variables. 
     opt = tf.train.SyncReplicasOptimizer(opt, replicas_to_aggregate=N, 
             replica_id=FLAGS.task_index, total_num_replicas=N) 

     # Now you can call `minimize()` or `compute_gradients()` and `apply_gradients()` normally 
     train = opt.minimize(cost, global_step=global_step) 

     # You can now call get_init_tokens_op() and get_chief_queue_runner(). 
     # Note that get_init_tokens_op() must be called before creating session 
     # because it modifies the graph. 
     init_token_op = opt.get_init_tokens_op() 
     chief_queue_runner = opt.get_chief_queue_runner() 
     # ************************************************************************************** 

     # Test model 
     correct = tf.equal(tf.argmax(pred, 1), tf.argmax(y, 1)) 
     accuracy = tf.reduce_mean(tf.cast(correct, "float")) 

     # Initializing the variables 
     init_op = tf.initialize_all_variables() 
     print("---Variables initialized---") 

    # ************************************************************************************** 
    is_chief = (FLAGS.task_index == 0) 
    # Create a "supervisor", which oversees the training process. 
    sv = tf.train.Supervisor(is_chief=is_chief, 
          logdir="/tmp/train_logs", 
          init_op=init_op, 
          global_step=global_step, 
          save_model_secs=600) 
    # ************************************************************************************** 

    with sv.prepare_or_wait_for_session(server.target) as sess: 
     # **************************************************************************************   
     # After the session is created by the Supervisor and before the main while loop: 
     if is_chief: 
      sv.start_queue_runners(sess, [chief_queue_runner]) 
      # Insert initial tokens to the queue. 
      sess.run(init_token_op) 
     # ************************************************************************************** 
     # Statistics 
     net_train_t = 0 
     # Training 
     for epoch in range(training_epochs): 
      total_batch = int(mnist.train.num_examples/batch_size) 
      # Loop over all batches 
      for i in range(total_batch): 
       batch_x, batch_y = mnist.train.next_batch(batch_size) 
       # ======== net training time ======== 
       begin_t = time.time() 
       sess.run(train, feed_dict={x: batch_x, y: batch_y}) 
       end_t = time.time() 
       net_train_t += (end_t - begin_t) 
       # =================================== 
      # Calculate training accuracy 
      # acc = sess.run(accuracy, feed_dict={x: mnist.train.images, y: mnist.train.labels}) 
      # print("Epoch:", '%04d' % (epoch+1), " Train Accuracy =", acc) 
      print("Epoch:", '%04d' % (epoch+1)) 
     print("Training Finished!") 
     print("Net Training Time: ", net_train_t, "second") 
     # Testing 
     print("Testing Accuracy = ", accuracy.eval({x: mnist.test.images, y: mnist.test.labels})) 

    sv.stop() 
    print("done") 

मेरे कोड के साथ कुछ भी गलत है? या मेरे पास पालन करने के लिए एक अच्छा उदाहरण हो सकता है?

+0

कोड अल्पज्ञता सही लगती है कि आप इसे तुल्यकालिक को चलाने के लिए कम से कम विधि में एक aggregation_method निर्दिष्ट करने की आवश्यकता है, लेकिन 'tf.train.SyncReplicasOptimizer' इंटरफेस काफी जटिल है, तो अभी भी एक बग हो सकता है। जब आप कहते हैं "प्रशिक्षण प्रक्रिया अभी भी असंकालिक है," आपने यह कैसे देखा? – mrry

+0

उत्तर के लिए धन्यवाद, @mrry। आदर्श syn-training में, हम उम्मीद करते हैं कि "एपोक #i" प्रत्येक श्रमिकों पर एक ही समय के बारे में मुद्रित होगा, लेकिन मैंने जो देखा वह है: कार्यकर्ता 0 पर "युग 1" - (3 मिनट बाद) -> " एपोक 1 "कार्यकर्ता 1 पर (3 मिनट बाद) -> कार्यकर्ता 2 पर" युग 1 "- (3 मिनट बाद) -> कार्यकर्ता 0 पर" युग 2 "- (3 मिनट बाद) -> कार्यकर्ता 1 पर "युग 2" - (3 मिनट बाद) -> कार्यकर्ता 2 पर "एपोक 2" - (3 मिनट बाद) -> कार्यकर्ता 0 पर "युग 3" .... अंत तक लूप। तो tensorflow syn-training में वास्तव में क्या चल रहा है? एक आदेशित युग प्रशिक्षण क्यों है? –

+0

मैं इसके बारे में भी उत्सुक हूं। मुझे आश्चर्य है कि कभी-कभी एक सीपीयू पीछे हो सकता है और यह एक सीपीयू से दो बैचों को जोड़ता है और अन्य CPUs में से एक को पीछे छोड़ देता है। – Aaron

उत्तर

0

मुझे यकीन नहीं है कि क्या आप उपयोगकर्ता-पारदर्शी वितरित tensorflow में रुचि रखते हैं जो बैकएंड में एमपीआई का उपयोग करता है। हमने हाल ही में MaTEx: https://github.com/matex-org/matex के साथ ऐसा एक संस्करण विकसित किया है।

इसलिए वितरित टेंसरफ्लो के लिए, आपको सिंकरप्लिकिका ऑप्टीमाइज़र कोड लिखने की आवश्यकता नहीं होगी, क्योंकि सभी परिवर्तन उपयोगकर्ता से सारित किए जाते हैं।

उम्मीद है कि इससे मदद मिलती है।

0

मुझे लगता है कि आपके प्रश्न को #9596 में tensorflow के टिप्पणियों के रूप में उत्तर दिया जा सकता है। यह समस्या tf.train.SyncReplicasOptimizer() के नए संस्करण की बग के कारण होती है। इस समस्या से बचने के लिए आप इस एपीआई के पुराने संस्करण का उपयोग कर सकते हैं।

एक और समाधान Tensorflow Distributed Benchmarks से है। स्रोत कोड पर एक नज़र डालें, और आप पाते हैं कि वे tensorflow में कतार के माध्यम से मैन्युअल रूप से श्रमिकों को सिंक्रनाइज़ करते हैं। प्रयोगों के माध्यम से, यह बेंचमार्क ठीक वैसे ही चलता है जो आप उम्मीद करते हैं।

आशा है कि ये टिप्पणियां और संसाधन आपकी समस्या का समाधान करने में आपकी सहायता कर सकते हैं। धन्यवाद!

0

एक मुद्दा

train = opt.minimize(cost, global_step=global_step, aggregation_method=tf.AggregationMethod.ADD_N) 
संबंधित मुद्दे