Object Detection
vision
sonebu
update email
df8cf63
###########################################################################
# Computer vision - Embedded person tracking demo software by HyperbeeAI. #
# Copyrights © 2023 Hyperbee.AI Inc. All rights reserved. [email protected] #
###########################################################################
import torch, torchvision, time, random
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import os
from datetime import datetime
from collections import Counter
import torchvision.ops as ops
from pycocotools.cocoeval import COCOeval
import json
from tqdm import tqdm
import qat_core
def ssd_postprocess_person_cls(pred):
"""
Take the models prediction outputs (pred) and make the post processing operations to
classification head outputs since the output is not directly class probabilities.
Assuming square input image so H=W.
Assuming binary classes (0/1).
Input:
- pred: predicted outputs of the model as list of length 4 as [output1_reg, output1_class, output2_reg, output2_class]
and shape of [(NR1, CR1, HR1, WR1), (NC1, CC1, HC1, WC1), (NR2, CR2, HR2, WR2), (NC2, CC2, HC2, WC2)].
Returns:
- person_cls: person class probabilities (torch.FloatTensor) shape of [CC1/2*HC1*WC1 + CC2/2*HC2*WC2].
"""
head_regression_hires = pred[0]
head_classification_hires = pred[1]
head_regression_lores = pred[2]
head_classification_lores = pred[3]
# split classification head outputs for person and background
head_classification_hires_background = head_classification_hires[0,1::2,:,:]
head_classification_hires_person = head_classification_hires[0,0::2,:,:]
head_classification_lores_background = head_classification_lores[0,1::2,:,:]
head_classification_lores_person = head_classification_lores[0,0::2,:,:]
## assuming square input image so rows=cols
## I'll just define these globally:
hires_rowscols = head_regression_hires.shape[3] # could have been classification head too, just getting dimension
lores_rowscols = head_regression_lores.shape[3] # could have been classification head too, just getting dimension
hires_numanchors = int(head_regression_hires.shape[1]/4) # 4 because xywh
lores_numanchors = int(head_regression_lores.shape[1]/4) # 4 because xywh
background_hires_flat = explicit_flatten(head_classification_hires_background, 'hires', hires_rowscols, hires_numanchors, lores_rowscols, lores_numanchors)
background_lores_flat = explicit_flatten(head_classification_lores_background, 'lores', hires_rowscols, hires_numanchors, lores_rowscols, lores_numanchors)
person_hires_flat = explicit_flatten(head_classification_hires_person, 'hires', hires_rowscols, hires_numanchors, lores_rowscols, lores_numanchors)
person_lores_flat = explicit_flatten(head_classification_lores_person, 'lores', hires_rowscols, hires_numanchors, lores_rowscols, lores_numanchors)
person_flat = torch.cat((person_hires_flat, person_lores_flat))
background_flat = torch.cat((background_hires_flat, background_lores_flat))
total_cat = torch.cat( ( torch.unsqueeze(person_flat,0) , torch.unsqueeze(background_flat,0) ) )
softmax_fcn = torch.nn.Softmax(dim=0)
softmax_result = softmax_fcn(total_cat)
person_hires_flat_sft = softmax_result[0,:][0:background_hires_flat.shape[0]]
person_lores_flat_sft = softmax_result[0,:][background_hires_flat.shape[0]:]
person_hires_classification_scores = explicit_unflatten(person_hires_flat_sft, 'hires', hires_rowscols, hires_numanchors, lores_rowscols, lores_numanchors)
person_lores_classification_scores = explicit_unflatten(person_lores_flat_sft, 'lores', hires_rowscols, hires_numanchors, lores_rowscols, lores_numanchors)
person_cls = torch.cat(( person_hires_flat_sft, person_lores_flat_sft ))
return person_cls
def ssd_postprocess_person_bboxes(pred, image_width, image_height, anchors_head1, anchors_head2):
"""
Take the models prediction output (pred) and make the post processing operations to
show bboxes.
Assuming square input image so H=W.
Assuming binary classes (0/1).
Input:
- pred: predicted outputs of the model as list of length 4[output1_reg, output1_class, output2_reg, output2_class]
shape of [(NR1, CR1, HR1, WR1), (NC1, CC1, HC1, WC1), (NR2, CR2, HR2, WR2), (NC2, CC2, HC2, WC2)].
- image_width: Integer.
- image_height: Integer.
- anchors_head1: list of length 4, contains image_width/image_height*anchor_ratios as tuples.
shape [(W*A1, H*B1), (W*A2, H*B2), (W*A3, H*B3), (W*A4, H*B4)] where A#num and B#num are
corresponding different aspect ratios.
- anchors_head2: list of length 4, contains image_width/image_height*anchor_ratios as tuples.
shape [(W*C1, H*D1), (W*C2, H*D2), (W*C3, H*D3), (W*C4, H*D4)] where C#num and D#num are
corresponding different aspect ratios.
Returns:
- absolute_boxes: absolute value of bounding boxes (torch.FloatTensor) shape of [CR1/4*HR1*WR1 + CR2/4*HR2*WR2, 4].
"""
head_regression_hires = pred[0]
head_classification_hires = pred[1]
head_regression_lores = pred[2]
head_classification_lores = pred[3]
## assuming square input image so rows=cols
## I'll just define these globally:
hires_rowscols = head_regression_hires.shape[3] # could have been classification head too, just getting dimension
lores_rowscols = head_regression_lores.shape[3] # could have been classification head too, just getting dimension
hires_numanchors = int(head_regression_hires.shape[1]/4) # 4 because xywh
lores_numanchors = int(head_regression_lores.shape[1]/4) # 4 because xywh
# Postprocess regression + classification together, i.e., apply NMS
delta_x_hires = head_regression_hires[0, 0::4, :, :] # skip 4 means skip y,w,h and land on x again
delta_y_hires = head_regression_hires[0, 1::4, :, :] # skip 4 means skip w,h,x and land on y again, etc...
delta_w_hires = head_regression_hires[0, 2::4, :, :]
delta_h_hires = head_regression_hires[0, 3::4, :, :]
delta_x_lores = head_regression_lores[0, 0::4, :, :] # skip 4 means skip y,w,h and land on x again
delta_y_lores = head_regression_lores[0, 1::4, :, :] # skip 4 means skip w,h,x and land on y again, etc...
delta_w_lores = head_regression_lores[0, 2::4, :, :]
delta_h_lores = head_regression_lores[0, 3::4, :, :]
## There is also a concept called priorbox variance, see:
## https://github.com/weiliu89/caffe/issues/155#issuecomment-243541464
## https://leimao.github.io/blog/Bounding-Box-Encoding-Decoding/
##
## values taken from the xml, see layer "PriorBoxClustered":
var_x = 0.1
var_y = 0.1
var_w = 0.2
var_h = 0.2
w_anchors_hires = torch.tensor(anchors_head1)[:,0]
h_anchors_hires = torch.tensor(anchors_head1)[:,1]
w_anchors_lores = torch.tensor(anchors_head2)[:,0]
h_anchors_lores = torch.tensor(anchors_head2)[:,1]
x_anchors_hires = populate_xy_anchors(delta_x_hires, 'x')
y_anchors_hires = populate_xy_anchors(delta_y_hires, 'y')
x_anchors_lores = populate_xy_anchors(delta_x_lores, 'x')
y_anchors_lores = populate_xy_anchors(delta_y_lores, 'y')
w_anchors_hires_rpt = populate_wh_anchors(delta_w_hires, w_anchors_hires)
h_anchors_hires_rpt = populate_wh_anchors(delta_h_hires, h_anchors_hires)
w_anchors_lores_rpt = populate_wh_anchors(delta_w_lores, w_anchors_lores)
h_anchors_lores_rpt = populate_wh_anchors(delta_h_lores, h_anchors_lores)
absolute_x_hires = delta_x_hires * w_anchors_hires_rpt * var_x + x_anchors_hires
absolute_y_hires = delta_y_hires * h_anchors_hires_rpt * var_y + y_anchors_hires
absolute_x_lores = delta_x_lores * w_anchors_lores_rpt * var_x + x_anchors_lores
absolute_y_lores = delta_y_lores * h_anchors_lores_rpt * var_y + y_anchors_lores
absolute_w_hires = (delta_w_hires * var_w).exp() * w_anchors_hires_rpt
absolute_h_hires = (delta_h_hires * var_h).exp() * h_anchors_hires_rpt
absolute_w_lores = (delta_w_lores * var_w).exp() * w_anchors_lores_rpt
absolute_h_lores = (delta_h_lores * var_h).exp() * h_anchors_lores_rpt
absolute_hires_xleft = absolute_x_hires - absolute_w_hires/2
absolute_hires_xright = absolute_x_hires + absolute_w_hires/2
absolute_hires_ytop = absolute_y_hires - absolute_h_hires/2
absolute_hires_ybottom = absolute_y_hires + absolute_h_hires/2
absolute_lores_xleft = absolute_x_lores - absolute_w_lores/2
absolute_lores_xright = absolute_x_lores + absolute_w_lores/2
absolute_lores_ytop = absolute_y_lores - absolute_h_lores/2
absolute_lores_ybottom = absolute_y_lores + absolute_h_lores/2
absolute_hires_xleft_flat = explicit_flatten(absolute_hires_xleft, 'hires', hires_rowscols, hires_numanchors, lores_rowscols, lores_numanchors)
absolute_hires_xright_flat = explicit_flatten(absolute_hires_xright, 'hires', hires_rowscols, hires_numanchors, lores_rowscols, lores_numanchors)
absolute_hires_ytop_flat = explicit_flatten(absolute_hires_ytop, 'hires', hires_rowscols, hires_numanchors, lores_rowscols, lores_numanchors)
absolute_hires_ybottom_flat = explicit_flatten(absolute_hires_ybottom, 'hires', hires_rowscols, hires_numanchors, lores_rowscols, lores_numanchors)
absolute_lores_xleft_flat = explicit_flatten(absolute_lores_xleft, 'lores', hires_rowscols, hires_numanchors, lores_rowscols, lores_numanchors)
absolute_lores_xright_flat = explicit_flatten(absolute_lores_xright, 'lores', hires_rowscols, hires_numanchors, lores_rowscols, lores_numanchors)
absolute_lores_ytop_flat = explicit_flatten(absolute_lores_ytop, 'lores', hires_rowscols, hires_numanchors, lores_rowscols, lores_numanchors)
absolute_lores_ybottom_flat = explicit_flatten(absolute_lores_ybottom, 'lores', hires_rowscols, hires_numanchors, lores_rowscols, lores_numanchors)
absolute_xleft = torch.unsqueeze(torch.cat((absolute_hires_xleft_flat, absolute_lores_xleft_flat)) ,1)
absolute_xright = torch.unsqueeze(torch.cat((absolute_hires_xright_flat, absolute_lores_xright_flat)) ,1)
absolute_ytop = torch.unsqueeze(torch.cat((absolute_hires_ytop_flat, absolute_lores_ytop_flat)) ,1)
absolute_ybottom = torch.unsqueeze(torch.cat((absolute_hires_ybottom_flat, absolute_lores_ybottom_flat)),1)
absolute_boxes = torch.cat((absolute_xleft, absolute_ytop, absolute_xright, absolute_ybottom), dim=1)
return absolute_boxes
# so that we know what goes where
def explicit_flatten(tensor, hires_or_lores, hires_rowscols, hires_numanchors, lores_rowscols, lores_numanchors):
flattened_tensor = torch.zeros_like(tensor.flatten())
if(hires_or_lores=='hires'):
rc = hires_rowscols
na = hires_numanchors
elif(hires_or_lores=='lores'):
rc = lores_rowscols
na = lores_numanchors
else:
print("somethings wrong")
return
for row in range(0, rc):
for col in range(0, rc):
for anc in range(0, na):
flattened_tensor[anc*rc*rc + row*rc + col] = tensor[anc,row,col];
return flattened_tensor
# so that we know what goes where
def explicit_unflatten(flattened_tensor, hires_or_lores, hires_rowscols, hires_numanchors, lores_rowscols, lores_numanchors):
if(hires_or_lores=='hires'):
tensor = torch.zeros((hires_numanchors, hires_rowscols, hires_rowscols))
rc = hires_rowscols
na = hires_numanchors
elif(hires_or_lores=='lores'):
tensor = torch.zeros((lores_numanchors, lores_rowscols, lores_rowscols))
rc = lores_rowscols
na = lores_numanchors
else:
print("somethings wrong")
return
for row in range(0, rc):
for col in range(0, rc):
for anc in range(0, na):
tensor[anc,row,col] = flattened_tensor[anc*rc*rc + row*rc + col];
return tensor
def plot_softmax_confidence_scores(person_hires_flat_sft, person_lores_flat_sft):
fig, ax = plt.subplots(figsize=(10,6))
ax.plot(person_hires_flat_sft.detach().cpu().numpy())
ax.plot(person_lores_flat_sft.detach().cpu().numpy())
ax.grid()
ax.legend(['hires confidences', 'lores confidences'])
plt.title('softmax-processed confidence scores for the two heads')
plt.show()
def populate_wh_anchors(delta_ref, wh_anchors_hilores):
wh_anchors_hilores_rpt = torch.ones_like(delta_ref)
for i in range(0, wh_anchors_hilores_rpt.shape[0]):
wh_anchors_hilores_rpt[i] = wh_anchors_hilores_rpt[i]*wh_anchors_hilores[i]
return wh_anchors_hilores_rpt
def populate_xy_anchors(delta_ref, x_or_y):
xy_anchors_hilores = torch.zeros_like(delta_ref)
scale = 512 / delta_ref.shape[2]
for i in range(0, xy_anchors_hilores.shape[0]): # count anchors
for j in range(0, xy_anchors_hilores.shape[1]): # count width
for k in range(0, xy_anchors_hilores.shape[2]): # count height
if(x_or_y == 'x'):
xy_anchors_hilores[i,j,k] = scale * k + (scale +1) / 2 # More precise conversion
if(x_or_y == 'y'):
xy_anchors_hilores[i,j,k] = scale * j + (scale +1) / 2
return xy_anchors_hilores
def plot_image_mnv2_2xSSDlite(image, pred_person_cls = None, pred_absolute_boxes = None, color = 'r'
,nmsIoUTreshold = 0.45, predConfPlotTreshold = 0.6,target=None, figsize=(16,16),
saveFig=False, imageID=None, folderName='UnconstFPT'):
""" Plots original image, ground truths, and predictions if available.
Does non-maximum-suppression and plots perdiction boxes, saves figure under "Training Outputs" folder in specified folderName
Args:
image : (Tensor) Shape[Channel,width, height]
pred_person_cls : (Tensor) person class confidences for predicted boxes Shape[numPred,1]
pred_absolute_boxes : (Tensor) predicted boxes [xmin,ymin,xmax,ymax] Shape[numPred,4]
color: Color of drawn predicted boxes
nmsIoUTreshold : non max suppression IoU treshold
predConfPlotTreshold : Confidence treshold to draw predicted boxes
target : (Tensor) Ground truth boxes [pictureID, xmin, ymin, w, h] Shape[numGt, 5]
folderName : Foldername under ./Model Outputs diectory to save figure.
Return: none
"""
# if image is normalized to [-1,1], re-map it to [0,1] for plotting purposes
if (image.min()<0):
image[0,:,:] = (image[0,:,:]/2)+0.5
image[1,:,:] = (image[1,:,:]/2)+0.5
image[2,:,:] = (image[2,:,:]/2)+0.5
image = image.permute(1, 2, 0).to("cpu")
fig, ax = plt.subplots(figsize=figsize);
if (saveFig):
plt.ioff()
else:
plt.ion()
ax.imshow(image,aspect='equal')
# Draw ground truth boxes if available
if (target != None):
absolute_box_label = target.clone()
if (absolute_box_label.shape[0] != 0):
absolute_box_label = absolute_box_label[:,1:]
absolute_box_label[:,2] = absolute_box_label[:,2] + absolute_box_label[:,0]
absolute_box_label[:,3] = absolute_box_label[:,3] + absolute_box_label[:,1]
for ii, box in enumerate(absolute_box_label):
upper_left_x = box[0]
upper_left_y = box[1]
ww = box[2] - box[0]
hh = box[3] - box[1]
rect = patches.Rectangle(
(upper_left_x, upper_left_y),
ww, hh,
linewidth=5,
edgecolor='g',
facecolor="none",
)
ax.add_patch(rect);
# Draw predicted absoulte boxes if available
if (pred_absolute_boxes != None):
confidences = pred_person_cls
boxes = pred_absolute_boxes
nms_picks = torchvision.ops.nms(boxes, confidences, nmsIoUTreshold)
boxes_to_draw = boxes[nms_picks].detach().cpu().numpy()
confs_to_draw = confidences[nms_picks].detach().cpu().numpy()
for ii, box in enumerate(boxes_to_draw):
if(confs_to_draw[ii] > predConfPlotTreshold):
upper_left_x = box[0];
upper_left_y = box[1];
ww = box[2] - box[0]
hh = box[3] - box[1]
conf = "{:.3f}".format(confs_to_draw[ii])
if not saveFig:
print(f'Conf{ii} : {confs_to_draw[ii]}')
plt.text(upper_left_x,upper_left_y-5, conf, fontsize = 12,color= color)
rect = patches.Rectangle(
(upper_left_x, upper_left_y),
ww, hh,
linewidth=2,
edgecolor=color,
facecolor="none",
)
ax.add_patch(rect);
if saveFig:
trainingOutpDir = os.path.join(".","Training Outputs")
saveDir = os.path.join(trainingOutpDir,folderName)
if not (os.path.isdir(trainingOutpDir)):
os.mkdir(trainingOutpDir)
if not (os.path.isdir(saveDir)):
os.mkdir(saveDir)
if (imageID == None):
imageID = 'NA'
else:
imageID = str(int(imageID))
imageName = folderName+"_ImgId_"+imageID+".png"
imageDir = os.path.join(saveDir, imageName)
plt.savefig(imageDir)
plt.close('all')
plt.cla()
else:
plt.show()
plt.close('all')
def generateAnchorsInOrigImage(anchors,headgridSize,originalPicSize=512):
'''
Prepares anchor tensors in original image.
E.g. If there are 4 anchors for the prediction head,
4 anchor positions in original image are calculated for (x=0, y=0),(x=1, y=0)... feature grid, and written
one under the other to anchorsInOrig
Args:
anchors : (tuple) Tuple of anchor boxes in Tensor w,h form Tuple(Shape[numAnchors,2])
headgridSize : Prediction head grid size, 16 or 32 for mobilenet
originalPicSize : original image size
Return:
anchorsInOrig : Tensor shape[#ofboxes*head width size*head height size,4], anchors are written in (cx, cy, w, h) form
'''
scale = originalPicSize/headgridSize
anchorsInOrig = torch.zeros([len(anchors)*headgridSize*headgridSize,4])
numOfAnchorBox = len(anchors)
for i in range(headgridSize):
for j in range(headgridSize):
for k in range(len(anchors)):
cx = j*scale + (scale+1)/2
cy = i*scale + (scale+1)/2
w, h = anchors[k]
tempAnch = torch.tensor([cx,cy,w,h])
anchorsInOrig[i*headgridSize*numOfAnchorBox + j*numOfAnchorBox + k,:]=tempAnch
# anchorsInOrig.requires_grad_(True) # does no effect result
return anchorsInOrig
def prepareHeadDataforLoss(HeadBB,HeadConf):
'''
Prepares prediction head tensors for loss calculation
E.g. If there are 4 BBs for the prediction head,
4 BB positions in delta form are written one under the other, for (x=0, y=0),(x=1, y=0)... of feature grid and returned
Args:
HeadBB : (tensor) Location head of the layer Shape[numofAncBoxesperCell * 4, head width, head height ]
Boxes -> [dcx, dcy, dw, dh ]
HeadConf : (tensor) Confidence head of the layer Shape[numofAncBoxesperCell * 2, head width, head height ]
Confidences -> (p(person), p(background))
Return:
BBs : (tensor) Predicted bounding boxes are written in delta form (dcx, dcy, dw, dh)
shape[numofAncBoxesperCell * head width * head height ,4] -> shape[4096,4] for 32x32 head
CFs : (tensor) Class confidences are written in (p(person), p(background))
shape[#ofPredperFeatureCell * head width * head height ,2] -> shape[4096,2] for 32x32 head
'''
width = HeadBB.shape[1]
height = HeadBB.shape[2]
numOfAnchorBox = int(HeadBB.shape[0]/4)
BBs = torch.zeros([width*height*numOfAnchorBox,4]).to(device)
CFs = torch.zeros([width*height*numOfAnchorBox,2]).to(device)
for i in range(width):
for j in range(height):
for k in range(numOfAnchorBox):
BBs[i*height*numOfAnchorBox + j*numOfAnchorBox + k,:] = HeadBB[k*4:k*4+4,i,j]
CFs[i*height*numOfAnchorBox + j*numOfAnchorBox + k,:] = HeadConf[k*2:k*2+2,i,j]
return BBs, CFs
def prepareHeadDataforLoss_fast(HeadBB,HeadConf):
'''
Same function with prepareHeadDataforLoss(), but blackbox faster implementation.
See details in prepareHeadDataforLoss()
'''
BBs = HeadBB.squeeze(0)
BBs = BBs.permute((1,2,0))
BBs = BBs.contiguous().view(-1,4)
CFs = HeadConf.squeeze(0)
CFs = CFs.permute((1,2,0))
CFs = CFs.contiguous().view(-1,2)
return BBs, CFs
# https://github.com/amdegroot/ssd.pytorch/blob/master/layers/box_utils.py
def point_form(boxes):
""" Convert box in form (cx, cy, w, h) to (xmin, ymin, xmax, ymax)
representation for comparison to point form ground truth data.
Args:
boxes: (tensor) boxes in (cx, cy, w, h) form
Return:
boxes: (tensor) Converted xmin, ymin, xmax, ymax form of boxes.
"""
return torch.cat((boxes[:, :2] - boxes[:, 2:]/2, # xmin, ymin
boxes[:, :2] + boxes[:, 2:]/2), 1) # xmax, ymax
# https://github.com/amdegroot/ssd.pytorch/blob/master/layers/box_utils.py
def intersect(box_a, box_b):
""" We resize both tensors to [A,B,2] without new malloc:
[A,2] -> [A,1,2] -> [A,B,2]
[B,2] -> [1,B,2] -> [A,B,2]
Then we compute the area of intersect between box_a and box_b.
Args:
box_a: (tensor) bounding boxes, Shape: [A,4]. xmin, ymin, xmax, ymax form
box_b: (tensor) bounding boxes, Shape: [B,4].
Return:
(tensor) intersection area, Shape: [A,B].
"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
A = box_a.size(0)
B = box_b.size(0)
box_a = box_a.to(device)
box_b = box_b.to(device)
max_xy = torch.min(box_a[:, 2:].unsqueeze(1).expand(A, B, 2),
box_b[:, 2:].unsqueeze(0).expand(A, B, 2))
min_xy = torch.max(box_a[:, :2].unsqueeze(1).expand(A, B, 2),
box_b[:, :2].unsqueeze(0).expand(A, B, 2))
inter = torch.clamp((max_xy - min_xy), min=0)
return inter[:, :, 0] * inter[:, :, 1]
def jaccard(box_a, box_b):
"""Compute the jaccard overlap of two sets of boxes. The jaccard overlap
is simply the intersection over union of two boxes. Here we operate on
ground truth boxes and default boxes.
E.g.:
A ∩ B / A ∪ B = A ∩ B / (area(A) + area(B) - A ∩ B)
Args:
box_a: (tensor) Ground truth bounding boxes, Shape: [num_objects,4]
box_b: (tensor) Prior boxes from priorbox layers, Shape: [num_priors,4]
Return:
jaccard overlap: (tensor) Shape: [box_a.size(0), box_b.size(0)]
"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
inter = intersect(box_a, box_b) # boxes are in the form of xmin, ymin, xmax, ymax
area_a = ((box_a[:, 2]-box_a[:, 0]) *
(box_a[:, 3]-box_a[:, 1])).unsqueeze(1).expand_as(inter) # [A,B]
area_b = ((box_b[:, 2]-box_b[:, 0]) *
(box_b[:, 3]-box_b[:, 1])).unsqueeze(0).expand_as(inter) # [A,B]
area_a = area_a.to(device)
area_b = area_b.to(device)
union = area_a + area_b - inter
return inter / union # [A,B]
def collate_fn(batch):
"""
Custom collate function.
Need to create own collate_fn Function for COCO.
Merges a list of samples to form a mini-batch of Tensor(s).
Used when using batched loading from a map-style dataset.
"""
return zip(*batch)
def sampleRandomPicsFromCOCO_old(train_loader, numtoPlot = 10, PictureSize = 512):
'''
This function is used to sample random pictures from COCO dataset
Args:
numtoPlot : number of random pictures to plot from dataset
Return:
SelectedPics : (tensor) size[numtoPlot, 3, PictureSize, PictureSize]
SelectedTargets: list[(tensor)] list of bounding boxes in COCO format for each picture
'''
import random
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
numofbatches = len(train_loader)
batchsize = train_loader.batch_size
randomBatches = random.sample(range(0, numofbatches), numtoPlot)
selectedTargets = []
selectedPics = torch.zeros((numtoPlot,3,PictureSize,PictureSize)).to(device)
dataloader_iterator = iter(train_loader)
i = 0
batchnum = 0
while batchnum < numofbatches:
# print(batchnum)
if batchnum in randomBatches:
data = next(dataloader_iterator)
picnum = random.randrange(0, batchsize, 1)
randomBatches.remove(batchnum)
imageBatch, targetBatch, picNum = data
image = imageBatch[picnum].unsqueeze(0).clone().to(device)
target = targetBatch[picnum].clone().to(device)
selectedPics[i,:,:,:] = image
selectedTargets.append(target)
i += 1
else:
next(dataloader_iterator)
batchnum += 1
if not randomBatches:
break
return selectedPics, selectedTargets
def sampleRandomPicsFromCOCO(dataset, numtoPick = 10, pickSame = False):
'''
This function is used to sample random pictures from a COCO type dataset
Args:
dataset: dataset to be sampled
numtoPick : number of random pictures to pick from dataset
pickSame: if it is set to true,
Return:
SelectedPics : (tensor) size[numtoPlot, 3, PictureSize, PictureSize]
SelectedTargets: list[(tensor)] list of bounding boxes in COCO format for each picture
'''
if pickSame:
random.seed(1234)
else:
pass
random_indices = random.sample(range(len(dataset)), numtoPick)
rand_sampler = torch.utils.data.SubsetRandomSampler(random_indices)
loader = torch.utils.data.DataLoader(dataset,
sampler=rand_sampler,
batch_size=1,
collate_fn=collate_fn,
drop_last=False)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
selectedTargets = []
selectedPics = torch.zeros((numtoPick, 3, 512, 512)).to(device)
picIds = []
for i, data in enumerate(loader):
imageBatch, targetBatch, picNum = data
image = imageBatch[0].unsqueeze(0).to(device)
target = targetBatch[0].to(device)
selectedPics[i,:,:,:] = image
selectedTargets.append(target)
picIds.append(picNum[0])
return selectedPics, selectedTargets, picIds
def saveOutputs(pictures, picIds, targets, preds, anchors_head1, anchors_head2,
savefolderName='UnconstFPT',
nmsIoUTreshold = 0.45, predConfPlotTreshold = 0.6, figsize=(8,8)):
'''
Saves pictures,ground truths and model predictions under specified folder
'''
predsPostProcess = PredsPostProcess(512, anchors_head1, anchors_head2)
image_width = pictures.shape[2]
image_height = pictures.shape[3]
BBs1 = preds[0].clone()
CFs1 = preds[1].clone()
BBs2 = preds[2].clone()
CFs2 = preds[3].clone()
for imgNum in tqdm(range(0,pictures.shape[0])):
img = pictures[imgNum,:,:,:].clone()
target = targets[imgNum].clone()
pred = (BBs1[imgNum,:,:,:].unsqueeze(0), CFs1[imgNum,:,:,:].unsqueeze(0),
BBs2[imgNum,:,:,:].unsqueeze(0), CFs2[imgNum,:,:,:].unsqueeze(0))
id = picIds[imgNum]
absolute_boxes,person_cls = predsPostProcess.getPredsInOriginal(pred)
plot_image_mnv2_2xSSDlite(img, pred_person_cls = person_cls, pred_absolute_boxes = absolute_boxes, color = 'r'
,nmsIoUTreshold = nmsIoUTreshold, predConfPlotTreshold = predConfPlotTreshold,
target=target, figsize=figsize,
saveFig=True, imageID= id, folderName = savefolderName)
class PredsPostProcess:
'''
Class to convert mobilenet SSD heads to real image coordinates in form [xmin, ymin, xmax, ymax]
'''
def __init__(self, image_width, anchors_head1, anchors_head2):
Head1AnchorsForLoss = generateAnchorsInOrigImage(anchors_head1,headgridSize=32,originalPicSize=image_width)
Head2AnchorsForLoss = generateAnchorsInOrigImage(anchors_head2,headgridSize=16,originalPicSize=image_width)
AnchorsFlatten_wh = torch.cat((Head1AnchorsForLoss,Head2AnchorsForLoss),0) # shape[32x32x4+16x16x5, 4]
# boxes in form[cx, cy, w, h]
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
AnchorsFlatten_wh = AnchorsFlatten_wh.to(device)
self.AnchorsFlatten_wh = AnchorsFlatten_wh
self.softmax_fcn = torch.nn.Softmax(dim=1).to(device)
self.var_x = 0.1
self.var_y = 0.1
self.var_w = 0.2
self.var_h = 0.2
def getPredsInOriginal(self,preds):
'''
Args:
preds: Prediction heads, i.e output of mobilenet model()
Return:
absolute_boxes: 32 * 32 *4 + 16 * 16 * 5 = 5376 pred BB's in form [imagenum, xmin, ymin, xmax, ymax]
(tensor) [5376, 5]
person cls: Person classification heads, (tensor) [5376,1]
'''
AnchorsFlatten_wh = self.AnchorsFlatten_wh
BBhires, CFhires = prepareHeadDataforLoss_fast(preds[0].data,preds[1].data)
BBlores, CFlores = prepareHeadDataforLoss_fast(preds[2].data,preds[3].data)
cls = torch.cat(( CFhires, CFlores))
cls = self.softmax_fcn(cls)
person_cls =cls[:,0]
delta_boxes_wh = torch.cat(( BBhires, BBlores))
pred_cx = delta_boxes_wh[:,0]*self.var_x*self.AnchorsFlatten_wh[:,2] + self.AnchorsFlatten_wh[:,0]
pred_cy = delta_boxes_wh[:,1]*self.var_y*self.AnchorsFlatten_wh[:,3] + self.AnchorsFlatten_wh[:,1]
pred_w = (delta_boxes_wh[:,2]*self.var_w).exp()*self.AnchorsFlatten_wh[:,2]
pred_h = (delta_boxes_wh[:,3]*self.var_h).exp()*self.AnchorsFlatten_wh[:,3]
absolute_xleft = pred_cx - pred_w/2
absolute_ytop = pred_cy - pred_h/2
absolute_xright = pred_cx + pred_w/2
absolute_ybottom = pred_cy + pred_h/2
absolute_boxes = torch.cat((absolute_xleft.view(-1,1), absolute_ytop.view(-1,1), absolute_xright.view(-1,1), absolute_ybottom.view(-1,1)), dim=1)
return absolute_boxes, person_cls
def mAP(cocoGT, cocoDT, imgIDS, catIDS=1, annType="bbox"):
"""
Explanation: This function calculate the mean average precision for given
ground truths and detection results. Default category and
annotation format is set to 'person' and 'bbox' respectively.
This function is based on popular benchmark function "pycocotools"
that is forked 3.3k. Please re-check the iou threshold (parameter iouThrs)
,which is default '.5:.05:.95', before you run the code.
Arguments:
cocoGT(Json File): Annotated orginal valset of COCO.
cocoDT(Json File): Model Results as format ===> [{"image_id":42, "category_id":18, "bbox":[258.15,41.29,348.26,243.78],"score":0.236},
{"image_id":73, "category_id":11, "bbox":[61,22.75,504,609.67], "score":0.318},
...]
imgIDS(list): list of image IDs.
catIDS(list): list of category ids. Default=1 as person.
annType(String): Annotation type, Default=bbox. Can be ['segm','bbox','keypoints'].
Returns:
None: just results as strings in terminal.
######################## More Detailed Guideline ########################
The usage for CocoEval is as follows: #
cocoGt=..., cocoDt=... # load dataset and results #
E = CocoEval(cocoGt,cocoDt); # initialize CocoEval object #
E.params.recThrs = ...; # set parameters as desired #
E.evaluate(); # run per image evaluation #
E.accumulate(); # accumulate per image results #
E.summarize(); # display summary metrics of results #
#########################################################################
The evaluation parameters are as follows (defaults in brackets): #
imgIds - [all] N img ids to use for evaluation #
catIds - [all] K cat ids to use for evaluation #
iouThrs - [.5:.05:.95] T=10 IoU thresholds for evaluation #
recThrs - [0:.01:1] R=101 recall thresholds for evaluation #
areaRng - [...] A=4 object area ranges for evaluation #
maxDets - [1 10 100] M=3 thresholds on max detections per image #
iouType - ['segm'] set iouType to 'segm', 'bbox' or 'keypoints' #
iouType replaced the now DEPRECATED useSegm parameter. #
useCats - [1] if true use category labels for evaluation #
Note: if useCats=0 category labels are ignored as in proposal scoring. #
Note: multiple areaRngs [Ax2] and maxDets [Mx1] can be specified. #
#########################################################################
evaluate(): evaluates detections on every image and every category and #
concats the results into the "evalImgs" with fields: #
dtIds - [1xD] id for each of the D detections (dt) #
gtIds - [1xG] id for each of the G ground truths (gt) #
dtMatches - [TxD] matching gt id at each IoU or 0 #
gtMatches - [TxG] matching dt id at each IoU or 0 #
dtScores - [1xD] confidence of each dt #
gtIgnore - [1xG] ignore flag for each gt #
dtIgnore - [TxD] ignore flag for each dt at each IoU #
#########################################################################
accumulate(): accumulates the per-image, per-category evaluation #
results in "evalImgs" into the dictionary "eval" with fields: #
params - parameters used for evaluation #
date - date evaluation was performed #
counts - [T,R,K,A,M] parameter dimensions (see above) #
precision - [TxRxKxAxM] precision for every evaluation setting #
recall - [TxKxAxM] max recall for every evaluation setting #
Note: precision and recall==-1 for settings with no gt objects. #
#########################################################################
***For more details of COCOeval please check: https://github.com/cocodataset/cocoapi/blob/master/PythonAPI/pycocotools/cocoeval.py
***If you need an orginal example from API please check: https://github.com/cocodataset/cocoapi/blob/master/PythonAPI/pycocoEvalDemo.ipynb
"""
cocoEval = COCOeval(cocoGT,cocoDT,annType)
cocoEval.params.imgIds = imgIDS
cocoEval.params.catIds = catIDS
cocoEval.evaluate()
cocoEval.accumulate()
cocoEval.summarize()
def round_floats(o):
'''
Used to round floats before writing to json form
'''
if isinstance(o, float):
return round(o, 3)
if isinstance(o, dict):
return {k: round_floats(v) for k, v in o.items()}
if isinstance(o, (list, tuple)):
return [round_floats(x) for x in o]
return o
def get_FPnum_per_image(bbox, GT_bbox, min_IoU = 0.5):
''' Founds the number of False Positives by assocating detection BB's to GT BBs
Arguments:
-------------
bbox : list
N x 4 list of detection bounding boxes in xmin, ymin, w, h form
GT_bbox : list
N x 4 list of ground truth bounding boxes in xmin, ymin, w, h form
min_IoU : float [0,1]
Treshold of intersection of union to evaluate detection and GT to be matched, if IoU of Det and GT is below
this value they are automatically marked as unmatched
'''
bbox = torch.tensor(bbox)
# Convert x,y,w,h -> xmin, ymin, xmax, ymax
bbox[:,2] = bbox[:,0] + bbox[:,2]
bbox[:,3] = bbox[:,1] + bbox[:,3]
GT_bbox[:,2] = GT_bbox[:,0] + GT_bbox[:,2]
GT_bbox[:,3] = GT_bbox[:,1] + GT_bbox[:,3]
IoUscore = jaccard(GT_bbox, bbox)
num_det = IoUscore.shape[1]
num_TP = 0
GT_indexes = [x for x in range(IoUscore.shape[0])]
# all detections
for det_idx in range(IoUscore.shape[1]):
max_IoU = min_IoU
max_IoU_gt_id = None
# all remained unmatched GTs
for i, gt_idx in enumerate(GT_indexes):
currentIoU = IoUscore[gt_idx, det_idx]
if currentIoU > max_IoU:
max_IoU = currentIoU
max_IoU_gt_id = i
if max_IoU_gt_id is not None:
del GT_indexes[max_IoU_gt_id] # Remove GT from unmatcheds list
num_TP += 1
if len(GT_indexes) == 0:
break
FP_count_image = num_det - num_TP
return FP_count_image
def calculatemAP(model, test_loader,cocoGT, ANCHORS_HEAD1, ANCHORS_HEAD2 , PredMinConfTreshold=0.7 ,
nmsIoUTreshold = 0.5, mAPOnlyFirstBatch= False, calculate_FP_ratio=False, hardware_mode = False):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
t1 = time.time()
print('mAP calculation started...')
predsPostProcess = PredsPostProcess(512, ANCHORS_HEAD1, ANCHORS_HEAD2)
dataDictList =[]
imgIDS = []
model.eval()
total_GT_count = 0
total_FP_count = 0
with torch.no_grad():
for i, data in enumerate(tqdm(test_loader)):
imageBatch, targetBatch , idxBatch = data
imageStack = torch.stack(imageBatch).detach().to(device)
predBatch = model(imageStack)
# Outputs are in [-128, 127] in hw mode
if hardware_mode:
BBs1 = predBatch[0].detach() / 128.0
CFs1 = predBatch[1].detach() / 128.0
BBs2 = predBatch[2].detach() / 128.0
CFs2 = predBatch[3].detach() / 128.0
else:
BBs1 = predBatch[0].detach()
CFs1 = predBatch[1].detach()
BBs2 = predBatch[2].detach()
CFs2 = predBatch[3].detach()
for imgNum in range(imageStack.shape[0]):
img = imageStack[imgNum,:,:,:]
target = targetBatch[imgNum]
image_id = int(idxBatch[imgNum])
imgIDS.append(image_id)
pred = (BBs1[imgNum,:,:,:].unsqueeze(0), CFs1[imgNum,:,:,:].unsqueeze(0),
BBs2[imgNum,:,:,:].unsqueeze(0), CFs2[imgNum,:,:,:].unsqueeze(0))
absolute_boxes, person_cls = predsPostProcess.getPredsInOriginal(pred)
confidences = person_cls
boxes = absolute_boxes
nms_picks = torchvision.ops.nms(boxes, confidences, nmsIoUTreshold)
boxes_to_draw = boxes[nms_picks]
confs_to_draw = confidences[nms_picks]
# Predictions filtered by nms and conf tresholding, these will go to mAP
confMask = (confs_to_draw > PredMinConfTreshold)
# Accumulate total GT bounding box number to calculate total False Positive rate
if calculate_FP_ratio and (target.shape[0] != 0):
GT_bbox = target[:,1:]
total_GT_count += GT_bbox.shape[0]
# Inputs to mAP algorithm
if (confMask.any()):
# pred boxes -> [xmin,ymin,xmax,ymax], tensor shape[numpred,4]
bbox = boxes_to_draw[confMask]
# Convert BB to coco annot format -> [xmin,ymin,width, height]
bbox[:,2] = bbox[:,2] - bbox[:,0]
bbox[:,3] = bbox[:,3] - bbox[:,1]
bbox = bbox.tolist() # pred boxes -> [xmin,ymin,xmax,ymax], shape[numpred,4]
score = confs_to_draw[confMask].tolist()
category_id = np.ones_like(score,dtype=int).tolist()
for j in range(len(bbox)):
box = {"image_id":image_id, "category_id":category_id[j], "bbox":bbox[j],"score":score[j]}
dataDictList.append(round_floats(box))
# If detection exists and false positive ratio calculation is enabled
if calculate_FP_ratio:
# Note that scores are already in descending order thanks to nms operation
# No ground truth, all detections are FP
if GT_bbox.shape[0] == 0:
total_FP_count += len(score)
# Find false positives
else:
FP_count_image = get_FPnum_per_image(bbox, GT_bbox, min_IoU=0.5)
total_FP_count += FP_count_image
if mAPOnlyFirstBatch:
break
if (len(dataDictList)):
# Evavluate and Accumulate mAP for remained baches, if any
cocoDT = json.dumps(dataDictList)
# Write detections to .json file
with open('cocoDT.json', 'w') as outfile:
outfile.write(cocoDT)
# Load detections
cocoDT=cocoGT.loadRes('cocoDT.json')
# running evaluation
annType = 'bbox'
cocoEval = COCOeval(cocoGT,cocoDT,annType)
cocoEval.params.catIds = 1
cocoEval.params.imgIds = imgIDS
cocoEval.evaluate()
cocoEval.accumulate()
cocoEval.summarize()
# Print False Positive Statistics
if calculate_FP_ratio:
print()
print('********** False Positive Statistics **********')
print(f'Total GT Boxes: {total_GT_count}, Total FPs Boxes: {total_FP_count}, FP% : {total_FP_count/total_GT_count*100}')
print()
mean_ap = cocoEval.stats[0].item()
mean_recall = cocoEval.stats[8].item()
# Delete detection json file created
os.remove("cocoDT.json")
else:
mean_ap = 0
mean_recall = 0
t2 = time.time()
print(f'mAP done in : {t2-t1} secs')
return mean_ap, mean_recall
def batchNormAdaptation(model, train_loader,numSamples = 100):
'''
BN parameters of intel model is spoiled intentionally/or unintentionaly before publishing.
Batch norm adaptation routine is proposed before any training based on this model.
https://github.com/openvinotoolkit/nncf/blob/develop/docs/compression_algorithms/Quantization.md#batch-norm-statistics-adaptation
#numSamples predictions are made and running mean variance are recalculated for the layers.
'''
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print('')
print('Batch norm adaptation before training started.')
for i, data in enumerate(train_loader):
imageBatch, targetBatch, imgIDs = data
imageStack = torch.stack(imageBatch)
imageStack = imageStack.detach()
imageStack.requires_grad_(False)
imageStack = imageStack.to(device)
predBatch = model(imageStack)
if (i*len(imgIDs) >= numSamples):
return model
# Some functions to be used in training phase
def conv_model_fptunc2fpt(model):
layer_str_arr = [attr_name for attr_name in dir(model) if
isinstance(getattr(model, attr_name), qat_core.layers.shallow_base_layer)]
# Convert layers
for layer in layer_str_arr:
layer_attribute = getattr(model, layer)
layer_attribute.mode_fptunconstrained2fpt('fpt')
setattr(model, layer, layer_attribute)
# Convert add_residual modules.
add_res_attribute = getattr(model, 'add_residual')
add_res_attribute.mode_fptunconstrained2fpt('fpt')
setattr(model, 'add_residual', add_res_attribute)
return model
def conv_model_fpt2qat(model, weight_dictionary, shift_quantile=0.985):
print('Folding BNs and converting to qat mode')
layer_attributes = []
for layer_string in dir(model):
if(layer_string in weight_dictionary):
layer_attribute = getattr(model, layer_string)
if layer_attribute.mode == 'fpt':
print('Folding BN for:', layer_string)
weight_bits=weight_dictionary[layer_string]
print(f'Layer bit is : {weight_bits}')
# For binary weights convert layer in to qat_ap mode
if weight_bits == 1:
print('layer is converted in to qat_ap mode')
layer_attribute.configure_layer_base(weight_bits=2 , bias_bits=8, shift_quantile=shift_quantile)
layer_attribute.mode_fpt2qat('qat_ap');
# convert other layers in to qat mode
else:
print('layer is converted in to qat mode')
layer_attribute.configure_layer_base(weight_bits=weight_bits , bias_bits=8, shift_quantile=shift_quantile)
layer_attribute.mode_fpt2qat('qat');
setattr(model, layer_string, layer_attribute)
print('')
else:
print('To convert model to QAT mode, all layers must be in fpt mode but, ' + layer_string + 'is in' + layer_attribute.mode +' mode. Exiting...')
sys.exit()
add_res_attribute = getattr(model, 'add_residual')
if add_res_attribute.mode == 'fpt':
add_res_attribute.mode_fpt2qat('qat')
setattr(model, 'add_residual', add_res_attribute)
else:
print('To convert model to QAT mode, add_residual modüle must be in fpt mode but, it is in ' + add_res_attribute.mode + ' mode. Exiting...')
sys.exit()
print('********* Converting to qat mode finished *********')
print('')
return model
def conv_model_qat2hw(model):
print('Converting model to eval/hw mode for testing')
layer_str_arr = [attr_name for attr_name in dir(model) if
isinstance(getattr(model, attr_name), qat_core.layers.shallow_base_layer)]
for layer in layer_str_arr:
layer_attribute = getattr(model, layer)
if layer_attribute.mode == 'qat':
layer_attribute.mode_qat2hw('eval')
setattr(model, layer, layer_attribute)
# print(f'{layer} was in qat converted to eval mode')
elif layer_attribute.mode == 'qat_ap':
layer_attribute.mode_qat_ap2hw('eval')
setattr(model, layer, layer_attribute)
# print(f'{layer} was in qat_ap converted to eval mode')
else:
print('To convert model to hw mode, all layers must be in qat or qat_ap mode but, ' + layer_string + 'is in' + layer_attribute.mode +' mode. Exiting...')
sys.exit()
# print('')
model = model.to(model.conv1.op.weight.device.type)
# Convert add residual operation in to eval mode
add_res_attribute = getattr(model, 'add_residual')
if add_res_attribute.mode == 'qat':
add_res_attribute.mode_qat2hw('eval')
setattr(model, 'add_residual', add_res_attribute)
else:
print('To convert model to QAT mode, add_residual modüle must be in qat mode but, it is in ' + add_res_attribute.mode + ' mode. Exiting...')
sys.exit()
print('********* Converting model to eval/hw mode for testing finished *********')
print('')
return model