367 lines
12 KiB
Python
367 lines
12 KiB
Python
import torch
|
|
import numpy as np
|
|
from tqdm import tqdm
|
|
|
|
|
|
def calc_hammingDist(B1, B2):
|
|
q = B2.shape[1]
|
|
if len(B1.shape) < 2:
|
|
B1 = B1.unsqueeze(0)
|
|
distH = 0.5 * (q - B1.mm(B2.transpose(0, 1)))
|
|
return distH
|
|
|
|
def calc_cosineDist(B1, B2):
|
|
q = B2.shape[1]
|
|
if len(B1.shape) < 2:
|
|
B1 = B1.unsqueeze(0)
|
|
dot_product = B1.mm(B2.transpose(0, 1))
|
|
norms = torch.sqrt(torch.einsum('ii->i', dot_product))
|
|
distH = 0.5 * (q - B1.mm(B2.transpose(0, 1)))
|
|
return distH
|
|
|
|
|
|
def calc_map_k_matrix(qB, rB, query_L, retrieval_L, k=None, rank=0):
|
|
|
|
num_query = query_L.shape[0]
|
|
if qB.is_cuda:
|
|
qB = qB.cpu()
|
|
rB = rB.cpu()
|
|
map = 0
|
|
if k is None:
|
|
k = retrieval_L.shape[0]
|
|
gnds = (query_L.mm(retrieval_L.transpose(0, 1)) > 0).squeeze().type(torch.float32)
|
|
tsums = torch.sum(gnds, dim=-1, keepdim=True, dtype=torch.int32)
|
|
hamms = calc_hammingDist(qB, rB)
|
|
_, ind = torch.sort(hamms, dim=-1)
|
|
|
|
totals = torch.min(tsums, torch.tensor([k], dtype=torch.int32).expand_as(tsums))
|
|
for iter in range(num_query):
|
|
gnd = gnds[iter][ind[iter]]
|
|
total = totals[iter].squeeze()
|
|
count = torch.arange(1, total + 1).type(torch.float32)
|
|
tindex = torch.nonzero(gnd)[:total].squeeze().type(torch.float32) + 1.0
|
|
map = map + torch.mean(count / tindex)
|
|
map = map / num_query
|
|
|
|
return map
|
|
|
|
|
|
def calc_map_k(qB, rB, query_L, retrieval_L, k=None, rank=0):
|
|
|
|
num_query = query_L.shape[0]
|
|
qB = torch.sign(qB)
|
|
rB = torch.sign(rB)
|
|
map = 0
|
|
if k is None:
|
|
k = retrieval_L.shape[0]
|
|
for iter in range(num_query):
|
|
q_L = query_L[iter]
|
|
if len(q_L.shape) < 2:
|
|
q_L = q_L.unsqueeze(0) # [1, hash length]
|
|
gnd = (q_L.mm(retrieval_L.transpose(0, 1)) > 0).squeeze().type(torch.float32)
|
|
tsum = torch.sum(gnd)
|
|
if tsum == 0:
|
|
continue
|
|
hamm = calc_hammingDist(qB[iter, :], rB)
|
|
_, ind = torch.sort(hamm)
|
|
ind.squeeze_()
|
|
gnd = gnd[ind]
|
|
total = min(k, int(tsum))
|
|
count = torch.arange(1, total + 1).type(torch.float32)
|
|
tindex = torch.nonzero(gnd)[:total].squeeze().type(torch.float32) + 1.0
|
|
if tindex.is_cuda:
|
|
count = count.to(rank)
|
|
map = map + torch.mean(count / tindex)
|
|
map = map / num_query
|
|
return map
|
|
|
|
|
|
def calc_precisions_topn_matrix(qB, rB, query_L, retrieval_L, recall_gas=0.02, num_retrieval=10000):
|
|
if not isinstance(qB, torch.Tensor):
|
|
qB = torch.from_numpy(qB)
|
|
rB = torch.from_numpy(rB)
|
|
query_L = torch.from_numpy(query_L)
|
|
retrieval_L = torch.from_numpy(retrieval_L)
|
|
qB = qB.float()
|
|
rB = rB.float()
|
|
qB = torch.sign(qB - 0.5)
|
|
rB = torch.sign(rB - 0.5)
|
|
if qB.is_cuda:
|
|
qB = qB.cpu()
|
|
rB = rB.cpu()
|
|
num_query = query_L.shape[0]
|
|
# num_retrieval = retrieval_L.shape[0]
|
|
precisions = [0] * int(1 / recall_gas)
|
|
gnds = (query_L.mm(retrieval_L.transpose(0, 1)) > 0).squeeze().type(torch.float32)
|
|
hamms = calc_hammingDist(qB, rB)
|
|
_, inds = torch.sort(hamms, dim=-1)
|
|
for iter in range(num_query):
|
|
gnd = gnds[iter]
|
|
ind = inds[iter]
|
|
|
|
gnd = gnd[ind]
|
|
for i, recall in enumerate(np.arange(recall_gas, 1 + recall_gas, recall_gas)):
|
|
total = int(num_retrieval * recall)
|
|
right = torch.nonzero(gnd[: total]).squeeze().numpy()
|
|
|
|
right_num = right.size
|
|
precisions[i] += (right_num/total)
|
|
for i in range(len(precisions)):
|
|
precisions[i] /= num_query
|
|
return precisions
|
|
|
|
|
|
def calc_precisions_topn(qB, rB, query_L, retrieval_L, recall_gas=0.02, num_retrieval=10000):
|
|
qB = qB.float()
|
|
rB = rB.float()
|
|
qB = torch.sign(qB - 0.5)
|
|
rB = torch.sign(rB - 0.5)
|
|
num_query = query_L.shape[0]
|
|
# num_retrieval = retrieval_L.shape[0]
|
|
precisions = [0] * int(1 / recall_gas)
|
|
for iter in range(num_query):
|
|
q_L = query_L[iter]
|
|
if len(q_L.shape) < 2:
|
|
q_L = q_L.unsqueeze(0) # [1, hash length]
|
|
gnd = (q_L.mm(retrieval_L.transpose(0, 1)) > 0).squeeze().type(torch.float32)
|
|
hamm = calc_hammingDist(qB[iter, :], rB)
|
|
_, ind = torch.sort(hamm)
|
|
ind.squeeze_()
|
|
gnd = gnd[ind]
|
|
for i, recall in enumerate(np.arange(recall_gas, 1 + recall_gas, recall_gas)):
|
|
total = int(num_retrieval * recall)
|
|
right = torch.nonzero(gnd[: total]).squeeze().numpy()
|
|
# right_num = torch.nonzero(gnd[: total]).squeeze().shape[0]
|
|
right_num = right.size
|
|
precisions[i] += (right_num/total)
|
|
for i in range(len(precisions)):
|
|
precisions[i] /= num_query
|
|
return precisions
|
|
|
|
|
|
def calc_precisions_hash(qB, rB, query_L, retrieval_L):
|
|
qB = qB.float()
|
|
rB = rB.float()
|
|
qB = torch.sign(qB - 0.5)
|
|
rB = torch.sign(rB - 0.5)
|
|
num_query = query_L.shape[0]
|
|
num_retrieval = retrieval_L.shape[0]
|
|
bit = qB.shape[1]
|
|
hamm = calc_hammingDist(qB, rB)
|
|
hamm = hamm.type(torch.ByteTensor)
|
|
total_num = [0] * (bit + 1)
|
|
max_hamm = int(torch.max(hamm))
|
|
gnd = (query_L.mm(retrieval_L.transpose(0, 1)) > 0).squeeze()
|
|
total_right = torch.sum(torch.matmul(query_L, retrieval_L.t())>0)
|
|
precisions = np.zeros([max_hamm + 1])
|
|
recalls = np.zeros([max_hamm + 1])
|
|
# _, index = torch.sort(hamm)
|
|
# del _
|
|
# for i in range(index.shape[0]):
|
|
# gnd[i, :] = gnd[i, index[i]]
|
|
# del index
|
|
right_num = 0
|
|
recall_num = 0
|
|
for i, radius in enumerate(range(0, max_hamm+1)):
|
|
recall = torch.nonzero(hamm == radius)
|
|
right = gnd[recall.split(1, dim=1)]
|
|
recall_num += recall.shape[0]
|
|
del recall
|
|
right_num += torch.nonzero(right).shape[0]
|
|
del right
|
|
precisions[i] += (right_num / (recall_num + 1e-8))
|
|
# recalls[i] += (recall_num / num_retrieval / num_query)
|
|
recalls[i] += (recall_num / total_right)
|
|
return precisions, recalls
|
|
|
|
|
|
|
|
|
|
def calc_precisions_hamming_radius(qB, rB, query_L, retrieval_L, hamming_gas=1):
|
|
num_query = query_L.shape[0]
|
|
bit = qB.shape[1]
|
|
precisions = [0] * int(bit / hamming_gas)
|
|
for iter in range(num_query):
|
|
q_L = query_L[iter]
|
|
if len(q_L.shape) < 2:
|
|
q_L = q_L.unsqueeze(0) # [1, hash length]
|
|
gnd = (q_L.mm(retrieval_L.transpose(0, 1)) > 0).squeeze().type(torch.float32)
|
|
hamm = calc_hammingDist(qB[iter, :], rB)
|
|
_, ind = torch.sort(hamm)
|
|
ind.squeeze_()
|
|
gnd = gnd[ind]
|
|
for i, recall in enumerate(np.arange(1, bit+1, hamming_gas)):
|
|
total = torch.nonzero(hamm <= recall).squeeze().shape[0]
|
|
if total == 0:
|
|
precisions[i] += 0
|
|
continue
|
|
right = torch.nonzero(gnd[: total]).squeeze().numpy()
|
|
right_num = right.size
|
|
|
|
precisions[i] += (right_num / total)
|
|
for i in range(len(precisions)):
|
|
precisions[i] /= num_query
|
|
return precisions
|
|
|
|
|
|
def calc_neighbor(label1, label2):
|
|
# calculate the similar matrix
|
|
Sim = label1.matmul(label2.transpose(0, 1)) > 0
|
|
return Sim.float()
|
|
|
|
|
|
def norm_max_min(x: torch.Tensor, dim=None):
|
|
if dim is None:
|
|
max = torch.max(x)
|
|
min = torch.min(x)
|
|
if dim is not None:
|
|
max = torch.max(x, dim=dim)[0]
|
|
min = torch.min(x, dim=dim)[0]
|
|
if dim > 0:
|
|
max = max.unsqueeze(len(x.shape) - 1)
|
|
min = min.unsqueeze(len(x.shape) - 1)
|
|
norm = (x - min) / (max - min)
|
|
return norm
|
|
|
|
|
|
def norm_mean(x: torch.Tensor, dim=None):
|
|
if dim is None:
|
|
mean = torch.mean(x)
|
|
std = torch.std(x)
|
|
if dim is not None:
|
|
mean = torch.mean(x, dim=dim)
|
|
std = torch.std(x, dim=dim)
|
|
if dim > 0:
|
|
mean = mean.unsqueeze(len(x.shape) - 1)
|
|
std = std.unsqueeze(len(x.shape) - 1)
|
|
norm = (x - mean) / std
|
|
return norm
|
|
|
|
|
|
def norm_abs_mean(x: torch.Tensor, dim=None):
|
|
if dim is None:
|
|
mean = torch.mean(x)
|
|
std = torch.std(x)
|
|
if dim is not None:
|
|
mean = torch.mean(x, dim=dim)
|
|
std = torch.std(x, dim=dim)
|
|
if dim > 0:
|
|
mean = mean.unsqueeze(len(x.shape) - 1)
|
|
std = std.unsqueeze(len(x.shape) - 1)
|
|
norm = torch.abs(x - mean) / std
|
|
return norm
|
|
|
|
|
|
def factorial(n):
|
|
if n == 0:
|
|
return 1
|
|
else:
|
|
return n * factorial(n - 1)
|
|
|
|
|
|
def calc_IF(all_bow):
|
|
word_num = torch.sum(all_bow, dim=0)
|
|
total_num = torch.sum(word_num)
|
|
IF = word_num / total_num
|
|
return IF
|
|
|
|
|
|
def cal_cosine_dis(f1, f2):
|
|
f1_norm = np.linalg.norm(f1)
|
|
f2_norm = np.linalg.norm(f2, axis=1)
|
|
|
|
similiarity = np.dot(f1, f2.T)/(f1_norm * f2_norm)
|
|
return 1 - similiarity
|
|
|
|
def cal_hamming_dis(b1, b2):
|
|
k = b2.shape[1] # length of hash code
|
|
dis = 0.5 * (k - np.dot(b1, b2.transpose()))
|
|
return dis
|
|
|
|
def cal_map(query_feats, query_label, retrieval_feats, retrieval_label, top_k=500, dist_method='hamming'):
|
|
"""
|
|
Calculate MAP (Mean Average Precision)
|
|
:param query_binary: binary code of query sample
|
|
:param query_label: label of qurey sample
|
|
:param retrieval_binary: binary code of database
|
|
:param retrieval_label: label of database
|
|
:param top_k:
|
|
:return:
|
|
"""
|
|
query_number = query_label.shape[0]
|
|
top_k_map = 0
|
|
|
|
dist_func = cal_hamming_dis if dist_method == 'hamming' else cal_cosine_dis
|
|
|
|
for query_index in range(query_number):
|
|
# (1, N)
|
|
ground_truth = (np.dot(query_label[query_index, :], retrieval_label.transpose()) > 0).astype(np.float32)
|
|
hamming_dis = dist_func(query_feats[query_index, :], retrieval_feats) # (1, N)
|
|
|
|
# sort hamming distance
|
|
sort_index = np.argsort(hamming_dis)
|
|
|
|
# resort ground truth
|
|
ground_truth = ground_truth[sort_index]
|
|
|
|
# get top K ground truth
|
|
top_k_gnd = ground_truth[0:top_k]
|
|
top_k_sum = np.sum(top_k_gnd).astype(int) # the number of correct retrieval in top K
|
|
if top_k_sum == 0:
|
|
continue
|
|
count = np.linspace(1, top_k_sum, int(top_k_sum))
|
|
|
|
top_k_index = np.asarray(np.where(top_k_gnd == 1)) + 1.0
|
|
top_k_map += np.mean(count / top_k_index) # average precision of per class
|
|
|
|
return top_k_map / query_number # mean of average precision of all class
|
|
|
|
def cal_pr(retrieval_binary, query_binary, retrieval_label, query_label, interval=0.1):
|
|
r_arr = np.array([i * interval for i in range(1, int(1/interval) + 1)])
|
|
p_arr = np.zeros(len(r_arr))
|
|
|
|
query_number = query_label.shape[0]
|
|
|
|
for query_index in range(query_number):
|
|
ground_truth = (np.dot(query_label[query_index, :], retrieval_label.transpose()) > 0).astype(
|
|
np.float32) # (1, N)
|
|
hamming_dis = cal_hamming_dis(query_binary[query_index, :], retrieval_binary) # (1, N)
|
|
|
|
# sort hamming distance
|
|
sort_index = np.argsort(hamming_dis)
|
|
ground_truth = ground_truth[sort_index]
|
|
tp_num = len(np.where(ground_truth == 1)[0])
|
|
r_num_arr = (tp_num * r_arr).astype(np.int32)
|
|
|
|
tp_cum = np.cumsum(ground_truth)
|
|
total_num_arr = np.array([np.where(tp_cum == i)[0][0] + 1 for i in r_num_arr])
|
|
p_arr += r_num_arr/total_num_arr
|
|
p_arr /= query_number
|
|
|
|
return np.array(list(zip(r_arr, p_arr)))
|
|
|
|
def cal_top_n(retrieval_binary, query_binary, retrieval_label, query_label, top_n=None):
|
|
if top_n is None:
|
|
top_n = range(100, 1001, 100)
|
|
|
|
top_n = np.array(top_n)
|
|
top_n_p = np.zeros(len(top_n))
|
|
query_number = query_label.shape[0]
|
|
|
|
for query_index in range(query_number):
|
|
ground_truth = (np.dot(query_label[query_index, :], retrieval_label.transpose()) > 0).astype(
|
|
np.float32) # (1, N)
|
|
hamming_dis = cal_hamming_dis(query_binary[query_index, :], retrieval_binary) # (1, N)
|
|
|
|
# sort hamming distance
|
|
sort_index = np.argsort(hamming_dis)
|
|
ground_truth = ground_truth[sort_index]
|
|
ground_truth = ground_truth[:top_n[-1]]
|
|
|
|
tp_cum = np.cumsum(ground_truth)
|
|
tp_num_arr = tp_cum[top_n - 1]
|
|
top_n_p += tp_num_arr/top_n
|
|
|
|
top_n_p /= query_number
|
|
return np.array(list(zip(top_n, top_n_p)))
|