本文共 16124 字,大约阅读时间需要 53 分钟。
在ocr任务与机器翻译中,输入与输出GT文本很难在单词上对齐,在预处理的时候对齐是非常困难的,但是如果不对齐而直接训练模型的话,由于字符距离的不同,导致模型很难收敛.
CTC(Connectionist Temporal Classification)避免了输入与输出手动对齐,适合OCR或语音这样的序列应用;
给定输入序列
对比传统分类方法,时序分类有一下困难:
CTC提供了解决方案,对于一个给定的输入序列
loss:给定输入序列X,我们希望最大化Y的后验概率
在ocr任务中,输入X是一张含有"CAT"的图片,输出Y是文本[C,A,T] 最原始的对齐方式将X分割成若干个时间片,每一个时间片得到一个字符的输出,然后合并连续重复出现的字符.
然而这样做有两个缺点:为了解决上面的问题,CTC引入了空白字符, CTC的对齐涉及去除重复字母和去除 空白字符 两部分. 其规则:
比如,对于长度为10的输入序列,以下RNN输出序列都可以映射为: apple
最后要计算P(Y|X),可以累加其对应的全部输出(全部输出为apple)的路径概率之和.
因此,在训练阶段,我们要对GT进行标签扩充.其做法是: 头尾加空白符,并在GT中的每一个字符间插入空白符; 用l表示最终标签,l’表示扩展后的形式, 则由2|l| + 1 = 2|l’|,比如:l=apple => l’=_a_p_p_l_e_
如图所示:
上图的路径搜索中: 定义:
(1) 如果
所以这种情况下其概率为:
(2)如果
所以这种情况下前向概率为:
初始化值:
最后我们需要计算
两个前向概率之和便得到前向概率之和.如图的右下角两个位置概率之和.
利用前向概率计算ctc 的loss 即
所以loss的值为:
我们看到在计算过程中我们发现了大量的连乘。由于每一个数字都是浮点数,那么这样连乘下去,最终数字有可能非常小而导致underflow。所以我们要将这个计算过程转到对数域上。这样我们就将其中的乘法转变成了加法。
由于最后计算loss为-ln(P)
所以在计算前向概率的时候可以直接计算log(p)的值..
如果我们有N个概率,
如果
解决方法:
一般情况下,a取N个值中的最大值; 这可以保证指数最大不会超过0,于是你就不会上溢出。即便剩余的部分下溢出了,你也能得到一个合理的值。
证明:
CTC 概率图前向概率:
在torch的ctc 前馈过程中,计算的log前向概率值的矩阵,(用以进行loss back),我们看到其核心:
log_alpha_a[t][s] = std::log(std::exp(la1 - lamax) + std::exp(la2 - lamax) + std::exp(la3 - lamax)) + lamax + log_probs_a[t][current_target_prime];
源码分析: 使用了pytorch中ctcloss 的源码
//pytorch/aten/src/ATen/native/LossCTC.cpp
//获取填充blank后的target指定位置的值,用来判断是static inline int64_t get_target_prime(target_t *target, int64_t offset, int64_t stride, int64_t idx, int64_t BLANK) { if (idx % 2 == 0) { return BLANK; } else { return target[offset + stride * (idx / 2)]; } }//ctc_loss_cpu_template部分核心代码//前向概率[B,T,N] Tensor log_alpha = at::empty({batch_size, log_probs.size(0), 2 * max_target_length + 1}, log_probs.options()); Tensor neg_log_likelihood = at::empty({batch_size}, log_probs.options()); //[B,T,N] auto lpp = log_probs.permute({1, 0, 2}); auto log_probs_a_global = lpp.accessor(); auto log_alpha_a_global = log_alpha.accessor (); auto targets_data = targets.data_ptr (); auto neg_log_likelihood_a = neg_log_likelihood.accessor (); // alpha calculation for the first row, the three equations for alpha_1 above eq (6) // first the default log_alpha.narrow(1, 0, 1).fill_(neginf); at::parallel_for(0, batch_size, 0, [&](int64_t start, int64_t end) { for (int64_t b = start; b < end; b++) { //每个batch int64_t input_length = input_lengths[b]; int64_t target_length = target_lengths[b]; auto log_probs_a = log_probs_a_global[b]; auto log_alpha_a = log_alpha_a_global[b]; int64_t tg_batch_offset = tg_batch_offsets[b]; // the first two items of alpha_t above eq (6) //初始化前向概率[t0][s0] log_alpha_a[0][0] = log_probs_a[0][BLANK]; if (target_length > 0) //[t0][s1]等于序列中第一个字符的概率 log_alpha_a[0][1] = log_probs_a[0][get_target_prime(targets_data, tg_batch_offset, tg_target_stride, 1, BLANK)]; // now the loop over the inputs for (int64_t t = 1; t < input_length; t++) { for (int64_t s = 0; s < 2 * target_length + 1; s++) { //对于每一个s,计算其概率 //获取第s个字符是什么 auto current_target_prime = get_target_prime(targets_data, tg_batch_offset, tg_target_stride, s, BLANK); // this loop over s could be parallel/vectorized, too, but the required items are one index apart // alternatively, one might consider moving s to the outer loop to cache current_target_prime more (but then it needs to be descending) // for the cuda implementation, that gave a speed boost. // This is eq (6) and (7), la1,2,3 are the three summands. We keep track of the maximum for the logsumexp calculation. scalar_t la1 = log_alpha_a[t - 1][s]; scalar_t lamax = la1; scalar_t la2, la3; if (s > 0) { la2 = log_alpha_a[t - 1][s - 1]; if (la2 > lamax) lamax = la2; } else { la2 = neginf; } if ((s > 1) && (get_target_prime(targets_data, tg_batch_offset, tg_target_stride, s - 2, BLANK) != current_target_prime)) { //第s个字符不是空且不等于s-2(即不连续的时候),即动态转移方程的第二个式子 la3 = log_alpha_a[t - 1][s - 2]; if (la3 > lamax) lamax = la3; } else { //s为空或者连续,第三项不用加 la3 = neginf; } //添加概率最大项按前一个[t-1][s] if (lamax == neginf) // cannot do neginf-neginf lamax = 0; //计算此时的 // this is the assignment of eq (6) log_alpha_a[t][s] = std::log(std::exp(la1 - lamax) + std::exp(la2 - lamax) + std::exp(la3 - lamax)) + lamax + log_probs_a[t][current_target_prime]; } } // the likelihood is the the sum of the last two alphas, eq (8), the loss is the negative log likelihood if (target_length == 0) { // if the target is empty then there is no preceding BLANK state and hence there is no path to merge neg_log_likelihood_a[b] = -log_alpha_a[input_length - 1][0]; } else { scalar_t l1 = log_alpha_a[input_length - 1][target_length * 2]; scalar_t l2 = log_alpha_a[input_length - 1][target_length * 2 - 1]; //取两条路的概率之和 scalar_t m = std::max(l1, l2); m = ((m == neginf) ? 0 : m); scalar_t log_likelihood = std::log(std::exp(l1 - m) + std::exp(l2 - m)) + m; neg_log_likelihood_a[b] = -log_likelihood; } } });
提供一个python 版本的numpy ctc的代码方便理解
import numpy as npninf = -np.float('inf')def _logsumexp(a, b): ''' np.log(np.exp(a) + np.exp(b)) ''' if a < b: a, b = b, a if b == ninf: return a else: return a + np.log(1 + np.exp(b - a)) def logsumexp(*args): ''' from scipy.special import logsumexp logsumexp(args) ''' res = args[0] for e in args[1:]: res = _logsumexp(res, e) return resclass CTC: def __init__(self): pass def forward(self): pass def alpha(self, log_y, labels): ##alpha 为前向概率 T, V = log_y.shape L = len(labels) log_alpha = np.ones([T, L]) * ninf # init ## 初始化动态规划 log_alpha[0, 0] = log_y[0, labels[0]] log_alpha[0, 1] = log_y[0, labels[1]] ##计算每一步,每个GT的前向概率 for t in range(1, T): for i in range(L): s = labels[i] a = log_alpha[t - 1, i] if i - 1 >= 0: a = logsumexp(a, log_alpha[t - 1, i - 1]) ##如果当前不是空白符,得加前两步的状态 if i - 2 >= 0 and s != 0 and s != labels[i - 2]: a = logsumexp(a, log_alpha[t - 1, i - 2]) log_alpha[t, i] = a + log_y[t, s] return log_alpha def beta(self, log_y, labels): ##计算后向概率 T, V = log_y.shape L = len(labels) log_beta = np.ones([T, L]) * ninf # init log_beta[-1, -1] = log_y[-1, labels[-1]] log_beta[-1, -2] = log_y[-1, labels[-2]] for t in range(T - 2, -1, -1): for i in range(L): s = labels[i] a = log_beta[t + 1, i] if i + 1 < L: a = logsumexp(a, log_beta[t + 1, i + 1]) if i + 2 < L and s != 0 and s != labels[i + 2]: a = logsumexp(a, log_beta[t + 1, i + 2]) log_beta[t, i] = a + log_y[t, s] return log_beta def backward(selflog_y, labels): T, V = log_y.shape L = len(labels) log_alpha = self.alpha(log_y, labels) log_beta = self.beta(log_y, labels) log_p = logsumexp(log_alpha[-1, -1], log_alpha[-1, -2]) ##任意时刻的 log_grad = np.ones([T, V]) * ninf for t in range(T): for s in range(V): lab = [i for i, c in enumerate(labels) if c == s] for i in lab: log_grad[t, s] = logsumexp(log_grad[t, s], log_alpha[t, i] + log_beta[t, i]) log_grad[t, s] -= 2 * log_y[t, s] log_grad -= log_p return log_grad def predict(self): pass def ctc_prefix(self): pass def ctc_beamsearch(self): pass def alpha_vanilla(self, y, labels): T, V = y.shape # T,time step, V: probs L = len(labels) # label length alpha = np.zeros([T, L]) # init alpha[0, 0] = y[0, labels[0]] alpha[0, 1] = y[0, labels[1]] for t in range(1, T): for i in range(L): s = labels[i] a = alpha[t - 1, i] if i - 1 >= 0: a += alpha[t - 1, i - 1] if i - 2 >= 0 and s != 0 and s != labels[i - 2]: a += alpha[t - 1, i - 2] alpha[t, i] = a * y[t, s] return alpha def beta_vanilla(self, y, labels): ##原始版计算前向概率,没在对数域中计算 T, V = y.shape L = len(labels) beta = np.zeros([T, L]) # init beta[-1, -1] = y[-1, labels[-1]] beta[-1, -2] = y[-1, labels[-2]] for t in range(T - 2, -1, -1): for i in range(L): s = labels[i] a = beta[t + 1, i] if i + 1 < L: a += beta[t + 1, i + 1] if i + 2 < L and s != 0 and s != labels[i + 2]: a += beta[t + 1, i + 2] beta[t, i] = a * y[t, s] return beta def gradient(self, y, labels): T, V = y.shape L = len(labels) alpha = self.alpha_vanilla(y, labels) beta = self.beta(y, labels) p = alpha[-1, -1] + alpha[-1, -2] grad = np.zeros([T, V]) for t in range(T): for s in range(V): lab = [i for i, c in enumerate(labels) if c == s] for i in lab: grad[t, s] += alpha[t, i] * beta[t, i] grad[t, s] /= y[t, s] ** 2 grad /= p return grad def check_grad(y, labels, w=-1, v=-1, toleration=1e-3): grad_1 = gradient(y, labels)[w, v] delta = 1e-10 original = y[w, v] y[w, v] = original + delta alpha = forward(y, labels) log_p1 = np.log(alpha[-1, -1] + alpha[-1, -2]) y[w, v] = original - delta alpha = forward(y, labels) log_p2 = np.log(alpha[-1, -1] + alpha[-1, -2]) y[w, v] = original grad_2 = (log_p1 - log_p2) / (2 * delta) if np.abs(grad_1 - grad_2) > toleration: print('[%d, %d]:%.2e' % (w, v, np.abs(grad_1 - grad_2)))def remove_blank(labels, blank=0): new_labels = [] # combine duplicate previous = None for l in labels: if l != previous: new_labels.append(l) previous = l # remove blank new_labels = [l for l in new_labels if l != blank] return new_labelsdef insert_blank(labels, blank=0): new_labels = [blank] for l in labels: new_labels += [l, blank] return new_labelsdef greedy_decode(y, blank=0): raw_rs = np.argmax(y, axis=1) rs = remove_blank(raw_rs, blank) return raw_rs, rsdef beam_decode(y, beam_size=10): T, V = y.shape log_y = np.log(y) beam = [([], 0)] for t in range(T): # for every timestep new_beam = [] for prefix, score in beam: for i in range(V): # for every state new_prefix = prefix + [i] new_score = score + log_y[t, i] new_beam.append((new_prefix, new_score)) # top beam_size new_beam.sort(key=lambda x: x[1], reverse=True) beam = new_beam[:beam_size] return beamdef prefix_beam_decode(y, beam_size=10, blank=0): T, V = y.shape log_y = np.log(y) beam = [(tuple(), (0, ninf))] # blank, non-blank for t in range(T): # for every timestep new_beam = defaultdict(lambda : (ninf, ninf)) for prefix, (p_b, p_nb) in beam: for i in range(V): # for every state p = log_y[t, i] if i == blank: # propose a blank new_p_b, new_p_nb = new_beam[prefix] new_p_b = logsumexp(new_p_b, p_b + p, p_nb + p) new_beam[prefix] = (new_p_b, new_p_nb) continue else: # extend with non-blank end_t = prefix[-1] if prefix else None # exntend current prefix new_prefix = prefix + (i,) new_p_b, new_p_nb = new_beam[new_prefix] if i != end_t: new_p_nb = logsumexp(new_p_nb, p_b + p, p_nb + p) else: new_p_nb = logsumexp(new_p_nb, p_b + p) new_beam[new_prefix] = (new_p_b, new_p_nb) # keep current prefix if i == end_t: new_p_b, new_p_nb = new_beam[prefix] new_p_nb = logsumexp(new_p_nb, p_nb + p) new_beam[prefix] = (new_p_b, new_p_nb) # top beam_size beam = sorted(new_beam.items(), key=lambda x : logsumexp(*x[1]), reverse=True) beam = beam[:beam_size] return beam
转载地址:http://gzima.baihongyu.com/