Python自定义指标聚类实例代码

最近在研究 Yolov2 论文的时候,发现作者在做先验框聚类使用的指标并非欧式距离,而是IOU。在找了很多资料之后,基本确定 Python 没有自定义指标聚类的函数,所以打算自己做一个

设训练集的 shape 是 [n_sample, n_feature],基本思路是:

  • 簇中心初始化:第 1 个簇中心取样本的特征均值,shape = [n_feature, ];从第 2 个簇中心开始,用距离函数 (自定义) 计算每个样本到最近中心点的距离,归一化后作为选取下一个簇中心的概率 —— 迭代到选取到足够的簇中心为止
  • 簇中心调整:训练多轮,每一轮以样本点到最近中心点的距离之和作为 loss,梯度下降法 + Adam 优化器逼近最优解,在 loss 浮动值小于阈值的次数达到一定值时停止训练

因为设计之初就打算使用自定义距离函数,所以求导是很大的难题。笔者不才,最终决定借助 PyTorch 自动求导的天然优势

先给出欧式距离的计算函数

def Eu_dist(data, center):
  """ 以 欧氏距离 为聚类准则的距离计算函数
      data: 形如 [n_sample, n_feature] 的 tensor
      center: 形如 [n_cluster, n_feature] 的 tensor"""
  data = data.unsqueeze(1)
  center = center.unsqueeze(0)
  dist = ((data - center) ** 2).sum(dim=2)
  return dist

然后就是聚类器的代码:使用时只需关注 __init__、fit、classify 函数

import torch
import numpy as np
import matplotlib.pyplot as plt
Adam = torch.optim.Adam

def get_progress(current, target, bar_len=30):
  """ current: 当前完成任务数
      target: 任务总数
      bar_len: 进度条长度
      return: 进度条字符串"""
  assert current <= target
  percent = round(current / target * 100, 1)
  unit = 100 / bar_len
  solid = int(percent / unit)
  hollow = bar_len - solid
  return "■" * solid + "□" * hollow + f" {current}/{target}({percent}%)"


class Cluster:
  """ 聚类器
      n_cluster: 簇中心数
      dist_fun: 距离计算函数
          kwargs:
              data: 形如 [n_sample, n_feather] 的 tensor
              center: 形如 [n_cluster, n_feature] 的 tensor
          return: 形如 [n_sample, n_cluster] 的 tensor
      init: 初始簇中心
      max_iter: 最大迭代轮数
      lr: 中心点坐标学习率
      stop_thresh: 停止训练的loss浮动阈值
      cluster_centers_: 聚类中心
      labels_: 聚类结果"""

  def __init__(self, n_cluster, dist_fun, init=None, max_iter=300, lr=0.08, stop_thresh=1e-4):
      self._n_cluster = n_cluster
      self._dist_fun = dist_fun
      self._max_iter = max_iter
      self._lr = lr
      self._stop_thresh = stop_thresh
      # 初始化参数
      self.cluster_centers_ = None if init is None else torch.FloatTensor(init)
      self.labels_ = None
      self._bar_len = 20

  def fit(self, data):
      """ data: 形如 [n_sample, n_feature] 的 tensor
          return: loss浮动日志"""
      if self.cluster_centers_ is None:
          self._init_cluster(data, self._max_iter // 5)
      log = self._train(data, self._max_iter, self._lr)
      # 开始若干轮次的训练,得到loss浮动日志
      return log

  def classify(self, data, show=False):
      """ data: 形如 [n_sample, n_feature] 的 tensor
          show: 绘制分类结果
          return: 分类标签"""
      dist = self._dist_fun(data, self.cluster_centers_)
      self.labels_ = dist.argmin(axis=1)
      # 将标签加载到实例属性
      if show:
          for idx in range(self._n_cluster):
              container = data[self.labels_ == idx]
              plt.scatter(container[:, 0], container[:, 1], alpha=0.7)
          plt.scatter(self.cluster_centers_[:, 0], self.cluster_centers_[:, 1], c="gold", marker="p", s=50)
          plt.show()
      return self.labels_

  def _init_cluster(self, data, epochs):
      self.cluster_centers_ = data.mean(dim=0).reshape(1, -1)
      for idx in range(1, self._n_cluster):
          dist = np.array(self._dist_fun(data, self.cluster_centers_).min(dim=1)[0])
          new_cluster = data[np.random.choice(range(data.shape[0]), p=dist / dist.sum())].reshape(1, -1)
          # 取新的中心点
          self.cluster_centers_ = torch.cat([self.cluster_centers_, new_cluster], dim=0)
          progress = get_progress(idx, self._n_cluster, bar_len=self._n_cluster if self._n_cluster <= self._bar_len else self._bar_len)
          print(f"\rCluster Init: {progress}", end="")
          self._train(data, epochs, self._lr * 2.5, init=True)
          # 初始化簇中心时使用较大的lr

  def _train(self, data, epochs, lr, init=False):
      center = self.cluster_centers_.cuda()
      center.requires_grad = True
      data = data.cuda()
      optimizer = Adam([center], lr=lr)
      # 将中心数据加载到 GPU 上
      init_patience = int(epochs ** 0.5)
      patience = init_patience
      update_log = []
      min_loss = np.inf
      for epoch in range(epochs):
          # 对样本分类并更新中心点
          sample_dist = self._dist_fun(data, center).min(dim=1)
          self.labels_ = sample_dist[1]
          loss = sum([sample_dist[0][self.labels_ == idx].mean() for idx in range(len(center))])
          # loss 函数: 所有样本到中心点的最小距离和 - 中心点间的最小间隔
          loss.backward()
          optimizer.step()
          optimizer.zero_grad()
          # 反向传播梯度更新中心点
          loss = loss.item()
          progress = min_loss - loss
          update_log.append(progress)
          if progress > 0:
              self.cluster_centers_ = center.cpu().detach()
              min_loss = loss
              # 脱离计算图后记录中心点
          if progress < self._stop_thresh:
              patience -= 1
              # 耐心值减少
              if patience < 0:
                  break
                  # 耐心值归零时退出
          else:
              patience = init_patience
              # 恢复耐心值
          progress = get_progress(init_patience - patience, init_patience, bar_len=self._bar_len)
          if not init:
              print(f"\rCluster: {progress}\titer: {epoch + 1}", end="")
      if not init:
          print("")
      return torch.FloatTensor(update_log)

 

与KMeans++比较

KMeans++ 是以欧式距离为聚类准则的经典聚类算法。在 iris 数据集上,KMeans++ 远远快于我的聚类器。但在我反复对比测试的几轮里,我的聚类器精度也是不差的 —— 可以看到下图里的聚类结果完全一致

  KMeans++ My Cluster
Cost 145 ms 1597 ms
Center

[[5.9016, 2.7484, 4.3935, 1.4339],

[5.0060, 3.4280, 1.4620, 0.2460],
[6.8500, 3.0737, 5.7421, 2.0711]]

[[5.9016, 2.7485, 4.3934, 1.4338],
[5.0063, 3.4284, 1.4617, 0.2463],
[6.8500, 3.0741, 5.7420, 2.0714]]

虽然速度方面与老牌算法对比的确不行,但是我的这个聚类器最大的亮点还是自定义距离函数

 

Yolo 检测框聚类

本来想用 Yolov4 检测框聚类引入的CIoU 做聚类,但是没法解决梯度弥散的问题,所以退其次用了 DIoU

def DIoU_dist(boxes, anchor):
  """ 以 DIoU 为聚类准则的距离计算函数
      boxes: 形如 [n_sample, 2] 的 tensor
      anchor: 形如 [n_cluster, 2] 的 tensor"""
  n_sample = boxes.shape[0]
  n_cluster = anchor.shape[0]
  dist = Eu_dist(boxes, anchor)
  # 计算欧式距离
  union_inter = torch.prod(boxes, dim=1).reshape(-1, 1) + torch.prod(anchor, dim=1).reshape(1, -1)
  boxes = boxes.unsqueeze(1).repeat(1, n_cluster, 1)
  anchor = anchor.unsqueeze(0).repeat(n_sample, 1, 1)
  compare = torch.stack([boxes, anchor], dim=2)
  # 组合检测框与 anchor 的信息
  diag = torch.sum(compare.max(dim=2)[0] ** 2, dim=2)
  dist /= diag
  # 计算外接矩形的对角线长度
  inter = torch.prod(compare.min(dim=2)[0], dim=2)
  iou = inter / (union_inter - inter)
  # 计算 IoU
  dist += 1 - iou
  return dist

我提取了DroneVehicle 数据集的 650156 个预测框的尺寸做聚类,在这个过程中发现因为小尺寸的预测框过多,导致聚类中心聚集在原点附近。所以对 loss 函数做了改进:先分类,再计算每个分类下的最大距离之和

横轴表示检测框的宽度,纵轴表示检测框的高度,其数值都是相对于原图尺寸的比例。若原图尺寸为 608 * 608,则得到的 9 个先验框为:

[ 2, 3 ] [ 9, 13 ] [ 19, 35 ]
[ 10, 76 ] [ 60, 14 ] [ 25, 134 ]
[ 167, 25 ] [ 115, 54 ] [ 70, 176 ]

 

总结

关于Python自定义指标聚类的文章就介绍至此,更多相关Python自定义指标聚类内容请搜索编程宝库以前的文章,希望以后支持编程宝库

python中图像处理相关库有很多,这里简单介绍PIL、cv2、scipy.imageio 、matplotlib.image、skimage等常用库,其中PIL库使用最方便,cv2库功能最强大。下面分享 ...