import librosa import numpy as np from encoder import inference as encoder from utils import logmmse from synthesizer import audio from pathlib import Path from pypinyin import Style from pypinyin.contrib.neutral_tone import NeutralToneWith5Mixin from pypinyin.converter import DefaultConverter from pypinyin.core import Pinyin class PinyinConverter(NeutralToneWith5Mixin, DefaultConverter): pass pinyin = Pinyin(PinyinConverter()).pinyin def _process_utterance(wav: np.ndarray, text: str, out_dir: Path, basename: str, skip_existing: bool, hparams): ## FOR REFERENCE: # For you not to lose your head if you ever wish to change things here or implement your own # synthesizer. # - Both the audios and the mel spectrograms are saved as numpy arrays # - There is no processing done to the audios that will be saved to disk beyond volume # normalization (in split_on_silences) # - However, pre-emphasis is applied to the audios before computing the mel spectrogram. This # is why we re-apply it on the audio on the side of the vocoder. # - Librosa pads the waveform before computing the mel spectrogram. Here, the waveform is saved # without extra padding. This means that you won't have an exact relation between the length # of the wav and of the mel spectrogram. See the vocoder data loader. # Skip existing utterances if needed mel_fpath = out_dir.joinpath("mels", "mel-%s.npy" % basename) wav_fpath = out_dir.joinpath("audio", "audio-%s.npy" % basename) if skip_existing and mel_fpath.exists() and wav_fpath.exists(): return None # Trim silence if hparams.trim_silence: wav = encoder.preprocess_wav(wav, normalize=False, trim_silence=True) # Skip utterances that are too short if len(wav) < hparams.utterance_min_duration * hparams.sample_rate: return None # Compute the mel spectrogram mel_spectrogram = audio.melspectrogram(wav, hparams).astype(np.float32) mel_frames = mel_spectrogram.shape[1] # Skip utterances that are too long if mel_frames > hparams.max_mel_frames and hparams.clip_mels_length: return None # Write the spectrogram, embed and audio to disk np.save(mel_fpath, mel_spectrogram.T, allow_pickle=False) np.save(wav_fpath, wav, allow_pickle=False) # Return a tuple describing this training example return wav_fpath.name, mel_fpath.name, "embed-%s.npy" % basename, len(wav), mel_frames, text def _split_on_silences_aidatatang_200zh(wav_fpath, words, hparams): # Load the audio waveform wav, _ = librosa.load(wav_fpath, hparams.sample_rate) wav = librosa.effects.trim(wav, top_db= 40, frame_length=2048, hop_length=512)[0] if hparams.rescale: wav = wav / np.abs(wav).max() * hparams.rescaling_max # denoise, we may not need it here. if len(wav) > hparams.sample_rate*(0.3+0.1): noise_wav = np.concatenate([wav[:int(hparams.sample_rate*0.15)], wav[-int(hparams.sample_rate*0.15):]]) profile = logmmse.profile_noise(noise_wav, hparams.sample_rate) wav = logmmse.denoise(wav, profile, eta=0) resp = pinyin(words, style=Style.TONE3) res = [v[0] for v in resp if v[0].strip()] res = " ".join(res) return wav, res def preprocess_speaker_general(speaker_dir, out_dir: Path, skip_existing: bool, hparams, dict_info, no_alignments: bool): metadata = [] wav_fpath_list = speaker_dir.glob("*.wav") # Iterate over each wav for wav_fpath in wav_fpath_list: words = dict_info.get(wav_fpath.name.split(".")[0]) words = dict_info.get(wav_fpath.name) if not words else words # try with wav if not words: print("no wordS") continue sub_basename = "%s_%02d" % (wav_fpath.name, 0) wav, text = _split_on_silences_aidatatang_200zh(wav_fpath, words, hparams) metadata.append(_process_utterance(wav, text, out_dir, sub_basename, skip_existing, hparams)) return [m for m in metadata if m is not None] def preprocess_speaker(speaker_dir, out_dir: Path, skip_existing: bool, hparams, no_alignments: bool): metadata = [] for book_dir in speaker_dir.glob("*"): if no_alignments: # Gather the utterance audios and texts # LibriTTS uses .wav but we will include extensions for compatibility with other datasets extensions = ["*.wav", "*.flac", "*.mp3"] for extension in extensions: wav_fpaths = book_dir.glob(extension) for wav_fpath in wav_fpaths: # Load the audio waveform wav, _ = librosa.load(str(wav_fpath), hparams.sample_rate) if hparams.rescale: wav = wav / np.abs(wav).max() * hparams.rescaling_max # Get the corresponding text # Check for .txt (for compatibility with other datasets) text_fpath = wav_fpath.with_suffix(".txt") if not text_fpath.exists(): # Check for .normalized.txt (LibriTTS) text_fpath = wav_fpath.with_suffix(".normalized.txt") assert text_fpath.exists() with text_fpath.open("r") as text_file: text = "".join([line for line in text_file]) text = text.replace("\"", "") text = text.strip() # Process the utterance metadata.append(_process_utterance(wav, text, out_dir, str(wav_fpath.with_suffix("").name), skip_existing, hparams)) else: # Process alignment file (LibriSpeech support) # Gather the utterance audios and texts try: alignments_fpath = next(book_dir.glob("*.alignment.txt")) with alignments_fpath.open("r") as alignments_file: alignments = [line.rstrip().split(" ") for line in alignments_file] except StopIteration: # A few alignment files will be missing continue # Iterate over each entry in the alignments file for wav_fname, words, end_times in alignments: wav_fpath = book_dir.joinpath(wav_fname + ".flac") assert wav_fpath.exists() words = words.replace("\"", "").split(",") end_times = list(map(float, end_times.replace("\"", "").split(","))) # Process each sub-utterance wavs, texts = _split_on_silences(wav_fpath, words, end_times, hparams) for i, (wav, text) in enumerate(zip(wavs, texts)): sub_basename = "%s_%02d" % (wav_fname, i) metadata.append(_process_utterance(wav, text, out_dir, sub_basename, skip_existing, hparams)) return [m for m in metadata if m is not None] # TODO: use original split func def _split_on_silences(wav_fpath, words, end_times, hparams): # Load the audio waveform wav, _ = librosa.load(str(wav_fpath), hparams.sample_rate) if hparams.rescale: wav = wav / np.abs(wav).max() * hparams.rescaling_max words = np.array(words) start_times = np.array([0.0] + end_times[:-1]) end_times = np.array(end_times) assert len(words) == len(end_times) == len(start_times) assert words[0] == "" and words[-1] == "" # Find pauses that are too long mask = (words == "") & (end_times - start_times >= hparams.silence_min_duration_split) mask[0] = mask[-1] = True breaks = np.where(mask)[0] # Profile the noise from the silences and perform noise reduction on the waveform silence_times = [[start_times[i], end_times[i]] for i in breaks] silence_times = (np.array(silence_times) * hparams.sample_rate).astype(np.int) noisy_wav = np.concatenate([wav[stime[0]:stime[1]] for stime in silence_times]) if len(noisy_wav) > hparams.sample_rate * 0.02: profile = logmmse.profile_noise(noisy_wav, hparams.sample_rate) wav = logmmse.denoise(wav, profile, eta=0) # Re-attach segments that are too short segments = list(zip(breaks[:-1], breaks[1:])) segment_durations = [start_times[end] - end_times[start] for start, end in segments] i = 0 while i < len(segments) and len(segments) > 1: if segment_durations[i] < hparams.utterance_min_duration: # See if the segment can be re-attached with the right or the left segment left_duration = float("inf") if i == 0 else segment_durations[i - 1] right_duration = float("inf") if i == len(segments) - 1 else segment_durations[i + 1] joined_duration = segment_durations[i] + min(left_duration, right_duration) # Do not re-attach if it causes the joined utterance to be too long if joined_duration > hparams.hop_size * hparams.max_mel_frames / hparams.sample_rate: i += 1 continue # Re-attach the segment with the neighbour of shortest duration j = i - 1 if left_duration <= right_duration else i segments[j] = (segments[j][0], segments[j + 1][1]) segment_durations[j] = joined_duration del segments[j + 1], segment_durations[j + 1] else: i += 1 # Split the utterance segment_times = [[end_times[start], start_times[end]] for start, end in segments] segment_times = (np.array(segment_times) * hparams.sample_rate).astype(np.int) wavs = [wav[segment_time[0]:segment_time[1]] for segment_time in segment_times] texts = [" ".join(words[start + 1:end]).replace(" ", " ") for start, end in segments] # # DEBUG: play the audio segments (run with -n=1) # import sounddevice as sd # if len(wavs) > 1: # print("This sentence was split in %d segments:" % len(wavs)) # else: # print("There are no silences long enough for this sentence to be split:") # for wav, text in zip(wavs, texts): # # Pad the waveform with 1 second of silence because sounddevice tends to cut them early # # when playing them. You shouldn't need to do that in your parsers. # wav = np.concatenate((wav, [0] * 16000)) # print("\t%s" % text) # sd.play(wav, 16000, blocking=True) # print("") return wavs, texts