DCGAN debugging. Getting just garbage

So I solved this issue a while ago, but forgot to post an answer on stack overflow. So I will simply post my code here which should work probably pretty good.
Some disclaimer:

  • I am not quite sure if it works since I did this a year ago
  • its for 128x128px Images MNIST
  • It’s not a vanilla GAN I used various optimization techniques
  • If you want to use it you need to change various details, such as the training dataset

Resources:

  • Multi-Scale Gradients
  • Instance Noise
  • Various tricks I used
  • More tricks

    import torch
    from torch.autograd import Variable
    import torch.nn as nn
    import torch.nn.functional as F
    import torchvision
    import torchvision.transforms as transforms
    from torch.utils.data import DataLoader
    
    import pytorch_lightning as pl
    from pytorch_lightning import loggers
    
    from numpy.random import choice
    
    import os
    from pathlib import Path
    import shutil
    
    from collections import OrderedDict
    
    # custom weights initialization called on netG and netD
    def weights_init(m):
        classname = m.__class__.__name__
        if classname.find('Conv') != -1:
            nn.init.normal_(m.weight.data, 0.0, 0.02)
        elif classname.find('BatchNorm') != -1:
            nn.init.normal_(m.weight.data, 1.0, 0.02)
            nn.init.constant_(m.bias.data, 0)
    
    # randomly flip some labels
    def noisy_labels(y, p_flip=0.05):  # # flip labels with 5% probability
        # determine the number of labels to flip
        n_select = int(p_flip * y.shape[0])
        # choose labels to flip
        flip_ix = choice([i for i in range(y.shape[0])], size=n_select)
        # invert the labels in place
        y[flip_ix] = 1 - y[flip_ix]
        return y
    
    class AddGaussianNoise(object):
        def __init__(self, mean=0.0, std=0.1):
            self.std = std
            self.mean = mean
    
        def __call__(self, tensor):
            tensor = tensor.cuda()
            return tensor + (torch.randn(tensor.size()) * self.std + self.mean).cuda()
    
        def __repr__(self):
            return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)
    
    def resize2d(img, size):
        return (F.adaptive_avg_pool2d(img, size).data).cuda()
    
    def get_valid_labels(img):
        return ((0.8 - 1.1) * torch.rand(img.shape[0], 1, 1, 1) + 1.1).cuda()  # soft labels
    
    def get_unvalid_labels(img):
        return (noisy_labels((0.0 - 0.3) * torch.rand(img.shape[0], 1, 1, 1) + 0.3)).cuda()  # soft labels
    
    class Generator(pl.LightningModule):
        def __init__(self, ngf, nc, latent_dim):
            super(Generator, self).__init__()
            self.ngf = ngf
            self.latent_dim = latent_dim
            self.nc = nc
    
            self.fc0 = nn.Sequential(
                # input is Z, going into a convolution
                nn.utils.spectral_norm(nn.ConvTranspose2d(latent_dim, ngf * 16, 4, 1, 0, bias=False)),
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ngf * 16)
            )
    
            self.fc1 = nn.Sequential(
                # state size. (ngf*8) x 4 x 4
                nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 16, ngf * 8, 4, 2, 1, bias=False)),
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ngf * 8)
            )
    
            self.fc2 = nn.Sequential(
                # state size. (ngf*4) x 8 x 8
                nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False)),
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ngf * 4)
            )
    
            self.fc3 = nn.Sequential(
                # state size. (ngf*2) x 16 x 16
                nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 2, 1, bias=False)),
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ngf * 2)
            )
    
            self.fc4 = nn.Sequential(
                # state size. (ngf) x 32 x 32
                nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 2, ngf, 4, 2, 1, bias=False)),
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ngf)
            )
    
            self.fc5 = nn.Sequential(
                # state size. (nc) x 64 x 64
                nn.utils.spectral_norm(nn.ConvTranspose2d(ngf, nc, 4, 2, 1, bias=False)),
                nn.Tanh()
            )
    
            # state size. (nc) x 128 x 128
    
            # For Multi-Scale Gradient
            # Converting the intermediate layers into images
            self.fc0_r = nn.Conv2d(ngf * 16, self.nc, 1)
            self.fc1_r = nn.Conv2d(ngf * 8, self.nc, 1)
            self.fc2_r = nn.Conv2d(ngf * 4, self.nc, 1)
            self.fc3_r = nn.Conv2d(ngf * 2, self.nc, 1)
            self.fc4_r = nn.Conv2d(ngf, self.nc, 1)
    
        def forward(self, input):
            x_0 = self.fc0(input)
            x_1 = self.fc1(x_0)
            x_2 = self.fc2(x_1)
            x_3 = self.fc3(x_2)
            x_4 = self.fc4(x_3)
            x_5 = self.fc5(x_4)
    
            # For Multi-Scale Gradient
            # Converting the intermediate layers into images
            x_0_r = self.fc0_r(x_0)
            x_1_r = self.fc1_r(x_1)
            x_2_r = self.fc2_r(x_2)
            x_3_r = self.fc3_r(x_3)
            x_4_r = self.fc4_r(x_4)
    
            return x_5, x_0_r, x_1_r, x_2_r, x_3_r, x_4_r
    
    class Discriminator(pl.LightningModule):
        def __init__(self, ndf, nc):
            super(Discriminator, self).__init__()
            self.nc = nc
            self.ndf = ndf
    
            self.fc0 = nn.Sequential(
                # input is (nc) x 128 x 128
                nn.utils.spectral_norm(nn.Conv2d(nc, ndf, 4, 2, 1, bias=False)),
                nn.LeakyReLU(0.2, inplace=True)
            )
    
            self.fc1 = nn.Sequential(
                # state size. (ndf) x 64 x 64
                nn.utils.spectral_norm(nn.Conv2d(ndf + nc, ndf * 2, 4, 2, 1, bias=False)),
                # "+ nc" because of multi scale gradient
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ndf * 2)
            )
    
            self.fc2 = nn.Sequential(
                # state size. (ndf*2) x 32 x 32
                nn.utils.spectral_norm(nn.Conv2d(ndf * 2 + nc, ndf * 4, 4, 2, 1, bias=False)),
                # "+ nc" because of multi scale gradient
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ndf * 4)
            )
    
            self.fc3 = nn.Sequential(
                # state size. (ndf*4) x 16 x 16e
                nn.utils.spectral_norm(nn.Conv2d(ndf * 4 + nc, ndf * 8, 4, 2, 1, bias=False)),
                # "+ nc" because of multi scale gradient
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ndf * 8),
            )
    
            self.fc4 = nn.Sequential(
                # state size. (ndf*8) x 8 x 8
                nn.utils.spectral_norm(nn.Conv2d(ndf * 8 + nc, ndf * 16, 4, 2, 1, bias=False)),
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ndf * 16)
            )
    
            self.fc5 = nn.Sequential(
                # state size. (ndf*8) x 4 x 4
                nn.utils.spectral_norm(nn.Conv2d(ndf * 16 + nc, 1, 4, 1, 0, bias=False)),
                nn.Sigmoid()
            )
    
            # state size. 1 x 1 x 1
    
        def forward(self, input, detach_or_not):
            # When we train i ncombination with generator we use multi scale gradient.
            x, x_0_r, x_1_r, x_2_r, x_3_r, x_4_r = input
            if detach_or_not:
                x = x.detach()
    
            x_0 = self.fc0(x)
    
            x_0 = torch.cat((x_0, x_4_r), dim=1)  # Concat Multi-Scale Gradient
            x_1 = self.fc1(x_0)
    
            x_1 = torch.cat((x_1, x_3_r), dim=1)  # Concat Multi-Scale Gradient
            x_2 = self.fc2(x_1)
    
            x_2 = torch.cat((x_2, x_2_r), dim=1)  # Concat Multi-Scale Gradient
            x_3 = self.fc3(x_2)
    
            x_3 = torch.cat((x_3, x_1_r), dim=1)  # Concat Multi-Scale Gradient
            x_4 = self.fc4(x_3)
    
            x_4 = torch.cat((x_4, x_0_r), dim=1)  # Concat Multi-Scale Gradient
            x_5 = self.fc5(x_4)
    
            return x_5
    
    class DCGAN(pl.LightningModule):
    
        def __init__(self, hparams, checkpoint_folder, experiment_name):
            super().__init__()
            self.hparams = hparams
            self.checkpoint_folder = checkpoint_folder
            self.experiment_name = experiment_name
    
            # networks
            self.generator = Generator(ngf=hparams.ngf, nc=hparams.nc, latent_dim=hparams.latent_dim)
            self.discriminator = Discriminator(ndf=hparams.ndf, nc=hparams.nc)
            self.generator.apply(weights_init)
            self.discriminator.apply(weights_init)
    
            # cache for generated images
            self.generated_imgs = None
            self.last_imgs = None
    
            # For experience replay
            self.exp_replay_dis = torch.tensor([])
    
    
        def forward(self, z):
            return self.generator(z)
    
        def adversarial_loss(self, y_hat, y):
            return F.binary_cross_entropy(y_hat, y)
    
        def training_step(self, batch, batch_nb, optimizer_idx):
            # For adding Instance noise for more visit: https://www.inference.vc/instance-noise-a-trick-for-stabilising-gan-training/
            std_gaussian = max(0, self.hparams.level_of_noise - (
                    (self.hparams.level_of_noise * 2) * (self.current_epoch / self.hparams.epochs)))
            AddGaussianNoiseInst = AddGaussianNoise(std=std_gaussian)  # the noise decays over time
    
            imgs, _ = batch
            imgs = AddGaussianNoiseInst(imgs)  # Adding instance noise to real images
            self.last_imgs = imgs
    
            # train generator
            if optimizer_idx == 0:
                # sample noise
                z = torch.randn(imgs.shape[0], self.hparams.latent_dim, 1, 1).cuda()
    
                # generate images
                self.generated_imgs = self(z)
    
                # ground truth result (ie: all fake)
                g_loss = self.adversarial_loss(self.discriminator(self.generated_imgs, False), get_valid_labels(self.generated_imgs[0]))  # adversarial loss is binary cross-entropy; [0] is the image of the last layer
    
                tqdm_dict = {'g_loss': g_loss}
                log = {'g_loss': g_loss, "std_gaussian": std_gaussian}
                output = OrderedDict({
                    'loss': g_loss,
                    'progress_bar': tqdm_dict,
                    'log': log
                })
                return output
    
            # train discriminator
            if optimizer_idx == 1:
                # Measure discriminator's ability to classify real from generated samples
                # how well can it label as real?
                real_loss = self.adversarial_loss(
                    self.discriminator([imgs, resize2d(imgs, 4), resize2d(imgs, 8), resize2d(imgs, 16), resize2d(imgs, 32), resize2d(imgs, 64)],
                                       False), get_valid_labels(imgs))
    
                fake_loss = self.adversarial_loss(self.discriminator(self.generated_imgs, True), get_unvalid_labels(
                    self.generated_imgs[0]))  # how well can it label as fake?; [0] is the image of the last layer
    
                # discriminator loss is the average of these
                d_loss = (real_loss + fake_loss) / 2
    
                tqdm_dict = {'d_loss': d_loss}
                log = {'d_loss': d_loss, "std_gaussian": std_gaussian}
                output = OrderedDict({
                    'loss': d_loss,
                    'progress_bar': tqdm_dict,
                    'log': log
                })
                return output
    
        def configure_optimizers(self):
            lr_gen = self.hparams.lr_gen
            lr_dis = self.hparams.lr_dis
            b1 = self.hparams.b1
            b2 = self.hparams.b2
    
            opt_g = torch.optim.Adam(self.generator.parameters(), lr=lr_gen, betas=(b1, b2))
            opt_d = torch.optim.Adam(self.discriminator.parameters(), lr=lr_dis, betas=(b1, b2))
            return [opt_g, opt_d], []
    
        def backward(self, trainer, loss, optimizer, optimizer_idx: int) -> None:
            loss.backward(retain_graph=True)
    
        def train_dataloader(self):
            # transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)),
            #                                 transforms.ToTensor(),
            #                                 transforms.Normalize([0.5], [0.5])])
            # dataset = torchvision.datasets.MNIST(os.getcwd(), train=False, download=True, transform=transform)
            # return DataLoader(dataset, batch_size=self.hparams.batch_size)
            # transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)),
            #                                 transforms.ToTensor(),
            #                                 transforms.Normalize([0.5], [0.5])
            #                                 ])
    
            # train_dataset = torchvision.datasets.ImageFolder(
            #     root="./drive/My Drive/datasets/flower_dataset/",
            #     # root="./drive/My Drive/datasets/ghibli_dataset_small_overfit/",
            #     transform=transform
            # )
            # return DataLoader(train_dataset, num_workers=self.hparams.num_workers, shuffle=True,
            #                   batch_size=self.hparams.batch_size)
    
            transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)),
                                            transforms.ToTensor(),
                                            transforms.Normalize([0.5], [0.5])
                                            ])
            train_dataset = torchvision.datasets.ImageFolder(
                root="ghibli_dataset_small_overfit/",
                transform=transform
            )
            return DataLoader(train_dataset, num_workers=self.hparams.num_workers, shuffle=True,
                              batch_size=self.hparams.batch_size)
    
        def on_epoch_end(self):
            z = torch.randn(4, self.hparams.latent_dim, 1, 1).cuda()
            # match gpu device (or keep as cpu)
            if self.on_gpu:
                z = z.cuda(self.last_imgs.device.index)
    
            # log sampled images
            sample_imgs = self.generator(z)[0]
            torchvision.utils.save_image(sample_imgs, f'generated_images_epoch{self.current_epoch}.png')
    
            # save model
            if self.current_epoch % self.hparams.save_model_every_epoch == 0:
                trainer.save_checkpoint(
                    self.checkpoint_folder + "/" + self.experiment_name + "_epoch_" + str(self.current_epoch) + ".ckpt")
    
    from argparse import Namespace
    
    args = {
        'batch_size': 128, # batch size
        'lr_gen': 0.0003,  # TTUR;learnin rate of both networks; tested value: 0.0002
        'lr_dis': 0.0003,  # TTUR;learnin rate of both networks; tested value: 0.0002
        'b1': 0.5,  # Momentum for adam; tested value(dcgan paper): 0.5
        'b2': 0.999,  # Momentum for adam; tested value(dcgan paper): 0.999
        'latent_dim': 256,  # tested value which worked(in V4_1): 100
        'nc': 3,  # number of color channels
        'ndf': 8,  # number of discriminator features
        'ngf': 8,  # number of generator features
        'epochs': 4,  # the maxima lamount of epochs the algorith should run
        'save_model_every_epoch': 1,  # how often we save our model
        'image_size': 128, # size of the image
        'num_workers': 3,
        'level_of_noise': 0.1,  # how much instance noise we introduce(std; tested value: 0.15 and 0.1
        'experience_save_per_batch': 1,  # this value should be very low; tested value which works: 1
        'experience_batch_size': 50  # this value shouldnt be too high; tested value which works: 50
    }
    hparams = Namespace(**args)
    
    # Parameters
    experiment_name = "DCGAN_6_2_MNIST_128px"
    dataset_name = "mnist"
    checkpoint_folder = "DCGAN/"
    tags = ["DCGAN", "128x128"]
    dirpath = Path(checkpoint_folder)
    
    # defining net
    net = DCGAN(hparams, checkpoint_folder, experiment_name)
    
    torch.autograd.set_detect_anomaly(True)
    trainer = pl.Trainer( # resume_from_checkpoint="DCGAN_V4_2_GHIBLI_epoch_999.ckpt",
        max_epochs=args["epochs"],
        gpus=1
    )
    
    trainer.fit(net)

Leave a Comment

File not found.