以類神經網路來進行音高追蹤時,方法比較多變。你可以把音訊轉成頻譜再輸入,也可以直接用原始波型輸入;可以使用 MLP 一次預測一個音框的音高,也可以使用 CNN 乃至於 RNN 和 LSTM,來一次預測一整段音訊(多個音框)的音高。本篇將介紹當中最基礎的方法,即以頻譜為輸入,並用 MLP 一次預測一個音框的音高。做為一個基礎的方法說明,以下將只使用 MIR-1K 的部份資料進行示範,即訓練、驗證及測試資料皆取兩性各一人(根據 id 做字母排序為 abjones 和 amy 為訓練資料,ariel 和 bobon 為驗證資料,heycat 和 jmzen 為測試資料),且僅做移調的資料擴增。

因為抽取頻譜也需要花一些時間(儘管本範例的參數設定,跑起來可能只需要幾秒鐘),因此我們將相關步驟獨立出來,如下:

import os

import librosa
import numpy as np
import scipy.io.wavfile

DATASET_PATH = 'MIR-1K'
FRAME_SIZE = 640
HOP_SIZE = 320
TRAIN_IDS = ('abjones', 'amy')
VALID_IDS = ('ariel', 'bobon')
TEST_IDS = ('heycat', 'jmzen')

wavfiles = sorted(os.listdir(os.path.join(DATASET_PATH, 'Wavfile')))
pvfiles = sorted(os.listdir(os.path.join(DATASET_PATH, 'PitchLabel')))

fea_tr, pv_tr, fid_tr = [], [], []
fea_va, pv_va, fid_va = [], [], []
fea_te, pv_te, fid_te = [], [], []
for idx, (wav, pv) in enumerate(zip(wavfiles, pvfiles)):
	assert wav.split('.')[0] == pv.split('.')[0]
	sid = wav.split('_')[0]
	if sid not in TRAIN_IDS and sid not in VALID_IDS and sid not in TEST_IDS:
		continue
	print('Processing file', wav)
	wav_path = os.path.join(DATASET_PATH, 'Wavfile', wav)
	pv_path = os.path.join(DATASET_PATH, 'PitchLabel', pv)

	with open(pv_path, 'r') as fin:
		pitch_gt = list(map(float, fin.read().splitlines()))

	fs, y = scipy.io.wavfile.read(wav_path)
	y = (y[:, 0] + y[:, 1]) / 2 / 32768
	mag = np.abs(librosa.stft(y, n_fft=FRAME_SIZE, hop_length=HOP_SIZE, center=False)).T

	assert mag.shape[0] == len(pitch_gt)

	if sid in TRAIN_IDS:
		fea_tr.append(mag)
		pv_tr.append(pitch_gt)
		fid_tr.append(np.ones(len(pitch_gt)) * idx)

		for shift in (-6, 8):
			y_shift = librosa.effects.pitch_shift(y, sr=fs, n_steps=shift)
			mag_shift = np.abs(librosa.stft(y_shift, n_fft=FRAME_SIZE, hop_length=HOP_SIZE, center=False)).T
			pitch_shift = [p + shift if p != 0 else 0 for p in pitch_gt]

			fea_tr.append(mag_shift)
			pv_tr.append(pitch_shift)
			fid_tr.append(np.ones(len(pitch_shift)) * idx)
	elif sid in VALID_IDS:
		fea_va.append(mag)
		pv_va.append(pitch_gt)
		fid_va.append(np.ones(len(pitch_gt)) * idx)
	else:
		fea_te.append(mag)
		pv_te.append(pitch_gt)
		fid_te.append(np.ones(len(pitch_gt)) * idx)

fea_tr = np.vstack(fea_tr)
fea_va = np.vstack(fea_va)
fea_te = np.vstack(fea_te)
pv_tr = np.hstack(pv_tr)
pv_va = np.hstack(pv_va)
pv_te = np.hstack(pv_te)
fid_tr = np.hstack(fid_tr)
fid_va = np.hstack(fid_va)
fid_te = np.hstack(fid_te)

print('Training data shapes:', fea_tr.shape, pv_tr.shape, fid_tr.shape)
print('Validation data shapes:', fea_va.shape, pv_va.shape, fid_va.shape)
print('Test data shapes:', fea_te.shape, pv_te.shape, fid_te.shape)

np.save('fea_tr.npy', fea_tr)
np.save('fea_va.npy', fea_va)
np.save('fea_te.npy', fea_te)
np.save('pv_tr.npy', pv_tr)
np.save('pv_va.npy', pv_va)
np.save('pv_te.npy', pv_te)
np.save('fid_tr.npy', fid_tr)
np.save('fid_va.npy', fid_va)
np.save('fid_te.npy', fid_te)

在上述範例中:

訓練用的完整程式碼如下:

import os
import time

import numpy as np
import torch
from torch import nn, optim
from torch.autograd import Variable

import util
import model

SAVE_MODEL_DIR = 'model'
NUM_EPOCH = 100

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

fea_tr = np.load('fea_tr.npy')
fea_va = np.load('fea_va.npy')
pv_tr = np.load('pv_tr.npy')
pv_va = np.load('pv_va.npy')
print('Training data shapes:', fea_tr.shape, pv_tr.shape)
print('Validation data shapes:', fea_va.shape, pv_va.shape)

nonzero_idx_tr = np.where(pv_tr != 0)[0]
nonzero_idx_va = np.where(pv_va != 0)[0]
fea_tr = fea_tr[nonzero_idx_tr]
fea_va = fea_va[nonzero_idx_va]
pv_tr = pv_tr[nonzero_idx_tr]
pv_va = pv_va[nonzero_idx_va]
print('After removing zero-pitch data, training data shapes:', fea_tr.shape, pv_tr.shape)
print('After removing zero-pitch data, validation data shapes:', fea_va.shape, pv_va.shape)

pv_idx_tr, out_num = util.pv_to_idx(pv_tr)
pv_idx_va, _ = util.pv_to_idx(pv_va)
print('Pv to idx, out_num:', out_num)
print('Range of idx:')
print('TR:', np.min(pv_idx_tr), np.max(pv_idx_tr))
print('VA:', np.min(pv_idx_va), np.max(pv_idx_va))

print('Setting network')
network = model.MODEL(out_num)
network.to(device)
optimizer = optim.Adam(list(network.parameters()), lr=0.0005)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.95)
loss_func = nn.CrossEntropyLoss()

if not os.path.exists(SAVE_MODEL_DIR):
	os.makedirs(SAVE_MODEL_DIR, 0o755)

data_loader_train = torch.utils.data.DataLoader(
	util.Data2Torch(fea_tr, pv_idx_tr[:, np.newaxis]),
	shuffle=True,
	batch_size=1024,
)

data_loader_valid = torch.utils.data.DataLoader(
	util.Data2Torch(fea_va, pv_idx_va[:, np.newaxis]),
	batch_size=1024,
)

best_va_loss = 9999
va_not_imporved_continue_count = 0
totalTime = 0
fout = open(os.path.join(SAVE_MODEL_DIR, 'train_report.txt'), 'w')
for epoch in range(NUM_EPOCH):
	util.print_and_write_file(fout, 'epoch {}/{}...'.format(epoch + 1, NUM_EPOCH))
	tic = time.time()
	# --- Batch training
	network.train()
	training_loss = 0
	n_batch = 0
	optimizer.zero_grad()
	for idx, data in enumerate(data_loader_train):
		pred = network(Variable(data['fea'].to(device)))
		ans = Variable(data['ans'].to(device))
		loss = loss_func(pred, ans.flatten())
		optimizer.zero_grad()
		loss.backward()
		optimizer.step()
		training_loss += loss.data
		n_batch += 1
	# --- Training loss
	training_loss_avg = training_loss / n_batch
	util.print_and_write_file(
		fout, '\tTraining loss (avg over batch): {}, {}, {}'.format(
			training_loss_avg, training_loss, n_batch
		)
	)
	# --- Batch validation
	network.eval()
	va_loss = 0
	n_batch = 0
	for idx, data in enumerate(data_loader_valid):
		ans = Variable(data['ans'].to(device))
		with torch.no_grad():
			pred = network(Variable(data['fea'].to(device)))
			loss = loss_func(pred, ans.flatten())
		va_loss += loss.data.cpu().numpy()
		n_batch += 1
	# --- Validation loss
	va_loss_avg = va_loss / n_batch
	util.print_and_write_file(
		fout, '\tValidation loss (avg over batch): {}, {}, {}'.format(
			va_loss_avg, va_loss, n_batch
		)
	)
	# --- Save and early stop by VA loss
	if va_loss_avg < best_va_loss:
		best_va_loss = va_loss_avg
		va_not_imporved_continue_count = 0
		util.print_and_write_file(fout, '\tWill save bestVa model')
		torch.save(
			network.state_dict(),
			os.path.join(SAVE_MODEL_DIR, 'model_bestVa')
		)
	else:
		va_not_imporved_continue_count += 1
		util.print_and_write_file(fout, '\tva_not_imporved_continue_count: {}'.format(va_not_imporved_continue_count))
		if va_not_imporved_continue_count >= 10:
			break
	util.print_and_write_file(fout, '\tLearning rate used for this epoch: {}'.format(scheduler.get_last_lr()[0]))
	if scheduler.get_last_lr()[0] >= 1e-5:
		scheduler.step()
	# --- Time
	toc = time.time()
	totalTime += toc - tic
	util.print_and_write_file(fout, '\tTime: {:.3f} sec, estimated remaining: {:.3} hr'.format(
		toc - tic,
		1.0 * totalTime / (epoch + 1) * (NUM_EPOCH - (epoch + 1)) / 3600
	))
	fout.flush()
util.print_and_write_file(fout, 'Best VA loss: {:.6f}'.format(best_va_loss))
fout.close()
print('Model saved in {}'.format(SAVE_MODEL_DIR))

其中 import 的 model.py,內容如下:

from torch import nn


class MODEL(nn.Module):
	def __init__(self, out_num):
		super(MODEL, self).__init__()
		self.model = nn.Sequential(
			nn.Linear(in_features=321, out_features=512),
			nn.Sigmoid(),
			nn.Dropout(p=0.5),
			nn.Linear(in_features=512, out_features=512),
			nn.Sigmoid(),
			nn.Dropout(p=0.5),
			nn.Linear(in_features=512, out_features=512),
			nn.Sigmoid(),
			nn.Dropout(p=0.5),
			nn.Linear(in_features=512, out_features=out_num),
		)

	def forward(self, x):
		return self.model(x)

util.py 的內容則為如下:

import numpy as np
import torch
from torch.utils.data import Dataset


class Data2Torch(Dataset):
	def __init__(self, fea, ans):
		self.fea = fea
		self.ans = ans

	def __getitem__(self, index):
		return {
			'fea': torch.from_numpy(self.fea[index]).float(),
			'ans': torch.from_numpy(self.ans[index]).long(),
		}

	def __len__(self):
		return self.fea.shape[0]


def pv_to_idx(pv, lower=36, upper=77, step=0.25):
	pv_pool = np.linspace(lower, upper, num=int((upper-lower)/step)+1)
	index = np.zeros_like(pv)
	for idx, pitch in enumerate(pv):
		vec = np.abs(pv_pool-pitch)
		index[idx] = np.argmin(vec)
	return index, pv_pool.size


def idx_to_pv(idx, lower=36, upper=77, step=0.25):
	pv_pool = np.linspace(lower, upper, num=int((upper-lower)/step)+1)
	return pv_pool[idx]


def print_and_write_file(fout, cnt, fout_end='\n'):
	print(cnt)
	if fout is not None:
		fout.write(cnt  + fout_end)
		fout.flush()

在上述程式碼中:

測試用的程式碼如下:

import os
import time

import numpy as np
import torch
from mir_eval.melody import evaluate
from mir_eval.util import midi_to_hz
from torch.autograd import Variable

import util
import model

MODLE_DIR = 'model'

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

fea_te = np.load('fea_te.npy')
pv_te = np.load('pv_te.npy')
print('Test data shapes:', fea_te.shape, pv_te.shape)

pv_idx_te, out_num = util.pv_to_idx(pv_te)
print('Pv to idx, out_num:', out_num)
print('Range of idx:')
print('TE:', np.min(pv_idx_te), np.max(pv_idx_te))

print('Setting network')
save_dic = torch.load(os.path.join(MODLE_DIR, 'model_bestVa'))
network = model.MODEL(out_num)
network.load_state_dict(save_dic)
network.to(device)
network.eval()

data_loader_test = torch.utils.data.DataLoader(
	util.Data2Torch(fea_te, pv_idx_te[:, np.newaxis]),
	batch_size=1024,
)

pred_idx_all = []
for idx, data in enumerate(data_loader_test):
	with torch.no_grad():
		pred = network(Variable(data['fea'].to(device)))
	pred_idx_all.append(np.argmax(pred.detach().cpu().numpy(), axis=1))

pred_idx_all = np.hstack(pred_idx_all)

ref = np.array([midi_to_hz(pitch) if pitch != 0 else 0 for pitch in pv_te])
pred = midi_to_hz(util.idx_to_pv(pred_idx_all))
scores = evaluate(np.arange(ref.size), ref, np.arange(ref.size), pred)
print(scores)

在上述程式碼中:

依照前述設定,訓練出來的模型在測試集上的 RPA 表現大約會在六成上下。你可以嘗試其他改進,例如使用不同的特徵(例如梅爾頻譜, mel spectrogram)做為模型輸入、使用不同的 epoch 數目或學習率等超參數、以動態規劃對預測出的結果進行後處理,乃至於使用更複雜的模型,或者改為一次預測一整個檔案所有音框的音高序列等。