在PyTorch中自定義fit()函數(shù)中的操作代碼
1、緒論
當在進行有監(jiān)督學習時,我們可以使用fit()函數(shù)對模型進行訓練,通過迭代優(yōu)化模型的參數(shù),使其能夠更好地擬合訓練數(shù)據(jù)。
但當我們希望控制每一個小細節(jié)時,就可以完全從頭開始編寫自己的訓練循環(huán)。此時就需要一個自定義的訓練算法,但是如果我們同時又想受益于fit()的便捷功能,如回調(diào)、內(nèi)置分布支持或步驟融合,該怎么辦呢?
Keras的一個核心原則是復雜性的漸進披露。我們總是能夠逐漸進入更底層的工作流程。如果高級功能不完全符合我們的要求,我們就能夠通過自定義fit()在保留相應數(shù)量高級便利性的同時,對小細節(jié)獲得更多的控制。
當我們需要自定義fit()的行為時,你應該重寫Model類的訓練步驟函數(shù)。這是fit()函數(shù)為每一批數(shù)據(jù)調(diào)用的函數(shù)。然后,你就可以像平常一樣調(diào)用fit()——而它將會運行你自己的學習算法。
2、運行準備
2.1 設置
運行前請按照如下進行設置
import os # This guide can only be run with the torch backend. os.environ["KERAS_BACKEND"] = "torch" import torch import keras from keras import layers import numpy as np
2.2 示例代碼
一下我們從一個簡單的例子開始感受在PyTorch中自定義fit()函數(shù)中的操作的方法。
首先需要創(chuàng)建一個新的類,它繼承自keras.Model。
建立這個新類后,只需要重寫train_step(self, data)
方法。
運行上述方法將返回一個字典,該字典將指標名稱(包括損失)映射到它們的當前值。
輸入?yún)?shù)data
是傳遞給fit
作為訓練數(shù)據(jù)的內(nèi)容:
- 如果通過調(diào)用
fit(x, y, ...)
傳遞NumPy數(shù)組,那么data
將是元組(x, y)
- 如果通過調(diào)用
fit(dataset, ...)
傳遞一個torch.utils.data.DataLoader
或tf.data.Dataset
,那么data
將是數(shù)據(jù)集在每個批次中生成的內(nèi)容。
在train_step()
方法的主體中,我們實現(xiàn)了一個常規(guī)的訓練更新。重要的是,我們通過self.compute_loss()
計算損失,該方法封裝了在compile()
方法中傳遞的損失函數(shù)。
類似地,我們對self.metrics
中的指標調(diào)用metric.update_state(y, y_pred)
,以更新在compile()
方法中傳遞的指標的狀態(tài),并在最后查詢self.metrics
以檢索它們的當前值。
class CustomModel(keras.Model): def train_step(self, data): # Unpack the data. Its structure depends on your model and # on what you pass to `fit()`. x, y = data # Call torch.nn.Module.zero_grad() to clear the leftover gradients # for the weights from the previous train step. self.zero_grad() # Compute loss y_pred = self(x, training=True) # Forward pass loss = self.compute_loss(y=y, y_pred=y_pred) # Call torch.Tensor.backward() on the loss to compute gradients # for the weights. loss.backward() trainable_weights = [v for v in self.trainable_weights] gradients = [v.value.grad for v in trainable_weights] # Update weights with torch.no_grad(): self.optimizer.apply(gradients, trainable_weights) # Update metrics (includes the metric that tracks the loss) for metric in self.metrics: if metric.name == "loss": metric.update_state(loss) else: metric.update_state(y, y_pred) # Return a dict mapping metric names to current value # Note that it will include the loss (tracked in self.metrics). return {m.name: m.result() for m in self.metrics}
運行代碼,輸出如下所示
# Construct and compile an instance of CustomModel inputs = keras.Input(shape=(32,)) outputs = keras.layers.Dense(1)(inputs) model = CustomModel(inputs, outputs) model.compile(optimizer="adam", loss="mse", metrics=["mae"]) # Just use `fit` as usual x = np.random.random((1000, 32)) y = np.random.random((1000, 1)) model.fit(x, y, epochs=3)
Epoch 1/3 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 551us/step - mae: 0.6533 - loss: 0.6036 Epoch 2/3 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 522us/step - mae: 0.4013 - loss: 0.2522 Epoch 3/3 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 516us/step - mae: 0.3813 - loss: 0.2256 <keras.src.callbacks.history.History at 0x299b7baf0>
3、底層操作
當然,實際操作過程中也可以在compile()方法中不傳遞損失函數(shù),而是在train_step中手動處理所有事情。同樣地,對于指標也是如此。
下面是一個更底層級別操作的例子,它僅使用compile()來配置優(yōu)化器:
我們首先創(chuàng)建Metric實例來跟蹤我們的損失和MAE分數(shù)(在__init__()方法中)。
通過一個自定義的train_step(),更新這些指標的狀態(tài)(通過在其上調(diào)用update_state()),然后查詢它們(通過result())以返回它們的當前平均值,這些值將由進度條顯示并傳遞給任何回調(diào)。
請注意,運行過程中需要在每個epoch之間調(diào)用reset_states()來重置指標!否則,調(diào)用result()將返回從訓練開始以來的平均值,而通常是使用每個epoch的平均值。框架可以為我們做這件事:只需將你想要重置的任何指標列在模型的metrics屬性中即可。模型將在每個fit() epoch的開始或evaluate()調(diào)用的開始時調(diào)用reset_states()來重置這些對象的狀態(tài)。
class CustomModel(keras.Model): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.loss_tracker = keras.metrics.Mean(name="loss") self.mae_metric = keras.metrics.MeanAbsoluteError(name="mae") self.loss_fn = keras.losses.MeanSquaredError() def train_step(self, data): x, y = data # Call torch.nn.Module.zero_grad() to clear the leftover gradients # for the weights from the previous train step. self.zero_grad() # Compute loss y_pred = self(x, training=True) # Forward pass loss = self.loss_fn(y, y_pred) # Call torch.Tensor.backward() on the loss to compute gradients # for the weights. loss.backward() trainable_weights = [v for v in self.trainable_weights] gradients = [v.value.grad for v in trainable_weights] # Update weights with torch.no_grad(): self.optimizer.apply(gradients, trainable_weights) # Compute our own metrics self.loss_tracker.update_state(loss) self.mae_metric.update_state(y, y_pred) return { "loss": self.loss_tracker.result(), "mae": self.mae_metric.result(), } @property def metrics(self): # We list our `Metric` objects here so that `reset_states()` can be # called automatically at the start of each epoch # or at the start of `evaluate()`. return [self.loss_tracker, self.mae_metric] # Construct an instance of CustomModel inputs = keras.Input(shape=(32,)) outputs = keras.layers.Dense(1)(inputs) model = CustomModel(inputs, outputs) # We don't pass a loss or metrics here. model.compile(optimizer="adam") # Just use `fit` as usual -- you can use callbacks, etc. x = np.random.random((1000, 32)) y = np.random.random((1000, 1)) model.fit(x, y, epochs=5)
Epoch 1/5 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 461us/step - loss: 0.2470 - mae: 0.3953 Epoch 2/5 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 456us/step - loss: 0.2386 - mae: 0.3910 Epoch 3/5 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 456us/step - loss: 0.2359 - mae: 0.3901 Epoch 4/5 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 480us/step - loss: 0.2013 - mae: 0.3572 Epoch 5/5 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 463us/step - loss: 0.1903 - mae: 0.3480 <keras.src.callbacks.history.History at 0x299c5eec0>
3.1 支持樣本權(quán)重和分類權(quán)重
在文章開始的基本示例沒有提到樣本權(quán)重,那么如果想要支持fit()方法的sample_weight和class_weight參數(shù),可以按照以下步驟進行:
從data參數(shù)中解包sample_weight
將其傳遞給compute_loss和update_state(當然,如果你不是依賴于compile()方法來設置損失和指標,也可以手動應用它)
class CustomModel(keras.Model): def train_step(self, data): # Unpack the data. Its structure depends on your model and # on what you pass to `fit()`. if len(data) == 3: x, y, sample_weight = data else: sample_weight = None x, y = data # Call torch.nn.Module.zero_grad() to clear the leftover gradients # for the weights from the previous train step. self.zero_grad() # Compute loss y_pred = self(x, training=True) # Forward pass loss = self.compute_loss( y=y, y_pred=y_pred, sample_weight=sample_weight, ) # Call torch.Tensor.backward() on the loss to compute gradients # for the weights. loss.backward() trainable_weights = [v for v in self.trainable_weights] gradients = [v.value.grad for v in trainable_weights] # Update weights with torch.no_grad(): self.optimizer.apply(gradients, trainable_weights) # Update metrics (includes the metric that tracks the loss) for metric in self.metrics: if metric.name == "loss": metric.update_state(loss) else: metric.update_state(y, y_pred, sample_weight=sample_weight) # Return a dict mapping metric names to current value # Note that it will include the loss (tracked in self.metrics). return {m.name: m.result() for m in self.metrics} # Construct and compile an instance of CustomModel inputs = keras.Input(shape=(32,)) outputs = keras.layers.Dense(1)(inputs) model = CustomModel(inputs, outputs) model.compile(optimizer="adam", loss="mse", metrics=["mae"]) # You can now use sample_weight argument x = np.random.random((1000, 32)) y = np.random.random((1000, 1)) sw = np.random.random((1000, 1)) model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 499us/step - mae: 1.4332 - loss: 1.0769 Epoch 2/3 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 520us/step - mae: 0.9250 - loss: 0.5614 Epoch 3/3 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 502us/step - mae: 0.6069 - loss: 0.2653 <keras.src.callbacks.history.History at 0x299c82bf0>
3.2 提供自定義的評估步驟
如果想要在調(diào)用model.evaluate()
時自定義評估步驟,我們怎么做呢?那么我們將以完全相同的方式重寫test_step
。
class CustomModel(keras.Model): def test_step(self, data): # Unpack the data x, y = data # Compute predictions y_pred = self(x, training=False) # Updates the metrics tracking the loss loss = self.compute_loss(y=y, y_pred=y_pred) # Update the metrics. for metric in self.metrics: if metric.name == "loss": metric.update_state(loss) else: metric.update_state(y, y_pred) # Return a dict mapping metric names to current value. # Note that it will include the loss (tracked in self.metrics). return {m.name: m.result() for m in self.metrics} # Construct an instance of CustomModel inputs = keras.Input(shape=(32,)) outputs = keras.layers.Dense(1)(inputs) model = CustomModel(inputs, outputs) model.compile(loss="mse", metrics=["mae"]) # Evaluate with our custom test_step x = np.random.random((1000, 32)) y = np.random.random((1000, 1)) model.evaluate(x, y)
32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 325us/step - mae: 0.4427 - loss: 0.2993 [0.2726495862007141, 0.42286917567253113]
4、完整的應用示例
為了整合前面所學的知識,我們將通過一個端到端的GAN(生成對抗網(wǎng)絡)示例來演示在PyTorch中自定義fit()函數(shù)中的操作。
在這個例子中,我們將考慮:
- 一個用于生成28x28x1圖像的生成器網(wǎng)絡。
- 一個用于將28x28x1圖像分類為兩個類別(“假”和“真”)的判別器網(wǎng)絡。
- 每個網(wǎng)絡都有一個優(yōu)化器。
- 一個用于訓練判別器的損失函數(shù)。
首先,我們需要定義生成器和判別器的網(wǎng)絡結(jié)構(gòu)。這里為了簡潔,我們不會詳細寫出每個層的定義,但你可以想象生成器網(wǎng)絡將噪聲作為輸入并輸出圖像,而判別器網(wǎng)絡將圖像作為輸入并輸出一個概率值,該值表示輸入圖像是真實的(來自訓練集)還是假的(由生成器生成)。
下面是GAN訓練的大致流程:
初始化生成器和判別器網(wǎng)絡:
- 定義生成器和判別器的模型結(jié)構(gòu)。
- 編譯判別器網(wǎng)絡,并指定一個損失函數(shù)(如二元交叉熵)和優(yōu)化器(如Adam)。
訓練判別器:
- 對于一批真實圖像,計算判別器的損失(使用真實標簽1)。
- 通過生成器生成一批假圖像,并計算判別器對假圖像的損失(使用假標簽0)。
- 將兩個損失相加,并對判別器執(zhí)行一次梯度下降更新。
訓練生成器:
- 生成一批假圖像。
- 使用判別器對這些假圖像進行預測,得到概率值。
- 使用判別器的預測作為標簽(我們想要生成器生成的圖像被判別器認為是真實的),計算生成器的損失(這通常是通過將判別器的預測傳遞給某種損失函數(shù),如二元交叉熵或均方誤差,來實現(xiàn)的)。
- 使用計算出的損失對生成器執(zhí)行一次梯度下降更新。
- 注意:在訓練生成器時,我們需要將判別器的權(quán)重設置為不可訓練(因為我們只希望更新生成器的權(quán)重)。這可以通過在訓練生成器之前調(diào)用
discriminator.trainable = False
來實現(xiàn)。
循環(huán)迭代:
- 重復步驟2和3多次,以訓練GAN。
在測試集上評估GAN:
- 使用訓練好的生成器生成圖像,并可視化這些圖像以評估GAN的性能。
# Create the discriminator discriminator = keras.Sequential( [ keras.Input(shape=(28, 28, 1)), layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"), layers.LeakyReLU(negative_slope=0.2), layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"), layers.LeakyReLU(negative_slope=0.2), layers.GlobalMaxPooling2D(), layers.Dense(1), ], name="discriminator", ) # Create the generator latent_dim = 128 generator = keras.Sequential( [ keras.Input(shape=(latent_dim,)), # We want to generate 128 coefficients to reshape into a 7x7x128 map layers.Dense(7 * 7 * 128), layers.LeakyReLU(negative_slope=0.2), layers.Reshape((7, 7, 128)), layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"), layers.LeakyReLU(negative_slope=0.2), layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"), layers.LeakyReLU(negative_slope=0.2), layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"), ], name="generator", )
下面是一個功能完整的GAN類,它重寫了compile()
方法以使用自己的簽名,并在train_step
中以17行代碼實現(xiàn)了整個GAN算法:
class GAN(keras.Model): def __init__(self, discriminator, generator, latent_dim): super().__init__() self.discriminator = discriminator self.generator = generator self.latent_dim = latent_dim self.d_loss_tracker = keras.metrics.Mean(name="d_loss") self.g_loss_tracker = keras.metrics.Mean(name="g_loss") self.seed_generator = keras.random.SeedGenerator(1337) self.built = True @property def metrics(self): return [self.d_loss_tracker, self.g_loss_tracker] def compile(self, d_optimizer, g_optimizer, loss_fn): super().compile() self.d_optimizer = d_optimizer self.g_optimizer = g_optimizer self.loss_fn = loss_fn def train_step(self, real_images): device = "cuda" if torch.cuda.is_available() else "cpu" if isinstance(real_images, tuple): real_images = real_images[0] # Sample random points in the latent space batch_size = real_images.shape[0] random_latent_vectors = keras.random.normal( shape=(batch_size, self.latent_dim), seed=self.seed_generator ) # Decode them to fake images generated_images = self.generator(random_latent_vectors) # Combine them with real images real_images = torch.tensor(real_images, device=device) combined_images = torch.concat([generated_images, real_images], axis=0) # Assemble labels discriminating real from fake images labels = torch.concat( [ torch.ones((batch_size, 1), device=device), torch.zeros((batch_size, 1), device=device), ], axis=0, ) # Add random noise to the labels - important trick! labels += 0.05 * keras.random.uniform(labels.shape, seed=self.seed_generator) # Train the discriminator self.zero_grad() predictions = self.discriminator(combined_images) d_loss = self.loss_fn(labels, predictions) d_loss.backward() grads = [v.value.grad for v in self.discriminator.trainable_weights] with torch.no_grad(): self.d_optimizer.apply(grads, self.discriminator.trainable_weights) # Sample random points in the latent space random_latent_vectors = keras.random.normal( shape=(batch_size, self.latent_dim), seed=self.seed_generator ) # Assemble labels that say "all real images" misleading_labels = torch.zeros((batch_size, 1), device=device) # Train the generator (note that we should *not* update the weights # of the discriminator)! self.zero_grad() predictions = self.discriminator(self.generator(random_latent_vectors)) g_loss = self.loss_fn(misleading_labels, predictions) grads = g_loss.backward() grads = [v.value.grad for v in self.generator.trainable_weights] with torch.no_grad(): self.g_optimizer.apply(grads, self.generator.trainable_weights) # Update metrics and return their value. self.d_loss_tracker.update_state(d_loss) self.g_loss_tracker.update_state(g_loss) return { "d_loss": self.d_loss_tracker.result(), "g_loss": self.g_loss_tracker.result(), }
以下是運行結(jié)果
# Prepare the dataset. We use both the training & test MNIST digits. batch_size = 64 (x_train, _), (x_test, _) = keras.datasets.mnist.load_data() all_digits = np.concatenate([x_train, x_test]) all_digits = all_digits.astype("float32") / 255.0 all_digits = np.reshape(all_digits, (-1, 28, 28, 1)) # Create a TensorDataset dataset = torch.utils.data.TensorDataset( torch.from_numpy(all_digits), torch.from_numpy(all_digits) ) # Create a DataLoader dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True) gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim) gan.compile( d_optimizer=keras.optimizers.Adam(learning_rate=0.0003), g_optimizer=keras.optimizers.Adam(learning_rate=0.0003), loss_fn=keras.losses.BinaryCrossentropy(from_logits=True), ) gan.fit(dataloader, epochs=1)
1094/1094 ━━━━━━━━━━━━━━━━━━━━ 1582s 1s/step - d_loss: 0.3581 - g_loss: 2.0571 <keras.src.callbacks.history.History at 0x299ce1840>
以上就是在PyTorch中自定義fit()函數(shù)中的操作代碼的詳細內(nèi)容,更多關于PyTorch自定義fit()的資料請關注腳本之家其它相關文章!
相關文章
Python中將字典轉(zhuǎn)換為XML以及相關的命名空間解析
這篇文章主要介紹了Python中將字典轉(zhuǎn)換為XML以及相關的命名空間解析,包括使用字典創(chuàng)建XML等知識,需要的朋友可以參考下2015-10-10Python中的type與isinstance的區(qū)別詳解
本文主要介紹了Python中的type與isinstance的區(qū)別詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-04-04基于Python2、Python3中reload()的不同用法介紹
今天小編就為大家分享一篇基于Python2、Python3中reload()的不同用法介紹,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-08-08