马尔可夫随机场MRF与置信度传播BP(附例子讲解)

 

1 马尔可夫随机场

马尔可夫随机场(Markov Random Field,MRF),是概率无向图模型的一种,用于描述变量之间的关系,其中变量之间的连接是无向的。

MRF的结构和条件独立性质是基于团块最大团块的概念来定义的。

团块是图中结点的子集,其中子集中的节点两两之间都直接连接,而最大团块则是不能再添加其他节点而保持两两连接的最大子集。

image-20240329171005870

MRF的联合概率分布可以表示为:

\[P(\mathbf{x}) = \frac{1}{Z} \prod_{C} \psi_C(\mathbf{x}_C)\]

其中,

  • $P(\mathbf{x})$ 是联合概率分布
  • $Z$ 是归一化常数
  • $\psi_C(\mathbf{x}_C)$ 是势函数,表示团块 $C$ 中的变量组合 $\mathbf{x}_C$ 的势能

2 因子图

image-20240329171629775

上面三个都代表同一个MRF,后面两个是该MRF的因子图,不同的因子图代表着不同的分解:

  • 第一个: $P(a,b,c) = \frac{1}{Z}f_(a,b,c)$
  • 第二个: $P(a,b,c) = \frac{1}{Z}f_1(a,b)f_2(b,c)f_3(a,c)$

代表因子 $f$ 的结点与代表变量 $x_i$ 的边缘(或半边缘)相连,当且仅当 $f$ 是关于 $x_i$ 的函数

3 置信度传播

置信度传播算法(Belief Propagation,BP)是一种用于计算概率图模型中的边缘概率分布最大后验概率(MAP)估计的近似推断算法。它基于因子图结构,通过在因子图上传递消息来计算变量之间的联合概率分布。

最大后验概率:最大后验概率即在给定观测数据 $D$ 的情况下,使后验概率最大化的参数或隐含变量值。通常表示: $\hat{\theta}_{\text{MAP}} = \underset{\theta}{\arg\max} \ P(\theta D)$ ​
贝叶斯公式: $P(\theta D) = \frac{P(D \theta) \cdot P(\theta)}{P(D)}$

原理:

  • 消息传递:BP算法通过在因子图上的节点之间传递消息来进行推断。每个节点根据收到的消息更新自己的置信度,然后将更新后的消息传递给其相邻的节点,直到达到收敛或达到最大迭代次数为止

步骤:

  1. 初始化:初始化每个节点的信念值和消息
  2. 消息传递:在因子图上进行消息传递,直到收敛或达到最大迭代次数
  3. 计算结果:根据收敛后的节点信念值,计算边缘概率分布或进行最大后验概率估计

这段代码是用Python来计算在链式结构马尔可夫随机场上的最大积置信传播(Max-Product Belief Propagation)。下面我会逐步解释代码的功能:

Python示例:

image-20240329193528010

判断车辆最可能行驶的车道,由于有一些噪声导致了一些错误,初始值为 unary

image-20240329173729679

定义马尔可夫随机场模型的参数:

# unary: Nx3 matrix 一元势函数f_i
unary = np.array([[0.7,0.1,0.2],[0.7,0.2,0.1],[0.2,0.1,0.7],[0.7,0.2,0.1],
                  [0.2,0.6,0.2],[0.1,0.8,0.1],[0.4,0.3,0.3],[0.1,0.8,0.1],
                  [0.1,0.1,0.8],[0.1,0.5,0.4]])
# pairwise: 3x3 matrix 二元势函数g(转移概率)
pairwise = np.array([[0.8,0.2,0.0],[0.2,0.6,0.2],[0.0,0.2,0.8]])

一元势函数表示每个变量在各个状态下的初始概率,二元势函数表示变量之间的转移概率,如 pairwise[0][0]=0.8 就表示在第一个车道变到第一车道(维持在当前车道)的概率为0.8

1 获取模型参数
[num_vars, num_states] = unary.shape
  • unary 是一个二维数组,表示每个变量的一元势函数(unary potentials),其中每一行对应一个变量(某时刻在每个车道的概率),每一列对应一个状态(在哪个车道)
  • num_vars 表示变量的数量,即 unary 数组的行数
  • num_states 表示每个变量可能的状态数量,即 unary 数组的列数
2 计算消息

创建一个空的二维数组 msg,用于存储消息。数组的行数是 num_vars - 1,表示消息的数量,每一行对应一个变量。数组的列数是 num_states,表示每个消息可能的状态数量。

msg = np.zeros([num_vars - 1, num_states]) #一共9个消息(9个点与点的连接)
3 消息传递

消息传递的核心是通过计算变量之间的乘积来更新消息,并选择这个乘积中的最大值作为传递的消息,从而选择最有可能的状态作为消息传递的结果

for i in range(num_vars - 2, -1, -1):
    if i == num_vars - 2:
        msg[i] = np.max(pairwise * unary[i + 1, :], 1)
    else:
        msg[i] = np.max(pairwise * unary[i + 1, :] * msg[i + 1], 1)

消息从后往前传递:

  • 计算从倒数第二个变量到最后一个变量的消息:直接unary 乘上 pairwise,并取乘积中的最大值
  • 计算其他变量之间的消息:unarypairwise前一个变量的消息的乘积,并取乘积中的最大值
4 计算边缘概率分布和MAP
  1. 创建一个空的二维数组 max_marginals,用于存储边缘概率。数组的行数是 num_vars,表示变量的数量,每一行对应一个变量。数组的列数是 num_states,表示每个变量可能的状态(一共3个车道)

    max_marginals = np.zeros([num_vars, num_states])
    

    创建一个大小为 num_vars 的一维数组 map,用于存储每一个变量的MAP(即对于每一个时刻,最可能选择的车道)

    map = np.zeros(num_vars, dtype=int)
    
  2. 对于每个变量 i

    • 第一个,则边缘概率就等于该变量的消息 msg[i, :]
    • 最后一个,则边缘概率等于前一个变量到当前变量的转移概率乘以当前变量的一元势函数
    • 对于其他变量,边缘概率等于前一个变量到当前变量的转移概率乘以当前变量的一元势函数乘以当前变量的消息
    for i in range(num_vars):
        if i == 0:
            max_marginals[i, :] = msg[i, :]
        if i == num_vars - 1:
            max_marginals[i, :] = pairwise[map[i - 1], :] * unary[i, :]
        else:
            max_marginals[i, :] = pairwise[map[i - 1], :] * unary[i, :] * msg[i, :]
        map[i] = np.argmax(max_marginals[i, :])
    

    pairwise[map[i - 1], :] 代表前一个时刻最可能在的车道所对应的转到各个车道的概率

  3. 输出MAP估计值:

    输出MAP估计值,即对每个变量边缘概率最大的所对应的状态(每个时刻最可能的车道)

    print(map)
    

完整代码:

import numpy as np
import matplotlib.pyplot as plt
from scipy import misc

# plot function
# input: Nx3 matrix of values & title string
def plot(vals, title=''):
    plt.close()
    vals /= np.tile(np.sum(vals, 1), (3, 1)).transpose()
    f, axarr = plt.subplots(1, 10, figsize=(10, 2))
    plt.suptitle(title, fontsize=16, fontweight='bold')
    for i in range(vals.shape[0]):
        axarr[i].barh([0, 1, 2], np.array([1, 1, 1]), color='white', edgecolor='black', linewidth=2)
        axarr[i].barh([0, 1, 2], vals[i], color='red')
        axarr[i].axis('off')
    plt.show()


if __name__ == '__main__':
    unary = np.array([[0.7, 0.1, 0.2], [0.7, 0.2, 0.1], [0.2, 0.1, 0.7], [0.7, 0.2, 0.1],
                      [0.2, 0.6, 0.2], [0.1, 0.8, 0.1], [0.4, 0.3, 0.3], [0.1, 0.8, 0.1],
                      [0.1, 0.1, 0.8], [0.1, 0.5, 0.4]])
    
    pairwise = np.array([[0.8, 0.2, 0.0], [0.2, 0.6, 0.2], [0.0, 0.2, 0.8]])

    [num_vars, num_states] = unary.shape

    msg = np.zeros([num_vars - 1, num_states])  # (num_vars-1) x num_states matrix
    for i in range(num_vars - 2, -1, -1):
        if i == num_vars - 2:
            # 计算倒数第二个到最后一个的消息
            msg[i] = np.max(pairwise * unary[i + 1, :], 1)
        else:
            # 计算其他的消息
            msg[i] = np.max(pairwise * unary[i + 1, :] * msg[i + 1], 1)

   
    max_marginals = np.zeros([num_vars, num_states])
    map = np.zeros(num_vars, dtype=int)
    for i in range(num_vars):
        if i == 0:
            max_marginals[i, :] = msg[i, :]
        if i == num_vars - 1:
            max_marginals[i, :] = pairwise[map[i - 1], :] * unary[i, :]
        else:
            max_marginals[i, :] = pairwise[map[i - 1], :] * unary[i, :] * msg[i, :]
        map[i] = np.argmax(max_marginals[i, :])

    
    plot(max_marginals, 'Max Marginals')


    print("MAP Estimate:")
    print(map)