I am trying to prefetch training data to hide I/O latency. I would like to write custom Python code that loads data from disk and preprocesses the data (e.g. by adding a context window). In other words, one thread does data preprocessing and the other does training. Is this possible in TensorFlow?
Update: I have a working example based on @mrry's example.
import numpy as np
import tensorflow as tf
import threading
BATCH_SIZE = 5
TRAINING_ITERS = 4100
feature_input = tf.placeholder(tf.float32, shape=[128])
label_input = tf.placeholder(tf.float32, shape=[128])
q = tf.FIFOQueue(200, [tf.float32, tf.float32], shapes=[[128], [128]])
enqueue_op = q.enqueue([label_input, feature_input])
label_batch, feature_batch = q.dequeue_many(BATCH_SIZE)
c = tf.reshape(feature_batch, [BATCH_SIZE, 128]) + tf.reshape(label_batch, [BATCH_SIZE, 128])
sess = tf.Session()
def load_and_enqueue(sess, enqueue_op, coord):
with open('dummy_data/features.bin') as feature_file, open('dummy_data/labels.bin') as label_file:
while not coord.should_stop():
feature_array = np.fromfile(feature_file, np.float32, 128)
if feature_array.shape[0] == 0:
print('reach end of file, reset using seek(0,0)')
feature_file.seek(0,0)
label_file.seek(0,0)
continue
label_value = np.fromfile(label_file, np.float32, 128)
sess.run(enqueue_op, feed_dict={feature_input: feature_array,
label_input: label_value})
coord = tf.train.Coordinator()
t = threading.Thread(target=load_and_enqueue, args=(sess,enqueue_op, coord))
t.start()
for i in range(TRAINING_ITERS):
sum = sess.run(c)
print('train_iter='+str(i))
print(sum)
coord.request_stop()
coord.join([t])
See Question&Answers more detail:os