Intel® Optimized AI Frameworks
Get community support for questions related to PyTorch* and TensorFlow* frameworks.
53 Discussions

mnist_multi_worker_strategy distribution error

wangjian
Beginner
939 Views

 

System information

  • I can run this multi-node case with tensorflow 2.2.0  successfully. But with intel tensorflow 2.2.0,it occur error related with mkl.
  • OS Platform and Distribution : Linux CentOS 8.2
  • TensorFlow installed with pip  on both machines.

         pip install intel-tensorflow==2.2.0

  • Python version: 3.7
  • Bazel version (if compiling from source): 1.1.0

 

multi-nodes code

mnist_multi_worker_strategy.py :

 

#!/usr/bin/python
# -*-coding:utf-8 -*-

import os
import json
import argparse

import tensorflow as tf
from tensorflow.keras import datasets
from tensorflow.keras import layers, models
from tensorflow.keras import optimizers


def set_strategy(args):
if args.job_name != 'worker':
raise ValueError(
"Multi strategy only support worker mode, please check job name")

tf_config = args.worker_hosts.split(',')
os.environ["TF_CONFIG"] = json.dumps({
'cluster': {
'worker': tf_config
},
'task': {'type': args.job_name, 'index': args.task_index}
})
print(os.environ["TF_CONFIG"])

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

return strategy


# create cnn model
class Net(object):
def __init__(self):
model = models.Sequential()
model.add(layers.Conv2D(
32, (3, 3), activation='relu', input_shape=(28, 28, 1)))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))

model.add(layers.Flatten())
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dense(10, activation='softmax'))

model.summary()

self.model = model


# inital dateset
class DataSet(object):
def __init__(self):
data_path = os.path.dirname(os.path.realpath(__file__)) \
+ '/../../datasets/mnist/mnist.npz'
(train_images, train_labels), (test_images, test_labels) = \
datasets.mnist.load_data(path=data_path)
train_images = train_images.reshape((60000, 28, 28, 1))
test_images = test_images.reshape((10000, 28, 28, 1))

train_images, test_images = train_images / 255.0, test_images / 255.0

self.train_images, self.train_labels = train_images, train_labels
self.test_images, self.test_labels = test_images, test_labels


# train and val
class Train:
def __init__(self):
self.data = DataSet()

def train(self, args, strategy):
# Define the checkpoint directory to store the checkpoints
checkpoint_dir = args.train_dir
# Name of the checkpoint files
checkpoint_path = os.path.join(checkpoint_dir, "ckpt_{epoch}")

callbacks = [
tf.keras.callbacks.TensorBoard(
log_dir=args.train_dir, histogram_freq=1),
tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
save_weights_only=True),
]

with strategy.scope():
model = Net().model

model.compile(optimizer=optimizers.Adam(),
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])

model.fit(self.data.train_images, self.data.train_labels,
batch_size=args.batch_size,
epochs=args.epochs,
callbacks=callbacks,
validation_data=(self.data.test_images, self.data.test_labels))

# EVAL
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
eval_loss, eval_acc = model.evaluate(
self.data.test_images, self.data.test_labels, verbose=2)
print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))


def main():
# training params settings
parser = argparse.ArgumentParser(description='Tensorflow 2.0 MNIST Example,'
' use Mirrorstrategy')
parser.add_argument('--train_dir', '-td', type=str, default='./train_dir',
help='the folder of svaing model')
parser.add_argument('--batch_size', '-b', type=int, default=64,
help='input batch size for training (default: 64)')
parser.add_argument('--test_batchsize', '-tb', type=int, default=1000,
help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', '-e', type=int, default=10,
help='number of epochs to train (default: 10)')
parser.add_argument('--gpu_nums', '-g', type=int, default=0,
help='number of gpus')
parser.add_argument('--cpu_nums', '-c', type=int, default=0,
help='number of cpus')
parser.add_argument('--learning_rate', '-lr', type=float, default=0.01,
help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.5,
help='SGD momentum (default: 0.5)')
parser.add_argument('--log_interval', type=int, default=10,
help='how many batches to wait before logging training status')
parser.add_argument('--save_model', '-sm', action='store_true', default=False,
help='For Saving the current Model')
parser.add_argument('--worker_hosts', '-wh', type=str, required=True,
help='Comma-separated list of hostname:port pairs')
parser.add_argument('--job_name', '-j', type=str, default='worker',
help='Ps or worker')
parser.add_argument('--task_index', '-i', type=int, required=True,
help='Index of task within the job')

args = parser.parse_args()

strategy = set_strategy(args)

app = Train()
app.train(args, strategy)


if __name__ == "__main__":
main()

 

 

 

(/home/hpcadmin/wj/intel_tf2.2) [root@c1 ai]# python /home/hpcadmin/lico-demo/ai/tensorflow2/mnist_multi_worker_strategy.py --worker_hosts=c1:27481,c2:27252 --job_name=worker --task_index=0
{"cluster": {"worker": ["c1:27481", "c2:27252"]}, "task": {"type": "worker", "index": 0}}

User settings:

KMP_AFFINITY=granularity=fine,verbose,compact,1,0
KMP_BLOCKTIME=0
KMP_SETTINGS=1

Effective settings:

KMP_ABORT_DELAY=0
KMP_ADAPTIVE_LOCK_PROPS='1,1024'
KMP_ALIGN_ALLOC=64
KMP_ALL_THREADPRIVATE=128
KMP_ATOMIC_MODE=2
KMP_BLOCKTIME=0
KMP_CPUINFO_FILE: value is not defined
KMP_DETERMINISTIC_REDUCTION=false
KMP_DEVICE_THREAD_LIMIT=2147483647
KMP_DISP_HAND_THREAD=false
KMP_DISP_NUM_BUFFERS=7
KMP_DUPLICATE_LIB_OK=false
KMP_ENABLE_TASK_THROTTLING=true
KMP_FORCE_MONOTONIC_DYNAMIC_SCHEDULE=false
KMP_FORCE_REDUCTION: value is not defined
KMP_FOREIGN_THREADS_THREADPRIVATE=true
KMP_FORKJOIN_BARRIER='2,2'
KMP_FORKJOIN_BARRIER_PATTERN='hyper,hyper'
KMP_FORKJOIN_FRAMES=true
KMP_FORKJOIN_FRAMES_MODE=3
KMP_GTID_MODE=3
KMP_HANDLE_SIGNALS=false
KMP_HOT_TEAMS_MAX_LEVEL=1
KMP_HOT_TEAMS_MODE=0
KMP_INIT_AT_FORK=true
KMP_ITT_PREPARE_DELAY=0
KMP_LIBRARY=throughput
KMP_LOCK_KIND=queuing
KMP_MALLOC_POOL_INCR=1M
KMP_MWAIT_HINTS=0
KMP_NUM_LOCKS_IN_BLOCK=1
KMP_PLAIN_BARRIER='2,2'
KMP_PLAIN_BARRIER_PATTERN='hyper,hyper'
KMP_REDUCTION_BARRIER='1,1'
KMP_REDUCTION_BARRIER_PATTERN='hyper,hyper'
KMP_SCHEDULE='static,balanced;guided,iterative'
KMP_SETTINGS=true
KMP_SPIN_BACKOFF_PARAMS='4096,100'
KMP_STACKOFFSET=64
KMP_STACKPAD=0
KMP_STACKSIZE=8M
KMP_STORAGE_MAP=false
KMP_TASKING=2
KMP_TASKLOOP_MIN_TASKS=0
KMP_TASK_STEALING_CONSTRAINT=1
KMP_TEAMS_THREAD_LIMIT=32
KMP_TOPOLOGY_METHOD=all
KMP_USER_LEVEL_MWAIT=false
KMP_USE_YIELD=1
KMP_VERSION=false
KMP_WARNINGS=true
OMP_AFFINITY_FORMAT='OMP: pid %P tid %i thread %n bound to OS proc set {%A}'
OMP_ALLOCATOR=omp_default_mem_alloc
OMP_CANCELLATION=false
OMP_DEBUG=disabled
OMP_DEFAULT_DEVICE=0
OMP_DISPLAY_AFFINITY=false
OMP_DISPLAY_ENV=false
OMP_DYNAMIC=false
OMP_MAX_ACTIVE_LEVELS=1
OMP_MAX_TASK_PRIORITY=0
OMP_NESTED: deprecated; max-active-levels-var=1
OMP_NUM_TEAMS=0
OMP_NUM_THREADS: value is not defined
OMP_PLACES='threads'
OMP_PROC_BIND='intel'
OMP_SCHEDULE='static'
OMP_STACKSIZE=8M
OMP_TARGET_OFFLOAD=DEFAULT
OMP_TEAMS_THREAD_LIMIT=0
OMP_THREAD_LIMIT=2147483647
OMP_TOOL=enabled
OMP_TOOL_LIBRARIES: value is not defined
OMP_TOOL_VERBOSE_LOAD=disabled
OMP_WAIT_POLICY=PASSIVE
KMP_AFFINITY='verbose,warnings,respect,granularity=thread,compact,1,0'

2021-05-12 18:10:11.273100: I tensorflow/core/platform/profile_utils/cpu_utils.cc:104] CPU Frequency: 2599980000 Hz
2021-05-12 18:10:11.277218: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x5583f4f889d0 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2021-05-12 18:10:11.277242: I tensorflow/compiler/xla/service/service.cc:176] StreamExecutor device (0): Host, Default Version
2021-05-12 18:10:11.277396: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
2021-05-12 18:10:11.291073: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> c1:27481, 1 -> c2:27252}
2021-05-12 18:10:11.295355: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:405] Started server with target: grpc://c1:27481
2021-05-12 18:10:11.780776: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session started.
Model: "sequential"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
conv2d (Conv2D) (None, 26, 26, 32) 320
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 13, 13, 32) 0
_________________________________________________________________
conv2d_1 (Conv2D) (None, 11, 11, 64) 18496
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 5, 5, 64) 0
_________________________________________________________________
conv2d_2 (Conv2D) (None, 3, 3, 64) 36928
_________________________________________________________________
flatten (Flatten) (None, 576) 0
_________________________________________________________________
dense (Dense) (None, 64) 36928
_________________________________________________________________
dense_1 (Dense) (None, 10) 650
=================================================================
Total params: 93,322
Trainable params: 93,322
Non-trainable params: 0
_________________________________________________________________
WARNING:tensorflow:`eval_fn` is not passed in. The `worker_fn` will be used if an "evaluator" task exists in the cluster.
WARNING:tensorflow:`eval_strategy` is not passed in. No distribution strategy will be used for evaluation.
2021-05-12 18:10:43.333950: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:521] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_9"
op: "FlatMapDataset"
input: "PrefetchDataset/_8"
attr {
key: "Targuments"
value {
list {
}
}
}
attr {
key: "f"
value {
func {
name: "__inference_Dataset_flat_map_slice_batch_indices_245"
}
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: -1
}
}
}
}
}
attr {
key: "output_types"
value {
list {
type: DT_INT64
}
}
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.
Epoch 1/10
WARNING:tensorflow:From /home/hpcadmin/wj/intel_tf2.4.1_pip/lib/python3.7/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
2021-05-12 18:10:46.623967: E tensorflow/core/common_runtime/ring_alg.cc:274] Aborting RingReduce with Cancelled: [_Derived_]Cancelled
Additional GRPC error information from remote target /job:worker/replica:0/task:1:
:{"created":"@1620814246.623910026","description":"Error received from peer ipv4:10.240.212.98:27252","file":"external/com_github_grpc_grpc/src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Cancelled","grpc_status":1}
2021-05-12 18:10:46.624121: W tensorflow/core/framework/op_kernel.cc:1767] OP_REQUIRES failed at collective_ops.cc:257 : Cancelled: [_Derived_]Cancelled
Additional GRPC error information from remote target /job:worker/replica:0/task:1:
:{"created":"@1620814246.623910026","description":"Error received from peer ipv4:10.240.212.98:27252","file":"external/com_github_grpc_grpc/src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Cancelled","grpc_status":1}
Traceback (most recent call last):
File "/home/hpcadmin/lico-demo/ai/tensorflow2/mnist_multi_worker_strategy.py", line 147, in <module>
main()
File "/home/hpcadmin/lico-demo/ai/tensorflow2/mnist_multi_worker_strategy.py", line 143, in main
app.train(args, strategy)
File "/home/hpcadmin/lico-demo/ai/tensorflow2/mnist_multi_worker_strategy.py", line 98, in train
validation_data=(self.data.test_images, self.data.test_labels))
File "/home/hpcadmin/wj/intel_tf2.4.1_pip/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 117, in _method_wrapper
mode=dc.CoordinatorMode.INDEPENDENT_WORKER)
File "/home/hpcadmin/wj/intel_tf2.4.1_pip/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_coordinator.py", line 860, in run_distribute_coordinator
task_id, session_config, rpc_layer)
File "/home/hpcadmin/wj/intel_tf2.4.1_pip/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_coordinator.py", line 360, in _run_single_worker
return worker_fn(strategy)
File "/home/hpcadmin/wj/intel_tf2.4.1_pip/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 115, in <lambda>
lambda _: method(self, *args, **kwargs),
File "/home/hpcadmin/wj/intel_tf2.4.1_pip/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 1098, in fit
tmp_logs = train_function(iterator)
File "/home/hpcadmin/wj/intel_tf2.4.1_pip/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 780, in __call__
result = self._call(*args, **kwds)
File "/home/hpcadmin/wj/intel_tf2.4.1_pip/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 840, in _call
return self._stateless_fn(*args, **kwds)
File "/home/hpcadmin/wj/intel_tf2.4.1_pip/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 2829, in __call__
return graph_function._filtered_call(args, kwargs) # pylint: disable=protected-access
File "/home/hpcadmin/wj/intel_tf2.4.1_pip/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 1848, in _filtered_call
cancellation_manager=cancellation_manager)
File "/home/hpcadmin/wj/intel_tf2.4.1_pip/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 1924, in _call_flat
ctx, args, cancellation_manager=cancellation_manager))
File "/home/hpcadmin/wj/intel_tf2.4.1_pip/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 550, in call
ctx=ctx)
File "/home/hpcadmin/wj/intel_tf2.4.1_pip/lib/python3.7/site-packages/tensorflow/python/eager/execute.py", line 60, in quick_execute
inputs, attrs, num_outputs)
tensorflow.python.framework.errors_impl.InvalidArgumentError: Upper bound check fail for input 5 from node Mkl2Tf/_47 to node scoped_allocator_concat_1_8 input bounds = [0x7f602004fc40, 0x7f602004fd40] backing_tensor bounds = [0x7f5d11a56540, 0x7f5d11ab1780]
[[{{node scoped_allocator_concat_1_8}}]] [Op:__inference_train_function_1152]

Function call stack:
train_function

2021-05-12 18:10:46.797979: W tensorflow/core/common_runtime/eager/context.cc:566] Unable to destroy server_ object, so releasing instead. Servers don't support clean shutdown.

tensorflow.python.framework.errors_impl.InvalidArgumentError: Upper bound check fail for input 5 from node Mkl2Tf/_47 to node scoped_allocator_concat_1_8 input bounds = [0x7f602004fc40, 0x7f602004fd40] backing_tensor bounds = [0x7f5d11a56540, 0x7f5d11ab1780]

0 Kudos
9 Replies
RahulU_Intel
Moderator
930 Views

Hi,


Thanks for posting in Intel Forums. We will try to reproduce your issue from our side and let you know the updates.



Thanks,

Rahul


RahulU_Intel
Moderator
894 Views

Hi,


We tried the command you sent, but we could not reproduce the error. Can you reconfirm the command you sent to execute the python file?

Also can you attach the python file so that it will be easier for us to execute the same


Thanks

Rahul


RahulU_Intel
Moderator
867 Views

Hi,


We tried executing your code using Intel optimized tensorflow(2.2.0), but we were not able to reproduce the error. Can you attach the python script as a file you used to execute the same?


Thanks

Rahul


RahulU_Intel
Moderator
836 Views

Hi,


We looked into your case, It’s known bug in multiple worker model. It has been fixed in latest Tensorflow . Stack tf .2.5.0 has included the fix. You could install google official TF 2.5.0, and enable Intel optimization of Tensorflow by setting the environment variable TF_ENABLE_ONEDNN_OPTS=1. Can you try executing your code by upgrading your Tensorflow and let us know your results?


Thanks

Rahul


wangjian
Beginner
821 Views

Yes,I have execute code under TF 2.5.0 successfully. Thank your support. BTW , I want to know do  you have plan to merge this fix to Intel optimized TF?

RahulU_Intel
Moderator
793 Views

Hi,


We are glad that your issue is resolved. For the multiple worker crash issue, Intel Optimized Tensorflow 2.5.0 has included the fix. You can use the command "pip install intel-tensorflow==2.5.0" to install the latest Intel Optimzied Tensorflow.



Thanks and Regards

Rahul


RahulU_Intel
Moderator
774 Views

Hi,


We haven't heard back anything from you. Could you please confirm if the issue is resolved.


Thanks


RahulU_Intel
Moderator
755 Views

Hi,


I have not heard back from you, so I will close this inquiry now. If you need further assistance, please post a new question.


Thanks and Regards


wangjian
Beginner
719 Views

I have no problem,Thank you very much.

Reply