【深度学习入门系列】径向基函数(RBF)神经网络原理介绍及pytorch实现(内含分类、回归任务实例)

news2025/1/15 23:21:34

文章目录

  • 1 RBF神经网络
    • 1.1 简介
    • 1.2 步骤
      • 输入
      • rbf层
        • 核函数
        • 中心点求解方法
      • 输出
    • 1.3 几个问题
  • 2 分类
    • 2.0 数据集
    • 2.1 网络架构
    • 2.2 代码
    • 2.3 结果
  • 3 回归
    • 3.0 数据集
    • 3.1 网络架构
    • 3.2 代码
    • 3.3 结果
  • 4 代码(可直接食用)

众所周知,MATLAB工具箱里提供了RBF神经网络。个人认为这个东西虽然蛮好用的——有的时候比你自己写的效果都好。但是,不是长久之计。通过Pytorch能建立自定义程度更高的人工神经网络,往后在网络里面加乱七八糟的东西都很方便(比如GA/PSO求解超参之类的、比如微调模型架构之类的)。

看了眼网上的写法,用python实现的人还是挺多的,但是大多数还是选择用 numpy 实现 RBF,有用 torch 的,不是很多。

本文将首先简单介绍和推导RBF神经网络的理论,然后分别在分类数据集和回归数据集中演示该神经网络的具体搭建方法。

1 RBF神经网络

1.1 简介

径向基(Radial Basis Function, RBF)神经网络主要具有以下特性:

  1. 单隐层前馈神经网络。因此,RBF 神经网络一般都蛮简单的,毕竟只有一个隐藏层,也翻不出什么浪花。
  2. 隐藏层激活函数为径向基函数。这个也没有任何问题,这正式人家的特点。
  3. 最终结果是隐藏层结果的线性组合。

因为径向基函数图像基本上都呈现为钟形,所以径向基神经网络可以简单地理解为由多个钟形函数加权求和凑出来的一个拟合线。

因为 RBF 在隐藏层用了个核函数把数据从低维空间映射到高维空间以求解线性不可分问题,又在输出时用一个简单的线性关系计算 RBF 层的结果,所以我们可以这样理解:

RBF 神经网络 输入 ——> 输出 是一段非线性关系。
RBF 神经网络 隐层 ——> 输出 是一段线性关系。

1.2 步骤

步骤分为三步(输入,rbf层,输出):

输入

输入没啥好说的,就跟正常神经网络一样,有几个特征就用几个输入神经元。

rbf层

这部分相当于借鉴了聚类算法的思路,根据数据的特性把输入数据拆分为了 n(隐藏层神经元个数) 类,再由每个神经元负责自己的范围内的数据(主要是由激活函数的数学性质导致的)。

因此这就涉及到了每个神经元负责的范围大小,称为中心宽度;也涉及到了类别的数量确定,也就是隐藏层神经元的数量。

因为每个神经元只负责自己范围内的那些数据,在逼近最优值的时候不需要管其他神经元内的数据,因此这是一个局部逼近网络。

rbf层蛮有讲究的。大概有这几个点比较重要。

核函数

核函数就是一个能把低维数据映射到高维的东西,拿高斯核函数举例:

φ ( x i , r j ) = e − ∣ ∣ x i − c j ∣ ∣ 2 2 σ 2 \varphi(x_i,r_j)=e^{-\frac{||x_i-c_j||^2}{2\sigma^2}} φ(xi,rj)=e2σ2∣∣xicj2

其中, ∣ ∣ x i − c j ∣ ∣ ||x_i-c_j|| ∣∣xicj∣∣是指点 x i x_i xi与中心点 c j c_j cj的欧氏距离, σ \sigma σ是指高斯核的宽度。

这个公式实际上就是:

φ ′ ( x i → , r j → ) = e − x i → ∙ r j → σ 2 \varphi'(\overrightarrow{x_i},\overrightarrow{r_j})=e^{-\frac{\overrightarrow{x_i}\bullet \overrightarrow{r_j}}{\sigma^2}} φ(xi ,rj )=eσ2xi rj

这就好办了,幂级数展开

φ ′ ( x i → , r j → ) = ∑ n = 0 + ∞ ( x i → ∙ r j → ) n σ n   n ! \varphi'(\overrightarrow{x_i},\overrightarrow{r_j})=\sum_{n=0}^{+\infty}{\frac{(\overrightarrow{x_i}\bullet \overrightarrow{r_j})^n}{\sigma^n\ n!}} φ(xi ,rj )=n=0+σn n!(xi rj )n

因为 n 趋近于无穷,所以这一对乘起来之后维度数据就上去了。

当然除了高斯核函数,还有很多核函数,比如

反常S型函数: φ ( r ) = 1 1 + e ∣ ∣ x i − c j ∣ ∣ 2 2 σ 2 \varphi(r)=\frac{1}{1+e^{\frac{||x_i-c_j||^2}{2\sigma^2}}} φ(r)=1+e2σ2∣∣xicj21

拟多二次函数: φ ( r ) = 1 ∣ ∣ x i − c j ∣ ∣ 2 + σ 2 \varphi(r)=\frac{1}{\sqrt{||x_i-c_j||^2+\sigma^2}} φ(r)=∣∣xicj2+σ2 1

等等

中心点求解方法

  1. 随机方法(直接计算法)

隐含层神经元的中心是随机地在输入样本中选取,且中心固定。一旦中心固定下来,隐含层神经元的输出便是已知的,这样的神经网络的连接权就可以通过求解线性方程组来确定。适用于样本数据的分布具有明显代表性。

  1. 无监督学习方法

思路像聚类

第一步:无监督学习过程,求解隐含层基函数的中心与方差

第二步:有监督学习过程,求解隐含层到输出层之间的权值

首先,选取h个中心做k-means聚类,对于高斯核函数的径向基,方差由公式求解:

σ i = c m a x 2 h     i = 1 , 2 , 3 , … , h \sigma_i=\frac{c_{max}}{2h} \ \ \ i=1,2,3, …, h σi=2hcmax   i=1,2,3,,h

c m a x c_{max} cmax为所选取中心点之间的最大距离。

隐含层至输出层之间的神经元的连接权值可以用最小二乘法直接计算得到,即对损失函数求解关于w的偏导数,使其等于0,可以化简得到计算公式为:

w = e x p ( h c m a x 2 ∣ ∣ x p − c i ∣ ∣ 2 )     p = 1 , 2 , 3 , … , P ;   i = 1 , 2 , 3 , … , h w = exp(\frac{h}{c^2_{max}}||x_p-c_i||^2) \ \ \ p=1,2,3, …, P;\ i = 1,2,3, …, h w=exp(cmax2h∣∣xpci2)   p=1,2,3,,P; i=1,2,3,,h

  1. 有监督学习方法

通过训练样本集来获得满足监督要求的网络中心和其他权重参数,经历一个误差修正学习的过程,与BP网络的学习原理一样,同样采用梯度下降法。因此RBF同样可以被当作BP神经网络的一种。

这样,概括的来说RBF神经网络的隐层将输入空间映射到一个新空间 ,输出层在该新空间中实现线性组合器 ,可调节参数就是该线性组合器的权 。

这部分感觉这篇讲的更好一点。

输出

输出也没什么可以称道的,类似于简单的线性回归吧。实际上就是对上一层的聚类的结果的一个线性拟合(类似线性加权求和)。

1.3 几个问题

  1. 同样都是用核函数解决问题,在解决线性不可分问题时的 SVM 和 RBF 有什么区别吗?
    看到高斯核,感觉大家第一想法应该都是SVM吧,这俩同样都是计算距离,两者计算的距离还是有点区别的:
    SVM:计算与其他输入点间的距离,而且中心点是在已存在的样本中选取出来的。
    RBF神经网络:计算的是输入点和几个神经元节点中心的距离,思路有点像聚类。

  2. 为什么用 RBF 而不用 BP?RBF 的优势在哪里?

    1. :相比于BP,RBF神经网络用了非线性技术做了优化,减小了计算量,增快了学习速率;而BP必须要用某种非线性优化技术,这大大损耗了时间。
    2. 局部逼近:(这个不知道算不算优点)RBF神经网络用的是局部逼近,所以空间中只有少数几个连接的权重会影响到网络的输出;但是BP就不一样,它的参数牵一发而动全身,稍微改一个参数就会导致整个结果大变。
  3. RBF 和 BP 架构的区别?

    1. RBF是单层的,BP多少层理论上都可以。
    2. RBF局部逼近,BP全局逼近。
    3. RBF收敛的相对快一点。

2 分类

2.0 数据集

数据集我采用的是之前接手的一个保险理赔项目。

目标是判断用户是不是来骗保的,给了一大堆特征,这里我就不详细解释是哪些特征了,这篇文章主要负责搭建模型。

归一化之后发现数据还是非常稀疏的,而且看着可能二值化效果会不错,嫌麻烦,不尝试了,就直接拿这个用也没什么大问题。

是一个4分类任务,结果可能没2分类好看,这跟项目的数据也有关,本身数据也比较脏、噪声也比较多,感觉特征与 label 之间的联系也不是特别紧密。不过问题不大,我们的重心还是放在搭网络上。

数据集缺点还包括样本不平衡,确实会影响结果。

data.csv (36221x24)=> 已经包括 label 了

在这里插入图片描述

2.1 网络架构

torch 输出的架构如下

在这里插入图片描述

架构很简单,这个确实没办法,毕竟 RBF 神经网络就是单层的……

一共是1层隐藏层(32个神经元)。

也尝试过 64 个神经元和一些其他的参数,效果反而没这个好。

2.2 代码

分类任务的相关文件如下

在这里插入图片描述

RBF_clf.py			分类模型架构
data.csv			分类数据集
run.py				主函数
utils.py			相关函数和类

run.py

import os
import numpy
import torch
import random
from utils import Config, CLF_Model, REG_Model
from clf_model.RBF_clf import RBF


seed = 1129
random.seed(seed)
numpy.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
os.environ['PYTHONHASHSEED'] = str(seed)


# 分类
if __name__ == '__main__':
    config = Config(
        data_path="dataset_clf/data.csv",
        name="RBF",
        batch_size=128,
        learning_rate=0.007,
        epoch=10
    )
    clf = RBF(
        input_n=len(config.input_col),
        output_n=len(config.output_class),
        hidden_size=32
    )
    print(clf)
    model = CLF_Model(clf, config)
    model.run()

RBF_clf.py

import sys
import torch
from torch.nn import Linear, ReLU, ModuleList, Sequential, Dropout, Softmax, Tanh
import torch.nn as nn



class RBF(torch.nn.Module):
    # 默认三层隐藏层,分别有128个 64个 16个神经元
    def __init__(self, input_n, output_n, hidden_size=10, kernel="gaussian"):
        """
        :param input_n: int 输入神经元个数
        :param output_n: int 输出神经元个数
        :param hidden_size: int 隐藏层神经元个数
        :param kernel: string 核函数
        """
        super(RBF, self).__init__()
        self.input_n = input_n
        self.output_n = output_n
        # 输入层 -> RBF层
        self.input_layer = rbf_layer(input_n, hidden_size, kernel)
        # RBF层 -> 输出层
        self.output_layer = nn.Linear(hidden_size, output_n)

    def forward(self, x):
        rbf_ret = self.input_layer(x)
        output = self.output_layer(rbf_ret)
        return output



class rbf_layer(torch.nn.Module):
    def __init__(self, input_n, hidden_size=10, kernel="gaussian"):
        """
        :param input_n: int 输入神经元个数
        :param hidden_size: int 隐藏层神经元个数
        :param kernel: string 核函数
        """
        super(rbf_layer, self).__init__()
        self.input_n = input_n
        self.hidden_size = hidden_size
        self.kernel = kernel

        kernel_existed = {
            'gaussian': gaussian,
             'linear': linear,
             'quadratic': quadratic,
             'inverse quadratic': inverse_quadratic,
             'multiquadric': multiquadric,
             'inverse multiquadric': inverse_multiquadric,
             'spline': spline,
             'poisson one': poisson_one,
             'poisson two': poisson_two,
             'matern32': matern32,
             'matern52': matern52
        }
        if self.kernel not in kernel_existed:
            print("Unknown kernel function, plz check and modify RBF.kernel")
            sys.exit(0)

        self.centres = nn.Parameter(torch.Tensor(hidden_size, input_n))
        self.log_sigmas = nn.Parameter(torch.Tensor(hidden_size))
        # 初始化各项参数(聚类中心)
        self.ini_parameters()

    def ini_parameters(self):
        nn.init.normal_(self.centres, 0, 1)
        nn.init.constant_(self.log_sigmas, 0)

    def forward(self, x):
        # x: [batch_size, input_n]
        size = (x.size(0), self.hidden_size, self.input_n)
        # 调整数据shape
        # point: [batch_size, hidden_size, input_n]
        # center: [batch_size, hidden_size, input_n]
        point = x.unsqueeze(1).expand(size)
        centers = self.centres.unsqueeze(0).expand(size)
        distance_p_c = self.distance(point, centers)
        return distance_p_c

    def distance(self, x1, x2):
        distances = (x1 - x2).pow(2).sum(-1).pow(0.5) / torch.exp(self.log_sigmas).unsqueeze(0)
        return distances



# 可能会用到的几个RBF核函数
def gaussian(alpha):
    phi = torch.exp(-1*alpha.pow(2))
    return phi

def linear(alpha):
    phi = alpha
    return phi

def quadratic(alpha):
    phi = alpha.pow(2)
    return phi

def inverse_quadratic(alpha):
    phi = torch.ones_like(alpha) / (torch.ones_like(alpha) + alpha.pow(2))
    return phi

def multiquadric(alpha):
    phi = (torch.ones_like(alpha) + alpha.pow(2)).pow(0.5)
    return phi

def inverse_multiquadric(alpha):
    phi = torch.ones_like(alpha) / (torch.ones_like(alpha) + alpha.pow(2)).pow(0.5)
    return phi

def spline(alpha):
    phi = (alpha.pow(2) * torch.log(alpha + torch.ones_like(alpha)))
    return phi

def poisson_one(alpha):
    phi = (alpha - torch.ones_like(alpha)) * torch.exp(-alpha)
    return phi

def poisson_two(alpha):
    phi = ((alpha - 2*torch.ones_like(alpha)) / 2*torch.ones_like(alpha)) * alpha * torch.exp(-alpha)
    return phi

def matern32(alpha):
    phi = (torch.ones_like(alpha) + 3**0.5*alpha)*torch.exp(-3**0.5*alpha)
    return phi

def matern52(alpha):
    phi = (torch.ones_like(alpha) + 5**0.5*alpha + (5/3) * alpha.pow(2))*torch.exp(-5**0.5*alpha)
    return phi

util.py

import sys

import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from matplotlib import pyplot as plt
from sklearn import metrics
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
import time
from datetime import timedelta


# 一个数据格式。感觉这玩意好多余啊,但是nlp写多了就很习惯的写了一个上去 => 主要是不写没法封装
class Dataset(torch.utils.data.Dataset):
    def __init__(self, data_df):
        self.label = torch.from_numpy(data_df['label'].values)
        self.data = torch.from_numpy(data_df[data_df.columns[:-1]].values).to(torch.float32)

    # 每次迭代取出对应的data和author
    def __getitem__(self, idx):
        batch_data = self.get_batch_data(idx)
        batch_label = self.get_batch_label(idx)
        return batch_data, batch_label

    # 下面的几条没啥用其实,就是为__getitem__服务的
    def classes(self):
        return self.label

    def __len__(self):
        return self.data.size(0)

    def get_batch_label(self, idx):
        return np.array(self.label[idx])

    def get_batch_data(self, idx):
        return self.data[idx]


# 存数据,加载数据用的
class Config:
    def __init__(self, data_path, name, batch_size, learning_rate, epoch):
        """
        :param data_path: string 数据文件路径
        :param name: string 模型名字
        :param batch_size: int 多少条数据组成一个batch
        :param learning_rate: float 学习率
        :param epoch: int 学几轮
        """
        self.name = name
        self.data_path = data_path
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.epoch = epoch
        self.train_loader, self.dev_loader, self.test_loader = self.load_tdt()
        self.input_col, self.output_class = self.get_class()
    
    # 加载train, dev, test,把数据封装成Dataloader类
    def load_tdt(self):
        file = self.read_file()
        train_dev_test = self.cut_data(file)
        tdt_loader = [self.load_data(i) for i in train_dev_test]
        return tdt_loader[0], tdt_loader[1], tdt_loader[2]

    # 读文件
    def read_file(self):
        file = pd.read_csv(self.data_path, encoding="utf-8-sig", index_col=None)
        # 保险起见,确认最后一列列名为label
        file.columns.values[-1] = "label"
        self.if_nan(file)
        return file

    # 切7:1:2 => 训练:验证:测试
    def cut_data(self, data_df):
        try:
            train_df, test_dev_df = train_test_split(data_df, test_size=0.3, random_state=1129, stratify=data_df["label"])
            dev_df, test_df = train_test_split(test_dev_df, test_size=0.66, random_state=1129, stratify=test_dev_df["label"])
        except ValueError:
            train_df, test_dev_df = train_test_split(data_df, test_size=0.3, random_state=1129)
            dev_df, test_df = train_test_split(test_dev_df, test_size=0.66, random_state=1129)
        return [train_df, dev_df, test_df]

    # Dataloader 封装进去
    def load_data(self, data_df):
        dataset = Dataset(data_df)
        return torch.utils.data.DataLoader(dataset, batch_size=self.batch_size)

    # 检验输入输出是否有空值
    def if_nan(self, data):
        if data.isnull().any().any():
            empty = data.isnull().any()
            print(empty[empty].index)
            print("Empty data exists")
            sys.exit(0)

    # 后面输出混淆矩阵用的
    def get_class(self):
        file = self.read_file()
        label = file[file.columns[-1]]
        label = list(set(list(label)))
        return file.columns[:-1], label


# 跑clf用的,里面包含了训练,测试,评价等等的代码
class CLF_Model:
    def __init__(self, model, config):
        self.model = model
        self.config = config

    def run(self):
        self.train(self.model)

    def train(self, model):
        dev_best_loss = float('inf')
        start_time = time.time()
        # 模型为训练模式
        model.train()
        # 定义优化器
        optimizer = torch.optim.Adam(model.parameters(), lr=self.config.learning_rate)
        # 记录训练、验证的准确率和损失
        acc_list = [[], []]
        loss_list = [[], []]
        # 记录损失不下降的epoch数,到达20之后就直接退出 => 训练无效,再训练下去可能过拟合
        break_epoch = 0
        for epoch in range(self.config.epoch):
            print('Epoch [{}/{}]'.format(epoch + 1, self.config.epoch))
            for index, (trains, labels) in enumerate(self.config.train_loader):
                # 归零
                model.zero_grad()
                # 得到预测结果,是一堆概率
                outputs = model(trains)
                # 交叉熵计算要long的类型
                labels = labels.long()
                # 计算交叉熵损失
                loss = F.cross_entropy(outputs, labels)
                # 反向传播loss
                loss.backward()
                # 优化参数
                optimizer.step()
                # 每100个迭代或者跑完一个epoch后,验证一下
                if (index % 100 == 0 and index != 0) or index == len(self.config.train_loader) - 1:
                    true = labels.data.cpu()
                    # 预测类别
                    predict = torch.max(outputs.data, 1)[1].cpu()
                    # 计算训练准确率
                    train_acc = metrics.accuracy_score(true, predict)
                    # 计算验证准确率和loss
                    dev_acc, dev_loss = self.evaluate(model)
                    # 查看验证loss是不是进步了
                    if dev_loss < dev_best_loss:
                        dev_best_loss = dev_loss
                        improve = '*'
                        break_epoch = 0
                    else:
                        improve = ''
                        break_epoch += 1
                    time_dif = self.get_time_dif(start_time)
                    # 输出阶段性结果
                    msg = 'Iter: {0:>6},  Train Loss: {1:>5.3},  Train Acc: {2:>6.3%},  Val Loss: {3:>5.3},  Val Acc: {4:>6.3%},  Time: {5} {6}'
                    print(msg.format(index, loss.item(), train_acc, dev_loss, dev_acc, time_dif, improve))
                    # 为了画图准备的,记录每个epoch的结果
                    if index == len(self.config.train_loader) - 1:
                        acc_list[0].append(train_acc)
                        acc_list[1].append(dev_acc)
                        loss_list[0].append(loss.item())
                        loss_list[1].append(dev_loss)
                    # 验证集评估时模型编程验证模式了,现在变回训练模式
                    model.train()
            # 20个epoch损失不变,直接退出训练
            if break_epoch > 20:
                self.config.epoch = epoch+1
                break
        # 测试
        self.test(model)
        # 画图
        self.draw_curve(acc_list, loss_list, self.config.epoch)

    def test(self, model):
        start_time = time.time()
        # 测试准确率,测试损失,测试分类报告,测试混淆矩阵
        test_acc, test_loss, test_report, test_confusion = self.evaluate(model, test=True)
        msg = 'Test Loss: {0:>5.3},  Test Acc: {1:>6.3%}'
        print(msg.format(test_loss, test_acc))
        print("Precision, Recall and F1-Score...")
        print(test_report)
        print("Confusion Matrix...")
        print(test_confusion)
        time_dif = self.get_time_dif(start_time)
        print("Time usage:", time_dif)

    def evaluate(self, model, test=False):
        # 模型模式变一下
        model.eval()
        loss_total = 0
        predict_all = np.array([], dtype=int)
        labels_all = np.array([], dtype=int)
        # 如果是测试模式(这一段写的不是很好)
        if test:
            with torch.no_grad():
                for (dev, labels) in self.config.test_loader:
                    outputs = model(dev)
                    labels = labels.long()
                    loss = F.cross_entropy(outputs, labels)
                    loss_total += loss
                    labels = labels.data.cpu().numpy()
                    predict = torch.max(outputs.data, 1)[1].cpu().numpy()
                    labels_all = np.append(labels_all, labels)
                    predict_all = np.append(predict_all, predict)
            acc = metrics.accuracy_score(labels_all, predict_all)
            report = metrics.classification_report(labels_all, predict_all, target_names=[str(i) for i in self.config.get_class()[1]], digits=4)
            confusion = metrics.confusion_matrix(labels_all, predict_all)
            return acc, loss_total / len(self.config.dev_loader), report, confusion
        # 不是测试模式
        with torch.no_grad():
            for (dev, labels) in self.config.dev_loader:
                outputs = model(dev)
                labels = labels.long()
                loss = F.cross_entropy(outputs, labels)
                loss_total += loss
                labels = labels.data.cpu().numpy()
                predict = torch.max(outputs.data, 1)[1].cpu().numpy()
                labels_all = np.append(labels_all, labels)
                predict_all = np.append(predict_all, predict)
        acc = metrics.accuracy_score(labels_all, predict_all)
        return acc, loss_total / len(self.config.dev_loader)
    
    # 算时间损耗
    def get_time_dif(self, start_time):
        end_time = time.time()
        time_dif = end_time - start_time
        return timedelta(seconds=int(round(time_dif)))

    # 画图
    def draw_curve(self, acc_list, loss_list, epochs):
        x = range(0, epochs)
        y1 = loss_list[0]
        y2 = loss_list[1]
        y3 = acc_list[0]
        y4 = acc_list[1]
        plt.figure(figsize=(13, 13))
        plt.subplot(2, 1, 1)
        plt.plot(x, y1, color="blue", label="train_loss", linewidth=2)
        plt.plot(x, y2, color="orange", label="val_loss", linewidth=2)
        plt.title("Loss_curve", fontsize=20)
        plt.xlabel(xlabel="Epochs", fontsize=15)
        plt.ylabel(ylabel="Loss", fontsize=15)
        plt.legend()
        plt.subplot(2, 1, 2)
        plt.plot(x, y3, color="blue", label="train_acc", linewidth=2)
        plt.plot(x, y4, color="orange", label="val_acc", linewidth=2)
        plt.title("Acc_curve", fontsize=20)
        plt.xlabel(xlabel="Epochs", fontsize=15)
        plt.ylabel(ylabel="Accuracy", fontsize=15)
        plt.legend()
        plt.savefig("images/"+self.config.name+"_Loss&acc.png")

2.3 结果

训练ing……

在这里插入图片描述

效果还不错。在 MLP 中,神经网络很容易陷入局部最优导致准确率一直卡在76%,但是在 RBF 里面神经网络在第三个epoch就跳出来了,这可能跟它追求局部收敛有关。

第三类的精确率感人,不过其他分类效果挺好的。感觉是数据集的问题,对比之前 MLP 的结果可以说是又快又好了。

虽然没有彻底解决第三类分类不准确的问题,但是在第二类上的精确度确实有所提升,而且收敛的很快。

在这里插入图片描述

曲线如下(看曲线感觉可以适当增加下epoch,虽然 loss 确实趋于平稳了,但是怎么感觉还能跌)

在这里插入图片描述

3 回归

3.0 数据集

用的2023美赛春季赛Y题数据,在原有的数据集上加了很多船的参数,再把两个 dataset 合一起。

预处理的操作简单暴力,重复值的行删掉,部分含空值多的行删掉,方便填充的随机森林下,不方便填充的直接暴力填0。

平时肯定是不能这么干的,但是这里我们只是需要一个数据集而已,重点还是模型。

data.csv(2793x39)=> 已经包括 label 了

在这里插入图片描述

label.csv(2793x1)

在这里插入图片描述

3.1 网络架构

torch 输出的架构如下

在这里插入图片描述

跟分类同理

架构很简单,这个确实没办法,毕竟 RBF 神经网络就是单层的……

一共是1层隐藏层(32个神经元)。

也尝试过多的少的神经元数量和一些其他的参数,都被打爆了。

3.2 代码

回归任务的相关文件如下

在这里插入图片描述

data.csv			回归数据集
RBF_reg.py			回归模型架构
run.py				主函数
utils.py			相关函数和类

run.py

import os
import numpy
import torch
import random
from utils import Config, CLF_Model, REG_Model
from clf_model.RBF_clf import RBF


seed = 1129
random.seed(seed)
numpy.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
os.environ['PYTHONHASHSEED'] = str(seed)


from reg_model.RBF_reg import RBF
# 回归
if __name__ == '__main__':
    config = Config(
        data_path="dataset_reg/data.csv",
        name="RBF",
        batch_size=16,
        learning_rate=0.01,
        epoch=200
    )
    # 看是几输出问题
    reg = RBF(
        input_n=len(config.input_col),
        output_n=1,
        hidden_size=32,
    )
    print(reg)
    model = REG_Model(reg, config)
    model.run()

utils.py

import sys

import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from matplotlib import pyplot as plt
from sklearn import metrics
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
import time
from datetime import timedelta


# 一个数据格式。感觉这玩意好多余啊,但是nlp写多了就很习惯的写了一个上去 => 主要是不写没法封装
class Dataset(torch.utils.data.Dataset):
    def __init__(self, data_df):
        self.label = torch.from_numpy(data_df['label'].values)
        self.data = torch.from_numpy(data_df[data_df.columns[:-1]].values).to(torch.float32)

    # 每次迭代取出对应的data和author
    def __getitem__(self, idx):
        batch_data = self.get_batch_data(idx)
        batch_label = self.get_batch_label(idx)
        return batch_data, batch_label

    # 下面的几条没啥用其实,就是为__getitem__服务的
    def classes(self):
        return self.label

    def __len__(self):
        return self.data.size(0)

    def get_batch_label(self, idx):
        return np.array(self.label[idx])

    def get_batch_data(self, idx):
        return self.data[idx]


# 存数据,加载数据用的
class Config:
    def __init__(self, data_path, name, batch_size, learning_rate, epoch):
        """
        :param data_path: string 数据文件路径
        :param name: string 模型名字
        :param batch_size: int 多少条数据组成一个batch
        :param learning_rate: float 学习率
        :param epoch: int 学几轮
        """
        self.name = name
        self.data_path = data_path
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.epoch = epoch
        self.train_loader, self.dev_loader, self.test_loader = self.load_tdt()
        self.input_col, self.output_class = self.get_class()

    # 加载train, dev, test,把数据封装成Dataloader类
    def load_tdt(self):
        file = self.read_file()
        train_dev_test = self.cut_data(file)
        tdt_loader = [self.load_data(i) for i in train_dev_test]
        return tdt_loader[0], tdt_loader[1], tdt_loader[2]

    # 读文件
    def read_file(self):
        file = pd.read_csv(self.data_path, encoding="utf-8-sig", index_col=None)
        # 保险起见,确认最后一列列名为label
        file.columns.values[-1] = "label"
        self.if_nan(file)
        return file

    # 切7:1:2 => 训练:验证:测试
    def cut_data(self, data_df):
        try:
            train_df, test_dev_df = train_test_split(data_df, test_size=0.3, random_state=1129, stratify=data_df["label"])
            dev_df, test_df = train_test_split(test_dev_df, test_size=0.66, random_state=1129, stratify=test_dev_df["label"])
        except ValueError:
            train_df, test_dev_df = train_test_split(data_df, test_size=0.3, random_state=1129)
            dev_df, test_df = train_test_split(test_dev_df, test_size=0.66, random_state=1129)
        return [train_df, dev_df, test_df]

    # Dataloader 封装进去
    def load_data(self, data_df):
        dataset = Dataset(data_df)
        return torch.utils.data.DataLoader(dataset, batch_size=self.batch_size)

    # 检验输入输出是否有空值
    def if_nan(self, data):
        if data.isnull().any().any():
            empty = data.isnull().any()
            print(empty[empty].index)
            print("Empty data exists")
            sys.exit(0)

    # 后面输出混淆矩阵用的
    def get_class(self):
        file = self.read_file()
        label = file[file.columns[-1]]
        label = list(set(list(label)))
        return file.columns[:-1], label



# 跑reg用的,里面包含了训练,测试,评价等等的代码
class REG_Model:
    def __init__(self, model, config):
        self.model = model
        self.config = config

    def run(self):
        self.train(self.model)

    def train(self, model):
        dev_best_loss = float('inf')
        start_time = time.time()
        # 模型为训练模式
        model.train()
        # 定义优化器
        optimizer = torch.optim.Adam(model.parameters(), lr=self.config.learning_rate)
        acc_list = [[], []]
        loss_list = [[], []]
        # 记录损失不下降的epoch数,到达20之后就直接退出 => 训练无效,再训练下去可能过拟合
        break_epoch = 0
        for epoch in range(self.config.epoch):
            print('Epoch [{}/{}]'.format(epoch + 1, self.config.epoch))
            for index, (trains, labels) in enumerate(self.config.train_loader):
                # 归零
                model.zero_grad()
                # 得到预测结果
                outputs = model(trains)
                # MSE计算要float的类型
                labels = labels.to(torch.float)
                # 计算MSE损失
                loss = torch.nn.MSELoss()(outputs, labels)
                # 反向传播loss
                loss.backward()
                # 优化参数
                optimizer.step()
                # 每100个迭代或者跑完一个epoch后,验证一下
                if (index % 100 == 0 and index != 0) or index == len(self.config.train_loader) - 1:
                    true = labels.data.cpu()
                    # 预测数据
                    predict = outputs.data.cpu()
                    # 计算训练准确度 R2
                    train_acc = r2_score(true, predict)
                    # 计算验证准确度 R2 和 loss
                    dev_acc, dev_loss, dev_mse = self.evaluate(model)
                    # 查看验证loss是不是进步了
                    if dev_loss < dev_best_loss:
                        dev_best_loss = dev_loss
                        improve = '*'
                        break_epoch = 0
                    else:
                        improve = ''
                        break_epoch += 1
                    time_dif = self.get_time_dif(start_time)
                    # 输出阶段性结果
                    msg = 'Iter: {0:>6},  Train Loss: {1:>5.3},  Train R2: {2:>6.3},  Val Loss: {3:>5.3},  Val R2: {4:>6.3},  Val Mse: {5:>6.3},  Time: {6} {7}'
                    print(msg.format(index, loss.item(), train_acc, dev_loss, dev_acc, dev_mse, time_dif, improve))
                    # 为了画图准备的,记录每个epoch的结果
                    if index == len(self.config.train_loader) - 1:
                        acc_list[0].append(train_acc)
                        acc_list[1].append(dev_acc)
                        loss_list[0].append(loss.item())
                        loss_list[1].append(dev_loss)
                    # 验证集评估时模型编程验证模式了,现在变回训练模式
                    model.train()
            # 20个epoch损失不变,直接退出训练
            if break_epoch > 20:
                self.config.epoch = epoch+1
                break
        # 测试
        self.test(model)
        # 画图
        self.draw_curve(acc_list, loss_list, self.config.epoch)

    def test(self, model):
        start_time = time.time()
        # 测试准确度 R2,测试损失,测试MSE
        test_acc, test_loss, mse = self.evaluate(model, test=True)
        msg = 'Test R2: {0:>5.3},  Test loss: {1:>6.3},  Test MSE: {2:>6.3}'
        print(msg.format(test_acc, test_loss, mse))
        time_dif = self.get_time_dif(start_time)
        print("Time usage:", time_dif)

    def evaluate(self, model, test=False):
        # 模型模式变一下
        model.eval()
        loss_total = 0
        predict_all = np.array([], dtype=int)
        labels_all = np.array([], dtype=int)
        # 如果是测试模式(这一段写的不是很好)
        if test:
            with torch.no_grad():
                for (dev, labels) in self.config.test_loader:
                    outputs = model(dev)
                    labels = labels.to(torch.float)
                    loss = torch.nn.MSELoss()(outputs, labels)
                    loss_total += loss
                    labels = labels.data.cpu().numpy()
                    predict = outputs.data.cpu().numpy()
                    labels_all = np.append(labels_all, labels)
                    predict_all = np.append(predict_all, predict)
        # 不是测试模式
        else:
            with torch.no_grad():
                for (dev, labels) in self.config.dev_loader:
                    outputs = model(dev)
                    labels = labels.long()
                    loss = torch.nn.MSELoss()(outputs, labels)
                    loss_total += loss
                    labels = labels.data.cpu().numpy()
                    predict = outputs.data.cpu().numpy()
                    labels_all = np.append(labels_all, labels)
                    predict_all = np.append(predict_all, predict)
        r2 = r2_score(labels_all, predict_all)
        mse = mean_squared_error(labels_all, predict_all)
        if test:
            return r2, loss_total / len(self.config.test_loader), mse
        else:
            return r2, loss_total / len(self.config.dev_loader), mse

    # 算时间损耗
    def get_time_dif(self, start_time):
        end_time = time.time()
        time_dif = end_time - start_time
        return timedelta(seconds=int(round(time_dif)))

    # 画图
    def draw_curve(self, acc_list, loss_list, epochs):
        x = range(0, epochs)
        y1 = loss_list[0]
        y2 = loss_list[1]
        y3 = acc_list[0]
        y4 = acc_list[1]
        plt.figure(figsize=(13, 13))
        plt.subplot(2, 1, 1)
        plt.plot(x, y1, color="blue", label="train_loss", linewidth=2)
        plt.plot(x, y2, color="orange", label="val_loss", linewidth=2)
        plt.title("Loss_curve", fontsize=20)
        plt.xlabel(xlabel="Epochs", fontsize=15)
        plt.ylabel(ylabel="Loss", fontsize=15)
        plt.legend()
        plt.subplot(2, 1, 2)
        plt.plot(x, y3, color="blue", label="train_acc", linewidth=2)
        plt.plot(x, y4, color="orange", label="val_acc", linewidth=2)
        plt.title("Acc_curve", fontsize=20)
        plt.xlabel(xlabel="Epochs", fontsize=15)
        plt.ylabel(ylabel="Accuracy", fontsize=15)
        plt.legend()
        plt.savefig("images/"+self.config.name+"_Loss&acc.png")

RBF_reg.py

import sys
import torch
from torch.nn import Linear, ReLU, ModuleList, Sequential, Dropout, Softmax, Tanh
import torch.nn as nn



class RBF(torch.nn.Module):
    # 默认三层隐藏层,分别有128个 64个 16个神经元
    def __init__(self, input_n, output_n, hidden_size=10, kernel="gaussian"):
        """
        :param input_n: int 输入神经元个数
        :param output_n: int 输出神经元个数
        :param hidden_size: int 隐藏层神经元个数
        :param kernel: string 核函数
        """
        super(RBF, self).__init__()
        self.input_n = input_n
        self.output_n = output_n
        # 输入层 -> RBF层
        self.input_layer = rbf_layer(input_n, hidden_size, kernel)
        # RBF层 -> 输出层
        self.output_layer = nn.Linear(hidden_size, output_n)

    def forward(self, x):
        rbf_ret = self.input_layer(x)
        output = self.output_layer(rbf_ret)
        # 减少多余的一维,方便计算mse,不然warning
        output = output.squeeze(1)
        return output




class rbf_layer(torch.nn.Module):
    def __init__(self, input_n, hidden_size=10, kernel="gaussian"):
        """
        :param input_n: int 输入神经元个数
        :param hidden_size: int 隐藏层神经元个数
        :param kernel: string 核函数
        """
        super(rbf_layer, self).__init__()
        self.input_n = input_n
        self.hidden_size = hidden_size
        self.kernel = kernel

        kernel_existed = {
            'gaussian': gaussian,
             'linear': linear,
             'quadratic': quadratic,
             'inverse quadratic': inverse_quadratic,
             'multiquadric': multiquadric,
             'inverse multiquadric': inverse_multiquadric,
             'spline': spline,
             'poisson one': poisson_one,
             'poisson two': poisson_two,
             'matern32': matern32,
             'matern52': matern52
        }
        if self.kernel not in kernel_existed:
            print("Unknown kernel function, plz check and modify RBF.kernel")
            sys.exit(0)

        self.centres = nn.Parameter(torch.Tensor(hidden_size, input_n))
        self.log_sigmas = nn.Parameter(torch.Tensor(hidden_size))
        # 初始化各项参数(聚类中心)
        self.ini_parameters()

    def ini_parameters(self):
        nn.init.normal_(self.centres, 0, 1)
        nn.init.constant_(self.log_sigmas, 0)

    def forward(self, x):
        # x: [batch_size, input_n]
        size = (x.size(0), self.hidden_size, self.input_n)
        # 调整数据shape
        # point: [batch_size, hidden_size, input_n]
        # center: [batch_size, hidden_size, input_n]
        point = x.unsqueeze(1).expand(size)
        centers = self.centres.unsqueeze(0).expand(size)
        distance_p_c = self.distance(point, centers)
        return distance_p_c

    def distance(self, x1, x2):
        distances = (x1 - x2).pow(2).sum(-1).pow(0.5) / torch.exp(self.log_sigmas).unsqueeze(0)
        return distances



# 可能会用到的几个RBF核函数
def gaussian(alpha):
    phi = torch.exp(-1*alpha.pow(2))
    return phi

def linear(alpha):
    phi = alpha
    return phi

def quadratic(alpha):
    phi = alpha.pow(2)
    return phi

def inverse_quadratic(alpha):
    phi = torch.ones_like(alpha) / (torch.ones_like(alpha) + alpha.pow(2))
    return phi

def multiquadric(alpha):
    phi = (torch.ones_like(alpha) + alpha.pow(2)).pow(0.5)
    return phi

def inverse_multiquadric(alpha):
    phi = torch.ones_like(alpha) / (torch.ones_like(alpha) + alpha.pow(2)).pow(0.5)
    return phi

def spline(alpha):
    phi = (alpha.pow(2) * torch.log(alpha + torch.ones_like(alpha)))
    return phi

def poisson_one(alpha):
    phi = (alpha - torch.ones_like(alpha)) * torch.exp(-alpha)
    return phi

def poisson_two(alpha):
    phi = ((alpha - 2*torch.ones_like(alpha)) / 2*torch.ones_like(alpha)) * alpha * torch.exp(-alpha)
    return phi

def matern32(alpha):
    phi = (torch.ones_like(alpha) + 3**0.5*alpha)*torch.exp(-3**0.5*alpha)
    return phi

def matern52(alpha):
    phi = (torch.ones_like(alpha) + 5**0.5*alpha + (5/3) * alpha.pow(2))*torch.exp(-5**0.5*alpha)
    return phi

3.3 结果

训练ing……

在这里插入图片描述

R2 有 0.712,吊打隔壁全连接 MLP。

在这里插入图片描述

曲线如下(epoch到100的时候应该就差不多了)

在这里插入图片描述

4 代码(可直接食用)

记得一键三连噢~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/456802.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL基础练习——创建数据库、数据表,并进行修改

目录 题目&#xff1a; 创建库和表&#xff1a; 创建库&#xff1a; 创建表&#xff1a; 将 c_contact 字段插入到 c_birth 字段后面&#xff1a; 将 c_name 字段数据类型改为VARCHAR(70)&#xff1a; 将 c_contact 字段改名为 c_phone&#xff1a; 将表名修改为 customer…

魔兽世界az端和TC端有什么区别 Mangos,TC,SW,AZ,AC的关系

魔兽世界az端和TC端有什么区别 Mangos,TC,SW,AZ,AC的关系 大家好我是艾西&#xff0c;魔兽世界现在很多小伙伴对AZ端和TC端不是很能理解什么意思有什么区别&#xff0c;小编查询了大量的资料简单跟大家说一下&#xff0c;今天是艾西故事会大家全当听故事了&#xff01; &#…

learn C++ NO.2 ——认识引用、auto关键字

1.引用 1.1 引用的概念 引用并不是定义一个新的变量&#xff0c;而是给已经存在的变量起的一个别名。从语言的层面上&#xff0c;编译器并不会为了引用而去开辟新的内存空间。引用和被它引用的变量是共用一块内存空间的。举个生活中引用的例子&#xff0c;西游记中&#xff0…

C++入门(上)

C入门 c是对于c语言的补充而发展的一种面向对象的语言&#xff0c;也能兼容c语言的内容&#xff0c;所以c语言的东西可以在cpp文件中写c语言的内容&#xff0c;也是可以运行的&#xff08;可以混写&#xff09; 文章目录 C入门命名空间命名空间的定义命名空间的使用 C的输入…

22、Tweak原理及部分逆向防护

一、Tweak原理 1.1 Tweak产物.dylib 执行make命令时,在 .theos的隐藏目录中,编译出obj/debug目录,包含 arm64、arm64e两种架构,同时生成readbadges.dylib动态库 在arm64、arm64e目录下,有各自架构的readbadges.dylib,而debug目录下的readbadges.dylib,是一个胖二进制文件 fi…

ShareSDK QQ平台注册

注册开发者账号 1.在QQ互联开放平台首页 QQ互联官网首页 &#xff0c;点击右上角的“登录”按钮&#xff0c;使用QQ帐号登录&#xff0c;如下图所示&#xff1a; 重要提示&#xff1a; 开发者QQ号码一旦注册不能变更&#xff0c;建议使用公司公共QQ号码而不是员工私人号码注册…

软件测试好学习吗?

软件测试好不好学习其实各自的认知都不同&#xff0c;想要知道自己能不能学会&#xff0c;对于自己怎么样&#xff0c;最简单的方法就是找个基础教程先去学习一下了~ 其实软件测试这个行业与其他岗位相比&#xff0c;对零基础的学习者更加友好。即使你不懂互联网&#xff0c;不…

小程序过审失败,怎么解决?

小程序过审失败&#xff0c;怎么解决&#xff1f; 如果你的小程序未能通过审核&#xff0c;可以参考以下步骤解决问题&#xff1a; 1、审核不通过原因&#xff1a;在审核失败的通知中会注明不通过的具体原因和相关文件路径。请先认真阅读并理解不通过的原因&#xff0c;找到问…

存储电路:计算机存储芯片的电路结构是怎样的?

我们把用于存储数据的电路叫做存储器&#xff0c;按照到 CPU 距离的远近&#xff0c;存储器主要分为寄存器、缓存和主存。我们就来重点分析这三种存储器的特点、原理&#xff0c;以及应用场景。 存储器是由基本的存储单元组成的&#xff0c;要想搞清楚存储器原理&#xff0c;我…

【C++关联容器】set的成员函数

目录 set 1. 构造、析构和赋值运算符重载 1.1 构造函数 1.2 析构函数 1.3 赋值运算符重载 2. 迭代器 3. 容量 4. 修改器 5. 观察者 6. 操作 7. 分配器 set set是按照特定顺序存储唯一元素的容器。 在一个set中&#xff0c;一个元素的值也是它的标识&#xff08;值…

插装式两位两通电磁阀DSV-080-2NCP、DDSV-080-2NCP

特性 压力4000 PSI(276 Bar) 持续的电磁。 硬化处理的提升阀和柱塞可获得更长的寿命和低泄漏量。 有效的混式电磁铁结构。 插装阀允许交流电压。可选的线圈电压和端子。 标准的滤网低泄漏量选择 手动关闭选择。 工业化通用阀腔。 紧凑的尺寸。 两位两通常闭式双向电磁…

热门好用的企业网盘工具大盘点

企业网盘作为热门的企业文件管理工具相比于个人网盘&#xff0c;更注重安全性&#xff0c;并增加了协同功能。当下市面上的企业网盘工具可谓是百花齐放&#xff0c;今天就盘点几款热门好用的网盘工具&#xff0c;希望能帮助您挑选到心仪的网盘工具~ 1. Zoho Workdrive Zoho Wo…

#PythonPytorch 2.如何对CTG特征数据建模

系列文章目录 #Python&Pytorch 1.如何入门深度学习模型 #Python&Pytorch 2.如何对CTG特征数据建模 我之前也写过一篇使用GBDT对UCI-CTG特征数据进行建模的博客&#xff0c;不过那是挺早的时候写的&#xff0c;只是简单贴了代码&#xff0c;方便了解流程而已&#xff0…

原神3.2剧情服搭建教程

同步官服所有剧情和交互 优化后电脑16G运行内存也可以完美运行 数据库再次启动报错的,把将redis.service中的Type=forking配置删除或者注释掉即可。 位于:/usrb/systemd/system/redis.service 然后重启服务就不会爆错了。 下面是具体步骤 su root (此处会提示输入密…

相机雷达联合标定cam_lidar_calibration

文章目录 运行环境&#xff1a;1.1 ROS环境配置1&#xff09;工作空间创建和编译2&#xff09;官方数据集测试环境 2.1 在线标定1&#xff09;数据类型2&#xff09;标定板制作3&#xff09;配置文件4&#xff09;开始标定5&#xff09;完整实现步骤 3.1 python版本选择3.2 rvi…

医疗保障信息平台HASF应用系统技术架构名词解释技术选型架构图

下载地址&#xff1a; 医疗保障信息平台HASF应用系统技术架构规范.pdf下载—无极低码 HSAF 医疗保障应用框架&#xff08;Healthcare Security Application Framework&#xff09; IaaS 基础设施即服务&#xff08;Infrastructure-as-a-Service&#xff09; PaaS 平台即服务…

实现了单链表各种功能,并配上详细解读。

单链表 链表的概念及结构链表的分类链表的实现初始化打印申请结点头插尾插头删尾删查找在pos位置之后插入在pos位置之前插入删除pos位置之后的值删除pos位置的值销毁 链表的概念及结构 概念&#xff1a;链表是一种物理存储结构上非连续、非顺序的存储结构&#xff0c;数据元素…

离散数学期末复习第一章 数理逻辑

离散数学 离散数学是研究各种各样的离散量的结构及离散量之间的关系一门学科&#xff0c;是计算机科学中基础理论的核心课程。 什么是连续变量&#xff1f; 在一定区间内可以任意取值的变量叫连续变量&#xff0c;其数值是连续不断的&#xff0c;相邻两个数值可作无限分割&a…

buuctf4

目录 [极客大挑战 2019]LoveSQL [极客大挑战 2019]Http [极客大挑战 2019]Knife qr 镜子里面的世界 ningen 小明的保险箱 爱因斯坦 easycap 隐藏的钥匙 另外一个世界 FLAG [极客大挑战 2019]LoveSQL 1.启动环境&#xff0c; 使用万能密码尝试一下 2.跳转到了check.php…

维度云工业品ERP进销存软件教您如何突破工业品生意的困境?

是困境也是机遇 随着全球化和技术进步的不断推进&#xff0c;工业品贸易正逐渐成为国际贸易的重要组成部分。工业品包含了从原材料、零部件到工业设备、机械以及其他工业用品等范畴的产品&#xff0c;涉及各种制造、加工和组装过程。在全球供应链互联互通之下&#xff0c;工业品…