मैं वितरित tensorflow वितरित करने के लिए नया हूं और CPUs पर तुल्यकालिक प्रशिक्षण करने के लिए एक अच्छा उदाहरण ढूंढ रहा हूं।वितरित Tensorflow: CPUs पर सिंक्रोनस प्रशिक्षण के लिए अच्छा उदाहरण
मैंने पहले ही Distributed Tensorflow Example को आजमाया है और यह 1 पैरामीटर सर्वर (1 सीपीयू वाला 1 मशीन) और 3 श्रमिकों (प्रत्येक कार्यकर्ता = 1 मशीन के साथ 1 मशीन) पर सफलतापूर्वक एसिंक्रोनस प्रशिक्षण कर सकता है। हालांकि, जब यह सिंक्रोनस प्रशिक्षण की बात आती है, तो मैं इसे सही तरीके से चलाने में सक्षम नहीं हूं, हालांकि मैंने SyncReplicasOptimizer(V1.0 and V2.0) के ट्यूटोरियल का पालन किया है।
मैंने आधिकारिक सिंक्रिप्लिकस ऑप्टीमाइज़र कोड को काम करने वाले असीमित प्रशिक्षण उदाहरण में डाला है लेकिन प्रशिक्षण प्रक्रिया अभी भी असीमित है। मेरा विस्तृत कोड इस प्रकार है। सिंक्रोनस प्रशिक्षण से संबंधित कोई भी कोड ****** के ब्लॉक के भीतर है।
import tensorflow as tf
import sys
import time
# cluster specification ----------------------------------------------------------------------
parameter_servers = ["xx1.edu:2222"]
workers = ["xx2.edu:2222", "xx3.edu:2222", "xx4.edu:2222"]
cluster = tf.train.ClusterSpec({"ps":parameter_servers, "worker":workers})
# input flags
tf.app.flags.DEFINE_string("job_name", "", "Either 'ps' or 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS
# start a server for a specific task
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
# Parameters ----------------------------------------------------------------------
N = 3 # number of replicas
learning_rate = 0.001
training_epochs = int(21/N)
batch_size = 100
# Network Parameters
n_input = 784 # MNIST data input (img shape: 28*28)
n_hidden_1 = 256 # 1st layer number of features
n_hidden_2 = 256 # 2nd layer number of features
n_classes = 10 # MNIST total classes (0-9 digits)
if FLAGS.job_name == "ps":
server.join()
print("--- Parameter Server Ready ---")
elif FLAGS.job_name == "worker":
# Import MNIST data
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)
# Between-graph replication
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# count the number of updates
global_step = tf.get_variable('global_step', [],
initializer = tf.constant_initializer(0),
trainable = False,
dtype = tf.int32)
# tf Graph input
x = tf.placeholder("float", [None, n_input])
y = tf.placeholder("float", [None, n_classes])
# Create model
def multilayer_perceptron(x, weights, biases):
# Hidden layer with RELU activation
layer_1 = tf.add(tf.matmul(x, weights['h1']), biases['b1'])
layer_1 = tf.nn.relu(layer_1)
# Hidden layer with RELU activation
layer_2 = tf.add(tf.matmul(layer_1, weights['h2']), biases['b2'])
layer_2 = tf.nn.relu(layer_2)
# Output layer with linear activation
out_layer = tf.matmul(layer_2, weights['out']) + biases['out']
return out_layer
# Store layers weight & bias
weights = {
'h1': tf.Variable(tf.random_normal([n_input, n_hidden_1])),
'h2': tf.Variable(tf.random_normal([n_hidden_1, n_hidden_2])),
'out': tf.Variable(tf.random_normal([n_hidden_2, n_classes]))
}
biases = {
'b1': tf.Variable(tf.random_normal([n_hidden_1])),
'b2': tf.Variable(tf.random_normal([n_hidden_2])),
'out': tf.Variable(tf.random_normal([n_classes]))
}
# Construct model
pred = multilayer_perceptron(x, weights, biases)
# Define loss and optimizer
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(pred, y))
# ************************* SyncReplicasOpt Version 1.0 *****************************************************
''' This optimizer collects gradients from all replicas, "summing" them,
then applying them to the variables in one shot, after which replicas can fetch the new variables and continue. '''
# Create any optimizer to update the variables, say a simple SGD
opt = tf.train.AdamOptimizer(learning_rate=learning_rate)
# Wrap the optimizer with sync_replicas_optimizer with N replicas: at each step the optimizer collects N gradients before applying to variables.
opt = tf.train.SyncReplicasOptimizer(opt, replicas_to_aggregate=N,
replica_id=FLAGS.task_index, total_num_replicas=N)
# Now you can call `minimize()` or `compute_gradients()` and `apply_gradients()` normally
train = opt.minimize(cost, global_step=global_step)
# You can now call get_init_tokens_op() and get_chief_queue_runner().
# Note that get_init_tokens_op() must be called before creating session
# because it modifies the graph.
init_token_op = opt.get_init_tokens_op()
chief_queue_runner = opt.get_chief_queue_runner()
# **************************************************************************************
# Test model
correct = tf.equal(tf.argmax(pred, 1), tf.argmax(y, 1))
accuracy = tf.reduce_mean(tf.cast(correct, "float"))
# Initializing the variables
init_op = tf.initialize_all_variables()
print("---Variables initialized---")
# **************************************************************************************
is_chief = (FLAGS.task_index == 0)
# Create a "supervisor", which oversees the training process.
sv = tf.train.Supervisor(is_chief=is_chief,
logdir="/tmp/train_logs",
init_op=init_op,
global_step=global_step,
save_model_secs=600)
# **************************************************************************************
with sv.prepare_or_wait_for_session(server.target) as sess:
# **************************************************************************************
# After the session is created by the Supervisor and before the main while loop:
if is_chief:
sv.start_queue_runners(sess, [chief_queue_runner])
# Insert initial tokens to the queue.
sess.run(init_token_op)
# **************************************************************************************
# Statistics
net_train_t = 0
# Training
for epoch in range(training_epochs):
total_batch = int(mnist.train.num_examples/batch_size)
# Loop over all batches
for i in range(total_batch):
batch_x, batch_y = mnist.train.next_batch(batch_size)
# ======== net training time ========
begin_t = time.time()
sess.run(train, feed_dict={x: batch_x, y: batch_y})
end_t = time.time()
net_train_t += (end_t - begin_t)
# ===================================
# Calculate training accuracy
# acc = sess.run(accuracy, feed_dict={x: mnist.train.images, y: mnist.train.labels})
# print("Epoch:", '%04d' % (epoch+1), " Train Accuracy =", acc)
print("Epoch:", '%04d' % (epoch+1))
print("Training Finished!")
print("Net Training Time: ", net_train_t, "second")
# Testing
print("Testing Accuracy = ", accuracy.eval({x: mnist.test.images, y: mnist.test.labels}))
sv.stop()
print("done")
मेरे कोड के साथ कुछ भी गलत है? या मेरे पास पालन करने के लिए एक अच्छा उदाहरण हो सकता है?
कोड अल्पज्ञता सही लगती है कि आप इसे तुल्यकालिक को चलाने के लिए कम से कम विधि में एक aggregation_method निर्दिष्ट करने की आवश्यकता है, लेकिन 'tf.train.SyncReplicasOptimizer' इंटरफेस काफी जटिल है, तो अभी भी एक बग हो सकता है। जब आप कहते हैं "प्रशिक्षण प्रक्रिया अभी भी असंकालिक है," आपने यह कैसे देखा? – mrry
उत्तर के लिए धन्यवाद, @mrry। आदर्श syn-training में, हम उम्मीद करते हैं कि "एपोक #i" प्रत्येक श्रमिकों पर एक ही समय के बारे में मुद्रित होगा, लेकिन मैंने जो देखा वह है: कार्यकर्ता 0 पर "युग 1" - (3 मिनट बाद) -> " एपोक 1 "कार्यकर्ता 1 पर (3 मिनट बाद) -> कार्यकर्ता 2 पर" युग 1 "- (3 मिनट बाद) -> कार्यकर्ता 0 पर" युग 2 "- (3 मिनट बाद) -> कार्यकर्ता 1 पर "युग 2" - (3 मिनट बाद) -> कार्यकर्ता 2 पर "एपोक 2" - (3 मिनट बाद) -> कार्यकर्ता 0 पर "युग 3" .... अंत तक लूप। तो tensorflow syn-training में वास्तव में क्या चल रहा है? एक आदेशित युग प्रशिक्षण क्यों है? –
मैं इसके बारे में भी उत्सुक हूं। मुझे आश्चर्य है कि कभी-कभी एक सीपीयू पीछे हो सकता है और यह एक सीपीयू से दो बैचों को जोड़ता है और अन्य CPUs में से एक को पीछे छोड़ देता है। – Aaron