advclip/utils/calc_utils.py

367 lines
12 KiB
Python

import torch
import numpy as np
from tqdm import tqdm
def calc_hammingDist(B1, B2):
q = B2.shape[1]
if len(B1.shape) < 2:
B1 = B1.unsqueeze(0)
distH = 0.5 * (q - B1.mm(B2.transpose(0, 1)))
return distH
def calc_cosineDist(B1, B2):
q = B2.shape[1]
if len(B1.shape) < 2:
B1 = B1.unsqueeze(0)
dot_product = B1.mm(B2.transpose(0, 1))
norms = torch.sqrt(torch.einsum('ii->i', dot_product))
distH = 0.5 * (q - B1.mm(B2.transpose(0, 1)))
return distH
def calc_map_k_matrix(qB, rB, query_L, retrieval_L, k=None, rank=0):
num_query = query_L.shape[0]
if qB.is_cuda:
qB = qB.cpu()
rB = rB.cpu()
map = 0
if k is None:
k = retrieval_L.shape[0]
gnds = (query_L.mm(retrieval_L.transpose(0, 1)) > 0).squeeze().type(torch.float32)
tsums = torch.sum(gnds, dim=-1, keepdim=True, dtype=torch.int32)
hamms = calc_hammingDist(qB, rB)
_, ind = torch.sort(hamms, dim=-1)
totals = torch.min(tsums, torch.tensor([k], dtype=torch.int32).expand_as(tsums))
for iter in range(num_query):
gnd = gnds[iter][ind[iter]]
total = totals[iter].squeeze()
count = torch.arange(1, total + 1).type(torch.float32)
tindex = torch.nonzero(gnd)[:total].squeeze().type(torch.float32) + 1.0
map = map + torch.mean(count / tindex)
map = map / num_query
return map
def calc_map_k(qB, rB, query_L, retrieval_L, k=None, rank=0):
num_query = query_L.shape[0]
qB = torch.sign(qB)
rB = torch.sign(rB)
map = 0
if k is None:
k = retrieval_L.shape[0]
for iter in range(num_query):
q_L = query_L[iter]
if len(q_L.shape) < 2:
q_L = q_L.unsqueeze(0) # [1, hash length]
gnd = (q_L.mm(retrieval_L.transpose(0, 1)) > 0).squeeze().type(torch.float32)
tsum = torch.sum(gnd)
if tsum == 0:
continue
hamm = calc_hammingDist(qB[iter, :], rB)
_, ind = torch.sort(hamm)
ind.squeeze_()
gnd = gnd[ind]
total = min(k, int(tsum))
count = torch.arange(1, total + 1).type(torch.float32)
tindex = torch.nonzero(gnd)[:total].squeeze().type(torch.float32) + 1.0
if tindex.is_cuda:
count = count.to(rank)
map = map + torch.mean(count / tindex)
map = map / num_query
return map
def calc_precisions_topn_matrix(qB, rB, query_L, retrieval_L, recall_gas=0.02, num_retrieval=10000):
if not isinstance(qB, torch.Tensor):
qB = torch.from_numpy(qB)
rB = torch.from_numpy(rB)
query_L = torch.from_numpy(query_L)
retrieval_L = torch.from_numpy(retrieval_L)
qB = qB.float()
rB = rB.float()
qB = torch.sign(qB - 0.5)
rB = torch.sign(rB - 0.5)
if qB.is_cuda:
qB = qB.cpu()
rB = rB.cpu()
num_query = query_L.shape[0]
# num_retrieval = retrieval_L.shape[0]
precisions = [0] * int(1 / recall_gas)
gnds = (query_L.mm(retrieval_L.transpose(0, 1)) > 0).squeeze().type(torch.float32)
hamms = calc_hammingDist(qB, rB)
_, inds = torch.sort(hamms, dim=-1)
for iter in range(num_query):
gnd = gnds[iter]
ind = inds[iter]
gnd = gnd[ind]
for i, recall in enumerate(np.arange(recall_gas, 1 + recall_gas, recall_gas)):
total = int(num_retrieval * recall)
right = torch.nonzero(gnd[: total]).squeeze().numpy()
right_num = right.size
precisions[i] += (right_num/total)
for i in range(len(precisions)):
precisions[i] /= num_query
return precisions
def calc_precisions_topn(qB, rB, query_L, retrieval_L, recall_gas=0.02, num_retrieval=10000):
qB = qB.float()
rB = rB.float()
qB = torch.sign(qB - 0.5)
rB = torch.sign(rB - 0.5)
num_query = query_L.shape[0]
# num_retrieval = retrieval_L.shape[0]
precisions = [0] * int(1 / recall_gas)
for iter in range(num_query):
q_L = query_L[iter]
if len(q_L.shape) < 2:
q_L = q_L.unsqueeze(0) # [1, hash length]
gnd = (q_L.mm(retrieval_L.transpose(0, 1)) > 0).squeeze().type(torch.float32)
hamm = calc_hammingDist(qB[iter, :], rB)
_, ind = torch.sort(hamm)
ind.squeeze_()
gnd = gnd[ind]
for i, recall in enumerate(np.arange(recall_gas, 1 + recall_gas, recall_gas)):
total = int(num_retrieval * recall)
right = torch.nonzero(gnd[: total]).squeeze().numpy()
# right_num = torch.nonzero(gnd[: total]).squeeze().shape[0]
right_num = right.size
precisions[i] += (right_num/total)
for i in range(len(precisions)):
precisions[i] /= num_query
return precisions
def calc_precisions_hash(qB, rB, query_L, retrieval_L):
qB = qB.float()
rB = rB.float()
qB = torch.sign(qB - 0.5)
rB = torch.sign(rB - 0.5)
num_query = query_L.shape[0]
num_retrieval = retrieval_L.shape[0]
bit = qB.shape[1]
hamm = calc_hammingDist(qB, rB)
hamm = hamm.type(torch.ByteTensor)
total_num = [0] * (bit + 1)
max_hamm = int(torch.max(hamm))
gnd = (query_L.mm(retrieval_L.transpose(0, 1)) > 0).squeeze()
total_right = torch.sum(torch.matmul(query_L, retrieval_L.t())>0)
precisions = np.zeros([max_hamm + 1])
recalls = np.zeros([max_hamm + 1])
# _, index = torch.sort(hamm)
# del _
# for i in range(index.shape[0]):
# gnd[i, :] = gnd[i, index[i]]
# del index
right_num = 0
recall_num = 0
for i, radius in enumerate(range(0, max_hamm+1)):
recall = torch.nonzero(hamm == radius)
right = gnd[recall.split(1, dim=1)]
recall_num += recall.shape[0]
del recall
right_num += torch.nonzero(right).shape[0]
del right
precisions[i] += (right_num / (recall_num + 1e-8))
# recalls[i] += (recall_num / num_retrieval / num_query)
recalls[i] += (recall_num / total_right)
return precisions, recalls
def calc_precisions_hamming_radius(qB, rB, query_L, retrieval_L, hamming_gas=1):
num_query = query_L.shape[0]
bit = qB.shape[1]
precisions = [0] * int(bit / hamming_gas)
for iter in range(num_query):
q_L = query_L[iter]
if len(q_L.shape) < 2:
q_L = q_L.unsqueeze(0) # [1, hash length]
gnd = (q_L.mm(retrieval_L.transpose(0, 1)) > 0).squeeze().type(torch.float32)
hamm = calc_hammingDist(qB[iter, :], rB)
_, ind = torch.sort(hamm)
ind.squeeze_()
gnd = gnd[ind]
for i, recall in enumerate(np.arange(1, bit+1, hamming_gas)):
total = torch.nonzero(hamm <= recall).squeeze().shape[0]
if total == 0:
precisions[i] += 0
continue
right = torch.nonzero(gnd[: total]).squeeze().numpy()
right_num = right.size
precisions[i] += (right_num / total)
for i in range(len(precisions)):
precisions[i] /= num_query
return precisions
def calc_neighbor(label1, label2):
# calculate the similar matrix
Sim = label1.matmul(label2.transpose(0, 1)) > 0
return Sim.float()
def norm_max_min(x: torch.Tensor, dim=None):
if dim is None:
max = torch.max(x)
min = torch.min(x)
if dim is not None:
max = torch.max(x, dim=dim)[0]
min = torch.min(x, dim=dim)[0]
if dim > 0:
max = max.unsqueeze(len(x.shape) - 1)
min = min.unsqueeze(len(x.shape) - 1)
norm = (x - min) / (max - min)
return norm
def norm_mean(x: torch.Tensor, dim=None):
if dim is None:
mean = torch.mean(x)
std = torch.std(x)
if dim is not None:
mean = torch.mean(x, dim=dim)
std = torch.std(x, dim=dim)
if dim > 0:
mean = mean.unsqueeze(len(x.shape) - 1)
std = std.unsqueeze(len(x.shape) - 1)
norm = (x - mean) / std
return norm
def norm_abs_mean(x: torch.Tensor, dim=None):
if dim is None:
mean = torch.mean(x)
std = torch.std(x)
if dim is not None:
mean = torch.mean(x, dim=dim)
std = torch.std(x, dim=dim)
if dim > 0:
mean = mean.unsqueeze(len(x.shape) - 1)
std = std.unsqueeze(len(x.shape) - 1)
norm = torch.abs(x - mean) / std
return norm
def factorial(n):
if n == 0:
return 1
else:
return n * factorial(n - 1)
def calc_IF(all_bow):
word_num = torch.sum(all_bow, dim=0)
total_num = torch.sum(word_num)
IF = word_num / total_num
return IF
def cal_cosine_dis(f1, f2):
f1_norm = np.linalg.norm(f1)
f2_norm = np.linalg.norm(f2, axis=1)
similiarity = np.dot(f1, f2.T)/(f1_norm * f2_norm)
return 1 - similiarity
def cal_hamming_dis(b1, b2):
k = b2.shape[1] # length of hash code
dis = 0.5 * (k - np.dot(b1, b2.transpose()))
return dis
def cal_map(query_feats, query_label, retrieval_feats, retrieval_label, top_k=5, dist_method='cosine'):
"""
Calculate MAP (Mean Average Precision)
:param query_binary: binary code of query sample
:param query_label: label of qurey sample
:param retrieval_binary: binary code of database
:param retrieval_label: label of database
:param top_k:
:return:
"""
query_number = query_label.shape[0]
top_k_map = 0
dist_func = cal_hamming_dis if dist_method == 'hamming' else cal_cosine_dis
for query_index in range(query_number):
# (1, N)
ground_truth = (np.dot(query_label[query_index, :], retrieval_label.transpose()) > 0).astype(np.float32)
hamming_dis = dist_func(query_feats[query_index, :], retrieval_feats) # (1, N)
# sort hamming distance
sort_index = np.argsort(hamming_dis)
# resort ground truth
ground_truth = ground_truth[sort_index]
# get top K ground truth
top_k_gnd = ground_truth[0:top_k]
top_k_sum = np.sum(top_k_gnd).astype(int) # the number of correct retrieval in top K
if top_k_sum == 0:
continue
count = np.linspace(1, top_k_sum, int(top_k_sum))
top_k_index = np.asarray(np.where(top_k_gnd == 1)) + 1.0
top_k_map += np.mean(count / top_k_index) # average precision of per class
return top_k_map / query_number # mean of average precision of all class
def cal_pr(retrieval_binary, query_binary, retrieval_label, query_label, interval=0.1):
r_arr = np.array([i * interval for i in range(1, int(1/interval) + 1)])
p_arr = np.zeros(len(r_arr))
query_number = query_label.shape[0]
for query_index in range(query_number):
ground_truth = (np.dot(query_label[query_index, :], retrieval_label.transpose()) > 0).astype(
np.float32) # (1, N)
hamming_dis = cal_hamming_dis(query_binary[query_index, :], retrieval_binary) # (1, N)
# sort hamming distance
sort_index = np.argsort(hamming_dis)
ground_truth = ground_truth[sort_index]
tp_num = len(np.where(ground_truth == 1)[0])
r_num_arr = (tp_num * r_arr).astype(np.int32)
tp_cum = np.cumsum(ground_truth)
total_num_arr = np.array([np.where(tp_cum == i)[0][0] + 1 for i in r_num_arr])
p_arr += r_num_arr/total_num_arr
p_arr /= query_number
return np.array(list(zip(r_arr, p_arr)))
def cal_top_n(retrieval_binary, query_binary, retrieval_label, query_label, top_n=None):
if top_n is None:
top_n = range(100, 1001, 100)
top_n = np.array(top_n)
top_n_p = np.zeros(len(top_n))
query_number = query_label.shape[0]
for query_index in range(query_number):
ground_truth = (np.dot(query_label[query_index, :], retrieval_label.transpose()) > 0).astype(
np.float32) # (1, N)
hamming_dis = cal_hamming_dis(query_binary[query_index, :], retrieval_binary) # (1, N)
# sort hamming distance
sort_index = np.argsort(hamming_dis)
ground_truth = ground_truth[sort_index]
ground_truth = ground_truth[:top_n[-1]]
tp_cum = np.cumsum(ground_truth)
tp_num_arr = tp_cum[top_n - 1]
top_n_p += tp_num_arr/top_n
top_n_p /= query_number
return np.array(list(zip(top_n, top_n_p)))