PyTorch并行訓(xùn)練DistributedDataParallel完整demo
大型數(shù)據(jù)集訓(xùn)練
使用大型數(shù)據(jù)集訓(xùn)練大型深度神經(jīng)網(wǎng)絡(luò) (DNN) 的問(wèn)題是深度學(xué)習(xí)領(lǐng)域的主要挑戰(zhàn)。 隨著 DNN 和數(shù)據(jù)集規(guī)模的增加,訓(xùn)練這些模型的計(jì)算和內(nèi)存需求也會(huì)增加。 這使得在計(jì)算資源有限的單臺(tái)機(jī)器上訓(xùn)練這些模型變得困難甚至不可能。
使用大型數(shù)據(jù)集訓(xùn)練大型 DNN 的一些主要挑戰(zhàn)包括:
- 訓(xùn)練時(shí)間長(zhǎng):訓(xùn)練過(guò)程可能需要數(shù)周甚至數(shù)月才能完成,具體取決于模型的復(fù)雜性和數(shù)據(jù)集的大小。
- 內(nèi)存限制:大型 DNN 可能需要大量?jī)?nèi)存來(lái)存儲(chǔ)訓(xùn)練期間的所有模型參數(shù)、梯度和中間激活。 這可能會(huì)導(dǎo)致內(nèi)存不足錯(cuò)誤并限制可在單臺(tái)機(jī)器上訓(xùn)練的模型的大小。
為了應(yīng)對(duì)這些挑戰(zhàn),已經(jīng)開(kāi)發(fā)了各種技術(shù)來(lái)擴(kuò)大具有大型數(shù)據(jù)集的大型 DNN 的訓(xùn)練,包括模型并行性、數(shù)據(jù)并行性和混合并行性,以及硬件、軟件和算法的優(yōu)化。
PyTorch 的數(shù)據(jù)并行性和模型并行性
在本文中我們將演示使用 PyTorch 的數(shù)據(jù)并行性和模型并行性。
我們所說(shuō)的并行性一般是指在多個(gè)gpu,或多臺(tái)機(jī)器上訓(xùn)練深度神經(jīng)網(wǎng)絡(luò)(dnn),以實(shí)現(xiàn)更少的訓(xùn)練時(shí)間。數(shù)據(jù)并行背后的基本思想是將訓(xùn)練數(shù)據(jù)分成更小的塊,讓每個(gè)GPU或機(jī)器處理一個(gè)單獨(dú)的數(shù)據(jù)塊。然后將每個(gè)節(jié)點(diǎn)的結(jié)果組合起來(lái),用于更新模型參數(shù)。在數(shù)據(jù)并行中,模型體系結(jié)構(gòu)在每個(gè)節(jié)點(diǎn)上是相同的,但模型參數(shù)在節(jié)點(diǎn)之間進(jìn)行了分區(qū)。每個(gè)節(jié)點(diǎn)使用分配的數(shù)據(jù)塊訓(xùn)練自己的本地模型,在每次訓(xùn)練迭代結(jié)束時(shí),模型參數(shù)在所有節(jié)點(diǎn)之間同步。這個(gè)過(guò)程不斷重復(fù),直到模型收斂到一個(gè)令人滿意的結(jié)果。
下面我們用用ResNet50和CIFAR10數(shù)據(jù)集來(lái)進(jìn)行完整的代碼示例:
在數(shù)據(jù)并行中,模型架構(gòu)在每個(gè)節(jié)點(diǎn)上保持相同,但模型參數(shù)在節(jié)點(diǎn)之間進(jìn)行了分區(qū),每個(gè)節(jié)點(diǎn)使用分配的數(shù)據(jù)塊訓(xùn)練自己的本地模型。
PyTorch的DistributedDataParallel 庫(kù)可以進(jìn)行跨節(jié)點(diǎn)的梯度和模型參數(shù)的高效通信和同步,實(shí)現(xiàn)分布式訓(xùn)練。本文提供了如何使用ResNet50和CIFAR10數(shù)據(jù)集使用PyTorch實(shí)現(xiàn)數(shù)據(jù)并行的示例,其中代碼在多個(gè)gpu或機(jī)器上運(yùn)行,每臺(tái)機(jī)器處理訓(xùn)練數(shù)據(jù)的一個(gè)子集。訓(xùn)練過(guò)程使用PyTorch的DistributedDataParallel 庫(kù)進(jìn)行并行化。
導(dǎo)入必須要的庫(kù)
importos fromdatetimeimportdatetime fromtimeimporttime importargparse importtorchvision importtorchvision.transformsastransforms importtorch importtorch.nnasnn importtorch.distributedasdist fromtorch.nn.parallelimportDistributedDataParallel
檢查GPU
importsubprocess result=subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE) print(result.stdout.decode())
因?yàn)槲覀冃枰诙鄠€(gè)服務(wù)器上運(yùn)行,所以手動(dòng)一個(gè)一個(gè)執(zhí)行并不現(xiàn)實(shí),所以需要有一個(gè)調(diào)度程序。這里我們使用SLURM文件來(lái)運(yùn)行代碼(slurm面向Linux和Unix類(lèi)似內(nèi)核的免費(fèi)和開(kāi)源工作調(diào)度程序),
defmain(): # get distributed configuration from Slurm environment parser=argparse.ArgumentParser() parser.add_argument('-b', '--batch-size', default=128, type=int, help='batch size. it will be divided in mini-batch for each worker') parser.add_argument('-e','--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') parser.add_argument('-c','--checkpoint', default=None, type=str, help='path to checkpoint to load') args=parser.parse_args() rank=int(os.environ['SLURM_PROCID']) local_rank=int(os.environ['SLURM_LOCALID']) size=int(os.environ['SLURM_NTASKS']) master_addr=os.environ["SLURM_SRUN_COMM_HOST"] port="29500" node_id=os.environ['SLURM_NODEID'] ddp_arg= [rank, local_rank, size, master_addr, port, node_id] train(args, ddp_arg)
然后我們使用DistributedDataParallel 庫(kù)來(lái)執(zhí)行分布式訓(xùn)練。
deftrain(args, ddp_arg): rank, local_rank, size, MASTER_ADDR, port, NODE_ID=ddp_arg # display info ifrank==0: #print(">>> Training on ", len(hostnames), " nodes and ", size, " processes, master node is ", MASTER_ADDR) print(">>> Training on ", size, " GPUs, master node is ", MASTER_ADDR) #print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID)) print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID)) # configure distribution method: define address and port of the master node and initialise communication backend (NCCL) #dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank) dist.init_process_group( backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank ) # distribute model torch.cuda.set_device(local_rank) gpu=torch.device("cuda") #model = ResNet18(classes=10).to(gpu) model=torchvision.models.resnet50(pretrained=False).to(gpu) ddp_model=DistributedDataParallel(model, device_ids=[local_rank]) ifargs.checkpointisnotNone: map_location= {'cuda:%d'%0: 'cuda:%d'%local_rank} ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location)) # distribute batch size (mini-batch) batch_size=args.batch_size batch_size_per_gpu=batch_size//size # define loss function (criterion) and optimizer criterion=nn.CrossEntropyLoss() optimizer=torch.optim.SGD(ddp_model.parameters(), 1e-4) transform_train=transforms.Compose([ transforms.RandomCrop(32, padding=4), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), ]) # load data with distributed sampler #train_dataset = torchvision.datasets.CIFAR10(root='./data', # train=True, # transform=transform_train, # download=False) # load data with distributed sampler train_dataset=torchvision.datasets.CIFAR10(root='./data', train=True, transform=transform_train, download=False) train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset, num_replicas=size, rank=rank) train_loader=torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size_per_gpu, shuffle=False, num_workers=0, pin_memory=True, sampler=train_sampler) # training (timers and display handled by process 0) ifrank==0: start=datetime.now() total_step=len(train_loader) forepochinrange(args.epochs): ifrank==0: start_dataload=time() fori, (images, labels) inenumerate(train_loader): # distribution of images and labels to all GPUs images=images.to(gpu, non_blocking=True) labels=labels.to(gpu, non_blocking=True) ifrank==0: stop_dataload=time() ifrank==0: start_training=time() # forward pass outputs=ddp_model(images) loss=criterion(outputs, labels) # backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() ifrank==0: stop_training=time() if (i+1) %10==0andrank==0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch+1, args.epochs, i+1, total_step, loss.item(), (stop_dataload-start_dataload)*1000, (stop_training-start_training)*1000)) ifrank==0: start_dataload=time() #Save checkpoint at every end of epoch ifrank==0: torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1)) ifrank==0: print(">>> Training complete in: "+str(datetime.now() -start)) if__name__=='__main__': main()
代碼將數(shù)據(jù)和模型分割到多個(gè)gpu上,并以分布式的方式更新模型。
代碼解釋
train(args, ddp_arg)有兩個(gè)參數(shù),args和ddp_arg,其中args是傳遞給腳本的命令行參數(shù),ddp_arg包含分布式訓(xùn)練相關(guān)參數(shù)。
rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg:解包ddp_arg中分布式訓(xùn)練相關(guān)參數(shù)。
如果rank為0,則打印當(dāng)前使用的gpu數(shù)量和主節(jié)點(diǎn)IP地址信息。
dist.init_process_group(backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank) :使用NCCL后端初始化分布式進(jìn)程組。
torch.cuda.set_device(local_rank):為這個(gè)進(jìn)程選擇指定的GPU。
model = torchvision.models. ResNet50 (pretrained=False).to(gpu):從torchvision模型中加載ResNet50模型,并將其移動(dòng)到指定的gpu。
ddp_model = DistributedDataParallel(model, device_ids=[local_rank]):將模型包裝在DistributedDataParallel模塊中,也就是說(shuō)這樣我們就可以進(jìn)行分布式訓(xùn)練了
加載CIFAR-10數(shù)據(jù)集并應(yīng)用數(shù)據(jù)增強(qiáng)轉(zhuǎn)換。
train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank):創(chuàng)建一個(gè)DistributedSampler對(duì)象,將數(shù)據(jù)集分割到多個(gè)gpu上。
train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler):創(chuàng)建一個(gè)DataLoader對(duì)象,數(shù)據(jù)將批量加載到模型中,這與我們平常訓(xùn)練的步驟是一致的只不過(guò)是增加了一個(gè)分布式的數(shù)據(jù)采樣DistributedSampler
為指定的epoch數(shù)訓(xùn)練模型,以分布式的方式使用optimizer.step()更新權(quán)重。
rank0在每個(gè)輪次結(jié)束時(shí)保存一個(gè)檢查點(diǎn)。
rank0每10個(gè)批次顯示損失和訓(xùn)練時(shí)間。
結(jié)束訓(xùn)練時(shí)打印訓(xùn)練模型所花費(fèi)的總時(shí)間也是在rank0上。
代碼測(cè)試
在使用1個(gè)節(jié)點(diǎn)1/2/3/4個(gè)gpu, 2個(gè)節(jié)點(diǎn)6/8個(gè)gpu,每個(gè)節(jié)點(diǎn)3/4個(gè)gpu上進(jìn)行了訓(xùn)練Cifar10上的Resnet50的測(cè)試如下圖所示,每次測(cè)試的批處理大小保持不變。完成每項(xiàng)測(cè)試所花費(fèi)的時(shí)間以秒為單位記錄。隨著使用的gpu數(shù)量的增加,完成測(cè)試所需的時(shí)間會(huì)減少。當(dāng)使用8個(gè)gpu時(shí),需要320秒才能完成,這是記錄中最快的時(shí)間。這是肯定的,但是我們可以看到訓(xùn)練的速度并沒(méi)有像GPU數(shù)量增長(zhǎng)呈現(xiàn)線性的增長(zhǎng),這可能是因?yàn)镽esnet50算是一個(gè)比較小的模型了,并不需要進(jìn)行并行化訓(xùn)練。
在多個(gè)gpu上使用數(shù)據(jù)并行可以顯著減少在給定數(shù)據(jù)集上訓(xùn)練深度神經(jīng)網(wǎng)絡(luò)(DNN)所需的時(shí)間。隨著gpu數(shù)量的增加,完成訓(xùn)練過(guò)程所需的時(shí)間減少,這表明DNN可以更有效地并行訓(xùn)練。
這種方法在處理大型數(shù)據(jù)集或復(fù)雜的DNN架構(gòu)時(shí)特別有用。通過(guò)利用多個(gè)gpu,可以加快訓(xùn)練過(guò)程,實(shí)現(xiàn)更快的模型迭代和實(shí)驗(yàn)。但是需要注意的是,通過(guò)Data Parallelism實(shí)現(xiàn)的性能提升可能會(huì)受到通信開(kāi)銷(xiāo)和GPU內(nèi)存限制等因素的限制,需要仔細(xì)調(diào)優(yōu)才能獲得最佳結(jié)果。
以上就是PyTorch并行訓(xùn)練DistributedDataParallel完整demo的詳細(xì)內(nèi)容,更多關(guān)于PyTorch并行訓(xùn)練的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
通過(guò)Python繪制中國(guó)結(jié)的示例代碼
再過(guò)不久就要到新年了,所以這篇文章將為大家介紹一下如何通過(guò)Python代碼繪制一個(gè)中國(guó)結(jié),文中的示例代碼講解詳細(xì),感興趣的可以動(dòng)手試一試2022-01-01Python異步發(fā)送日志到遠(yuǎn)程服務(wù)器詳情
這篇文章主要介紹了Python異步發(fā)送日志到遠(yuǎn)程服務(wù)器詳情,文章通過(guò)簡(jiǎn)單輸出到cmd和文件中的代碼展開(kāi)詳情,需要的朋友可以參考一下2022-07-07python詞云庫(kù)wordCloud使用方法詳解(解決中文亂碼)
這篇文章主要介紹了python詞云庫(kù)wordCloud使用方法詳解(解決中文亂碼),需要的朋友可以參考下2020-02-02