概念

异常检测是一种识别异常样本的方法,我们需要构建一个概率模型,如果某一样本被认定是正常样本的概率足够小,那么它会被当做异常样本。高斯分布(或称正态分布)模型是异常检测算法最常使用的概率分布模型。

算法的基本步骤如下:利用极大似然估计估算特征x的高斯分布概率密度函数的参数$\mu$和$\delta^2$。假设有m个特征,那么样本模型$p(x)$就是这m个特征的概率密度函数的乘积。用此模型估计新的样本,当$ p(x)<\epsilon $时,认为样本x是异常样本。

那么如何选择合适的阈值$\epsilon$呢?一种计算方法是从所给数据最小和最大的概率密度之间选择多个数值,然后我们可以考虑使用F1分数来评价$\epsilon$的优劣,选出最优值。

TensorFlow实现

首先仍然是读取数据,计算平均值和方差。

data = scio.loadmat('ex8data1.mat')
train_data = data['X']
X_val = tf.Variable(data['Xval'], dtype=np.float32) # 测试数据
y_val = tf.Variable(data['yval'], dtype=np.float32) # 是否为异常数据(0或1),与X_val数据相对应
X = tf.Variable(train_data, dtype=np.float32)

mean, sigma = tf.nn.moments(X, axes=0)

之后计算高斯分布模型$p(x)$,因为本例训练数据的特征值有两个,这里分别算出概率密度p1、p2,再相乘,这里注意需要扩展一个维度,使之变为矩阵。

def p(x):
    p1 = tf.distributions.Normal(mean[0], tf.sqrt(sigma[0])).prob(x[:, 0]) # prob方法带入相应的数据进行计算
    p2 = tf.distributions.Normal(mean[1], tf.sqrt(sigma[1])).prob(x[:, 1])
    return tf.expand_dims(tf.multiply(p1, p2), 1)

p_val = p(X_val) # 测试集模型
p = p(X)  # 训练集模型

然后就是计算F1分数选择$\epsilon$,我发现TensorFlow有自带的计算F1分数的函数:

f1_score, update_op = tf.contrib.metrics.f1_score(y_val, p_val)

但是除了官方文档之外,我没有查到任何使用的例子,自己用会报一大堆错误,由于无法解决这个问题,只能一步一步计算了。另外这个方法使用NumPy实现,而不是TensorFlow。因为TensorFlow的张量是不能迭代的,所以不能进行for循环,也没有找到很好的方式解决,所以就临时使用NumPy实现。

def choose_epsilon(y_val, p_val): # 两个参数代表实际值和预测值(概率密度)
    best_epsilon = 0
    best_f1 = 0

    step = (np.max(p_val) - np.min(p_val)) / 1000
    for epsilon in np.arange(np.min(p_val), np.max(p_val), step):
        predictions = p_val < epsilon # 将预测值概率密度转为0或1

        tp = np.sum(np.logical_and(predictions == 1, y_val == 1))  # true positive 预测为正,实际为正
        fp = np.sum(np.logical_and(predictions == 1, y_val == 0))  # false positive 预测为正,实际为负
        fn = np.sum(np.logical_and(predictions == 0, y_val == 1))  # false negative 预测为负,实际为正

        precision = tp / (tp + fp)  # 计算精确率,注意这里会报一个警告,为保证分母不为0,需要加一个数(如0.0000000001),下面计算F1分数也如此
        recall = tp / (tp + fn) # 计算召回率
        f1 = (2 * precision * recall) / (precision + recall) # 计算F1分数

        if f1 > best_f1:
            best_f1 = f1
            best_epsilon = epsilon

    return best_epsilon, best_f1

后续计算及可视化如下:

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    
    epsilon, _ = choose_epsilon(sess.run(y_val), sess.run(p_val)) # 计算最佳阈值
    outliers = np.where(sess.run(p) < epsilon, True, False).ravel() # 找出异常数据,以True标示,这是使用训练集模型,而不是测试集模型

    plt.scatter(train_data[:, 0], train_data[:, 1]) # 画出所有训练数据
    plt.scatter(train_data[outliers, 0], train_data[outliers, 1], color='r') # 画出异常数据
    plt.show()

最终输出的结果可视化如下,红点为异常数据,共6个。

参考资料: 机器学习练习(四)——异常检测