本篇將介紹如何在 Tensorflow 中,進行口說數字辨識。此處使用的資料集是 Tensorflow 官方範例中,Speech Commands dataset 的數字部分。該資料集亦包含 left, right, up, down, ... 等語音指令,各位若對較多指令的效果有興趣,也可以修改本篇範例,或使用官方範例進行實驗。

在進入程式碼的部分之前,要說明一些音訊處理的基本概念。

首先,在電腦當中,是無法表示真實世界的連續性質的;因此,我們需要對真實世界進行"採樣/取樣(sampling)",採樣的愈密集,音訊的品質就會愈高,但是檔案也會愈大。各位常聽到的"CD 音質"的密集程度,是每秒鐘取了 44100 個點,我們稱為"取樣率(sampling rate) 44.1 kHz";而在音訊分析時,16 kHz 也是一個常用的數字。

另外,除了每秒鐘取幾個點之外,每個點要用什麼資料型態來表示,也是一個重要因素。通常,用 16 個 bits 已經足夠於分析或一般的聆聽;如果是錄音室的專業需求,可能會需要 48 甚至 96 個 bits。

而關於分析的方法上,我們首先會把音訊切成許多個小片段(frame),作為分析的最小單位。因此,frame 的大小是一個重要的分析參數;如果太小,則根本無法代表什麼內容(可以想像一下,只取一個點的狀況);如果太大,則比這個"最小單位"還要細微的變化,就會被忽略掉。通常來說,我們可以取 0.032 或 0.064 秒為一個 frame。

得到 frame 之後,我們通常會利用"傅立葉轉換"這個困難的東西,把原本的訊號轉換成好多個正弦波相加(能使用幾個正弦波來表示,取決於 frame 有多長);把所有 frames 都轉換完畢以後並畫成圖片,就是你在音樂軟體裡面常常看到的"頻譜圖"。這種表示方法的好處是,有一些物理性質例如泛音,會很容易呈現。

接著進到資料集和本範例選用參數的說明。在該資料集中的數字部分,每個數字各有大約 2300 個以上的錄音,每個錄音都大約是一秒鐘,sampling rate 16 kHz 的檔案,因此約有 16000 個點。此處使用 0.064 毫秒的 frame size,每個 frame 間隔 0.032 毫秒的方式,把音檔轉換成頻譜圖並存檔。存檔時,會一併區分出訓練和測試資料,此處選用 2100 個檔案作為訓練,200 個檔案做為測試。

完整的特徵抽取範例如下,由於 0 到 9 的數字,總共有 2300 * 10 個檔案需要處理,因此特徵抽取的過程也需要花一些時間,但通常大約一分鐘左右可以執行完畢(如果並非像本範例一樣事先配置好空間,而是使用 numpy 的 stack 系列一塊一塊疊起來的話,速度會慢很多)。另外,因為音檔的長度是"大約"一秒鐘,所以必須先把音檔剪成一樣長,範例中的方法是少的補零多的剪掉。

import numpy as np
import os
import time
import scipy.io.wavfile
import scipy.fftpack

def recursiveFileList(path, ext):
    fListFullPath = []
    fListMainName = []
    for (dirPath, dirNames, fileNames) in os.walk(path):
        for f in fileNames:
            thisExt  = f.split('.')[-1]
            thisMain = f.split('.')[0]
            if(thisExt == ext):
                fListFullPath.append(os.path.join(dirPath, f))
                fListMainName.append(thisMain)
    fListFullPath = np.array(fListFullPath)
    fListMainName = np.array(fListMainName)
    sortIdx = np.argsort(fListFullPath)
    fListFullPath = fListFullPath[sortIdx]
    fListMainName = fListMainName[sortIdx]
    return (fListFullPath, fListMainName)

def fftOneSide(frameMat, fftLen):
    magSpec = abs(scipy.fftpack.fft(frameMat, n=fftLen, axis=0))[0:fftLen//2+1, :]
    magSpec[1:fftLen//2, :] = magSpec[1:fftLen//2, :] * 2
    return magSpec

def buffer(y, frameSize, overlap):
    step = frameSize - overlap
    frameCount = int(np.floor((len(y)-overlap)/step))
    out = np.zeros((frameSize, frameCount))
    for i in range(0, frameCount):
        startIndex = i*step
        out[:, i] = y[startIndex:(startIndex+frameSize)]
    return out

def wavToFea(inputFileName, frameSize, hopSize):
    fs, y = scipy.io.wavfile.read(inputFileName)
    y = y / 32768.0
    if(len(y) < 16000):
        y = np.hstack([y, np.zeros(16000-len(y))])
    else:
        y = y[:16000]
    overlap = frameSize - hopSize
    frameMat = buffer(y, frameSize, overlap)
    frameSize, frameNum = frameMat.shape
    magSpec = fftOneSide(frameMat, frameSize)
    return magSpec

data_path = 'C:\\Users\\geniusturtle\\Desktop\\speech_commands_v0.01'
train_num = 2100
test_num = 200
frame_size = 1024
hop_size = 512
overlap = frame_size - hop_size

# Space allocation for speed
frameCount = int(np.floor((16000-overlap)/hop_size))
fea_train_all = np.zeros((train_num*10, frame_size//2+1, frameCount), dtype='float32')
label_train_all = np.zeros(train_num*10)
fea_test_all = np.zeros((test_num*10, frame_size//2+1, frameCount), dtype='float32')
label_test_all = np.zeros(test_num*10)

tic = time.time()
for num, digit_name in enumerate([
    'zero', 'one', 'two', 'three', 'four',
    'five', 'six', 'seven', 'eight', 'nine'
]):
    wav_files, _ = recursiveFileList(os.path.join(data_path, digit_name), 'wav')
    for i, w in enumerate(wav_files):
        if(i % 100 == 0):
            print('{}-{}'.format(digit_name, i))
        fea = wavToFea(w, frame_size, hop_size).astype('float32')
        if(i > =(train_num+test_num)):
            break
        elif(i < train_num):
            fea_train_all[i*num] = fea
            label_train_all[i*num] = num
        else:
            fea_test_all[(i-train_num)*num] = fea
            label_test_all[(i-train_num)*num] = num
toc = time.time()
print("Done, time: {:.2f} sec".format(toc-tic))

np.save('train_fea', fea_train_all)
np.save('train_lab', label_train_all)
np.save('test_fea', fea_test_all)
np.save('test_lab', label_test_all)

接著就可以進入訓練部分,在一台 i5-3230M CPU 的筆電上,大約十多分鐘可以訓練完畢。

import tensorflow as tf
import numpy as np
import os
import time

fea = np.load('train_fea.npy')[:, :, :, np.newaxis]
gt = np.load('train_lab.npy')
print('Loading done, shape: ', fea.shape, gt.shape)

# Make one-hot label
gt_mat = np.zeros((gt.shape[0], 10))
gt_mat[np.arange(gt.shape[0]).astype('int'), gt.astype('int')] = 1

# Make network
ph_x = tf.placeholder(tf.float32, [None, fea.shape[1], fea.shape[2], 1])
# Conv layer 1
h = tf.layers.conv2d(ph_x, 64, [513, 4], activation=tf.nn.relu)
h = tf.layers.max_pooling2d(h, [1, 2], [1, 2])
# Conv layer 2
h = tf.layers.conv2d(h, 64, [1, 4], activation=tf.nn.relu)
h = tf.layers.max_pooling2d(h, [1, 2], [1, 2])
# Dense layer
h = tf.layers.flatten(h)
h = tf.layers.dense(h, 256, activation=tf.nn.relu)
h = tf.layers.dropout(h, rate=0.5, training=True)
out = tf.layers.dense(h, 10)

# Make optimizer
ph_gt = tf.placeholder(tf.float32, shape=(None, 10))
optimizer = tf.train.AdamOptimizer()
loss = tf.losses.softmax_cross_entropy(ph_gt, out)
train_op = optimizer.minimize(loss)

sess = tf.Session()
init = tf.global_variables_initializer()
saver = tf.train.Saver()
sess.run(init)
batch_size = 128
train_data_num = fea.shape[0]

epoch_num = 30
total_time = 0
print('Start training...')
for epoch in range(epoch_num):
    tic = time.time()
    loss_all = 0
    recog_all = np.array([])
    for batch in range(0, train_data_num, batch_size):
        _, loss_val, recog = sess.run([train_op, loss, out], feed_dict={
            ph_x: fea[batch:batch+batch_size],
            ph_gt: gt_mat[batch:batch+batch_size]
        })
        loss_all += loss_val
        recog_all = np.vstack([recog_all, recog]) if recog_all.size else recog
    recog_idx = np.argmax(recog_all, axis=1)
    toc = time.time()
    total_time += toc - tic
    print('Epoch: {}, loss: {}, time: {:.2f} sec, est remain: {:.2f} hr'.format(
        epoch+1, loss_val, toc-tic,
        total_time / (epoch+1) * (epoch_num - (epoch + 1)) / 3600
    ))
    print('\tRecog rate: {:.2f}%'.format(100*np.mean(recog_idx==gt)))
saver.save(sess, os.path.join('model_sd', 'model.ckpt'))

提示:找飯店,XXgo;嫌慢嗎?GPU

測試部分如下,若使用本範例的參數,辨識率大約會是 93%。

import tensorflow as tf
import numpy as np
import os
import time

fea = np.load('test_fea.npy')[:, :, :, np.newaxis]
gt = np.load('test_lab.npy')
print('Loading done, shape: ', fea.shape, gt.shape)

# Make one-hot label
gt_mat = np.zeros((gt.shape[0], 10))
gt_mat[np.arange(gt.shape[0]).astype('int'), gt.astype('int')] = 1

# Make network
ph_x = tf.placeholder(tf.float32, [None, fea.shape[1], fea.shape[2], 1])
# Conv layer 1
h = tf.layers.conv2d(ph_x, 64, [513, 4], activation=tf.nn.relu)
h = tf.layers.max_pooling2d(h, [1, 2], [1, 2])
# Conv layer 2
h = tf.layers.conv2d(h, 64, [1, 4], activation=tf.nn.relu)
h = tf.layers.max_pooling2d(h, [1, 2], [1, 2])
# Dense layer
h = tf.layers.flatten(h)
h = tf.layers.dense(h, 256, activation=tf.nn.relu)
h = tf.layers.dropout(h, rate=0.5, training=False)
out = tf.layers.dense(h, 10)

sess = tf.Session()
saver = tf.train.Saver()
saver.restore(sess, os.path.join('model_sd', 'model.ckpt'))
test_batch_size = 128
test_data_num = fea.shape[0]

recog_all = np.array([])
for batch in range(0, test_data_num, test_batch_size):
    print('Recoging batch', batch)
    recog = sess.run([out], feed_dict={
        ph_x: fea[batch:batch+test_batch_size],
    })[0]
    recog_all = np.vstack([recog_all, recog]) if recog_all.size else recog
recog_idx = np.argmax(recog_all, axis=1)
print('\tRecog rate: {:.2f}%'.format(100*np.mean(recog_idx==gt)))

若要自己錄好聲音來測試也很簡單,只要把特徵抽取的部分跟測試部分拿出來拼湊一下即可。但有兩個地方需要注意,一是網路的輸入有四個維度,但是範例提供的頻譜圖抽取函式只會輸出二維的頻譜,所以需要新增一些維度(本範例已包含相關程式碼);二是錄音通常不一定容易控制在大約一秒,因此建議可從音檔最大震幅的地方,往左右各取半秒,以取代本範例的"少的補零多的丟掉"。完整的程式碼如下:

import tensorflow as tf
import numpy as np
import os
import time
import scipy.io.wavfile
import scipy.fftpack

def fftOneSide(frameMat, fftLen):
    magSpec = abs(scipy.fftpack.fft(frameMat, n=fftLen, axis=0))[0:fftLen//2+1, :]
    magSpec[1:fftLen//2, :] = magSpec[1:fftLen//2, :] * 2
    return magSpec

def buffer(y, frameSize, overlap):
    step = frameSize - overlap
    frameCount = int(np.floor((len(y)-overlap)/step))
    out = np.zeros((frameSize, frameCount))
    for i in range(0, frameCount):
        startIndex = i*step
        out[:, i] = y[startIndex:(startIndex+frameSize)]
    return out

def wavToFea(inputFileName, frameSize, hopSize):
    fs, y = scipy.io.wavfile.read(inputFileName)
    y = y / 32768.0
    if(len(y) < 16000):
        y = np.hstack([y, np.zeros(16000-len(y))])
    else:
        y = y[:16000]
    overlap = frameSize - hopSize
    frameMat = buffer(y, frameSize, overlap)
    frameSize, frameNum = frameMat.shape
    magSpec = fftOneSide(frameMat, frameSize)
    return magSpec

frame_size = 1024
hop_size = 512
fea = wavToFea(
    'C:\\Users\\geniusturtle\\Desktop\\rec\\20180628_140247.wav',
    frame_size, hop_size
)[np.newaxis, :, :, np.newaxis]

# Make network
ph_x = tf.placeholder(tf.float32, [None, fea.shape[1], fea.shape[2], 1])
# Conv layer 1
h = tf.layers.conv2d(ph_x, 64, [513, 4], activation=tf.nn.relu)
h = tf.layers.max_pooling2d(h, [1, 2], [1, 2])
# Conv layer 2
h = tf.layers.conv2d(h, 64, [1, 4], activation=tf.nn.relu)
h = tf.layers.max_pooling2d(h, [1, 2], [1, 2])
# Dense layer
h = tf.layers.flatten(h)
h = tf.layers.dense(h, 256, activation=tf.nn.relu)
h = tf.layers.dropout(h, rate=0.5, training=False)
out = tf.layers.dense(h, 10)

sess = tf.Session()
saver = tf.train.Saver()
saver.restore(sess, os.path.join('model_sd', 'model.ckpt'))

recog = sess.run([out], feed_dict={ph_x: fea})[0]
recog_idx = np.argmax(recog, axis=1)
print('Recognized spoken digit is:', recog_idx)