Spaces:
Sleeping
Sleeping
import logging | |
import numpy as np | |
from PIL import Image | |
from torch.utils.data import Dataset | |
from torchvision import transforms | |
from utils.data import iCIFAR10, iCIFAR100, iImageNet100, iImageNet1000, StanfordCar, GeneralDataset | |
from tqdm import tqdm | |
class DataManager(object): | |
def __init__(self, dataset_name, shuffle, seed, init_cls, increment, resume = False, path = None, class_list = [-1]): | |
self.dataset_name = dataset_name | |
self.init_class_list = class_list | |
if not resume: | |
data = { | |
"path": path, | |
"class_list": [-1], | |
} | |
self._setup_data(dataset_name, shuffle, seed, data = data) | |
if len(self._class_order) < init_cls: | |
self._increments = [len(self._class_order)] | |
else: | |
self._increments = [init_cls] | |
while sum(self._increments) + increment < len(self._class_order): | |
self._increments.append(increment) | |
offset = len(self._class_order) - sum(self._increments) | |
if offset > 0: | |
self._increments.append(offset) | |
else: | |
self._increments = [max(class_list)] | |
data = { | |
"path": path, | |
"class_list": class_list, | |
} | |
self._setup_data(dataset_name, shuffle, seed, data = data) | |
while sum(self._increments) + increment < len(self._class_order): | |
self._increments.append(increment) | |
offset = len(self._class_order) - sum(self._increments) - 1 | |
if offset > 0: | |
self._increments.append(offset) | |
def get_class_list(self, task): | |
return self._class_order[: sum(self._increments[: task + 1])] | |
def get_label_list(self, task): | |
cls_list = self.get_class_list(task) | |
start_index = max(self.init_class_list) + 1 | |
result = {i:self.label_list[i] for i in cls_list} | |
return result | |
def nb_tasks(self): | |
return len(self._increments) | |
def get_task_size(self, task): | |
return self._increments[task] | |
def get_accumulate_tasksize(self,task): | |
return float(sum(self._increments[:task+1])) | |
def get_total_classnum(self): | |
return len(self._class_order) | |
def get_dataset( | |
self, indices, source, mode, appendent=None, ret_data=False, m_rate=None | |
): | |
if source == "train": | |
x, y = self._train_data, self._train_targets | |
elif source == "test": | |
x, y = self._test_data, self._test_targets | |
else: | |
raise ValueError("Unknown data source {}.".format(source)) | |
if mode == "train": | |
trsf = transforms.Compose([*self._train_trsf, *self._common_trsf]) | |
elif mode == "flip": | |
trsf = transforms.Compose( | |
[ | |
*self._test_trsf, | |
transforms.RandomHorizontalFlip(p=1.0), | |
*self._common_trsf, | |
] | |
) | |
elif mode == "test": | |
trsf = transforms.Compose([*self._test_trsf, *self._common_trsf]) | |
else: | |
raise ValueError("Unknown mode {}.".format(mode)) | |
data, targets = [], [] | |
for idx in indices: | |
if m_rate is None: | |
class_data, class_targets = self._select( | |
x, y, low_range=idx, high_range=idx + 1 | |
) | |
else: | |
class_data, class_targets = self._select_rmm( | |
x, y, low_range=idx, high_range=idx + 1, m_rate=m_rate | |
) | |
data.append(class_data) | |
targets.append(class_targets) | |
if appendent is not None and len(appendent) != 0: | |
appendent_data, appendent_targets = appendent | |
data.append(appendent_data) | |
targets.append(appendent_targets) | |
data, targets = np.concatenate(data), np.concatenate(targets) | |
if ret_data: | |
return data, targets, DummyDataset(data, targets, trsf, self.use_path) | |
else: | |
return DummyDataset(data, targets, trsf, self.use_path) | |
def get_finetune_dataset(self,known_classes,total_classes,source,mode,appendent,type="ratio"): | |
if source == 'train': | |
x, y = self._train_data, self._train_targets | |
elif source == 'test': | |
x, y = self._test_data, self._test_targets | |
else: | |
raise ValueError('Unknown data source {}.'.format(source)) | |
if mode == 'train': | |
trsf = transforms.Compose([*self._train_trsf, *self._common_trsf]) | |
elif mode == 'test': | |
trsf = transforms.Compose([*self._test_trsf, *self._common_trsf]) | |
else: | |
raise ValueError('Unknown mode {}.'.format(mode)) | |
val_data = [] | |
val_targets = [] | |
old_num_tot = 0 | |
appendent_data, appendent_targets = appendent | |
for idx in range(0, known_classes): | |
append_data, append_targets = self._select(appendent_data, appendent_targets, | |
low_range=idx, high_range=idx+1) | |
num=len(append_data) | |
if num == 0: | |
continue | |
old_num_tot += num | |
val_data.append(append_data) | |
val_targets.append(append_targets) | |
if type == "ratio": | |
new_num_tot = int(old_num_tot*(total_classes-known_classes)/known_classes) | |
elif type == "same": | |
new_num_tot = old_num_tot | |
else: | |
assert 0, "not implemented yet" | |
new_num_average = int(new_num_tot/(total_classes-known_classes)) | |
for idx in range(known_classes,total_classes): | |
class_data, class_targets = self._select(x, y, low_range=idx, high_range=idx+1) | |
val_indx = np.random.choice(len(class_data),new_num_average, replace=False) | |
val_data.append(class_data[val_indx]) | |
val_targets.append(class_targets[val_indx]) | |
val_data=np.concatenate(val_data) | |
val_targets = np.concatenate(val_targets) | |
return DummyDataset(val_data, val_targets, trsf, self.use_path) | |
def get_dataset_with_split( | |
self, indices, source, mode, appendent=None, val_samples_per_class=0 | |
): | |
if source == "train": | |
x, y = self._train_data, self._train_targets | |
elif source == "test": | |
x, y = self._test_data, self._test_targets | |
else: | |
raise ValueError("Unknown data source {}.".format(source)) | |
if mode == "train": | |
trsf = transforms.Compose([*self._train_trsf, *self._common_trsf]) | |
elif mode == "test": | |
trsf = transforms.Compose([*self._test_trsf, *self._common_trsf]) | |
else: | |
raise ValueError("Unknown mode {}.".format(mode)) | |
train_data, train_targets = [], [] | |
val_data, val_targets = [], [] | |
for idx in indices: | |
class_data, class_targets = self._select( | |
x, y, low_range=idx, high_range=idx + 1 | |
) | |
val_indx = np.random.choice( | |
len(class_data), val_samples_per_class, replace=False | |
) | |
train_indx = list(set(np.arange(len(class_data))) - set(val_indx)) | |
val_data.append(class_data[val_indx]) | |
val_targets.append(class_targets[val_indx]) | |
train_data.append(class_data[train_indx]) | |
train_targets.append(class_targets[train_indx]) | |
if appendent is not None: | |
appendent_data, appendent_targets = appendent | |
for idx in range(0, int(np.max(appendent_targets)) + 1): | |
append_data, append_targets = self._select( | |
appendent_data, appendent_targets, low_range=idx, high_range=idx + 1 | |
) | |
val_indx = np.random.choice( | |
len(append_data), val_samples_per_class, replace=False | |
) | |
train_indx = list(set(np.arange(len(append_data))) - set(val_indx)) | |
val_data.append(append_data[val_indx]) | |
val_targets.append(append_targets[val_indx]) | |
train_data.append(append_data[train_indx]) | |
train_targets.append(append_targets[train_indx]) | |
train_data, train_targets = np.concatenate(train_data), np.concatenate( | |
train_targets | |
) | |
val_data, val_targets = np.concatenate(val_data), np.concatenate(val_targets) | |
return DummyDataset( | |
train_data, train_targets, trsf, self.use_path | |
), DummyDataset(val_data, val_targets, trsf, self.use_path) | |
def _setup_data(self, dataset_name, shuffle, seed, data = None): | |
idata = _get_idata(dataset_name, data = data) | |
self.label_list = idata.download_data() | |
# Data | |
self._train_data, self._train_targets = idata.train_data, idata.train_targets | |
self._test_data, self._test_targets = idata.test_data, idata.test_targets | |
self.use_path = idata.use_path | |
# Transforms | |
self._train_trsf = idata.train_trsf | |
self._test_trsf = idata.test_trsf | |
self._common_trsf = idata.common_trsf | |
# Order | |
order = np.unique(self._train_targets) | |
if shuffle: | |
np.random.seed(seed) | |
order = np.random.permutation(order).tolist() | |
else: | |
order = idata.class_order.tolist() | |
if data['class_list'][0] != -1: | |
self._class_order = np.concatenate((np.array(data['class_list']), order)).tolist() | |
else: | |
self._class_order = order | |
logging.info(self._class_order) | |
# Map indices | |
self._train_targets = _map_new_class_index( | |
self._train_targets, self._class_order, | |
) | |
self._test_targets = _map_new_class_index(self._test_targets, self._class_order) | |
def _select(self, x, y, low_range, high_range): | |
idxes = np.where(np.logical_and(y >= low_range, y < high_range))[0] | |
if isinstance(x,np.ndarray): | |
x_return = x[idxes] | |
else: | |
x_return = [] | |
for id in idxes: | |
x_return.append(x[id]) | |
return x_return, y[idxes] | |
def _select_rmm(self, x, y, low_range, high_range, m_rate): | |
assert m_rate is not None | |
if m_rate != 0: | |
idxes = np.where(np.logical_and(y >= low_range, y < high_range))[0] | |
selected_idxes = np.random.randint( | |
0, len(idxes), size=int((1 - m_rate) * len(idxes)) | |
) | |
new_idxes = idxes[selected_idxes] | |
new_idxes = np.sort(new_idxes) | |
else: | |
new_idxes = np.where(np.logical_and(y >= low_range, y < high_range))[0] | |
return x[new_idxes], y[new_idxes] | |
def getlen(self, index): | |
y = self._train_targets | |
return np.sum(np.where(y == index)) | |
class DummyDataset(Dataset): | |
def __init__(self, images, labels, trsf, use_path=False): | |
assert len(images) == len(labels), "Data size error!" | |
self.images = images | |
self.labels = labels | |
self.trsf = trsf | |
self.use_path = use_path | |
def __len__(self): | |
return len(self.images) | |
def __getitem__(self, idx): | |
if self.use_path: | |
image = self.trsf(pil_loader(self.images[idx])) | |
else: | |
image = self.trsf(Image.fromarray(self.images[idx])) | |
label = self.labels[idx] | |
return idx, image, label | |
def _map_new_class_index(y, order): | |
return np.array(list(map(lambda x: order.index(x), y))) | |
def _get_idata(dataset_name, data = None): | |
name = dataset_name.lower() | |
if name == "cifar10": | |
return iCIFAR10() | |
elif name == "cifar100": | |
return iCIFAR100() | |
elif name == "imagenet1000": | |
return iImageNet1000() | |
elif name == "imagenet100": | |
return iImageNet100() | |
elif name == 'stanfordcar': | |
return StanfordCar() | |
elif name == 'general_dataset': | |
print(data) | |
return GeneralDataset(data["path"], init_class_list = data["class_list"]); | |
else: | |
raise NotImplementedError("Unknown dataset {}.".format(dataset_name)) | |
def pil_loader(path): | |
# open path as file to avoid ResourceWarning (https://github.com/python-pillow/Pillow/issues/835) | |
with open(path, "rb") as f: | |
img = Image.open(f) | |
return img.convert("RGB") | |
def accimage_loader(path): | |
import accimage | |
try: | |
return accimage.Image(path) | |
except IOError: | |
# Potentially a decoding problem, fall back to PIL.Image | |
return pil_loader(path) | |
def default_loader(path): | |
from torchvision import get_image_backend | |
if get_image_backend() == "accimage": | |
return accimage_loader(path) | |
else: | |
return pil_loader(path) | |