# Copied and modified from https://github.com/csteinmetz1/auraloss/blob/main/auraloss/freq.py under Apache License 2.0 # You can find the license at LICENSES/LICENSE_AURALOSS.txt import torch import numpy as np from typing import List, Any import scipy.signal def apply_reduction(losses, reduction="none"): """Apply reduction to collection of losses.""" if reduction == "mean": losses = losses.mean() elif reduction == "sum": losses = losses.sum() return losses def get_window(win_type: str, win_length: int): """Return a window function. Args: win_type (str): Window type. Can either be one of the window function provided in PyTorch ['hann_window', 'bartlett_window', 'blackman_window', 'hamming_window', 'kaiser_window'] or any of the windows provided by [SciPy](https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.windows.get_window.html). win_length (int): Window length Returns: win: The window as a 1D torch tensor """ try: win = getattr(torch, win_type)(win_length) except: win = torch.from_numpy(scipy.signal.windows.get_window(win_type, win_length)) return win class SumAndDifference(torch.nn.Module): """Sum and difference signal extraction module.""" def __init__(self): """Initialize sum and difference extraction module.""" super(SumAndDifference, self).__init__() def forward(self, x): """Calculate forward propagation. Args: x (Tensor): Predicted signal (B, #channels, #samples). Returns: Tensor: Sum signal. Tensor: Difference signal. """ if not (x.size(1) == 2): # inputs must be stereo raise ValueError(f"Input must be stereo: {x.size(1)} channel(s).") sum_sig = self.sum(x).unsqueeze(1) diff_sig = self.diff(x).unsqueeze(1) return sum_sig, diff_sig @staticmethod def sum(x): return x[:, 0, :] + x[:, 1, :] @staticmethod def diff(x): return x[:, 0, :] - x[:, 1, :] class FIRFilter(torch.nn.Module): """FIR pre-emphasis filtering module. Args: filter_type (str): Shape of the desired FIR filter ("hp", "fd", "aw"). Default: "hp" coef (float): Coefficient value for the filter tap (only applicable for "hp" and "fd"). Default: 0.85 ntaps (int): Number of FIR filter taps for constructing A-weighting filters. Default: 101 plot (bool): Plot the magnitude respond of the filter. Default: False Based upon the perceptual loss pre-empahsis filters proposed by [Wright & Välimäki, 2019](https://arxiv.org/abs/1911.08922). A-weighting filter - "aw" First-order highpass - "hp" Folded differentiator - "fd" Note that the default coefficeint value of 0.85 is optimized for a sampling rate of 44.1 kHz, considering adjusting this value at differnt sampling rates. """ def __init__(self, filter_type="hp", coef=0.85, fs=44100, ntaps=101, plot=False): """Initilize FIR pre-emphasis filtering module.""" super(FIRFilter, self).__init__() self.filter_type = filter_type self.coef = coef self.fs = fs self.ntaps = ntaps self.plot = plot import scipy.signal if ntaps % 2 == 0: raise ValueError(f"ntaps must be odd (ntaps={ntaps}).") if filter_type == "hp": self.fir = torch.nn.Conv1d(1, 1, kernel_size=3, bias=False, padding=1) self.fir.weight.requires_grad = False self.fir.weight.data = torch.tensor([1, -coef, 0]).view(1, 1, -1) elif filter_type == "fd": self.fir = torch.nn.Conv1d(1, 1, kernel_size=3, bias=False, padding=1) self.fir.weight.requires_grad = False self.fir.weight.data = torch.tensor([1, 0, -coef]).view(1, 1, -1) elif filter_type == "aw": # Definition of analog A-weighting filter according to IEC/CD 1672. f1 = 20.598997 f2 = 107.65265 f3 = 737.86223 f4 = 12194.217 A1000 = 1.9997 NUMs = [(2 * np.pi * f4) ** 2 * (10 ** (A1000 / 20)), 0, 0, 0, 0] DENs = np.polymul( [1, 4 * np.pi * f4, (2 * np.pi * f4) ** 2], [1, 4 * np.pi * f1, (2 * np.pi * f1) ** 2], ) DENs = np.polymul( np.polymul(DENs, [1, 2 * np.pi * f3]), [1, 2 * np.pi * f2] ) # convert analog filter to digital filter b, a = scipy.signal.bilinear(NUMs, DENs, fs=fs) # compute the digital filter frequency response w_iir, h_iir = scipy.signal.freqz(b, a, worN=512, fs=fs) # then we fit to 101 tap FIR filter with least squares taps = scipy.signal.firls(ntaps, w_iir, abs(h_iir), fs=fs) # now implement this digital FIR filter as a Conv1d layer self.fir = torch.nn.Conv1d( 1, 1, kernel_size=ntaps, bias=False, padding=ntaps // 2 ) self.fir.weight.requires_grad = False self.fir.weight.data = torch.tensor(taps.astype("float32")).view(1, 1, -1) if plot: from .plotting import compare_filters compare_filters(b, a, taps, fs=fs) def forward(self, input, target): """Calculate forward propagation. Args: input (Tensor): Predicted signal (B, #channels, #samples). target (Tensor): Groundtruth signal (B, #channels, #samples). Returns: Tensor: Filtered signal. """ input = torch.nn.functional.conv1d( input, self.fir.weight.data, padding=self.ntaps // 2 ) target = torch.nn.functional.conv1d( target, self.fir.weight.data, padding=self.ntaps // 2 ) return input, target class SpectralConvergenceLoss(torch.nn.Module): """Spectral convergence loss module. See [Arik et al., 2018](https://arxiv.org/abs/1808.06719). """ def __init__(self): super(SpectralConvergenceLoss, self).__init__() def forward(self, x_mag, y_mag): return (torch.norm(y_mag - x_mag, p="fro", dim=[-1, -2]) / torch.norm(y_mag, p="fro", dim=[-1, -2])).mean() class STFTMagnitudeLoss(torch.nn.Module): """STFT magnitude loss module. See [Arik et al., 2018](https://arxiv.org/abs/1808.06719) and [Engel et al., 2020](https://arxiv.org/abs/2001.04643v1) Log-magnitudes are calculated with `log(log_fac*x + log_eps)`, where `log_fac` controls the compression strength (larger value results in more compression), and `log_eps` can be used to control the range of the compressed output values (e.g., `log_eps>=1` ensures positive output values). The default values `log_fac=1` and `log_eps=0` correspond to plain log-compression. Args: log (bool, optional): Log-scale the STFT magnitudes, or use linear scale. Default: True log_eps (float, optional): Constant value added to the magnitudes before evaluating the logarithm. Default: 0.0 log_fac (float, optional): Constant multiplication factor for the magnitudes before evaluating the logarithm. Default: 1.0 distance (str, optional): Distance function ["L1", "L2"]. Default: "L1" reduction (str, optional): Reduction of the loss elements. Default: "mean" """ def __init__(self, log=True, log_eps=0.0, log_fac=1.0, distance="L1", reduction="mean"): super(STFTMagnitudeLoss, self).__init__() self.log = log self.log_eps = log_eps self.log_fac = log_fac if distance == "L1": self.distance = torch.nn.L1Loss(reduction=reduction) elif distance == "L2": self.distance = torch.nn.MSELoss(reduction=reduction) else: raise ValueError(f"Invalid distance: '{distance}'.") def forward(self, x_mag, y_mag): if self.log: x_mag = torch.log(self.log_fac * x_mag + self.log_eps) y_mag = torch.log(self.log_fac * y_mag + self.log_eps) return self.distance(x_mag, y_mag) class STFTLoss(torch.nn.Module): """STFT loss module. See [Yamamoto et al. 2019](https://arxiv.org/abs/1904.04472). Args: fft_size (int, optional): FFT size in samples. Default: 1024 hop_size (int, optional): Hop size of the FFT in samples. Default: 256 win_length (int, optional): Length of the FFT analysis window. Default: 1024 window (str, optional): Window to apply before FFT, can either be one of the window function provided in PyTorch ['hann_window', 'bartlett_window', 'blackman_window', 'hamming_window', 'kaiser_window'] or any of the windows provided by [SciPy](https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.windows.get_window.html). Default: 'hann_window' w_sc (float, optional): Weight of the spectral convergence loss term. Default: 1.0 w_log_mag (float, optional): Weight of the log magnitude loss term. Default: 1.0 w_lin_mag_mag (float, optional): Weight of the linear magnitude loss term. Default: 0.0 w_phs (float, optional): Weight of the spectral phase loss term. Default: 0.0 sample_rate (int, optional): Sample rate. Required when scale = 'mel'. Default: None scale (str, optional): Optional frequency scaling method, options include: ['mel', 'chroma'] Default: None n_bins (int, optional): Number of scaling frequency bins. Default: None. perceptual_weighting (bool, optional): Apply perceptual A-weighting (Sample rate must be supplied). Default: False scale_invariance (bool, optional): Perform an optimal scaling of the target. Default: False eps (float, optional): Small epsilon value for stablity. Default: 1e-8 output (str, optional): Format of the loss returned. 'loss' : Return only the raw, aggregate loss term. 'full' : Return the raw loss, plus intermediate loss terms. Default: 'loss' reduction (str, optional): Specifies the reduction to apply to the output: 'none': no reduction will be applied, 'mean': the sum of the output will be divided by the number of elements in the output, 'sum': the output will be summed. Default: 'mean' mag_distance (str, optional): Distance function ["L1", "L2"] for the magnitude loss terms. device (str, optional): Place the filterbanks on specified device. Default: None Returns: loss: Aggreate loss term. Only returned if output='loss'. By default. loss, sc_mag_loss, log_mag_loss, lin_mag_loss, phs_loss: Aggregate and intermediate loss terms. Only returned if output='full'. """ def __init__( self, fft_size: int = 1024, hop_size: int = 256, win_length: int = 1024, window: str = "hann_window", w_sc: float = 1.0, w_log_mag: float = 1.0, w_lin_mag: float = 0.0, w_phs: float = 0.0, sample_rate: float = None, scale: str = None, n_bins: int = None, perceptual_weighting: bool = False, scale_invariance: bool = False, eps: float = 1e-8, output: str = "loss", reduction: str = "mean", mag_distance: str = "L1", device: Any = None, **kwargs ): super().__init__() self.fft_size = fft_size self.hop_size = hop_size self.win_length = win_length self.window = get_window(window, win_length) self.w_sc = w_sc self.w_log_mag = w_log_mag self.w_lin_mag = w_lin_mag self.w_phs = w_phs self.sample_rate = sample_rate self.scale = scale self.n_bins = n_bins self.perceptual_weighting = perceptual_weighting self.scale_invariance = scale_invariance self.eps = eps self.output = output self.reduction = reduction self.mag_distance = mag_distance self.device = device self.phs_used = bool(self.w_phs) self.spectralconv = SpectralConvergenceLoss() self.logstft = STFTMagnitudeLoss( log=True, reduction=reduction, distance=mag_distance, **kwargs ) self.linstft = STFTMagnitudeLoss( log=False, reduction=reduction, distance=mag_distance, **kwargs ) # setup mel filterbank if scale is not None: try: import librosa.filters except Exception as e: print(e) print("Try `pip install auraloss[all]`.") if self.scale == "mel": assert sample_rate != None # Must set sample rate to use mel scale assert n_bins <= fft_size # Must be more FFT bins than Mel bins fb = librosa.filters.mel(sr=sample_rate, n_fft=fft_size, n_mels=n_bins) fb = torch.tensor(fb).unsqueeze(0) elif self.scale == "chroma": assert sample_rate != None # Must set sample rate to use chroma scale assert n_bins <= fft_size # Must be more FFT bins than chroma bins fb = librosa.filters.chroma( sr=sample_rate, n_fft=fft_size, n_chroma=n_bins ) else: raise ValueError( f"Invalid scale: {self.scale}. Must be 'mel' or 'chroma'." ) self.register_buffer("fb", fb) if scale is not None and device is not None: self.fb = self.fb.to(self.device) # move filterbank to device if self.perceptual_weighting: if sample_rate is None: raise ValueError( f"`sample_rate` must be supplied when `perceptual_weighting = True`." ) self.prefilter = FIRFilter(filter_type="aw", fs=sample_rate) def stft(self, x): """Perform STFT. Args: x (Tensor): Input signal tensor (B, T). Returns: Tensor: x_mag, x_phs Magnitude and phase spectra (B, fft_size // 2 + 1, frames). """ x_stft = torch.stft( x, self.fft_size, self.hop_size, self.win_length, self.window, return_complex=True, ) x_mag = torch.sqrt( torch.clamp((x_stft.real**2) + (x_stft.imag**2), min=self.eps) ) # torch.angle is expensive, so it is only evaluated if the values are used in the loss if self.phs_used: x_phs = torch.angle(x_stft) else: x_phs = None return x_mag, x_phs def forward(self, input: torch.Tensor, target: torch.Tensor): bs, chs, seq_len = input.size() if self.perceptual_weighting: # apply optional A-weighting via FIR filter # since FIRFilter only support mono audio we will move channels to batch dim input = input.view(bs * chs, 1, -1) target = target.view(bs * chs, 1, -1) # now apply the filter to both self.prefilter.to(input.device) input, target = self.prefilter(input, target) # now move the channels back input = input.view(bs, chs, -1) target = target.view(bs, chs, -1) # compute the magnitude and phase spectra of input and target self.window = self.window.to(input.device) x_mag, x_phs = self.stft(input.view(-1, input.size(-1))) y_mag, y_phs = self.stft(target.view(-1, target.size(-1))) # apply relevant transforms if self.scale is not None: self.fb = self.fb.to(input.device) x_mag = torch.matmul(self.fb, x_mag) y_mag = torch.matmul(self.fb, y_mag) # normalize scales if self.scale_invariance: alpha = (x_mag * y_mag).sum([-2, -1]) / ((y_mag**2).sum([-2, -1])) y_mag = y_mag * alpha.unsqueeze(-1) # compute loss terms sc_mag_loss = self.spectralconv(x_mag, y_mag) if self.w_sc else 0.0 log_mag_loss = self.logstft(x_mag, y_mag) if self.w_log_mag else 0.0 lin_mag_loss = self.linstft(x_mag, y_mag) if self.w_lin_mag else 0.0 phs_loss = torch.nn.functional.mse_loss(x_phs, y_phs) if self.phs_used else 0.0 # combine loss terms loss = ( (self.w_sc * sc_mag_loss) + (self.w_log_mag * log_mag_loss) + (self.w_lin_mag * lin_mag_loss) + (self.w_phs * phs_loss) ) loss = apply_reduction(loss, reduction=self.reduction) if self.output == "loss": return loss elif self.output == "full": return loss, sc_mag_loss, log_mag_loss, lin_mag_loss, phs_loss class MultiResolutionSTFTLoss(torch.nn.Module): """Multi resolution STFT loss module. See [Yamamoto et al., 2019](https://arxiv.org/abs/1910.11480) Args: fft_sizes (list): List of FFT sizes. hop_sizes (list): List of hop sizes. win_lengths (list): List of window lengths. window (str, optional): Window to apply before FFT, options include: 'hann_window', 'bartlett_window', 'blackman_window', 'hamming_window', 'kaiser_window'] Default: 'hann_window' w_sc (float, optional): Weight of the spectral convergence loss term. Default: 1.0 w_log_mag (float, optional): Weight of the log magnitude loss term. Default: 1.0 w_lin_mag (float, optional): Weight of the linear magnitude loss term. Default: 0.0 w_phs (float, optional): Weight of the spectral phase loss term. Default: 0.0 sample_rate (int, optional): Sample rate. Required when scale = 'mel'. Default: None scale (str, optional): Optional frequency scaling method, options include: ['mel', 'chroma'] Default: None n_bins (int, optional): Number of mel frequency bins. Required when scale = 'mel'. Default: None. scale_invariance (bool, optional): Perform an optimal scaling of the target. Default: False """ def __init__( self, fft_sizes: List[int] = [1024, 2048, 512], hop_sizes: List[int] = [120, 240, 50], win_lengths: List[int] = [600, 1200, 240], window: str = "hann_window", w_sc: float = 1.0, w_log_mag: float = 1.0, w_lin_mag: float = 0.0, w_phs: float = 0.0, sample_rate: float = None, scale: str = None, n_bins: int = None, perceptual_weighting: bool = False, scale_invariance: bool = False, **kwargs, ): super().__init__() assert len(fft_sizes) == len(hop_sizes) == len(win_lengths) # must define all self.fft_sizes = fft_sizes self.hop_sizes = hop_sizes self.win_lengths = win_lengths self.stft_losses = torch.nn.ModuleList() for fs, ss, wl in zip(fft_sizes, hop_sizes, win_lengths): self.stft_losses += [ STFTLoss( fs, ss, wl, window, w_sc, w_log_mag, w_lin_mag, w_phs, sample_rate, scale, n_bins, perceptual_weighting, scale_invariance, **kwargs, ) ] def forward(self, x, y): mrstft_loss = 0.0 sc_mag_loss, log_mag_loss, lin_mag_loss, phs_loss = [], [], [], [] for f in self.stft_losses: if f.output == "full": # extract just first term tmp_loss = f(x, y) mrstft_loss += tmp_loss[0] sc_mag_loss.append(tmp_loss[1]) log_mag_loss.append(tmp_loss[2]) lin_mag_loss.append(tmp_loss[3]) phs_loss.append(tmp_loss[4]) else: mrstft_loss += f(x, y) mrstft_loss /= len(self.stft_losses) if f.output == "loss": return mrstft_loss else: return mrstft_loss, sc_mag_loss, log_mag_loss, lin_mag_loss, phs_loss class SumAndDifferenceSTFTLoss(torch.nn.Module): """Sum and difference sttereo STFT loss module. See [Steinmetz et al., 2020](https://arxiv.org/abs/2010.10291) Args: fft_sizes (List[int]): List of FFT sizes. hop_sizes (List[int]): List of hop sizes. win_lengths (List[int]): List of window lengths. window (str, optional): Window function type. w_sum (float, optional): Weight of the sum loss component. Default: 1.0 w_diff (float, optional): Weight of the difference loss component. Default: 1.0 perceptual_weighting (bool, optional): Apply perceptual A-weighting (Sample rate must be supplied). Default: False mel_stft (bool, optional): Use Multi-resoltuion mel spectrograms. Default: False n_mel_bins (int, optional): Number of mel bins to use when mel_stft = True. Default: 128 sample_rate (float, optional): Audio sample rate. Default: None output (str, optional): Format of the loss returned. 'loss' : Return only the raw, aggregate loss term. 'full' : Return the raw loss, plus intermediate loss terms. Default: 'loss' """ def __init__( self, fft_sizes: List[int], hop_sizes: List[int], win_lengths: List[int], window: str = "hann_window", w_sum: float = 1.0, w_diff: float = 1.0, output: str = "loss", **kwargs, ): super().__init__() self.sd = SumAndDifference() self.w_sum = w_sum self.w_diff = w_diff self.output = output self.mrstft = MultiResolutionSTFTLoss( fft_sizes, hop_sizes, win_lengths, window, **kwargs, ) def forward(self, input: torch.Tensor, target: torch.Tensor): """This loss function assumes batched input of stereo audio in the time domain. Args: input (torch.Tensor): Input tensor with shape (batch size, 2, seq_len). target (torch.Tensor): Target tensor with shape (batch size, 2, seq_len). Returns: loss (torch.Tensor): Aggreate loss term. Only returned if output='loss'. loss (torch.Tensor), sum_loss (torch.Tensor), diff_loss (torch.Tensor): Aggregate and intermediate loss terms. Only returned if output='full'. """ assert input.shape == target.shape # must have same shape bs, chs, seq_len = input.size() # compute sum and difference signals for both input_sum, input_diff = self.sd(input) target_sum, target_diff = self.sd(target) # compute error in STFT domain sum_loss = self.mrstft(input_sum, target_sum) diff_loss = self.mrstft(input_diff, target_diff) loss = ((self.w_sum * sum_loss) + (self.w_diff * diff_loss)) / 2 if self.output == "loss": return loss elif self.output == "full": return loss, sum_loss, diff_loss