*#Image to Image Translation#*

In [1]:
!pip install tensorflow==2.15.0





[notice] A new release of pip is available: 23.0.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
!pip install tensorflow-probability==0.23.0




[notice] A new release of pip is available: 23.0.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


*1️⃣ Import Necessary Libraries*

1.*TensorFlow/Keras* for building and training deep learning models.

2.*NumPy* for numerical operations/n.

3.*Matplotlib* for visualizing the results.

4.*OpenCV/PIL* for image processing.

5.*TensorFlow* Addons for additional loss functions and layers (e.g., InstanceNorm).

6.*TensorFlow Datasets* (or custom loaders) to load CT & MRI images.

In [3]:
import tensorflow as tf
from tensorflow.keras import layers, Model
import numpy as np
import cv2
import pathlib
import matplotlib.pyplot as plt
import tensorflow_probability as tfp

tfd = tfp.distributions






2️⃣ *Configuration (Hyperparameters)*

This step defines the key settings for training.


Image size: The input image dimensions.

Latent dimension: The size of the encoded representation in the VAE.

Learning rate: Defines how fast the model updates weights.

Batch size & epochs: Training parameters.

In [4]:
IMAGE_SHAPE = (256, 256, 3)
LATENT_DIM = 256
FILTERS = 16
KERNEL = 3
LEARNING_RATE = 0.0001
WEIGHT_DECAY = 6e-8
BATCH_SIZE = 1
EPOCHS = 10

* ===================== Architecture Components =====================*

*3️⃣ Sampling Layer for Variational Autoencoder (VAE)*

The sampling layer is a crucial part of the VAE, where we sample from the latent space.

🔹 What We Need 

The encoder outputs μ (mean) and σ (log variance).

This layer samples from a normal distribution using the reparameterization trick.

In [None]:
class Sampling(layers.Layer):
 def call(self, inputs):
 z_mean, z_log_var = inputs
 batch = tf.shape(z_mean)[0]
 dim = tf.shape(z_mean)[1]
 epsilon = tf.random.normal(shape=(batch, dim))
 return z_mean + tf.exp(0.5 * z_log_var) * epsilon

It inherits from layers.Layer, meaning it's a custom layer that can be used like any other Keras layer.

📍inputs is a tuple containing:

 z_mean: The mean vector output from the encoder.

 z_log_var: The log variance vector output from the encoder.

📍This unpacks the inputs into two separate variables:

 z_mean: Represents the mean of the latent distribution.

 z_log_var: Represents the log variance of the latent distribution.

📍Why log variance?

Instead of using variance (σ²), we use log(σ²) because:

Numerical Stability: Log prevents exploding/vanishing gradients.

Easier Optimization: exp(log(σ²) / 2) makes variance always positive.


📍This determines:
 batch: The number of samples in the batch.
 dim: The size of the latent space (e.g., 128 if LATENT_DIM = 128).


📍Generates random values from a standard normal distribution (𝒩(0,1)).

epsilon.shape = (batch, dim), meaning every sample gets a unique noise vector.

Why do we need epsilon?

Instead of directly using z_mean, we add controlled randomness to ensure the VAE learns a smooth latent space.

* Reparameterization Trick*

 The latent space follows a normal distribution:

 𝑧 ∼ 𝒩(μ, σ²)

 A sample is drawn from this distribution:

 𝑧 = μ + σ * ε, where ε ∼ 𝒩(0,1).

 Since z_log_var = log(σ²), we compute:

 σ = exp(0.5 * log(σ²)) = exp(0.5 * z_log_var).





*🔹 Residual Block in Detail*

This function defines a residual block, a key building block inspired by ResNet (Residual Networks). Residual blocks help in training deep neural networks efficiently by allowing gradient flow through skip connections.

inputs: The input tensor (features from the previous layer).

filters: The number of filters (channels) in the convolution layers.

use_norm: Whether to apply Group Normalization (helps stabilize training)

Step 1️⃣: First Convolution + Activation :

 Applies a 2D Convolution (Conv2D) with filters filters.

 KERNEL (not defined in this function) should be the kernel size (e.g., 3x3 or 5x5).

 padding='same': Ensures the output size is the same as the input.

 Leaky ReLU activation (alpha=0.2):

 Helps avoid dead neurons (better than regular ReLU).
 
 Allows a small gradient flow for negative values.

Step 2️⃣: Group Normalization (Optional)

Step 3️⃣: Second Convolution + Activation

 Applies another Conv2D layer with the same number of filters.

 Uses LeakyReLU again for better gradient flow.

 Why two convolutions?

 The first convolution learns low-level features.
 
 The second convolution refines the learned features.


Step 5️⃣: Shortcut Connection (Skip Connection)

 The original input is passed through a 1x1 convolution.

 This matches the number of filters with the residual output.

 Why 1x1 convolution?

 Ensures the shortcut has the same number of filters as x.

 Helps in adjusting dimensions when the number of channels changes.

Step 6️⃣: Merge Shortcut & Residual Path

 Merges the shortcut and residual path using element-wise maximum.
 
 Why maximum() instead of addition (+)?

 Prevents negative values, which can help improve training stability.
 
 Focuses on stronger features from either the residual or shortcut path.










In [5]:
def residual_block(inputs, filters, use_norm=True):
 x = layers.Conv2D(filters, KERNEL, padding='same')(inputs)
 x = layers.LeakyReLU(alpha=0.2)(x)
 if use_norm:
 x = layers.GroupNormalization(groups=1)(x)
 x = layers.Conv2D(filters, KERNEL, padding='same')(x)
 x = layers.LeakyReLU(alpha=0.2)(x)
 if use_norm:
 x = layers.GroupNormalization(groups=1)(x)
 shortcut = layers.Conv2D(filters, 1, padding='same')(inputs)
 return layers.maximum([x, shortcut])

* 1️⃣ Encoder and Decoder Block*

*Encoder*

1️⃣ Pass Input Through Residual Block

Uses a residual block (previously defined).

Extracts important features while keeping the original information.

Helps prevent vanishing gradients and allows deep networks to train effectively.

2️⃣ Store the Skip Connection

The skip connection stores the output of the residual block.

It will be used later in the decoder to restore lost details.

3️⃣ Downsampling (Reduce Spatial Size)

Applies Max Pooling to reduce the spatial size (height & width).

Why?

Reduces computation.

Forces the model to learn high-level features instead of pixel details.

4️⃣ Return Downsampled Output & Skip Connection

Outputs:

x: The downsampled feature map.

skip: The saved feature map (used later in the decoder).


🔥 2️⃣ Decoder Block

1️⃣ Upsampling (Increase Spatial Size):

Uses Conv2DTranspose (transposed convolution, aka deconvolution).

Upsamples the input by a factor of 2 (increases spatial size).

Why?

Increases resolution to match the original input image.

2️⃣ Merge Skip Connection

Combines the upsampled output with the skip connection.

Uses element-wise maximum instead of addition.

Why?

Ensures the model focuses on the most important features.

Prevents loss of key information during encoding.

3️⃣ Apply a Residual Block

Uses a residual block to refine the upsampled output.

Helps recover lost details and maintain stability.

4️⃣ Return the Processed Output

Returns the final feature map after upsampling and refinement.




In [6]:
def encoder_block(inputs, filters, use_norm=True):
 x = residual_block(inputs, filters, use_norm)
 skip = x
 x = layers.MaxPooling2D()(x)
 return x, skip

def decoder_block(inputs, skip, filters, use_norm=True):
 x = layers.Conv2DTranspose(filters, KERNEL, strides=2, padding='same')(inputs)
 x = layers.maximum([x, skip])
 x = residual_block(x, filters, use_norm)
 return x


* ===================== Generator =====================*

This function builds the generator model for a Variational Autoencoder (VAE) with a CycleGAN architecture. The generator is responsible for converting a CT scan into an MRI image (or vice versa) by learning to map the two domains.

.

🛠️ What This Function Does?

It encodes an input image into a latent space.

It applies variational sampling to introduce a probabilistic distribution.

It decodes the latent representation back into an image.

Uses skip connections to retain features across layers.

1️⃣ Input Layer

Defines the input tensor with the given IMAGE_SHAPE (e.g., (256, 256, 3), for RGB images).

2️⃣ Encoder: Downsampling the Image

 Each encoder block halves the spatial resolution but doubles the filters.

 Stores skip connections (s1, s2, ..., s7) for later use in the decoder.

 After e7, the image is highly compressed into a feature map.

3️⃣ Latent Space (Variational Sampling)

 Flattens the feature map into a 1D vector.

 Uses two dense layers to compute:

 z_mean → The mean of the latent distribution.

 z_log_var → The logarithm of the variance.

 Uses reparameterization trick (Sampling layer) to ensure backpropagation works in VAE.

4️⃣ Reshape for Decoder

 Expands z into a 2x2 feature map to match e7 dimensions.

 Prepares the latent vector for decoding.

5️⃣ Decoder: Upsampling the Image

 Each decoder block upsamples the feature map back to the original size.

 Uses skip connections (s1, s2, ..., s7) to restore spatial information.

 Mirrors the encoder process but in reverse.

6️⃣ Final Output Layer

 Uses a Conv2D layer to produce the final RGB image.
 
 Applies sigmoid activation to ensure pixel values remain between [0,1].








In [7]:
def build_generator(name):
 inputs = layers.Input(IMAGE_SHAPE)
 
 # Encoder
 e1, s1 = encoder_block(inputs, FILTERS)
 e2, s2 = encoder_block(e1, FILTERS*2)
 e3, s3 = encoder_block(e2, FILTERS*4)
 e4, s4 = encoder_block(e3, FILTERS*8)
 e5, s5 = encoder_block(e4, FILTERS*16)
 e6, s6 = encoder_block(e5, FILTERS*32)
 e7, s7 = encoder_block(e6, FILTERS*64)
 
 # Latent Space
 x = layers.Flatten()(e7)
 z_mean = layers.Dense(LATENT_DIM, name=f"z_mean_{name.split('_')[-1]}")(x)
 z_log_var = layers.Dense(LATENT_DIM, name=f"z_log_var_{name.split('_')[-1]}")(x)
 z = Sampling()([z_mean, z_log_var])
 
 # Reshape for decoder
 x = layers.Dense(2 * 2 * FILTERS*64)(z)
 x = layers.Reshape((2, 2, FILTERS*64))(x)
 
 # Decoder
 d0 = decoder_block(x, s7, FILTERS*64)
 d1 = decoder_block(d0, s6, FILTERS*32)
 d2 = decoder_block(d1, s5, FILTERS*16)
 d3 = decoder_block(d2, s4, FILTERS*8)
 d4 = decoder_block(d3, s3, FILTERS*4)
 d5 = decoder_block(d4, s2, FILTERS*2)
 d6 = decoder_block(d5, s1, FILTERS)
 
 outputs = layers.Conv2D(3, KERNEL, activation='sigmoid', padding='same')(d6)
 return Model(inputs, [outputs, z_mean, z_log_var], name=name)


*===================== Discriminator =====================*


This function constructs the discriminator in a Generative Adversarial Network (GAN). The discriminator’s role is to classify an image as real or fake by extracting hierarchical features and making multi-scale predictions.

What Does This Function Do?

 Extracts features from the input image using convolutional layers.

 Downsamples the image through multiple layers to capture both local and global features.

 Generates multiple outputs from different feature scales for better discrimination.

1️⃣ Input Layer 

 Defines the input tensor with a shape of IMAGE_SHAPE (e.g., (256, 256, 3) for RGB images).

 This means the discriminator takes an image as input.

2️⃣ Feature Extraction

 x = inputs initializes x as the input image.

 features = [] creates a list to store intermediate feature map

3️⃣ Initial Convolution

 Applies a convolutional layer (Conv2D) with FILTERS (e.g., 64 filters) to extract basic edges and textures.

 Uses LeakyReLU activation (alpha=0.2) instead of ReLU to allow small gradients for negative values.

 Stores the feature map in features.

4️⃣ Downsampling Blocks (Feature Hierarchy)

 Defines filter_sizes, increasing filter count at each stage to learn complex features.

 Uses a loop to pass x through multiple encoder_block layers:

 Each encoder_block downsamples the feature map (reducing spatial size).

 Each block doubles the number of filters to capture more detailed features.

 Stores all extracted feature maps in features.

5️⃣ Multi-Scale Outputs (Final Classification Layers)

 The discriminator does not produce a single output; it uses multiple feature scales.

 Extracts the last 4 feature maps (features[-4:]) to classify at different resolutions.

 Each feature map is passed through a final Conv2D layer with 1 filter to predict real vs fake scores.

 Stores the outputs in outputs.

6️⃣ Return the Discriminator Model

 Creates a Keras Model that takes an image as input and outputs multiple classification scores.

 This helps in making fine-grained real/fake decisions.


In [9]:
def build_discriminator(name):
 inputs = layers.Input(IMAGE_SHAPE)
 
 # Feature extraction
 x = inputs
 features = []
 
 # Initial convolution
 x = layers.Conv2D(FILTERS, KERNEL, padding='same')(x)
 x = layers.LeakyReLU(alpha=0.2)(x)
 features.append(x)
 
 # Downsampling blocks
 filter_sizes = [FILTERS*2, FILTERS*4, FILTERS*8, FILTERS*16, FILTERS*32, FILTERS*64]
 for filters in filter_sizes:
 x, _ = encoder_block(x, filters, use_norm=False)
 features.append(x)
 
 # Multi-scale outputs
 outputs = []
 for i, feature in enumerate(features[-4:]):
 out = layers.Conv2D(1, KERNEL, padding='same')(feature)
 outputs.append(out)
 
 return Model(inputs, outputs, name=name)


*===================== Data Loading =====================*



In [10]:
def load_images(path):
 images = []
 for p in pathlib.Path(path).glob('*.*'):
 try:
 img = cv2.imread(str(p))
 if img is not None:
 img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
 img = cv2.resize(img, IMAGE_SHAPE[:2])
 img = img.astype(np.float32) / 255.0
 images.append(img)
 except Exception as e:
 print(f"Error loading image {p}: {e}")
 return np.array(images)

This function is responsible for loading and balancing two different medical imaging datasets: CT scans and MRI scans. The goal is to ensure that both datasets contain the same number of images to avoid class imbalance in training.


📌 What Does This Function Do?

Loads CT scans from the given directory.
Loads MRI scans from the given directory.
Finds the smaller dataset (CT or MRI) and trims the larger one to match its size.
Returns balanced datasets with the same number of images.

1️⃣ Loading CT Scans:

 Prints "Loading CT scans..." to inform the user.

 Calls load_images(ct_path), a function (likely defined elsewhere) that reads images from the directory specified by ct_path.

 Stores the loaded images in ct_scans.

2️⃣ Loading MRI Scans

 Prints "Loading MRI scans..." to indicate MRI loading.

 Calls load_images(mri_path), which loads images from mri_path.

 Stores the MRI images in mri_scans.

3️⃣ Balancing the Datasets

 Computes the minimum length between the two datasets.

 Ensures that the dataset with more images is trimmed to match the smaller one.

 Computes the minimum length between the two datasets.

 Ensures that the dataset with more images is trimmed to match the smaller one.








 



In [11]:

def load_and_balance_datasets(ct_path, mri_path):
 print("Loading CT scans...")
 ct_scans = load_images(ct_path)
 print("Loading MRI scans...")
 mri_scans = load_images(mri_path)
 
 min_length = min(len(ct_scans), len(mri_scans))
 ct_scans = ct_scans[:min_length]
 mri_scans = mri_scans[:min_length]
 
 print(f"Balanced datasets to {min_length} images each")
 return ct_scans, mri_scans

*Training Setup - Detailed Explanation*

This block of code sets up the models and optimizers required for training a CycleGAN for CT ↔ MRI image translation. Let’s break it down step by step.

📌 What Does This Code Do?
 Builds the generator models (CT → MRI and MRI → CT).

 Builds the discriminator models for CT and MRI.

 Creates optimizers for training the generators and discriminators.

 Initializes model variables (trainable parameters for both generators and discriminators).

 Builds optimizers using the trainable variables.

1️⃣ Building the Generator Models

 build_generator(name): This function (explained earlier) builds a U-Net-based Variational Autoencoder (VAE) generator.

 g_ct_mri: The generator that converts CT scans → MRI images.

 g_mri_ct: The generator that converts MRI images → CT scans

2️⃣ Building the Discriminator Models

 build_discriminator(name): This function (explained earlier) builds the discriminators to differentiate real and fake images.

 d_ct: The discriminator that distinguishes real CT scans from fake ones.

 d_mri: The discriminator that distinguishes real MRI scans from fake ones.


3️⃣ Creating Optimizers

 g_opt: Optimizer for training both generators.

 d_opt: Optimizer for training both discriminators.

 Uses RMSprop as the optimizer.

 The learning rate (LEARNING_RATE) controls the step size for updates.

 Weight decay (WEIGHT_DECAY) prevents overfitting by penalizing large weights.

4️⃣ Initializing Model Variables

 g_vars: Stores all trainable variables (weights & biases) of both generators.
 d_vars: Stores all trainable variables of both discriminators.

 📝 Why store trainable variables separately?

 Since generators and discriminators have separate losses, they need to be updated separately.

5️⃣ Building Optimizers with Model Variables

 g_opt.build(g_vars): Tells TensorFlow that g_opt will optimize generator variables.

 d_opt.build(d_vars): Tells TensorFlow that d_opt will optimize discriminator variables.

 📝 Why explicitly build the optimizers?

 In Eager Execution mode, TensorFlow automatically tracks variables.
 
 However, explicitly calling build() can help with performance optimization.




In [12]:
# ===================== Training Setup =====================
# Build models
g_ct_mri = build_generator('CT_to_MRI')
g_mri_ct = build_generator('MRI_to_CT')
d_ct = build_discriminator('D_CT')
d_mri = build_discriminator('D_MRI')

# Create optimizers
g_opt = tf.keras.optimizers.RMSprop(learning_rate=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
d_opt = tf.keras.optimizers.RMSprop(learning_rate=LEARNING_RATE, weight_decay=WEIGHT_DECAY)

# Initialize model variables
g_vars = g_ct_mri.trainable_variables + g_mri_ct.trainable_variables
d_vars = d_ct.trainable_variables + d_mri.trainable_variables

# Build optimizers
g_opt.build(g_vars)
d_opt.build(d_vars)





NameError: name 'Sampling' is not defined

*Explanation of train_step Function in CycleGAN with Variational Autoencoder (VAE)*

This function performs one training step for the CycleGAN with VAE-style latent representations. It does the following:

Generates fake images using the generators.

Evaluates the fake and real images using the discriminators.

Computes the loss functions for both generators and discriminators.

Computes gradients and updates the model parameters.

1️⃣ Forward Pass - Generate Fake Images

 g_ct_mri(real_ct): Translates CT → Fake MRI and produces:
 
 fake_mri: The generated MRI image.
 z_mean_fwd, z_log_var_fwd: Latent variables (from the Variational Autoencoder).
 g_mri_ct(real_mri): Translates MRI → Fake CT with similar outputs.

 📝 Why store z_mean and z_log_var?

 These come from the VAE latent space and are used for the KL divergence loss.

2️⃣ Compute Discriminator Outputs

 d_ct(real_ct): Discriminator’s prediction for real CT images.

 d_ct(fake_ct): Discriminator’s prediction for fake CT images.

 d_mri(real_mri): Discriminator’s prediction for real MRI images.

 d_mri(fake_mri): Discriminator’s prediction for fake MRI images.

 📝 Goal of Discriminators?


 Real images should be classified close to 1.

 Fake images should be classified close to 0.

3️⃣ Compute Discriminator Losses

 Uses Least Squares GAN (LSGAN) loss:

 For real images: (real - 1)^2 → Encourages real images to be classified as 1.

 For fake images: fake^2 → Encourages fake images to be classified as 0.

 sum([...]): If there are multiple output layers in the discriminator, we sum their losses.

 📝 Why LSGAN loss?

 Helps stabilize training compared to standard GAN loss.


4️⃣ Cycle Consistency Loss (CycleGAN Component)

 cycled_ct = g_mri_ct(fake_mri): The fake MRI is translated back to CT.

 cycled_mri = g_ct_mri(fake_ct): The fake CT is translated back to MRI.

 📝 Why cycle consistency?

 The network should learn round-trip consistency:

 CT → Fake MRI → CT (should look like original CT)

 MRI → Fake CT → MRI (should look like original MRI)

5️⃣ KL Divergence Loss (VAE Component)

 This is the KL divergence loss from VAE:

 Encourages the latent space to follow a Gaussian distribution.

 Prevents mode collapse.

 📝 Why add KL divergence loss?

 Regularizes the latent space so the generator produces diverse outputs.

6️⃣ Compute Generator Losses

 The generator wants fake images to be classified as real (1), so we use:

 (fake - 1)^2 → Fake images should be close to 1.

 Cycle consistency loss: L1 loss (|original - reconstructed|).

 Encourages faithful reconstructions.


 Final generator loss combines:

 Adversarial loss (GAN loss).

 Cycle consistency loss (weighted by 10 for stronger enforcement).

 KL divergence loss (weighted by 0.5).

 
7️⃣ Compute Total Discriminator Loss

Adds both discriminator losses.

8️⃣ Compute Gradients & Update Model Parameters

 Computes gradients of discriminator loss (d_total_loss).

 Updates discriminator weights (d_vars).

 Computes gradients of generator loss (g_total_loss).

 Updates generator weights (g_vars).

📝 Why use tf.GradientTape(persistent=True)?

 We need gradients twice (once for discriminators, once for generators).




In [None]:
import os
@tf.function
def train_step(real_ct, real_mri):
 with tf.GradientTape(persistent=True) as tape:
 # Forward passes
 fake_mri, z_mean_fwd, z_log_var_fwd = g_ct_mri(real_ct, training=True)
 fake_ct, z_mean_bwd, z_log_var_bwd = g_mri_ct(real_mri, training=True)
 
 # Discriminator outputs
 d_real_ct = d_ct(real_ct, training=True)
 d_fake_ct = d_ct(fake_ct, training=True)
 d_real_mri = d_mri(real_mri, training=True)
 d_fake_mri = d_mri(fake_mri, training=True)
 
 # Discriminator losses
 d_ct_loss = sum([tf.reduce_mean((real - 1)**2) + tf.reduce_mean(fake**2) 
 for real, fake in zip(d_real_ct, d_fake_ct)])
 d_mri_loss = sum([tf.reduce_mean((real - 1)**2) + tf.reduce_mean(fake**2) 
 for real, fake in zip(d_real_mri, d_fake_mri)])
 
 # Cycle consistency
 cycled_ct, _, _ = g_mri_ct(fake_mri, training=True)
 cycled_mri, _, _ = g_ct_mri(fake_ct, training=True)
 
 # KL Divergence
 kl_fwd = -0.5 * tf.reduce_mean(1 + z_log_var_fwd - tf.square(z_mean_fwd) - tf.exp(z_log_var_fwd))
 kl_bwd = -0.5 * tf.reduce_mean(1 + z_log_var_bwd - tf.square(z_mean_bwd) - tf.exp(z_log_var_bwd))
 
 # Generator losses
 g_adv_loss = sum([tf.reduce_mean((fake - 1)**2) for fake in d_fake_mri + d_fake_ct])
 g_cycle_loss = (tf.reduce_mean(tf.abs(real_ct - cycled_ct)) + 
 tf.reduce_mean(tf.abs(real_mri - cycled_mri)))
 g_total_loss = g_adv_loss + 10 * g_cycle_loss + 0.5 * (kl_fwd + kl_bwd)
 
 # Total discriminator loss
 d_total_loss = d_ct_loss + d_mri_loss
 
 # Update discriminators
 d_grads = tape.gradient(d_total_loss, d_vars)
 d_opt.apply_gradients(zip(d_grads, d_vars))
 
 # Update generators
 g_grads = tape.gradient(g_total_loss, g_vars)
 g_opt.apply_gradients(zip(g_grads, g_vars))
 
 return {
 'd_ct': d_ct_loss,
 'd_mri': d_mri_loss,
 'g_total': g_total_loss,
 'fake_mri': fake_mri,
 'fake_ct': fake_ct
 }

This code defines the main training loop for a CycleGAN-based model that translates between CT and MRI images. It consists of data preparation, training iteration, progress tracking, and model saving. Below is a step-by-step breakdown:


1. Create Progress Directory

The script creates a directory named progress/ inside Kaggle's working directory.

This directory will store progress images showing how well the model is learning over time.

2. Load and Balance the Datasets

Calls load_and_balance_datasets() to load CT and MRI images from the dataset folders.

Ensures both datasets have the same number of images by truncating the larger set.

3. Create TensorFlow Dataset for Training

Creates a TensorFlow dataset from the loaded images.

Shuffles the dataset to introduce randomness and prevent overfitting.

Batches the dataset to process multiple images in parallel during training.


4. Training Loop

Starts iterating over epochs (EPOCHS defines the total number of passes over the dataset).

Iterates through mini-batches of CT and MRI scans using train_dataset.

5. Train the Model (Forward & Backward Pass)

Calls train_step(ct_batch, mri_batch), which:

 Generates fake MRI from CT (G_CT→MRI) and fake CT from MRI (G_MRI→CT).

 Passes both real and fake images through the discriminators (D_CT and D_MRI).

 Computes adversarial losses, cycle consistency loss, and KL divergence.

 Updates the discriminators (D_CT, D_MRI) and generators (G_CT→MRI, G_MRI→CT).

Stores the loss values (d_ct_loss, d_mri_loss, g_total_loss) and the generated images.

6. Print Losses for Monitoring

Every 10 batches, prints:

D_CT: Discriminator loss for CT.

D_MRI: Discriminator loss for MRI.

G: Total generator loss.

This helps monitor model performance during training.

7. Save Sample Images for Progress Tracking

Every 100 batches, saves progress images to progress/.

Displays real CT & MRI images alongside their fake counterparts generated by the model.

Helps visually track improvements in image quality over time.



In [14]:

# ===================== Main Training Loop =====================
# Create progress directory if it doesn't exist
progress_dir = '/kaggle/working/progress'
if not os.path.exists(progress_dir):
 os.makedirs(progress_dir)

# Load and prepare data
print("Loading datasets...")
ct_scans, mri_scans = load_and_balance_datasets('/kaggle/input/ct-to-mri-cgan/Dataset/images/trainA', 
 '/kaggle/input/ct-to-mri-cgan/Dataset/images/trainB')

# Create TensorFlow dataset
train_dataset = tf.data.Dataset.from_tensor_slices((ct_scans, mri_scans))
train_dataset = train_dataset.shuffle(buffer_size=len(ct_scans)).batch(BATCH_SIZE)
# Training loop
print("Starting training...")
for epoch in range(EPOCHS):
 for batch_idx, (ct_batch, mri_batch) in enumerate(train_dataset):
 results = train_step(ct_batch, mri_batch)
 
 if batch_idx % 10 == 0:
 print(f"Epoch {epoch}, Batch {batch_idx}: "
 f"D_CT={float(results['d_ct']):.4f}, "
 f"D_MRI={float(results['d_mri']):.4f}, "
 f"G={float(results['g_total']):.4f}")
 
 # Save sample images every 100 batches
 if batch_idx % 100 == 0:
 fig, axes = plt.subplots(2, 2, figsize=(10, 10))
 
 # Real CT and Fake MRI
 axes[0,0].imshow(ct_batch[0].numpy())
 axes[0,0].set_title("Real CT")
 axes[0,0].axis('off')
 
 axes[0,1].imshow(results['fake_mri'][0].numpy())
 axes[0,1].set_title("Fake MRI")
 axes[0,1].axis('off')
 
 # Real MRI and Fake CT
 axes[1,0].imshow(mri_batch[0].numpy())
 axes[1,0].set_title("Real MRI")
 axes[1,0].axis('off')
 
 axes[1,1].imshow(results['fake_ct'][0].numpy())
 axes[1,1].set_title("Fake CT")
 axes[1,1].axis('off')
 
 plt.tight_layout()
 plt.savefig(f'progress/epoch_{epoch}_batch_{batch_idx}.png')
 plt.close()
 
 # Save models after each epoch
 save_models(g_ct_mri, g_mri_ct, epoch)

NameError: name 'os' is not defined

In [None]:

def translate_image(model_path, image_path, output_path, mode='ct_to_mri'):
 """
 Translate a single image using the trained model
 
 Parameters:
 model_path: Path to the saved model
 image_path: Path to the input image
 output_path: Path to save the translated image
 mode: 'ct_to_mri' or 'mri_to_ct'
 """
 # Load model
 print(f"Loading model from {model_path}")
 model = tf.keras.models.load_model(model_path, 
 custom_objects={'Sampling': Sampling})
 
 # Load and preprocess image
 input_image = load_and_preprocess_image(image_path)
 
 # Generate translation
 print("Generating translation...")
 translated_image, _, _ = model(input_image, training=False)
 
 # Convert to numpy and denormalize
 translated_image = translated_image.numpy()[0] * 255
 translated_image = translated_image.astype(np.uint8)
 
 # Save the result
 print(f"Saving translated image to {output_path}")
 plt.figure(figsize=(10, 5))
 
 plt.subplot(1, 2, 1)
 plt.title("Input Image")
 plt.imshow(input_image[0])
 plt.axis('off')
 
 plt.subplot(1, 2, 2)
 plt.title("Translated Image")
 plt.imshow(translated_image)
 plt.axis('off')
 
 plt.tight_layout()
 plt.savefig(output_path)
 plt.close()
 
 return translated_image
'''
# Example usage of the translation function
def example_translation():
 """Example of how to use the translation function"""
 # Paths
 ct_to_mri_model = 'saved_models/ct_to_mri_epoch_1000'
 mri_to_ct_model = 'saved_models/mri_to_ct_epoch_1000'
 
 # CT to MRI translation
 input_ct = 'path/to/your/ct_image.jpg'
 output_mri = 'results/translated_mri.png'
 translated_mri = translate_image(ct_to_mri_model, input_ct, output_mri, 
 mode='ct_to_mri')
 
 # MRI to CT translation
 input_mri = 'path/to/your/mri_image.jpg'
 output_ct = 'results/translated_ct.png'
 translated_ct = translate_image(mri_to_ct_model, input_mri, output_ct, 
 mode='mri_to_ct')'''

*Complete code in Single Block*

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, Model
import numpy as np
import cv2
import pathlib
import matplotlib.pyplot as plt
import tensorflow_probability as tfp

tfd = tfp.distributions

# ===================== Configuration =====================
IMAGE_SHAPE = (256, 256, 3)
LATENT_DIM = 256
FILTERS = 16
KERNEL = 3
LEARNING_RATE = 0.0001
WEIGHT_DECAY = 6e-8
BATCH_SIZE = 1
EPOCHS = 10

# ===================== Architecture Components =====================
class Sampling(layers.Layer):
 def call(self, inputs):
 z_mean, z_log_var = inputs
 batch = tf.shape(z_mean)[0]
 dim = tf.shape(z_mean)[1]
 epsilon = tf.random.normal(shape=(batch, dim))
 return z_mean + tf.exp(0.5 * z_log_var) * epsilon

def residual_block(inputs, filters, use_norm=True):
 x = layers.Conv2D(filters, KERNEL, padding='same')(inputs)
 x = layers.LeakyReLU(alpha=0.2)(x)
 if use_norm:
 x = layers.GroupNormalization(groups=1)(x)
 x = layers.Conv2D(filters, KERNEL, padding='same')(x)
 x = layers.LeakyReLU(alpha=0.2)(x)
 if use_norm:
 x = layers.GroupNormalization(groups=1)(x)
 shortcut = layers.Conv2D(filters, 1, padding='same')(inputs)
 return layers.maximum([x, shortcut])

def encoder_block(inputs, filters, use_norm=True):
 x = residual_block(inputs, filters, use_norm)
 skip = x
 x = layers.MaxPooling2D()(x)
 return x, skip

def decoder_block(inputs, skip, filters, use_norm=True):
 x = layers.Conv2DTranspose(filters, KERNEL, strides=2, padding='same')(inputs)
 x = layers.maximum([x, skip])
 x = residual_block(x, filters, use_norm)
 return x

# ===================== Generator =====================
def build_generator(name):
 inputs = layers.Input(IMAGE_SHAPE)
 
 # Encoder
 e1, s1 = encoder_block(inputs, FILTERS)
 e2, s2 = encoder_block(e1, FILTERS*2)
 e3, s3 = encoder_block(e2, FILTERS*4)
 e4, s4 = encoder_block(e3, FILTERS*8)
 e5, s5 = encoder_block(e4, FILTERS*16)
 e6, s6 = encoder_block(e5, FILTERS*32)
 e7, s7 = encoder_block(e6, FILTERS*64)
 
 # Latent Space
 x = layers.Flatten()(e7)
 z_mean = layers.Dense(LATENT_DIM, name=f"z_mean_{name.split('_')[-1]}")(x)
 z_log_var = layers.Dense(LATENT_DIM, name=f"z_log_var_{name.split('_')[-1]}")(x)
 z = Sampling()([z_mean, z_log_var])
 
 # Reshape for decoder
 x = layers.Dense(2 * 2 * FILTERS*64)(z)
 x = layers.Reshape((2, 2, FILTERS*64))(x)
 
 # Decoder
 d0 = decoder_block(x, s7, FILTERS*64)
 d1 = decoder_block(d0, s6, FILTERS*32)
 d2 = decoder_block(d1, s5, FILTERS*16)
 d3 = decoder_block(d2, s4, FILTERS*8)
 d4 = decoder_block(d3, s3, FILTERS*4)
 d5 = decoder_block(d4, s2, FILTERS*2)
 d6 = decoder_block(d5, s1, FILTERS)
 
 outputs = layers.Conv2D(3, KERNEL, activation='sigmoid', padding='same')(d6)
 return Model(inputs, [outputs, z_mean, z_log_var], name=name)

# ===================== Discriminator =====================
def build_discriminator(name):
 inputs = layers.Input(IMAGE_SHAPE)
 
 # Feature extraction
 x = inputs
 features = []
 
 # Initial convolution
 x = layers.Conv2D(FILTERS, KERNEL, padding='same')(x)
 x = layers.LeakyReLU(alpha=0.2)(x)
 features.append(x)
 
 # Downsampling blocks
 filter_sizes = [FILTERS*2, FILTERS*4, FILTERS*8, FILTERS*16, FILTERS*32, FILTERS*64]
 for filters in filter_sizes:
 x, _ = encoder_block(x, filters, use_norm=False)
 features.append(x)
 
 # Multi-scale outputs
 outputs = []
 for i, feature in enumerate(features[-4:]):
 out = layers.Conv2D(1, KERNEL, padding='same')(feature)
 outputs.append(out)
 
 return Model(inputs, outputs, name=name)

# ===================== Data Loading =====================
def load_images(path):
 images = []
 for p in pathlib.Path(path).glob('*.*'):
 try:
 img = cv2.imread(str(p))
 if img is not None:
 img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
 img = cv2.resize(img, IMAGE_SHAPE[:2])
 img = img.astype(np.float32) / 255.0
 images.append(img)
 except Exception as e:
 print(f"Error loading image {p}: {e}")
 return np.array(images)

def load_and_balance_datasets(ct_path, mri_path):
 print("Loading CT scans...")
 ct_scans = load_images(ct_path)
 print("Loading MRI scans...")
 mri_scans = load_images(mri_path)
 
 min_length = min(len(ct_scans), len(mri_scans))
 ct_scans = ct_scans[:min_length]
 mri_scans = mri_scans[:min_length]
 
 print(f"Balanced datasets to {min_length} images each")
 return ct_scans, mri_scans

# ===================== Training Setup =====================
# Build models
g_ct_mri = build_generator('CT_to_MRI')
g_mri_ct = build_generator('MRI_to_CT')
d_ct = build_discriminator('D_CT')
d_mri = build_discriminator('D_MRI')

# Create optimizers
g_opt = tf.keras.optimizers.RMSprop(learning_rate=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
d_opt = tf.keras.optimizers.RMSprop(learning_rate=LEARNING_RATE, weight_decay=WEIGHT_DECAY)

# Initialize model variables
g_vars = g_ct_mri.trainable_variables + g_mri_ct.trainable_variables
d_vars = d_ct.trainable_variables + d_mri.trainable_variables

# Build optimizers
g_opt.build(g_vars)
d_opt.build(d_vars)

# ===================== Training Function =====================
@tf.function
def train_step(real_ct, real_mri):
 with tf.GradientTape(persistent=True) as tape:
 # Forward passes
 fake_mri, z_mean_fwd, z_log_var_fwd = g_ct_mri(real_ct, training=True)
 fake_ct, z_mean_bwd, z_log_var_bwd = g_mri_ct(real_mri, training=True)
 
 # Discriminator outputs
 d_real_ct = d_ct(real_ct, training=True)
 d_fake_ct = d_ct(fake_ct, training=True)
 d_real_mri = d_mri(real_mri, training=True)
 d_fake_mri = d_mri(fake_mri, training=True)
 
 # Discriminator losses
 d_ct_loss = sum([tf.reduce_mean((real - 1)**2) + tf.reduce_mean(fake**2) 
 for real, fake in zip(d_real_ct, d_fake_ct)])
 d_mri_loss = sum([tf.reduce_mean((real - 1)**2) + tf.reduce_mean(fake**2) 
 for real, fake in zip(d_real_mri, d_fake_mri)])
 
 # Cycle consistency
 cycled_ct, _, _ = g_mri_ct(fake_mri, training=True)
 cycled_mri, _, _ = g_ct_mri(fake_ct, training=True)
 
 # KL Divergence
 kl_fwd = -0.5 * tf.reduce_mean(1 + z_log_var_fwd - tf.square(z_mean_fwd) - tf.exp(z_log_var_fwd))
 kl_bwd = -0.5 * tf.reduce_mean(1 + z_log_var_bwd - tf.square(z_mean_bwd) - tf.exp(z_log_var_bwd))
 
 # Generator losses
 g_adv_loss = sum([tf.reduce_mean((fake - 1)**2) for fake in d_fake_mri + d_fake_ct])
 g_cycle_loss = (tf.reduce_mean(tf.abs(real_ct - cycled_ct)) + 
 tf.reduce_mean(tf.abs(real_mri - cycled_mri)))
 g_total_loss = g_adv_loss + 10 * g_cycle_loss + 0.5 * (kl_fwd + kl_bwd)
 
 # Total discriminator loss
 d_total_loss = d_ct_loss + d_mri_loss
 
 # Update discriminators
 d_grads = tape.gradient(d_total_loss, d_vars)
 d_opt.apply_gradients(zip(d_grads, d_vars))
 
 # Update generators
 g_grads = tape.gradient(g_total_loss, g_vars)
 g_opt.apply_gradients(zip(g_grads, g_vars))
 
 return {
 'd_ct': d_ct_loss,
 'd_mri': d_mri_loss,
 'g_total': g_total_loss,
 'fake_mri': fake_mri,
 'fake_ct': fake_ct
 }



import os

def save_models(g_ct_mri, g_mri_ct, epoch, model_dir='/kaggle/working/saved_models'):
 """Save models in HDF5 format after each epoch"""
 if not os.path.exists(model_dir):
 os.makedirs(model_dir)
 
 # Save as .h5 files
 ct_path = os.path.join(model_dir, f'ct_to_mri_epoch_{epoch}.h5')
 mri_path = os.path.join(model_dir, f'mri_to_ct_epoch_{epoch}.h5')
 
 g_ct_mri.save(ct_path)
 g_mri_ct.save(mri_path)
 print(f"Models saved: {ct_path} and {mri_path}")


# ===================== Main Training Loop =====================
# Create progress directory if it doesn't exist
progress_dir = '/kaggle/working/progress'
if not os.path.exists(progress_dir):
 os.makedirs(progress_dir)

# Load and prepare data
print("Loading datasets...")
ct_scans, mri_scans = load_and_balance_datasets('/kaggle/input/ct-to-mri-cgan/Dataset/images/trainA', 
 '/kaggle/input/ct-to-mri-cgan/Dataset/images/trainB')

# Create TensorFlow dataset
train_dataset = tf.data.Dataset.from_tensor_slices((ct_scans, mri_scans))
train_dataset = train_dataset.shuffle(buffer_size=len(ct_scans)).batch(BATCH_SIZE)
# Training loop
print("Starting training...")
for epoch in range(EPOCHS):
 for batch_idx, (ct_batch, mri_batch) in enumerate(train_dataset):
 results = train_step(ct_batch, mri_batch)
 
 if batch_idx % 10 == 0:
 print(f"Epoch {epoch}, Batch {batch_idx}: "
 f"D_CT={float(results['d_ct']):.4f}, "
 f"D_MRI={float(results['d_mri']):.4f}, "
 f"G={float(results['g_total']):.4f}")
 
 # Save sample images every 100 batches
 if batch_idx % 100 == 0:
 fig, axes = plt.subplots(2, 2, figsize=(10, 10))
 
 # Real CT and Fake MRI
 axes[0,0].imshow(ct_batch[0].numpy())
 axes[0,0].set_title("Real CT")
 axes[0,0].axis('off')
 
 axes[0,1].imshow(results['fake_mri'][0].numpy())
 axes[0,1].set_title("Fake MRI")
 axes[0,1].axis('off')
 
 # Real MRI and Fake CT
 axes[1,0].imshow(mri_batch[0].numpy())
 axes[1,0].set_title("Real MRI")
 axes[1,0].axis('off')
 
 axes[1,1].imshow(results['fake_ct'][0].numpy())
 axes[1,1].set_title("Fake CT")
 axes[1,1].axis('off')
 
 plt.tight_layout()
 plt.savefig(f'progress/epoch_{epoch}_batch_{batch_idx}.png')
 plt.close()
 
 # Save models after each epoch
 save_models(g_ct_mri, g_mri_ct, epoch)

def load_and_preprocess_image(image_path):
 """Load and preprocess a single image for inference"""
 # Read image
 img = cv2.imread(image_path)
 if img is None:
 raise ValueError(f"Could not load image from {image_path}")
 
 # Convert BGR to RGB
 img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
 
 # Resize to model's input size
 img = cv2.resize(img, (256, 256))
 
 # Normalize to [0, 1]
 img = img.astype(np.float32) / 255.0
 
 # Add batch dimension
 img = np.expand_dims(img, axis=0)
 
 return img

def translate_image(model_path, image_path, output_path, mode='ct_to_mri'):
 """
 Translate a single image using the trained model
 
 Parameters:
 model_path: Path to the saved model
 image_path: Path to the input image
 output_path: Path to save the translated image
 mode: 'ct_to_mri' or 'mri_to_ct'
 """
 # Load model
 print(f"Loading model from {model_path}")
 model = tf.keras.models.load_model(model_path, 
 custom_objects={'Sampling': Sampling})
 
 # Load and preprocess image
 input_image = load_and_preprocess_image(image_path)
 
 # Generate translation
 print("Generating translation...")
 translated_image, _, _ = model(input_image, training=False)
 
 # Convert to numpy and denormalize
 translated_image = translated_image.numpy()[0] * 255
 translated_image = translated_image.astype(np.uint8)
 
 # Save the result
 print(f"Saving translated image to {output_path}")
 plt.figure(figsize=(10, 5))
 
 plt.subplot(1, 2, 1)
 plt.title("Input Image")
 plt.imshow(input_image[0])
 plt.axis('off')
 
 plt.subplot(1, 2, 2)
 plt.title("Translated Image")
 plt.imshow(translated_image)
 plt.axis('off')
 
 plt.tight_layout()
 plt.savefig(output_path)
 plt.close()
 
 return translated_image
'''
# Example usage of the translation function
def example_translation():
 """Example of how to use the translation function"""
 # Paths
 ct_to_mri_model = 'saved_models/ct_to_mri_epoch_1000'
 mri_to_ct_model = 'saved_models/mri_to_ct_epoch_1000'
 
 # CT to MRI translation
 input_ct = 'path/to/your/ct_image.jpg'
 output_mri = 'results/translated_mri.png'
 translated_mri = translate_image(ct_to_mri_model, input_ct, output_mri, 
 mode='ct_to_mri')
 
 # MRI to CT translation
 input_mri = 'path/to/your/mri_image.jpg'
 output_ct = 'results/translated_ct.png'
 translated_ct = translate_image(mri_to_ct_model, input_mri, output_ct, 
 mode='mri_to_ct')'''