|
import math
|
|
import torch
|
|
|
|
from torch import nn as nn
|
|
from torch.nn import functional as F
|
|
from torch.nn import init as init
|
|
from torch.nn.modules.batchnorm import _BatchNorm
|
|
|
|
|
|
@torch.no_grad()
|
|
def default_init_weights(module_list, scale=1, bias_fill=0, **kwargs):
|
|
"""Initialize network weights.
|
|
|
|
Args:
|
|
module_list (list[nn.Module] | nn.Module): Modules to be initialized.
|
|
scale (float): Scale initialized weights, especially for residual
|
|
blocks. Default: 1.
|
|
bias_fill (float): The value to fill bias. Default: 0
|
|
kwargs (dict): Other arguments for initialization function.
|
|
"""
|
|
if not isinstance(module_list, list):
|
|
module_list = [module_list]
|
|
for module in module_list:
|
|
for m in module.modules():
|
|
if isinstance(m, nn.Conv2d):
|
|
init.kaiming_normal_(m.weight, **kwargs)
|
|
m.weight.data *= scale
|
|
if m.bias is not None:
|
|
m.bias.data.fill_(bias_fill)
|
|
elif isinstance(m, nn.Linear):
|
|
init.kaiming_normal_(m.weight, **kwargs)
|
|
m.weight.data *= scale
|
|
if m.bias is not None:
|
|
m.bias.data.fill_(bias_fill)
|
|
elif isinstance(m, _BatchNorm):
|
|
init.constant_(m.weight, 1)
|
|
if m.bias is not None:
|
|
m.bias.data.fill_(bias_fill)
|
|
|
|
|
|
def make_layer(basic_block, num_basic_block, **kwarg):
|
|
"""Make layers by stacking the same blocks.
|
|
|
|
Args:
|
|
basic_block (nn.module): nn.module class for basic block.
|
|
num_basic_block (int): number of blocks.
|
|
|
|
Returns:
|
|
nn.Sequential: Stacked blocks in nn.Sequential.
|
|
"""
|
|
layers = []
|
|
for _ in range(num_basic_block):
|
|
layers.append(basic_block(**kwarg))
|
|
return nn.Sequential(*layers)
|
|
|
|
|
|
class ResidualBlockNoBN(nn.Module):
|
|
"""Residual block without BN.
|
|
|
|
It has a style of:
|
|
---Conv-ReLU-Conv-+-
|
|
|________________|
|
|
|
|
Args:
|
|
num_feat (int): Channel number of intermediate features.
|
|
Default: 64.
|
|
res_scale (float): Residual scale. Default: 1.
|
|
pytorch_init (bool): If set to True, use pytorch default init,
|
|
otherwise, use default_init_weights. Default: False.
|
|
"""
|
|
|
|
def __init__(self, num_feat=64, res_scale=1, pytorch_init=False):
|
|
super(ResidualBlockNoBN, self).__init__()
|
|
self.res_scale = res_scale
|
|
self.conv1 = nn.Conv2d(num_feat, num_feat, 3, 1, 1, bias=True)
|
|
self.conv2 = nn.Conv2d(num_feat, num_feat, 3, 1, 1, bias=True)
|
|
self.relu = nn.ReLU(inplace=True)
|
|
|
|
if not pytorch_init:
|
|
default_init_weights([self.conv1, self.conv2], 0.1)
|
|
|
|
def forward(self, x):
|
|
identity = x
|
|
out = self.conv2(self.relu(self.conv1(x)))
|
|
return identity + out * self.res_scale
|
|
|
|
|
|
class Upsample(nn.Sequential):
|
|
"""Upsample module.
|
|
|
|
Args:
|
|
scale (int): Scale factor. Supported scales: 2^n and 3.
|
|
num_feat (int): Channel number of intermediate features.
|
|
"""
|
|
|
|
def __init__(self, scale, num_feat):
|
|
m = []
|
|
if (scale & (scale - 1)) == 0:
|
|
for _ in range(int(math.log(scale, 2))):
|
|
m.append(nn.Conv2d(num_feat, 4 * num_feat, 3, 1, 1))
|
|
m.append(nn.PixelShuffle(2))
|
|
elif scale == 3:
|
|
m.append(nn.Conv2d(num_feat, 9 * num_feat, 3, 1, 1))
|
|
m.append(nn.PixelShuffle(3))
|
|
else:
|
|
raise ValueError(f'scale {scale} is not supported. '
|
|
'Supported scales: 2^n and 3.')
|
|
super(Upsample, self).__init__(*m)
|
|
|
|
|
|
def flow_warp(x,
|
|
flow,
|
|
interp_mode='bilinear',
|
|
padding_mode='zeros',
|
|
align_corners=True):
|
|
"""Warp an image or feature map with optical flow.
|
|
|
|
Args:
|
|
x (Tensor): Tensor with size (n, c, h, w).
|
|
flow (Tensor): Tensor with size (n, h, w, 2), normal value.
|
|
interp_mode (str): 'nearest' or 'bilinear'. Default: 'bilinear'.
|
|
padding_mode (str): 'zeros' or 'border' or 'reflection'.
|
|
Default: 'zeros'.
|
|
align_corners (bool): Before pytorch 1.3, the default value is
|
|
align_corners=True. After pytorch 1.3, the default value is
|
|
align_corners=False. Here, we use the True as default.
|
|
|
|
Returns:
|
|
Tensor: Warped image or feature map.
|
|
"""
|
|
assert x.size()[-2:] == flow.size()[1:3]
|
|
_, _, h, w = x.size()
|
|
|
|
grid_y, grid_x = torch.meshgrid(
|
|
torch.arange(0, h).type_as(x),
|
|
torch.arange(0, w).type_as(x))
|
|
grid = torch.stack((grid_x, grid_y), 2).float()
|
|
grid.requires_grad = False
|
|
|
|
vgrid = grid + flow
|
|
|
|
vgrid_x = 2.0 * vgrid[:, :, :, 0] / max(w - 1, 1) - 1.0
|
|
vgrid_y = 2.0 * vgrid[:, :, :, 1] / max(h - 1, 1) - 1.0
|
|
vgrid_scaled = torch.stack((vgrid_x, vgrid_y), dim=3)
|
|
output = F.grid_sample(
|
|
x,
|
|
vgrid_scaled,
|
|
mode=interp_mode,
|
|
padding_mode=padding_mode,
|
|
align_corners=align_corners)
|
|
|
|
|
|
return output
|
|
|
|
|
|
def resize_flow(flow,
|
|
size_type,
|
|
sizes,
|
|
interp_mode='bilinear',
|
|
align_corners=False):
|
|
"""Resize a flow according to ratio or shape.
|
|
|
|
Args:
|
|
flow (Tensor): Precomputed flow. shape [N, 2, H, W].
|
|
size_type (str): 'ratio' or 'shape'.
|
|
sizes (list[int | float]): the ratio for resizing or the final output
|
|
shape.
|
|
1) The order of ratio should be [ratio_h, ratio_w]. For
|
|
downsampling, the ratio should be smaller than 1.0 (i.e., ratio
|
|
< 1.0). For upsampling, the ratio should be larger than 1.0 (i.e.,
|
|
ratio > 1.0).
|
|
2) The order of output_size should be [out_h, out_w].
|
|
interp_mode (str): The mode of interpolation for resizing.
|
|
Default: 'bilinear'.
|
|
align_corners (bool): Whether align corners. Default: False.
|
|
|
|
Returns:
|
|
Tensor: Resized flow.
|
|
"""
|
|
_, _, flow_h, flow_w = flow.size()
|
|
if size_type == 'ratio':
|
|
output_h, output_w = int(flow_h * sizes[0]), int(flow_w * sizes[1])
|
|
elif size_type == 'shape':
|
|
output_h, output_w = sizes[0], sizes[1]
|
|
else:
|
|
raise ValueError(
|
|
f'Size type should be ratio or shape, but got type {size_type}.')
|
|
|
|
input_flow = flow.clone()
|
|
ratio_h = output_h / flow_h
|
|
ratio_w = output_w / flow_w
|
|
input_flow[:, 0, :, :] *= ratio_w
|
|
input_flow[:, 1, :, :] *= ratio_h
|
|
resized_flow = F.interpolate(
|
|
input=input_flow,
|
|
size=(output_h, output_w),
|
|
mode=interp_mode,
|
|
align_corners=align_corners)
|
|
return resized_flow
|
|
|
|
|
|
|
|
def pixel_unshuffle(x, scale):
|
|
""" Pixel unshuffle.
|
|
|
|
Args:
|
|
x (Tensor): Input feature with shape (b, c, hh, hw).
|
|
scale (int): Downsample ratio.
|
|
|
|
Returns:
|
|
Tensor: the pixel unshuffled feature.
|
|
"""
|
|
b, c, hh, hw = x.size()
|
|
out_channel = c * (scale**2)
|
|
assert hh % scale == 0 and hw % scale == 0
|
|
h = hh // scale
|
|
w = hw // scale
|
|
x_view = x.view(b, c, h, scale, w, scale)
|
|
return x_view.permute(0, 1, 3, 5, 2, 4).reshape(b, out_channel, h, w)
|
|
|
|
|
|
|
|
class LayerNormFunction(torch.autograd.Function):
|
|
|
|
@staticmethod
|
|
def forward(ctx, x, weight, bias, eps):
|
|
ctx.eps = eps
|
|
N, C, H, W = x.size()
|
|
mu = x.mean(1, keepdim=True)
|
|
var = (x - mu).pow(2).mean(1, keepdim=True)
|
|
y = (x - mu) / (var + eps).sqrt()
|
|
ctx.save_for_backward(y, var, weight)
|
|
y = weight.view(1, C, 1, 1) * y + bias.view(1, C, 1, 1)
|
|
return y
|
|
|
|
@staticmethod
|
|
def backward(ctx, grad_output):
|
|
eps = ctx.eps
|
|
|
|
N, C, H, W = grad_output.size()
|
|
y, var, weight = ctx.saved_variables
|
|
g = grad_output * weight.view(1, C, 1, 1)
|
|
mean_g = g.mean(dim=1, keepdim=True)
|
|
|
|
mean_gy = (g * y).mean(dim=1, keepdim=True)
|
|
gx = 1. / torch.sqrt(var + eps) * (g - y * mean_gy - mean_g)
|
|
return gx, (grad_output * y).sum(dim=3).sum(dim=2).sum(dim=0), grad_output.sum(dim=3).sum(dim=2).sum(
|
|
dim=0), None
|
|
|
|
class LayerNorm2d(nn.Module):
|
|
|
|
def __init__(self, channels, eps=1e-6):
|
|
super(LayerNorm2d, self).__init__()
|
|
self.register_parameter('weight', nn.Parameter(torch.ones(channels)))
|
|
self.register_parameter('bias', nn.Parameter(torch.zeros(channels)))
|
|
self.eps = eps
|
|
|
|
def forward(self, x):
|
|
return LayerNormFunction.apply(x, self.weight, self.bias, self.eps)
|
|
|
|
|
|
class MySequential(nn.Sequential):
|
|
def forward(self, *inputs):
|
|
for module in self._modules.values():
|
|
if type(inputs) == tuple:
|
|
inputs = module(*inputs)
|
|
else:
|
|
inputs = module(inputs)
|
|
return inputs
|
|
|
|
import time
|
|
def measure_inference_speed(model, data, max_iter=200, log_interval=50):
|
|
model.eval()
|
|
|
|
|
|
num_warmup = 5
|
|
pure_inf_time = 0
|
|
fps = 0
|
|
|
|
|
|
for i in range(max_iter):
|
|
|
|
torch.cuda.synchronize()
|
|
start_time = time.perf_counter()
|
|
|
|
with torch.no_grad():
|
|
model(*data)
|
|
|
|
torch.cuda.synchronize()
|
|
elapsed = time.perf_counter() - start_time
|
|
|
|
if i >= num_warmup:
|
|
pure_inf_time += elapsed
|
|
if (i + 1) % log_interval == 0:
|
|
fps = (i + 1 - num_warmup) / pure_inf_time
|
|
print(
|
|
f'Done image [{i + 1:<3}/ {max_iter}], '
|
|
f'fps: {fps:.1f} img / s, '
|
|
f'times per image: {1000 / fps:.1f} ms / img',
|
|
flush=True)
|
|
|
|
if (i + 1) == max_iter:
|
|
fps = (i + 1 - num_warmup) / pure_inf_time
|
|
print(
|
|
f'Overall fps: {fps:.1f} img / s, '
|
|
f'times per image: {1000 / fps:.1f} ms / img',
|
|
flush=True)
|
|
break
|
|
return fps |