import copy from typing import Tuple import numpy as np import torch from torch_complex.tensor import ComplexTensor from .log_mel import LogMel from .stft import Stft class DefaultFrontend(torch.nn.Module): """Conventional frontend structure for ASR Stft -> WPE -> MVDR-Beamformer -> Power-spec -> Mel-Fbank -> CMVN """ def __init__( self, fs: 16000, n_fft: int = 1024, win_length: int = 800, hop_length: int = 160, center: bool = True, pad_mode: str = "reflect", normalized: bool = False, onesided: bool = True, n_mels: int = 80, fmin: int = None, fmax: int = None, htk: bool = False, norm=1, frontend_conf=None, #Optional[dict] = get_default_kwargs(Frontend), kaldi_padding_mode=False, downsample_rate: int = 1, ): super().__init__() self.downsample_rate = downsample_rate # Deepcopy (In general, dict shouldn't be used as default arg) frontend_conf = copy.deepcopy(frontend_conf) self.stft = Stft( n_fft=n_fft, win_length=win_length, hop_length=hop_length, center=center, pad_mode=pad_mode, normalized=normalized, onesided=onesided, kaldi_padding_mode=kaldi_padding_mode ) if frontend_conf is not None: self.frontend = Frontend(idim=n_fft // 2 + 1, **frontend_conf) else: self.frontend = None self.logmel = LogMel( fs=fs, n_fft=n_fft, n_mels=n_mels, fmin=fmin, fmax=fmax, htk=htk, norm=norm, ) self.n_mels = n_mels def output_size(self) -> int: return self.n_mels def forward( self, input: torch.Tensor, input_lengths: torch.Tensor ) -> Tuple[torch.Tensor, torch.Tensor]: # 1. Domain-conversion: e.g. Stft: time -> time-freq input_stft, feats_lens = self.stft(input, input_lengths) assert input_stft.dim() >= 4, input_stft.shape # "2" refers to the real/imag parts of Complex assert input_stft.shape[-1] == 2, input_stft.shape # Change torch.Tensor to ComplexTensor # input_stft: (..., F, 2) -> (..., F) input_stft = ComplexTensor(input_stft[..., 0], input_stft[..., 1]) # 2. [Option] Speech enhancement if self.frontend is not None: assert isinstance(input_stft, ComplexTensor), type(input_stft) # input_stft: (Batch, Length, [Channel], Freq) input_stft, _, mask = self.frontend(input_stft, feats_lens) # 3. [Multi channel case]: Select a channel if input_stft.dim() == 4: # h: (B, T, C, F) -> h: (B, T, F) if self.training: # Select 1ch randomly ch = np.random.randint(input_stft.size(2)) input_stft = input_stft[:, :, ch, :] else: # Use the first channel input_stft = input_stft[:, :, 0, :] # 4. STFT -> Power spectrum # h: ComplexTensor(B, T, F) -> torch.Tensor(B, T, F) input_power = input_stft.real ** 2 + input_stft.imag ** 2 # 5. Feature transform e.g. Stft -> Log-Mel-Fbank # input_power: (Batch, [Channel,] Length, Freq) # -> input_feats: (Batch, Length, Dim) input_feats, _ = self.logmel(input_power, feats_lens) # NOTE(sx): pad max_len = input_feats.size(1) if self.downsample_rate > 1 and max_len % self.downsample_rate != 0: padding = self.downsample_rate - max_len % self.downsample_rate # print("Logmel: ", input_feats.size()) input_feats = torch.nn.functional.pad(input_feats, (0, 0, 0, padding), "constant", 0) # print("Logmel(after padding): ",input_feats.size()) feats_lens[torch.argmax(feats_lens)] = max_len + padding return input_feats, feats_lens