TastyPiano / src /music /utils.py
Cédric Colas
initial commit
e775f6d
raw
history blame
13.2 kB
import os
import pandas as pd
from pydub import AudioSegment
import numpy as np
from moviepy.editor import *
import time
import pickle
import audioread
import librosa # install numba==0.49.1
# setup A: numba 0.51.2, librosa 0.6.3, llvmlite: 0.34.0
# setupB: numba==0.49.1, llvmlite-0.32.1
from src.music.config import RATE_AUDIO_SAVE
import hashlib
import unicodedata
import re
# from src.music.piano_detection_model.piano_detection_model import SR
def clean_removed_mp3_from_csv(path):
print(f"Cleaning meta_data.csv using files from the folder, in {path}")
files = os.listdir(path)
indexes_to_remove = []
meta_data = pd.read_csv(path + 'meta_data.csv')
for i, fn in enumerate(meta_data['filename']):
if fn not in files:
indexes_to_remove.append(i)
meta_data = meta_data.drop(indexes_to_remove)
meta_data.to_csv(path + 'meta_data.csv', index=False)
print('\tDone.')
def clean_removed_csv_from_folder(path):
print(f"Cleaning files from folder using meta_data.csv listed file, in {path}")
files = os.listdir(path)
meta_data = pd.read_csv(path + 'meta_data.csv')
hashes = set(meta_data['hash'])
count = 0
for f in files:
if f not in ['meta_data.csv', 'url.txt']:
if f[:-4] not in hashes:
count += 1
print(count)
# os.remove(path + f)
stop = 1
print('\tDone.')
# def convert_mp3_to_mono_16k(path):
# print(f"\n\n\t\tConverting mp3 to mono and 16k sample rate, in {path}\n")
# if '.mp3' == path[-4:]:
# audio = AudioFileClip(path)
# audio.write_audiofile(path[:-4] + '.mp3',
# verbose=False,
# logger=None,
# fps=FPS,
# ffmpeg_params=["-ac", "1"])
# else:
# list_files = os.listdir(path)
# for i, f in enumerate(list_files):
# print(compute_progress(i, len(list_files)))
# if ".mp3" in f:
# audio = AudioFileClip(path + f)
# audio.write_audiofile(path + f[:-4] + '.mp3',
# verbose=False,
# logger=None,
# fps=FPS, # 16000 sr
# ffmpeg_params=["-ac", "1"] # make it mono
# )
# print('\tDone.')
def load_audio(path, sr=22050, mono=True, offset=0.0, duration=None,
dtype=np.float32, res_type='kaiser_best',
backends=[audioread.ffdec.FFmpegAudioFile]):
"""Load audio. Copied from librosa.core.load() except that ffmpeg backend is
always used in this function. Code from piano_transcription_inference"""
y = []
with audioread.audio_open(os.path.realpath(path), backends=backends) as input_file:
sr_native = input_file.samplerate
n_channels = input_file.channels
s_start = int(np.round(sr_native * offset)) * n_channels
if duration is None:
s_end = np.inf
else:
s_end = s_start + (int(np.round(sr_native * duration))
* n_channels)
n = 0
for frame in input_file:
frame = librosa.core.audio.util.buf_to_float(frame, dtype=dtype)
n_prev = n
n = n + len(frame)
if n < s_start:
# offset is after the current frame
# keep reading
continue
if s_end < n_prev:
# we're off the end. stop reading
break
if s_end < n:
# the end is in this frame. crop.
frame = frame[:s_end - n_prev]
if n_prev <= s_start <= n:
# beginning is in this frame
frame = frame[(s_start - n_prev):]
# tack on the current frame
y.append(frame)
if y:
y = np.concatenate(y)
if n_channels > 1:
y = y.reshape((-1, n_channels)).T
if mono:
y = librosa.core.audio.to_mono(y)
if sr is not None:
y = librosa.core.audio.resample(y, sr_native, sr, res_type=res_type)
else:
sr = sr_native
# Final cleanup for dtype and contiguity
y = np.ascontiguousarray(y, dtype=dtype)
return (y, sr)
def compute_progress(iter, total):
return f"{int((iter+ 1) / total * 100)}%"
def compute_progress_and_eta(times, iter, total, n_av=3000):
av_time = np.mean(times[-n_av:])
progress = int(((iter + 1) / total) * 100)
eta_h = int(av_time * (total - iter) // 3600)
eta_m = int((av_time * (total - iter) - (eta_h * 3600)) // 60)
eta_s = int((av_time * (total - iter) - (eta_h * 3600) - eta_m * 60))
eta = f"Progress: {progress}%, ETA: {eta_h}H{eta_m}M{eta_s}S."
return eta
def crop_mp3_from_meta_data_constraints(path, clean_constraints=True):
print(f"Cropping mp3 using constraints from meta_data.csv, in {path}")
meta_data = pd.read_csv(path + 'meta_data.csv')
constraint_start = meta_data['constraint_start'].copy()
length = meta_data['length'].copy()
constraint_end = meta_data['constraint_end'].copy()
filenames = meta_data['filename'].copy()
times = [5]
for i, c_start, c_end, fn, l in zip(range(len(constraint_start)), constraint_start, constraint_end, filenames, length):
if c_start != 0 or c_end != l:
i_time = time.time()
print(compute_progress_and_eta(times, i, len(constraint_start), n_av=100))
song = AudioSegment.from_mp3(path + fn)
extract = song[c_start*1000:c_end*1000]
extract.export(path + fn, format="mp3")
if clean_constraints:
constraint_start[i] = 0
constraint_end[i] = length[i]
meta_data['constraint_start'] = constraint_start
meta_data['constraint_end'] = constraint_end
meta_data.to_csv(path + 'meta_data.csv', index=False)
times.append(time.time() - i_time)
print('\tDone.')
def get_all_subfiles_with_extension(path, max_depth=3, extension='.*', current_depth=0):
folders = [f for f in os.listdir(path) if os.path.isdir(path + f)]
# get all files in current folder with a given extension
if isinstance(extension, list):
assert all([isinstance(e, str) for e in extension]), 'extension can be a str or a list'
files = [path + f for f in os.listdir(path) if os.path.isfile(path + f) and any([ext == f[-len(ext):] for ext in extension])]
elif isinstance(extension, str):
assert extension[0] == '.', 'extension should be an extension or a list of extensions'
if extension == '.*':
files = [path + f for f in os.listdir(path) if os.path.isfile(path + f)]
else:
files = [path + f for f in os.listdir(path) if os.path.isfile(path + f) and f[-len(extension):]==extension]
else:
print('Error: extension should be either a str or a list')
raise ValueError
if current_depth < max_depth:
for fold in folders:
files += get_all_subfiles_with_extension(path + fold + '/', max_depth=max_depth, extension=extension, current_depth=current_depth+1)
return files
def get_out_path(in_path, in_word, out_word, out_extension, exclude_paths=()):
splitted_in_path = in_path.split('/')
for i in range(len(splitted_in_path)):
if splitted_in_path[i] == in_word:
splitted_in_path[i] = out_word
playlist_index = i + 1
file_index = len(splitted_in_path) - 1
if splitted_in_path[playlist_index] in exclude_paths:
to_exclude = True
return None, to_exclude, None
else:
to_exclude = False
if out_word != 'midi':
splitted_in_path[playlist_index] = '_'.join(splitted_in_path[playlist_index].split('_')[:-len(in_word.split('_'))]) + '_' + out_word
else:
splitted_in_path[playlist_index] += '_' + out_word
if 'fake' not in splitted_in_path:
os.makedirs('/'.join(splitted_in_path[:playlist_index + 1]), exist_ok=True)
if out_word != 'midi':
new_filename = '_'.join(splitted_in_path[file_index].split('_')[:-len(in_word.split('_'))]) + '_' + out_word + out_extension
else:
new_filename = '.'.join(splitted_in_path[file_index].split('.')[:-len(in_word.split('_'))]) + '_' + out_word + out_extension
splitted_in_path[file_index] = new_filename
splitted_in_path = splitted_in_path[:playlist_index + 1] + [splitted_in_path[file_index]]
out_path = '/'.join(splitted_in_path)
return out_path, to_exclude, splitted_in_path[playlist_index]
def set_all_seeds(seed):
import random
import numpy as np
import torch
torch.manual_seed(seed)
random.seed(seed)
np.random.seed(seed)
def get_paths_in_and_out(in_path, in_word, in_extension, out_word, out_extension, max_depth, exclude_paths=()):
# find all files with the in_extension in subfolders of in_path up to max_depth.
# for each, replace the in_word keyword in folders with the out_word, and append out_word to filenames.
all_in_paths = get_all_subfiles_with_extension(in_path, max_depth=max_depth, extension=in_extension)
indexes_not_transcribed = []
all_out_paths = []
all_playlists = []
for i_path, in_path in enumerate(all_in_paths):
out_path, to_exclude, playlist = get_out_path(in_path=in_path, in_word=in_word, out_word=out_word, out_extension=out_extension, exclude_paths=exclude_paths)
if not to_exclude:
indexes_not_transcribed.append(i_path)
all_out_paths.append(out_path)
all_playlists.append(playlist)
all_in_paths = [in_path for i, in_path in enumerate(all_in_paths) if i in indexes_not_transcribed]
assert len(all_out_paths) == len(all_in_paths)
return all_in_paths, all_out_paths, all_playlists
def get_path_and_filter_existing(in_path, in_word, in_extension, out_word, out_extension, max_depth, exclude_paths=()):
# find all files with the in_extension in subfolders of in_path up to max_depth.
# for each, replace the in_word keyword in folders with the out_word, and append out_word to filenames.
all_in_paths = get_all_subfiles_with_extension(in_path, max_depth=max_depth, extension=in_extension)
indexes_to_process = []
all_out_paths = []
all_playlists = []
for i_path, in_path in enumerate(all_in_paths):
out_path, to_exclude, playlist = get_out_path(in_path=in_path, in_word=in_word, out_word=out_word, out_extension=out_extension, exclude_paths=exclude_paths)
if not to_exclude:
if not os.path.exists(out_path):
indexes_to_process.append(i_path)
all_out_paths.append(out_path)
all_playlists.append(playlist)
all_in_paths = list(np.array(all_in_paths)[indexes_to_process])#[in_path for i, in_path in enumerate(all_in_paths) if i in indexes_to_process]
assert len(all_out_paths) == len(all_in_paths)
return all_in_paths, all_out_paths, all_playlists
def md5sum(filename, blocksize=65536):
hash = hashlib.md5()
with open(filename, "rb") as f:
for block in iter(lambda: f.read(blocksize), b""):
hash.update(block)
return hash.hexdigest()
emoji_pattern = re.compile("["
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F300-\U0001F5FF" # symbols & pictographs
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags (iOS)
"]+", flags=re.UNICODE)
def slugify(value, allow_unicode=False):
"""
Taken from https://github.com/django/django/blob/master/django/utils/text.py
Convert to ASCII if 'allow_unicode' is False. Convert spaces or repeated
dashes to single dashes. Remove characters that aren't alphanumerics,
underscores, or hyphens. Convert to lowercase. Also strip leading and
trailing whitespace, dashes, and underscores.
"""
value = str(value).lower()
if allow_unicode:
value = unicodedata.normalize('NFKC', value)
else:
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii')
value = re.sub(r'[^\w\s-]', '', value.lower())
value = emoji_pattern.sub(r'', value)
value = re.sub(r'[-\s]+', '_', value).strip('-_')
# if value == '':
# for i in range(10):
# value += str(np.random.choice(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))
return value
if __name__ == '__main__':
path = "/home/cedric/Documents/pianocktail/data/midi/street_piano/"
# for folder in ['my_sheet_music_transcriptions']:#os.listdir(path):
# print('\n\n\t\t', folder)
# convert_mp4_to_mp3(path + folder + '/')
clean_removed_csv_from_folder(path)
# folder = 'street_piano/'
# for folder in ['street_piano/']:
# clean_removed_mp3_from_csv(path + folder)