Caffe DetectionOutput Layer Python Version #27

Posted 2 years ago · mins reading
python
import cv2
import caffe
import numpy as np
import math
import functools
import time
class NormalizedBBox:
def __init__(self, xmin, ymin, xmax, ymax):
self.xmin = xmin
self.ymin = ymin
self.xmax = xmax
self.ymax = ymax
self.size = 0
def BBoxSize(
bbox, # type: NormalizedBBox
normalized=True):
if bbox.xmax < bbox.xmin or bbox.ymax < bbox.ymin:
return 0
width = bbox.xmax - bbox.xmin
height = bbox.ymax - bbox.ymin
if normalized:
return width * height
else:
return (width + 1) * (height + 1)
def ClipBBox(
bbox # type: NormalizedBBox
):
bbox.xmin = max(min(bbox.xmin, 1.), 0.)
bbox.ymin = max(min(bbox.ymin, 1.), 0.)
bbox.xmax = max(min(bbox.xmax, 1.), 0.)
bbox.ymax = max(min(bbox.ymax, 1.), 0.)
bbox.size = BBoxSize(bbox)
# TODO bbox.difficult = difficult
def DecodeBBox(prior_bbox, prior_variance, code_type, variance_encoded_in_target, clip_bbox, bbox, decode_bbox):
if code_type == 0:
if variance_encoded_in_target:
decode_bbox.xmin = prior_bbox.xmin + bbox.xmin
decode_bbox.ymin = prior_bbox.ymin + bbox.ymin
decode_bbox.xmax = prior_bbox.xmax + bbox.xmax
decode_bbox.ymax = prior_bbox.ymax + bbox.ymax
else:
decode_bbox.xmin = prior_bbox.xmin + float(prior_variance[0]) * bbox.xmin
decode_bbox.ymin = prior_bbox.ymin + float(prior_variance[1]) * bbox.ymin
decode_bbox.xmax = prior_bbox.xmax + float(prior_variance[2]) * bbox.xmax
decode_bbox.ymax = prior_bbox.ymax + float(prior_variance[3]) * bbox.ymax
elif code_type == 1:
prior_width = prior_bbox.xmax - prior_bbox.xmin
assert prior_width > 0
prior_height = prior_bbox.ymax - prior_bbox.ymin
assert prior_height > 0
prior_center_x = (prior_bbox.xmin + prior_bbox.xmax) / 2.
prior_center_y = (prior_bbox.ymin + prior_bbox.ymax) / 2.
if variance_encoded_in_target:
decode_bbox_center_x = bbox.xmin * prior_width + prior_center_x
decode_bbox_center_y = bbox.ymin * prior_height + prior_center_y
decode_bbox_width = math.exp(bbox.xmax) * prior_width
decode_bbox_height = math.exp(bbox.ymax) * prior_height
else:
decode_bbox_center_x = float(prior_variance[0]) * bbox.xmin * prior_width + prior_center_x
decode_bbox_center_y = float(prior_variance[1]) * bbox.ymin * prior_height + prior_center_y
decode_bbox_width = math.exp(float(prior_variance[2]) * bbox.xmax) * prior_width
decode_bbox_height = math.exp(float(prior_variance[3]) * bbox.ymax) * prior_height
decode_bbox.xmin = decode_bbox_center_x - decode_bbox_width / 2.
decode_bbox.ymin = decode_bbox_center_y - decode_bbox_height / 2.
decode_bbox.xmax = decode_bbox_center_x + decode_bbox_width / 2.
decode_bbox.ymax = decode_bbox_center_y + decode_bbox_height / 2.
elif code_type == 2:
prior_width = prior_bbox.xmax - prior_bbox.xmin
assert prior_width > 0
prior_height = prior_bbox.ymax - prior_bbox.ymin
assert prior_height > 0
if variance_encoded_in_target:
decode_bbox.xmin = prior_bbox.xmin + bbox.xmin * prior_width
decode_bbox.ymin = prior_bbox.ymin + bbox.ymin * prior_height
decode_bbox.xmax = prior_bbox.xmax + bbox.xmax * prior_width
decode_bbox.ymax = prior_bbox.ymax + bbox.ymax * prior_height
else:
decode_bbox.xmin = prior_bbox.xmin + float(prior_variance[0]) * bbox.xmin * prior_width
decode_bbox.ymin = prior_bbox.ymin + float(prior_variance[1]) * bbox.ymin * prior_height
decode_bbox.xmax = prior_bbox.xmax + float(prior_variance[2]) * bbox.xmax * prior_width
decode_bbox.ymax = prior_bbox.ymax + float(prior_variance[3]) * bbox.ymax * prior_height
else:
print("Unknown LocLossType.")
decode_bbox.size = BBoxSize(decode_bbox)
if clip_bbox:
ClipBBox(decode_bbox)
def first_compare(a, b):
if a[0] > b[0]:
return 1
elif a[0] < b[0]:
return -1
else:
return 0
def GetMaxScoreIndex(scores, threshold, top_k):
score_index_vec = []
for n in range(len(scores)):
if scores[n] > threshold:
score_index_vec.append((scores[n], n))
score_index_vec.sort(key=functools.cmp_to_key(first_compare), reverse=True)
if -1 < top_k < len(score_index_vec):
score_index_vec = score_index_vec[: top_k]
return score_index_vec
def IntersectBBox(bbox1, bbox2, intersect_bbox):
if bbox2.xmin > bbox1.xmax or bbox2.xmax < bbox1.xmin or bbox2.ymin > bbox1.ymax or bbox2.ymax < bbox1.ymin:
intersect_bbox.xmin = 0
intersect_bbox.ymin = 0
intersect_bbox.xmax = 0
intersect_bbox.ymax = 0
else:
intersect_bbox.xmin = max(bbox1.xmin, bbox2.xmin)
intersect_bbox.ymin = max(bbox1.ymin, bbox2.ymin)
intersect_bbox.xmax = min(bbox1.xmax, bbox2.xmax)
intersect_bbox.ymax = min(bbox1.ymax, bbox2.ymax)
def JaccardOverlap(bbox1, bbox2, normalized=True):
intersect_bbox = NormalizedBBox(0., 0., 0., 0.)
IntersectBBox(bbox1, bbox2, intersect_bbox)
if normalized:
intersect_width = intersect_bbox.xmax - intersect_bbox.xmin
intersect_height = intersect_bbox.ymax - intersect_bbox.ymin
else:
intersect_width = intersect_bbox.xmax - intersect_bbox.xmin + 1.
intersect_height = intersect_bbox.ymax - intersect_bbox.ymin + 1.
if intersect_width > 0 and intersect_height > 0:
intersect_size = intersect_width * intersect_height
bbox1_size = BBoxSize(bbox1)
bbox2_size = BBoxSize(bbox2)
return intersect_size / (bbox1_size + bbox2_size - intersect_size)
else:
return 0
def ApplyNMSFast(
bboxes,
scores,
score_threshold,
nms_threshold,
eta,
_top_k,
indices):
assert len(bboxes) == len(scores)
score_index_vec = GetMaxScoreIndex(scores, score_threshold, _top_k)
adaptive_threshold = nms_threshold
for score_index in score_index_vec:
idx = score_index[1]
keep = True
for k in range(len(indices)):
if keep:
kept_idx = indices[k]
overlap = JaccardOverlap(bboxes[idx], bboxes[kept_idx])
keep = overlap <= adaptive_threshold
else:
break
if keep:
indices.append(idx)
if keep and eta < 1 and adaptive_threshold > 0.5:
adaptive_threshold = eta
def DecodeBBoxes(prior_bboxes, prior_variances, code_type, variance_encoded_in_target, clip_bbox, bboxes,
decode_bboxes):
assert len(prior_bboxes) == len(prior_variances)
assert len(prior_bboxes) == len(bboxes)
num_bboxes = len(prior_bboxes)
if num_bboxes >= 1:
assert len(prior_variances[0]) == 4
for n in range(num_bboxes):
decode_bbox = NormalizedBBox(.0, .0, .0, .0)
DecodeBBox(prior_bboxes[n], prior_variances[n], code_type,
variance_encoded_in_target, clip_bbox, bboxes[n], decode_bbox)
decode_bboxes.append(decode_bbox)
def detection_out(mreshape_loc, mbox_conf_flatten, reshape_priorbox):
background_label_id_ = 0
confidence_threshold_ = 0.3
keep_top_k_ = 200
nms_threshold_ = 0.45
top_k_ = 400
eta_ = 1.0
num_classes_ = 81
share_location_ = True
code_type_ = 1 # 0: CORNER, 1:CENTER_SIZE, 2:CORNER_SIZE
variance_encoded_in_target_ = False
confidence_threshold_ = confidence_threshold_ if confidence_threshold_ else 3.40282347e+38
top_k_ = top_k_ if top_k_ else -1
num_priors_ = int(reshape_priorbox.shape[2] / 4)
num_loc_classes_ = 1 if share_location_ else num_classes_
num = mreshape_loc.shape[0]
# Retrieve all location predictions.
loc_data = mreshape_loc.flatten()
num_preds_per_class = num_priors_
num_loc_classes = num_loc_classes_
share_location = share_location_
# GetLocPredictions Retrieve all location predictions.
all_loc_preds = []
for n in range(num):
label_bbox = {}
offset = num_preds_per_class * num_loc_classes * 4 * n
for p in range(num_preds_per_class):
start_idx = offset + p * num_loc_classes * 4
for c in range(num_loc_classes):
label = -1 if share_location else c
if label not in label_bbox:
label_bbox[label] = {}
label_bbox[label][p] = NormalizedBBox(
loc_data[start_idx + c * 4],
loc_data[start_idx + c * 4 + 1],
loc_data[start_idx + c * 4 + 2],
loc_data[start_idx + c * 4 + 3]
)
all_loc_preds.append(label_bbox)
# GetConfidenceScores Retrieve all confidences.
all_conf_scores = []
conf_data = mbox_conf_flatten.flatten()
num_classes = num_classes_
for n in range(num):
offset = num_preds_per_class * num_classes * n
label_scores = {}
for p in range(num_preds_per_class):
start_idx = offset + p * num_classes
for c in range(num_classes):
if c not in label_scores:
label_scores[c] = []
label_scores[c].append(conf_data[start_idx + c])
all_conf_scores.append(label_scores)
# GetPriorBBoxes
# Retrieve all prior bboxes. It is same within a batch since we assume all
# images in a batch are of same dimension.
prior_bboxes = []
prior_variances = []
prior_data = reshape_priorbox.flatten()
num_priors = num_priors_
for n in range(num_priors):
start_idx = n * 4
bbox = NormalizedBBox(
float(prior_data[start_idx]),
float(prior_data[start_idx + 1]),
float(prior_data[start_idx + 2]),
float(prior_data[start_idx + 3]),
)
bbox_size = BBoxSize(bbox)
bbox.size = bbox_size
prior_bboxes.append(bbox)
for n in range(num_priors):
start_idx = (num_priors + n) * 4
var = []
for j in range(4):
var.append(prior_data[start_idx + j])
prior_variances.append(var)
# DecodeBBoxesAll
all_decode_bboxes = []
clip_bbox = False
all_loc_preds = all_loc_preds
prior_bboxes = prior_bboxes
prior_variances = prior_variances
share_location = share_location_
num_loc_classes = num_loc_classes_
background_label_id = background_label_id_
code_type = code_type_
variance_encoded_in_target = variance_encoded_in_target_
clip = clip_bbox
assert (len(all_loc_preds) == num)
for n in range(num):
decode_bboxes = {}
for c in range(num_loc_classes):
label = -1 if share_location else c
if label == background_label_id:
continue
if label not in all_loc_preds[n]:
print("Could not find location predictions for label ", label)
decode_bboxes[label] = []
label_loc_preds = all_loc_preds[n][label]
DecodeBBoxes(prior_bboxes, prior_variances,
code_type, variance_encoded_in_target, clip,
label_loc_preds, decode_bboxes[label])
all_decode_bboxes.append(decode_bboxes)
num_kept = 0
all_indices = []
for n in range(num):
decode_bboxes = all_decode_bboxes[n]
conf_scores = all_conf_scores[n]
num_det = 0
indices = {}
for c in range(num_classes_):
if c == background_label_id_:
continue
if c not in conf_scores:
print("Could not find confidence predictions for label ", c)
if c not in indices:
indices[c] = []
scores = conf_scores[c]
label = -1 if share_location_ else c
if label not in decode_bboxes:
print("Could not find location predictions for label ", label)
continue
bboxes = decode_bboxes[label]
ApplyNMSFast(bboxes, scores, confidence_threshold_, nms_threshold_, eta_,
top_k_, indices[c])
num_det += len(indices[c])
if keep_top_k_ > -1 and num_det < keep_top_k_:
score_index_pairs = []
for label in indices:
label_indices = indices[label]
if label not in conf_scores:
print("Could not find location predictions for ", label)
continue
scores = conf_scores[label]
for j in range(len(label_indices)):
idx = label_indices[j]
assert idx < len(scores)
score_index_pairs.append((scores[idx], (label, idx)))
score_index_pairs.sort(key=functools.cmp_to_key(first_compare), reverse=True)
score_index_pairs = score_index_pairs[: top_k_]
new_indices = {}
for j in range(len(score_index_pairs)):
label = score_index_pairs[j][1][0]
idx = score_index_pairs[j][1][1]
if label not in new_indices:
new_indices[label] = []
new_indices[label].append(idx)
all_indices.append(new_indices)
num_kept += keep_top_k_
else:
all_indices.append(indices)
num_kept += num_det
top_shape = [1, 1, num_kept, 7]
top_data = np.zeros(tuple(top_shape), dtype='float32')
if num_kept == 0:
print("Couldn't find any detections")
top_shape[2] = num
top_data.reshape(tuple(top_shape))
top_data.fill(-1)
for n in range(num):
top_data[0 + n * 7] = n
top_data_shape = top_data.shape
top_data_flatten = top_data.flatten()
keep_count = 0
count = 0
for n in range(num):
conf_scores = all_conf_scores[n]
decode_bboxes = all_decode_bboxes[n]
print(all_indices[n])
for label in all_indices[n]:
if label not in conf_scores:
print("Could not find confidence predictions for ", label)
continue
scores = conf_scores[label]
loc_label = -1 if share_location_ else label
if loc_label not in decode_bboxes:
print("Could not find location predictions for ", loc_label)
continue
bboxes = decode_bboxes[loc_label]
indices = all_indices[n][label]
keep_count += len(indices)
for j in range(len(indices)):
idx = indices[j]
top_data_flatten[count * 7] = n
top_data_flatten[count * 7 + 1] = label
top_data_flatten[count * 7 + 2] = scores[idx]
bbox = bboxes[idx]
top_data_flatten[count * 7 + 3] = bbox.xmin
top_data_flatten[count * 7 + 4] = bbox.ymin
top_data_flatten[count * 7 + 5] = bbox.xmax
top_data_flatten[count * 7 + 6] = bbox.ymax
count += 1
top_data = top_data_flatten.reshape(top_data_shape)
return top_data, keep_count
if __name__ == '__main__':
coco_net = caffe.Net('VGG_coco_SSD_512x512_iter_360000_without_detection_output.prototxt', # Cut Detection Output Layer
'VGG_coco_SSD_512x512_iter_360000_without_detection_output.caffemodel', caffe.TEST)
image_path = 'ssd.jpg'
img = cv2.imread(image_path)
resized_img = cv2.resize(img, (512, 512))
input_data = np.asarray(resized_img)
input_data = input_data.transpose((2, 0, 1))
input_data = input_data.reshape(1, 3, 512, 512)
input_data = input_data.astype('float32')
blue_mean = 104.0
green_mean = 117.0
red_mean = 123.0
input_data[:, 0] -= blue_mean
input_data[:, 1] -= green_mean
input_data[:, 2] -= red_mean
coco_net.blobs['data'].data[...] = input_data
output = coco_net.forward()
mreshape_loc = output['mreshape_loc']
mbox_conf_flatten = output['mbox_conf_flatten']
reshape_priorbox = output['reshape_priorbox']
start = time.time()
detections, keep_count = detection_out(mreshape_loc, mbox_conf_flatten, reshape_priorbox)
end = time.time()
print("{:.2f}".format(end - start))
# parse the output
det_label = detections[0, 0, :, 1]
det_conf = detections[0, 0, :, 2]
det_xmin = detections[0, 0, :, 3]
det_ymin = detections[0, 0, :, 4]
det_xmax = detections[0, 0, :, 5]
det_ymax = detections[0, 0, :, 6]
# get topN detections
top_k = keep_count
topk_indexes = det_conf.argsort()[::-1][:top_k]
top_conf = det_conf[topk_indexes]
top_label_indexes = det_label[topk_indexes]
top_xmin = det_xmin[topk_indexes]
top_ymin = det_ymin[topk_indexes]
top_xmax = det_xmax[topk_indexes]
top_ymax = det_ymax[topk_indexes]
for i in range(top_conf.shape[0]):
label = int(round(det_label[i]))
xmin = int(round(top_xmin[i] * 512.0))
ymin = int(round(top_ymin[i] * 512.0))
xmax = int(round(top_xmax[i] * 512.0))
ymax = int(round(top_ymax[i] * 512.0))
score = top_conf[i]
print(label, score, xmin, ymin, xmax, ymax)
cv2.rectangle(resized_img, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
cv2.imwrite("ssd_out_cut.jpg", resized_img)