Spaces:
Runtime error
Runtime error
import os | |
import pandas as pd | |
from pydub import AudioSegment | |
import numpy as np | |
from moviepy.editor import * | |
import time | |
import pickle | |
import audioread | |
import librosa # install numba==0.49.1 | |
# setup A: numba 0.51.2, librosa 0.6.3, llvmlite: 0.34.0 | |
# setupB: numba==0.49.1, llvmlite-0.32.1 | |
from src.music.config import RATE_AUDIO_SAVE | |
import hashlib | |
import unicodedata | |
import re | |
# from src.music.piano_detection_model.piano_detection_model import SR | |
def clean_removed_mp3_from_csv(path): | |
print(f"Cleaning meta_data.csv using files from the folder, in {path}") | |
files = os.listdir(path) | |
indexes_to_remove = [] | |
meta_data = pd.read_csv(path + 'meta_data.csv') | |
for i, fn in enumerate(meta_data['filename']): | |
if fn not in files: | |
indexes_to_remove.append(i) | |
meta_data = meta_data.drop(indexes_to_remove) | |
meta_data.to_csv(path + 'meta_data.csv', index=False) | |
print('\tDone.') | |
def clean_removed_csv_from_folder(path): | |
print(f"Cleaning files from folder using meta_data.csv listed file, in {path}") | |
files = os.listdir(path) | |
meta_data = pd.read_csv(path + 'meta_data.csv') | |
hashes = set(meta_data['hash']) | |
count = 0 | |
for f in files: | |
if f not in ['meta_data.csv', 'url.txt']: | |
if f[:-4] not in hashes: | |
count += 1 | |
print(count) | |
# os.remove(path + f) | |
stop = 1 | |
print('\tDone.') | |
# def convert_mp3_to_mono_16k(path): | |
# print(f"\n\n\t\tConverting mp3 to mono and 16k sample rate, in {path}\n") | |
# if '.mp3' == path[-4:]: | |
# audio = AudioFileClip(path) | |
# audio.write_audiofile(path[:-4] + '.mp3', | |
# verbose=False, | |
# logger=None, | |
# fps=FPS, | |
# ffmpeg_params=["-ac", "1"]) | |
# else: | |
# list_files = os.listdir(path) | |
# for i, f in enumerate(list_files): | |
# print(compute_progress(i, len(list_files))) | |
# if ".mp3" in f: | |
# audio = AudioFileClip(path + f) | |
# audio.write_audiofile(path + f[:-4] + '.mp3', | |
# verbose=False, | |
# logger=None, | |
# fps=FPS, # 16000 sr | |
# ffmpeg_params=["-ac", "1"] # make it mono | |
# ) | |
# print('\tDone.') | |
def load_audio(path, sr=22050, mono=True, offset=0.0, duration=None, | |
dtype=np.float32, res_type='kaiser_best', | |
backends=[audioread.ffdec.FFmpegAudioFile]): | |
"""Load audio. Copied from librosa.core.load() except that ffmpeg backend is | |
always used in this function. Code from piano_transcription_inference""" | |
y = [] | |
with audioread.audio_open(os.path.realpath(path), backends=backends) as input_file: | |
sr_native = input_file.samplerate | |
n_channels = input_file.channels | |
s_start = int(np.round(sr_native * offset)) * n_channels | |
if duration is None: | |
s_end = np.inf | |
else: | |
s_end = s_start + (int(np.round(sr_native * duration)) | |
* n_channels) | |
n = 0 | |
for frame in input_file: | |
frame = librosa.core.audio.util.buf_to_float(frame, dtype=dtype) | |
n_prev = n | |
n = n + len(frame) | |
if n < s_start: | |
# offset is after the current frame | |
# keep reading | |
continue | |
if s_end < n_prev: | |
# we're off the end. stop reading | |
break | |
if s_end < n: | |
# the end is in this frame. crop. | |
frame = frame[:s_end - n_prev] | |
if n_prev <= s_start <= n: | |
# beginning is in this frame | |
frame = frame[(s_start - n_prev):] | |
# tack on the current frame | |
y.append(frame) | |
if y: | |
y = np.concatenate(y) | |
if n_channels > 1: | |
y = y.reshape((-1, n_channels)).T | |
if mono: | |
y = librosa.core.audio.to_mono(y) | |
if sr is not None: | |
y = librosa.core.audio.resample(y, sr_native, sr, res_type=res_type) | |
else: | |
sr = sr_native | |
# Final cleanup for dtype and contiguity | |
y = np.ascontiguousarray(y, dtype=dtype) | |
return (y, sr) | |
def compute_progress(iter, total): | |
return f"{int((iter+ 1) / total * 100)}%" | |
def compute_progress_and_eta(times, iter, total, n_av=3000): | |
av_time = np.mean(times[-n_av:]) | |
progress = int(((iter + 1) / total) * 100) | |
eta_h = int(av_time * (total - iter) // 3600) | |
eta_m = int((av_time * (total - iter) - (eta_h * 3600)) // 60) | |
eta_s = int((av_time * (total - iter) - (eta_h * 3600) - eta_m * 60)) | |
eta = f"Progress: {progress}%, ETA: {eta_h}H{eta_m}M{eta_s}S." | |
return eta | |
def crop_mp3_from_meta_data_constraints(path, clean_constraints=True): | |
print(f"Cropping mp3 using constraints from meta_data.csv, in {path}") | |
meta_data = pd.read_csv(path + 'meta_data.csv') | |
constraint_start = meta_data['constraint_start'].copy() | |
length = meta_data['length'].copy() | |
constraint_end = meta_data['constraint_end'].copy() | |
filenames = meta_data['filename'].copy() | |
times = [5] | |
for i, c_start, c_end, fn, l in zip(range(len(constraint_start)), constraint_start, constraint_end, filenames, length): | |
if c_start != 0 or c_end != l: | |
i_time = time.time() | |
print(compute_progress_and_eta(times, i, len(constraint_start), n_av=100)) | |
song = AudioSegment.from_mp3(path + fn) | |
extract = song[c_start*1000:c_end*1000] | |
extract.export(path + fn, format="mp3") | |
if clean_constraints: | |
constraint_start[i] = 0 | |
constraint_end[i] = length[i] | |
meta_data['constraint_start'] = constraint_start | |
meta_data['constraint_end'] = constraint_end | |
meta_data.to_csv(path + 'meta_data.csv', index=False) | |
times.append(time.time() - i_time) | |
print('\tDone.') | |
def get_all_subfiles_with_extension(path, max_depth=3, extension='.*', current_depth=0): | |
folders = [f for f in os.listdir(path) if os.path.isdir(path + f)] | |
# get all files in current folder with a given extension | |
if isinstance(extension, list): | |
assert all([isinstance(e, str) for e in extension]), 'extension can be a str or a list' | |
files = [path + f for f in os.listdir(path) if os.path.isfile(path + f) and any([ext == f[-len(ext):] for ext in extension])] | |
elif isinstance(extension, str): | |
assert extension[0] == '.', 'extension should be an extension or a list of extensions' | |
if extension == '.*': | |
files = [path + f for f in os.listdir(path) if os.path.isfile(path + f)] | |
else: | |
files = [path + f for f in os.listdir(path) if os.path.isfile(path + f) and f[-len(extension):]==extension] | |
else: | |
print('Error: extension should be either a str or a list') | |
raise ValueError | |
if current_depth < max_depth: | |
for fold in folders: | |
files += get_all_subfiles_with_extension(path + fold + '/', max_depth=max_depth, extension=extension, current_depth=current_depth+1) | |
return files | |
def get_out_path(in_path, in_word, out_word, out_extension, exclude_paths=()): | |
splitted_in_path = in_path.split('/') | |
for i in range(len(splitted_in_path)): | |
if splitted_in_path[i] == in_word: | |
splitted_in_path[i] = out_word | |
playlist_index = i + 1 | |
file_index = len(splitted_in_path) - 1 | |
if splitted_in_path[playlist_index] in exclude_paths: | |
to_exclude = True | |
return None, to_exclude, None | |
else: | |
to_exclude = False | |
if out_word != 'midi': | |
splitted_in_path[playlist_index] = '_'.join(splitted_in_path[playlist_index].split('_')[:-len(in_word.split('_'))]) + '_' + out_word | |
else: | |
splitted_in_path[playlist_index] += '_' + out_word | |
if 'fake' not in splitted_in_path: | |
os.makedirs('/'.join(splitted_in_path[:playlist_index + 1]), exist_ok=True) | |
if out_word != 'midi': | |
new_filename = '_'.join(splitted_in_path[file_index].split('_')[:-len(in_word.split('_'))]) + '_' + out_word + out_extension | |
else: | |
new_filename = '.'.join(splitted_in_path[file_index].split('.')[:-len(in_word.split('_'))]) + '_' + out_word + out_extension | |
splitted_in_path[file_index] = new_filename | |
splitted_in_path = splitted_in_path[:playlist_index + 1] + [splitted_in_path[file_index]] | |
out_path = '/'.join(splitted_in_path) | |
return out_path, to_exclude, splitted_in_path[playlist_index] | |
def set_all_seeds(seed): | |
import random | |
import numpy as np | |
import torch | |
torch.manual_seed(seed) | |
random.seed(seed) | |
np.random.seed(seed) | |
def get_paths_in_and_out(in_path, in_word, in_extension, out_word, out_extension, max_depth, exclude_paths=()): | |
# find all files with the in_extension in subfolders of in_path up to max_depth. | |
# for each, replace the in_word keyword in folders with the out_word, and append out_word to filenames. | |
all_in_paths = get_all_subfiles_with_extension(in_path, max_depth=max_depth, extension=in_extension) | |
indexes_not_transcribed = [] | |
all_out_paths = [] | |
all_playlists = [] | |
for i_path, in_path in enumerate(all_in_paths): | |
out_path, to_exclude, playlist = get_out_path(in_path=in_path, in_word=in_word, out_word=out_word, out_extension=out_extension, exclude_paths=exclude_paths) | |
if not to_exclude: | |
indexes_not_transcribed.append(i_path) | |
all_out_paths.append(out_path) | |
all_playlists.append(playlist) | |
all_in_paths = [in_path for i, in_path in enumerate(all_in_paths) if i in indexes_not_transcribed] | |
assert len(all_out_paths) == len(all_in_paths) | |
return all_in_paths, all_out_paths, all_playlists | |
def get_path_and_filter_existing(in_path, in_word, in_extension, out_word, out_extension, max_depth, exclude_paths=()): | |
# find all files with the in_extension in subfolders of in_path up to max_depth. | |
# for each, replace the in_word keyword in folders with the out_word, and append out_word to filenames. | |
all_in_paths = get_all_subfiles_with_extension(in_path, max_depth=max_depth, extension=in_extension) | |
indexes_to_process = [] | |
all_out_paths = [] | |
all_playlists = [] | |
for i_path, in_path in enumerate(all_in_paths): | |
out_path, to_exclude, playlist = get_out_path(in_path=in_path, in_word=in_word, out_word=out_word, out_extension=out_extension, exclude_paths=exclude_paths) | |
if not to_exclude: | |
if not os.path.exists(out_path): | |
indexes_to_process.append(i_path) | |
all_out_paths.append(out_path) | |
all_playlists.append(playlist) | |
all_in_paths = list(np.array(all_in_paths)[indexes_to_process])#[in_path for i, in_path in enumerate(all_in_paths) if i in indexes_to_process] | |
assert len(all_out_paths) == len(all_in_paths) | |
return all_in_paths, all_out_paths, all_playlists | |
def md5sum(filename, blocksize=65536): | |
hash = hashlib.md5() | |
with open(filename, "rb") as f: | |
for block in iter(lambda: f.read(blocksize), b""): | |
hash.update(block) | |
return hash.hexdigest() | |
emoji_pattern = re.compile("[" | |
u"\U0001F600-\U0001F64F" # emoticons | |
u"\U0001F300-\U0001F5FF" # symbols & pictographs | |
u"\U0001F680-\U0001F6FF" # transport & map symbols | |
u"\U0001F1E0-\U0001F1FF" # flags (iOS) | |
"]+", flags=re.UNICODE) | |
def slugify(value, allow_unicode=False): | |
""" | |
Taken from https://github.com/django/django/blob/master/django/utils/text.py | |
Convert to ASCII if 'allow_unicode' is False. Convert spaces or repeated | |
dashes to single dashes. Remove characters that aren't alphanumerics, | |
underscores, or hyphens. Convert to lowercase. Also strip leading and | |
trailing whitespace, dashes, and underscores. | |
""" | |
value = str(value).lower() | |
if allow_unicode: | |
value = unicodedata.normalize('NFKC', value) | |
else: | |
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii') | |
value = re.sub(r'[^\w\s-]', '', value.lower()) | |
value = emoji_pattern.sub(r'', value) | |
value = re.sub(r'[-\s]+', '_', value).strip('-_') | |
# if value == '': | |
# for i in range(10): | |
# value += str(np.random.choice(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])) | |
return value | |
if __name__ == '__main__': | |
path = "/home/cedric/Documents/pianocktail/data/midi/street_piano/" | |
# for folder in ['my_sheet_music_transcriptions']:#os.listdir(path): | |
# print('\n\n\t\t', folder) | |
# convert_mp4_to_mp3(path + folder + '/') | |
clean_removed_csv_from_folder(path) | |
# folder = 'street_piano/' | |
# for folder in ['street_piano/']: | |
# clean_removed_mp3_from_csv(path + folder) | |