|
STR_CLIP_ID = 'clip_id' |
|
STR_AUDIO_SIGNAL = 'audio_signal' |
|
STR_TARGET_VECTOR = 'target_vector' |
|
|
|
|
|
STR_CH_FIRST = 'channels_first' |
|
STR_CH_LAST = 'channels_last' |
|
|
|
import io |
|
import os |
|
import tqdm |
|
import logging |
|
import subprocess |
|
from typing import Tuple |
|
from pathlib import Path |
|
|
|
|
|
import numpy as np |
|
import soundfile as sf |
|
|
|
import itertools |
|
from numpy.fft import irfft |
|
|
|
def _resample_load_ffmpeg(path: str, sample_rate: int, downmix_to_mono: bool) -> Tuple[np.ndarray, int]: |
|
""" |
|
Decoding, downmixing, and downsampling by librosa. |
|
Returns a channel-first audio signal. |
|
|
|
Args: |
|
path: |
|
sample_rate: |
|
downmix_to_mono: |
|
|
|
Returns: |
|
(audio signal, sample rate) |
|
""" |
|
|
|
def _decode_resample_by_ffmpeg(filename, sr): |
|
"""decode, downmix, and resample audio file""" |
|
channel_cmd = '-ac 1 ' if downmix_to_mono else '' |
|
resampling_cmd = f'-ar {str(sr)}' if sr else '' |
|
cmd = f"ffmpeg -i \"{filename}\" {channel_cmd} {resampling_cmd} -f wav -" |
|
p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
out, err = p.communicate() |
|
return out |
|
|
|
src, sr = sf.read(io.BytesIO(_decode_resample_by_ffmpeg(path, sr=sample_rate))) |
|
return src.T, sr |
|
|
|
|
|
def _resample_load_librosa(path: str, sample_rate: int, downmix_to_mono: bool, **kwargs) -> Tuple[np.ndarray, int]: |
|
""" |
|
Decoding, downmixing, and downsampling by librosa. |
|
Returns a channel-first audio signal. |
|
""" |
|
src, sr = librosa.load(path, sr=sample_rate, mono=downmix_to_mono, **kwargs) |
|
return src, sr |
|
|
|
|
|
def load_audio( |
|
path: str or Path, |
|
ch_format: str, |
|
sample_rate: int = None, |
|
downmix_to_mono: bool = False, |
|
resample_by: str = 'ffmpeg', |
|
**kwargs, |
|
) -> Tuple[np.ndarray, int]: |
|
"""A wrapper of librosa.load that: |
|
- forces the returned audio to be 2-dim, |
|
- defaults to sr=None, and |
|
- defaults to downmix_to_mono=False. |
|
|
|
The audio decoding is done by `audioread` or `soundfile` package and ultimately, often by ffmpeg. |
|
The resampling is done by `librosa`'s child package `resampy`. |
|
|
|
Args: |
|
path: audio file path |
|
ch_format: one of 'channels_first' or 'channels_last' |
|
sample_rate: target sampling rate. if None, use the rate of the audio file |
|
downmix_to_mono: |
|
resample_by (str): 'librosa' or 'ffmpeg'. it decides backend for audio decoding and resampling. |
|
**kwargs: keyword args for librosa.load - offset, duration, dtype, res_type. |
|
|
|
Returns: |
|
(audio, sr) tuple |
|
""" |
|
if ch_format not in (STR_CH_FIRST, STR_CH_LAST): |
|
raise ValueError(f'ch_format is wrong here -> {ch_format}') |
|
|
|
if os.stat(path).st_size > 8000: |
|
if resample_by == 'librosa': |
|
src, sr = _resample_load_librosa(path, sample_rate, downmix_to_mono, **kwargs) |
|
elif resample_by == 'ffmpeg': |
|
src, sr = _resample_load_ffmpeg(path, sample_rate, downmix_to_mono) |
|
else: |
|
raise NotImplementedError(f'resample_by: "{resample_by}" is not supposred yet') |
|
else: |
|
raise ValueError('Given audio is too short!') |
|
return src, sr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ms(x): |
|
"""Mean value of signal `x` squared. |
|
:param x: Dynamic quantity. |
|
:returns: Mean squared of `x`. |
|
""" |
|
return (np.abs(x)**2.0).mean() |
|
|
|
def normalize(y, x=None): |
|
"""normalize power in y to a (standard normal) white noise signal. |
|
Optionally normalize to power in signal `x`. |
|
#The mean power of a Gaussian with :math:`\\mu=0` and :math:`\\sigma=1` is 1. |
|
""" |
|
if x is not None: |
|
x = ms(x) |
|
else: |
|
x = 1.0 |
|
return y * np.sqrt(x / ms(y)) |
|
|
|
def noise(N, color='white', state=None): |
|
"""Noise generator. |
|
:param N: Amount of samples. |
|
:param color: Color of noise. |
|
:param state: State of PRNG. |
|
:type state: :class:`np.random.RandomState` |
|
""" |
|
try: |
|
return _noise_generators[color](N, state) |
|
except KeyError: |
|
raise ValueError("Incorrect color.") |
|
|
|
def white(N, state=None): |
|
""" |
|
White noise. |
|
:param N: Amount of samples. |
|
:param state: State of PRNG. |
|
:type state: :class:`np.random.RandomState` |
|
White noise has a constant power density. It's narrowband spectrum is therefore flat. |
|
The power in white noise will increase by a factor of two for each octave band, |
|
and therefore increases with 3 dB per octave. |
|
""" |
|
state = np.random.RandomState() if state is None else state |
|
return state.randn(N) |
|
|
|
def pink(N, state=None): |
|
""" |
|
Pink noise. |
|
:param N: Amount of samples. |
|
:param state: State of PRNG. |
|
:type state: :class:`np.random.RandomState` |
|
Pink noise has equal power in bands that are proportionally wide. |
|
Power density decreases with 3 dB per octave. |
|
""" |
|
state = np.random.RandomState() if state is None else state |
|
uneven = N % 2 |
|
X = state.randn(N // 2 + 1 + uneven) + 1j * state.randn(N // 2 + 1 + uneven) |
|
S = np.sqrt(np.arange(len(X)) + 1.) |
|
y = (irfft(X / S)).real |
|
if uneven: |
|
y = y[:-1] |
|
return normalize(y) |
|
|
|
def blue(N, state=None): |
|
""" |
|
Blue noise. |
|
:param N: Amount of samples. |
|
:param state: State of PRNG. |
|
:type state: :class:`np.random.RandomState` |
|
Power increases with 6 dB per octave. |
|
Power density increases with 3 dB per octave. |
|
""" |
|
state = np.random.RandomState() if state is None else state |
|
uneven = N % 2 |
|
X = state.randn(N // 2 + 1 + uneven) + 1j * state.randn(N // 2 + 1 + uneven) |
|
S = np.sqrt(np.arange(len(X))) |
|
y = (irfft(X * S)).real |
|
if uneven: |
|
y = y[:-1] |
|
return normalize(y) |
|
|
|
def brown(N, state=None): |
|
""" |
|
Violet noise. |
|
:param N: Amount of samples. |
|
:param state: State of PRNG. |
|
:type state: :class:`np.random.RandomState` |
|
Power decreases with -3 dB per octave. |
|
Power density decreases with 6 dB per octave. |
|
""" |
|
state = np.random.RandomState() if state is None else state |
|
uneven = N % 2 |
|
X = state.randn(N // 2 + 1 + uneven) + 1j * state.randn(N // 2 + 1 + uneven) |
|
S = (np.arange(len(X)) + 1) |
|
y = (irfft(X / S)).real |
|
if uneven: |
|
y = y[:-1] |
|
return normalize(y) |
|
|
|
def violet(N, state=None): |
|
""" |
|
Violet noise. Power increases with 6 dB per octave. |
|
:param N: Amount of samples. |
|
:param state: State of PRNG. |
|
:type state: :class:`np.random.RandomState` |
|
Power increases with +9 dB per octave. |
|
Power density increases with +6 dB per octave. |
|
""" |
|
state = np.random.RandomState() if state is None else state |
|
uneven = N % 2 |
|
X = state.randn(N // 2 + 1 + uneven) + 1j * state.randn(N // 2 + 1 + uneven) |
|
S = (np.arange(len(X))) |
|
y = (irfft(X * S)).real |
|
if uneven: |
|
y = y[:-1] |
|
return normalize(y) |
|
|
|
_noise_generators = { |
|
'white': white, |
|
'pink': pink, |
|
'blue': blue, |
|
'brown': brown, |
|
'violet': violet, |
|
} |
|
|
|
def noise_generator(N=44100, color='white', state=None): |
|
"""Noise generator. |
|
:param N: Amount of unique samples to generate. |
|
:param color: Color of noise. |
|
Generate `N` amount of unique samples and cycle over these samples. |
|
""" |
|
|
|
for sample in itertools.cycle(noise(N, color, state)): |
|
yield sample |
|
|
|
def heaviside(N): |
|
"""Heaviside. |
|
Returns the value 0 for `x < 0`, 1 for `x > 0`, and 1/2 for `x = 0`. |
|
""" |
|
return 0.5 * (np.sign(N) + 1) |