基于变分自动编码器VAE的电池剩余使用寿命RUL估计

news2024/11/19 14:32:19

加载模块

import math
import itertools
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
from keras import layers
from sklearn.svm import SVR
from tensorflow import keras
from keras import backend as K
import matplotlib.pyplot as plt
from keras.regularizers import l2
from keras.regularizers import l1_l2
from keras.models import load_model
from keras.callbacks import ReduceLROnPlateau
from sklearn.model_selection import KFold, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import GroupShuffleSplit
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split, cross_val_score, cross_validate
from keras.callbacks import Callback, EarlyStopping, ModelCheckpoint, TensorBoard, LambdaCallback
from keras.layers import Input, Dense, Lambda, LSTM, RepeatVector, Bidirectional, Masking, Dropout, BatchNormalization

读取数据

# Data reading
data_dir = './original_data/'

# Specify the file path and column names
file_path = os.path.join(data_dir, 'battery_RUL.txt')

# Specify the column names
column_names = [
    'unit_nr',
    'time_cycles',
    's_discharge_t',
    's_decrement_3.6-3.4V',
    's_max_voltage_discharge',
    's_min_voltage_charge',
    'Time_at_4.15V_s',
    's_time_constant_current',
    's_charging_time',
    'RUL'
]

# Read the text file into a DataFrame
df = pd.read_csv(file_path, sep='\s+', header=None, skiprows=0, names=column_names)

# Calculate the unique number of batteries
unique_batteries = df['unit_nr'].nunique()
print("\n Unique number of batteries:", unique_batteries)

# Check for missing values
if df.isnull().any().any():
    print("\n There are missing values in the DataFrame.")

# Print the shape of the DataFrame
print("\n DataFrame shape:", df.shape)

# print df info
print(df.info())

# Print the first few rows of the DataFrame
print("\n First few rows of the DataFrame:")
df.head()

Unique number of batteries: 14

DataFrame shape: (15064, 10)
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15064 entries, 0 to 15063
Data columns (total 10 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 unit_nr 15064 non-null float64
1 time_cycles 15064 non-null float64
2 s_discharge_t 15064 non-null float64
3 s_decrement_3.6-3.4V 15064 non-null float64
4 s_max_voltage_discharge 15064 non-null float64
5 s_min_voltage_charge 15064 non-null float64
6 Time_at_4.15V_s 15064 non-null float64
7 s_time_constant_current 15064 non-null float64
8 s_charging_time 15064 non-null float64
9 RUL 15064 non-null int64
dtypes: float64(9), int64(1)
memory usage: 1.1 MB
None

First few rows of the DataFrame:

图片

数据前处理

# Filter rows where 'time_cycles' is equal to 1
df_at_cycle_1 = df[df['time_cycles'] == 1]

# Create a bar chart with all battery units on the X-axis
plt.figure(figsize=(12, 6))
plt.bar(df_at_cycle_1['unit_nr'], df_at_cycle_1['RUL'])
plt.xlabel('Battery Number')
plt.ylabel('RUL at Cycle 1')
plt.title('RUL Values at the Beginning of Cycle 1 for Each Battery Unit')
plt.xticks(df_at_cycle_1['unit_nr'])  # Set X-axis ticks explicitly
plt.tight_layout()
plt.show()

图片

plt.figure(figsize = (8,8))
sns.heatmap(df.corr(),annot=True, cbar=False, cmap='Blues', fmt='.1f')

图片

Correlation between RUL and:

Max. Voltage Dischar. (V) is 0.8

Min. Voltage Charg. (V) is -0.8

Time at 4.15V (s) is 0.2

Cycle index is -1.0

Discharge Time (s), Decrement 3.6-3.4V (s), Time constant current (s) and Charging time (s) are 0.

And correlation between Time at 4.15V and these four features are 0.8, 0.5,0.6 and 0.7.

df1=df.drop(['s_discharge_t', 's_decrement_3.6-3.4V', 's_time_constant_current','s_charging_time'], axis=1)
plt.figure(figsize = (4,4))
sns.heatmap(df1.corr(),annot=True, cbar=False, cmap='Blues', fmt='.1f')

图片

df1.head()

图片

def exponential_smoothing(df, sensors, n_samples, alpha=0.4):
    df = df.copy()
    # first, take the exponential weighted mean
    df[sensors] = df.groupby('unit_nr', group_keys=True)[sensors].apply(lambda x: x.ewm(alpha=alpha).mean()).reset_index(level=0, drop=True)

    # second, drop first n_samples of each unit_nr to reduce filter delay
    def create_mask(data, samples):
        result = np.ones_like(data)
        result[0:samples] = 0
        return result

    mask = df.groupby('unit_nr')['unit_nr'].transform(create_mask, samples=n_samples).astype(bool)
    df = df[mask]

    return df
#-----------------------------------------------------------------------------------------------------------------------
def data_standardization(df, sensors):
    df = df.copy()
    # Apply StandardScaler to the sensor data
    scaler = StandardScaler()
    df[sensors] = scaler.fit_transform(df[sensors])

    return df

    # MMS_X = MinMaxScaler()
    # mms_y = MinMaxScaler()
#-----------------------------------------------------------------------------------------------------------------------
def gen_train_data(df, sequence_length, columns):
    data = df[columns].values
    num_elements = data.shape[0]

    # -1 and +1 because of Python indexing
    for start, stop in zip(range(0, num_elements-(sequence_length-1)), range(sequence_length, num_elements+1)):
        yield data[start:stop, :]
#-----------------------------------------------------------------------------------------------------------------------
def gen_data_wrapper(df, sequence_length, columns, unit_nrs=np.array([])):
    if unit_nrs.size <= 0:
        unit_nrs = df['unit_nr'].unique()

    data_gen = (list(gen_train_data(df[df['unit_nr']==unit_nr], sequence_length, columns))
               for unit_nr in unit_nrs)
    data_array = np.concatenate(list(data_gen)).astype(np.float32)
    return data_array
#-----------------------------------------------------------------------------------------------------------------------
def gen_labels(df, sequence_length, label):
    data_matrix = df[label].values
    num_elements = data_matrix.shape[0]

    # -1 because I want to predict the rul of that last row in the sequence, not the next row
    return data_matrix[sequence_length-1:num_elements, :]
#-----------------------------------------------------------------------------------------------------------------------
def gen_label_wrapper(df, sequence_length, label, unit_nrs=np.array([])):
    if unit_nrs.size <= 0:
        unit_nrs = df['unit_nr'].unique()

    label_gen = [gen_labels(df[df['unit_nr']==unit_nr], sequence_length, label)
                for unit_nr in unit_nrs]
    label_array = np.concatenate(label_gen).astype(np.float32)
    return label_array
#---------Original code------------------------------------------------------------------------------------------------------
def gen_test_data(df, sequence_length, columns, mask_value):
    if df.shape[0] < sequence_length:
        data_matrix = np.full(shape=(sequence_length, len(columns)), fill_value=mask_value) # pad
        idx = data_matrix.shape[0] - df.shape[0]
        data_matrix[idx:,:] = df[columns].values  # fill with available data
    else:
        data_matrix = df[columns].values

    # specifically yield the last possible sequence
    stop = data_matrix.shape[0]
    start = stop - sequence_length
    for i in list(range(1)):
        yield data_matrix[start:stop, :]
#---------------------------------------------------------------------------------------------------------------------------
def change_test_index(df, initial_unit_number, start_index, end_index, min_rows, max_rows):
    df.reset_index(drop=True, inplace=True)
    y_test = []  # Initialize an empty list to store y_test values
    y_test.append(df.loc[0, 'RUL'])

    while end_index < len(df):
        # Calculate the number of rows to be assigned to the current unit_nr
        num_rows = min_rows + (end_index % (max_rows - min_rows + 1))
        end_index = end_index + num_rows

        # Update the unit_nr for the current block of rows
        df.loc[start_index:end_index, 'unit_nr'] = initial_unit_number

        # Update the "time_cycles" column starting from the first row with "time_cycles" == 1
        time_cycle_to_change = end_index + 1
        df.loc[time_cycle_to_change, 'time_cycles'] = 1

        # Append the values to y_test (replace 'column_name' with the actual column name you want to append)
        y_test.append(df.loc[time_cycle_to_change, 'RUL'])

        # Update the starting and ending index for the next block of rows
        start_index = end_index + 1
        initial_unit_number += 1

    # Drop rows with NaN values at the end of DataFrame 'df'
    df.dropna(axis=0, how='any', inplace=True)

    # Drop any NaN values at the end of list 'y_test'
    while len(y_test) > 0 and pd.isnull(y_test[-1]):
        y_test.pop()

    return df, y_test

获取数据

def get_data(df, sensors, sequence_length, alpha):
    # List of battery units
    battery_units = range(1, 15)
    sensor_names = ['s_{}'.format(i+1) for i in range(0,10)]
    # Define the number of batteries for training and testing
    train_batteries = 12
    test_batteries = 2

    # Extract the batteries for training and testing
    train_units = battery_units[:train_batteries]
    test_units = battery_units[train_batteries:train_batteries + test_batteries]

    # Create the training and testing datasets
    train = df[df['unit_nr'].isin(train_units)].copy()  # Use copy to avoid the SettingWithCopyWarning
    test = df[df['unit_nr'].isin(test_units)].copy()  # Use copy to avoid the SettingWithCopyWarning

    X_test_pre, y_test = change_test_index(test, 13, 0, 0, 230, 240)

    # y_test = pd.Series(y_test)  # Convert y_test list to a pandas Series

    # remove unused sensors
    drop_sensors = [element for element in sensor_names if element not in sensors]

    # Apply standardization to the training and testing data using data_standardization function
    standard_train = data_standardization(train, sensors)
    standard_test = data_standardization(test, sensors)

    # Exponential smoothing of training and testing data
    X_train_pre = exponential_smoothing(standard_train, sensors, 0, alpha)
    X_test_pre = exponential_smoothing(standard_test, sensors, 0, alpha)

    # Train-validation split
    gss = GroupShuffleSplit(n_splits=1, train_size=0.85, random_state=42)
    # Generate the train/val for each sample by iterating over the train and val units
    for train_unit, val_unit in gss.split(X_train_pre['unit_nr'].unique(), groups=X_train_pre['unit_nr'].unique()):
        train_unit = X_train_pre['unit_nr'].unique()[train_unit]  # gss returns indexes and index starts at 1
        val_unit = X_train_pre['unit_nr'].unique()[val_unit]

        x_train = gen_data_wrapper(X_train_pre, sequence_length, sensors, train_unit)
        y_train = gen_label_wrapper(X_train_pre, sequence_length, ['RUL'], train_unit)

        x_val = gen_data_wrapper(X_train_pre, sequence_length, sensors, val_unit)
        y_val = gen_label_wrapper(X_train_pre, sequence_length, ['RUL'], val_unit)

    # create sequences for test
    test_gen = (list(gen_test_data(X_test_pre[X_test_pre['unit_nr']==unit_nr], sequence_length, sensors, -99.))
			  for unit_nr in X_test_pre['unit_nr'].unique())

    x_test = np.concatenate(list(test_gen)).astype(np.float32)

    return x_train, y_train, x_val, y_val, x_test, y_test

训练回调

# --------------------------------------- TRAINING CALLBACKS  ---------------------------------------
class save_latent_space_viz(Callback):
	def __init__(self, model, data, target):
		self.model = model
		self.data = data
		self.target = target

	def on_train_begin(self, logs={}):
		self.best_val_loss = 100000

	def on_epoch_end(self, epoch, logs=None):
		encoder = self.model.layers[0]
		if logs.get('val_loss') < self.best_val_loss:
			self.best_val_loss = logs.get('val_loss')
			viz_latent_space(encoder, self.data, self.target, epoch, True, False)

def get_callbacks(model, data, target):
	model_callbacks = [
		EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=30),
        ModelCheckpoint(filepath='./checkpoints/checkpoint',monitor='val_loss', mode='min', verbose=1, save_best_only=True, save_weights_only=True),
		TensorBoard(log_dir='./logs'),
        save_latent_space_viz(model, data, target)
	]
	return model_callbacks

def viz_latent_space(encoder, data, targets=[], epoch='Final', save=False, show=True):
    z, _, _  = encoder.predict(data)
    plt.figure(figsize=(3, 3))  # Smaller figsize value to reduce the plot size
    if len(targets) > 0:
        plt.scatter(z[:, 1], z[:, 0],  c=targets)
    else:
        plt.scatter(z[:, 1], z[:, 0])
    plt.xlabel('z - dim 1')
    plt.ylabel('z - dim 2')
    plt.colorbar()
    if show:
        plt.show()
    if save:
        plt.savefig('./images/latent_space_epoch' + str(epoch) + '.png')
    return z

最佳学习率

# ----------------------------------------- FIND OPTIMAL LR  ----------------------------------------
class LRFinder:
    """
    Cyclical LR, code tailored from:
    https://towardsdatascience.com/estimating-optimal-learning-rate-for-a-deep-neural-network-ce32f2556ce0
    """

    def __init__(self, model):
        self.model = model
        self.losses = []
        self.lrs = []
        self.best_loss = 1e9

    def on_batch_end(self, batch, logs):
        # Log the learning rate
        lr = K.get_value(self.model.optimizer.lr)
        self.lrs.append(lr)

        # Log the loss
        loss = logs['loss']
        self.losses.append(loss)

        # Check whether the loss got too large or NaN
        if batch > 5 and (math.isnan(loss) or loss > self.best_loss * 4):
            self.model.stop_training = True
            return

        if loss < self.best_loss:
            self.best_loss = loss

        # Increase the learning rate for the next batch
        lr *= self.lr_mult
        K.set_value(self.model.optimizer.lr, lr)

    def find(self, x_train, y_train, start_lr, end_lr, batch_size=64, epochs=1, **kw_fit):
        # If x_train contains data for multiple inputs, use length of the first input.
        # Assumption: the first element in the list is single input; NOT a list of inputs.
        N = x_train[0].shape[0] if isinstance(x_train, list) else x_train.shape[0]

        # Compute number of batches and LR multiplier
        num_batches = epochs * N / batch_size
        self.lr_mult = (float(end_lr) / float(start_lr)) ** (float(1) / float(num_batches))
        # Save weights into a file
        initial_weights = self.model.get_weights()

        # Remember the original learning rate
        original_lr = K.get_value(self.model.optimizer.lr)

        # Set the initial learning rate
        K.set_value(self.model.optimizer.lr, start_lr)

        callback = LambdaCallback(on_batch_end=lambda batch, logs: self.on_batch_end(batch, logs))

        self.model.fit(x_train, y_train,
                       batch_size=batch_size, epochs=epochs,
                       callbacks=[callback],
                       **kw_fit)

        # Restore the weights to the state before model fitting
        self.model.set_weights(initial_weights)

        # Restore the original learning rate
        K.set_value(self.model.optimizer.lr, original_lr)

    def find_generator(self, generator, start_lr, end_lr, epochs=1, steps_per_epoch=None, **kw_fit):
        if steps_per_epoch is None:
            try:
                steps_per_epoch = len(generator)
            except (ValueError, NotImplementedError) as e:
                raise e('`steps_per_epoch=None` is only valid for a'
                        ' generator based on the '
                        '`keras.utils.Sequence`'
                        ' class. Please specify `steps_per_epoch` '
                        'or use the `keras.utils.Sequence` class.')
        self.lr_mult = (float(end_lr) / float(start_lr)) ** (float(1) / float(epochs * steps_per_epoch))

        # Save weights into a file
        initial_weights = self.model.get_weights()

        # Remember the original learning rate
        original_lr = K.get_value(self.model.optimizer.lr)

        # Set the initial learning rate
        K.set_value(self.model.optimizer.lr, start_lr)

        callback = LambdaCallback(on_batch_end=lambda batch,
                                                      logs: self.on_batch_end(batch, logs))

        self.model.fit_generator(generator=generator,
                                 epochs=epochs,
                                 steps_per_epoch=steps_per_epoch,
                                 callbacks=[callback],
                                 **kw_fit)

        # Restore the weights to the state before model fitting
        self.model.set_weights(initial_weights)

        # Restore the original learning rate
        K.set_value(self.model.optimizer.lr, original_lr)

    def plot_loss(self, n_skip_beginning=10, n_skip_end=5, x_scale='log'):
        """
        Plots the loss.
        Parameters:
            n_skip_beginning - number of batches to skip on the left.
            n_skip_end - number of batches to skip on the right.
        """
        plt.ylabel("loss")
        plt.xlabel("learning rate (log scale)")
        plt.plot(self.lrs[n_skip_beginning:-n_skip_end], self.losses[n_skip_beginning:-n_skip_end])
        plt.xscale(x_scale)
        plt.show()

    def plot_loss_change(self, sma=1, n_skip_beginning=10, n_skip_end=5, y_lim=(-0.01, 0.01)):
        """
        Plots rate of change of the loss function.
        Parameters:
            sma - number of batches for simple moving average to smooth out the curve.
            n_skip_beginning - number of batches to skip on the left.
            n_skip_end - number of batches to skip on the right.
            y_lim - limits for the y axis.
        """
        derivatives = self.get_derivatives(sma)[n_skip_beginning:-n_skip_end]
        lrs = self.lrs[n_skip_beginning:-n_skip_end]
        plt.ylabel("rate of loss change")
        plt.xlabel("learning rate (log scale)")
        plt.plot(lrs, derivatives)
        plt.xscale('log')
        plt.ylim(y_lim)
        plt.show()

    def get_derivatives(self, sma):
        assert sma >= 1
        derivatives = [0] * sma
        for i in range(sma, len(self.lrs)):
            derivatives.append((self.losses[i] - self.losses[i - sma]) / sma)
        return derivatives

    def get_best_lr(self, sma, n_skip_beginning=10, n_skip_end=5):
        derivatives = self.get_derivatives(sma)
        best_der_idx = np.argmin(derivatives[n_skip_beginning:-n_skip_end])
        return self.lrs[n_skip_beginning:-n_skip_end][best_der_idx]

结果

# --------------------------------------------- RESULTS  --------------------------------------------
def get_model(path):
    saved_VRAE_model = load_model(path, compile=False)

    # return encoder, regressor
    return saved_VRAE_model.layers[1], saved_VRAE_model.layers[2]

def evaluate(y_true, y_hat, label='test'):
    mse = mean_squared_error(y_true, y_hat)
    rmse = np.sqrt(mse)
    variance = r2_score(y_true, y_hat)
    print('{} set RMSE:{}, R2:{}'.format(label, rmse, variance))
    return rmse, variance

def score(y_true, y_hat):
  res = 0
  for true, hat in zip(y_true, y_hat):
    subs = hat - true
    if subs < 0:
      res = res + np.exp(-subs/10)[0]-1
    else:
      res = res + np.exp(subs/13)[0]-1
  print("score: ", res)

def results(path, x_train, y_train, x_test, y_test):
    # Get model
    encoder, regressor = get_model(path)
    # Latent space
    train_mu = viz_latent_space(encoder, x_train, y_train)
    test_mu = viz_latent_space(encoder, x_test, y_test)
    # Evaluate
    y_hat_train = regressor.predict(train_mu)
    y_hat_test = regressor.predict(test_mu)

    evaluate(y_train, y_hat_train, 'train')
    evaluate(y_test, y_hat_test, 'test')
    score(y_test, y_hat_test)

模型结构

class Sampling(keras.layers.Layer):
  """Uses (z_mean, sigma) to sample z, the vector encoding an engine trajetory."""
  def call(self, inputs):
    mu, sigma = inputs
    batch = tf.shape(mu)[0]
    dim = tf.shape(mu)[1]
    epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
    return mu + tf.exp(0.5 * sigma) * epsilon

class RVE(keras.Model):
    def __init__(self, encoder, regressor, decoder=None, **kwargs):
        super(RVE, self).__init__(**kwargs)
        self.encoder = encoder
        self.regressor = regressor
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
        self.reg_loss_tracker = keras.metrics.Mean(name="reg_loss")
        self.decoder = decoder
        if self.decoder!=None:
          self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")


    @property
    def metrics(self):
      if self.decoder!=None:
        return [
            self.total_loss_tracker,
            self.kl_loss_tracker,
            self.reg_loss_tracker,
            self.reconstruction_loss_tracker
        ]
      else:
        return [
            self.total_loss_tracker,
            self.kl_loss_tracker,
            self.reg_loss_tracker,
        ]

    def train_step(self, data):
        x, target_x = data
        with tf.GradientTape() as tape:
            # kl loss
            mu, sigma, z = self.encoder(x)
            kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            # Regressor
            reg_prediction = self.regressor(z)
            reg_loss = tf.reduce_mean(
                keras.losses.mse(target_x, reg_prediction)
            )
            # Reconstruction
            if self.decoder!=None:
              reconstruction = self.decoder(z)
              reconstruction_loss = tf.reduce_mean(
                  keras.losses.mse(x, reconstruction)
              )
              total_loss = kl_loss + reg_loss + reconstruction_loss
              self.reconstruction_loss_tracker.update_state(reconstruction_loss)
            else:
              total_loss = kl_loss + reg_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        self.reg_loss_tracker.update_state(reg_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
            "reg_loss": self.reg_loss_tracker.result(),
        }


    def test_step(self, data):
        x, target_x = data

        # kl loss
        mu, sigma, z = self.encoder(x)
        kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
        # Regressor
        reg_prediction = self.regressor(z)
        reg_loss = tf.reduce_mean(
            keras.losses.mse(target_x, reg_prediction)
        )
        # Reconstruction
        if self.decoder!=None:
          reconstruction = self.decoder(z)
          reconstruction_loss = tf.reduce_mean(
              keras.losses.mse(x, reconstruction)
          )

          total_loss = kl_loss + reg_loss + reconstruction_loss
        else:
          total_loss = kl_loss + reg_loss

        return {
            "loss": total_loss,
            "kl_loss": kl_loss,
            "reg_loss": reg_loss,
        }

# Set hyperparameters
sequence_length = 200
alpha = 0.2

# Load and preprocess data
df = df  # Load dataset
sensors = ['s_max_voltage_discharge', 's_min_voltage_charge', "Time_at_4.15V_s"] # Define the sensors

# Call get_data_with_kfold to get the necessary data
x_train, y_train, x_val, y_val, x_test, y_test = get_data(df1, sensors, sequence_length, alpha)

# from scipy.signal import savgol_filter
# # Apply Savitzky-Golay filter
# x_val_smoothed = savgol_filter(x_val, window_length=4, polyorder=2, axis=0)

# Setup the network parameters:
timesteps = x_train.shape[1]
input_dim = x_train.shape[2]
intermediate_dim = 32
batch_size = 256
latent_dim = 2
masking_value = -99 # used to mask values in sequences with less than 250 cycles until 250 is reached
kernel_regularizer=l1_l2(l1=0.001, l2=0.001)
dropout_rate = 0.1
# --------------------------------- Encoder --------------------------------------
inputs = Input(shape=(timesteps, input_dim,), name='encoder_input')
mask = Masking(mask_value=masking_value)(inputs)
h = Bidirectional(LSTM(intermediate_dim))(mask) # LSTM encoding

mu = Dense(latent_dim, kernel_regularizer = kernel_regularizer)(h) # VAE Z layer
mu = Dropout(dropout_rate)(mu)

sigma = Dense(latent_dim, kernel_regularizer = kernel_regularizer)(h)
sigma = Dropout(dropout_rate)(sigma)

z = Sampling()([mu, sigma])

# Instantiate the encoder model:
encoder = keras.Model(inputs, [mu, sigma, z], name='encoder')
# ------------------------------- Regressor --------------------------------------
reg_latent_inputs = Input(shape=(latent_dim,), name='z_sampling_reg')

reg_intermediate = Dense(16, activation='tanh', kernel_regularizer = kernel_regularizer)(reg_latent_inputs)
reg_intermediate = BatchNormalization()(reg_intermediate)
reg_intermediate = Dropout(dropout_rate)(reg_intermediate)

reg_outputs = Dense(1, name='reg_output', kernel_regularizer = kernel_regularizer)(reg_intermediate)
reg_outputs = Dropout(dropout_rate)(reg_outputs)

# Instantiate the classifier model:
regressor = keras.Model(reg_latent_inputs, reg_outputs, name='regressor')

print("Shape of x_train:", x_train.shape)
print("Shape of y_train:", y_train.shape)
print("Shape of x_val:", x_val.shape)
print("Shape of y_val:", y_val.shape)
print("Shape of x_test:", x_test.shape)
print("Shape of y_test:", len(y_test))

Shape of x_train: (8795, 200, 3)
Shape of y_train: (8795, 1)
Shape of x_val: (1758, 200, 3)
Shape of y_val: (1758, 1)
Shape of x_test: (10, 200, 3)
Shape of y_test: 10

rve = RVE(encoder, regressor)
lr_finder = LRFinder(rve)

rve.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0000001))

# with learning rate growing exponentially from 0.0000001 to 1
lr_finder.find(x_train, y_train, start_lr=0.0000001, end_lr=1, batch_size=batch_size, epochs=10)

# Plot the loss
lr_finder.plot_loss()

图片

图片

开始训练

# Instantiate the RVE model
rve = RVE(encoder, regressor)

#Compile the RVE model with the Adam optimizer
rve.compile(optimizer=keras.optimizers.Adam(0.01))

# Define the early stopping callback
early_stopping = EarlyStopping(monitor='loss', min_delta=3, patience=5,  verbose=1, mode='min', restore_best_weights=True)

# Call get_data_with_kfold to get the necessary data
x_train, y_train, x_val, y_val, x_test, y_test = get_data(df1, sensors, sequence_length, alpha)

# Fit the RVE model with the callbacks
rve.fit(x_train, y_train, epochs=500, batch_size=batch_size, validation_data=(x_val, y_val), callbacks=[early_stopping])

RUL估计

train_mu = viz_latent_space(rve.encoder, np.concatenate((x_train, x_val)), np.concatenate((y_train, y_val)))
test_mu = viz_latent_space(rve.encoder, x_test, y_test)

# Evaluate
y_hat_train = rve.regressor.predict(train_mu)
y_hat_test = rve.regressor.predict(test_mu)

evaluate(np.concatenate((y_train, y_val)), y_hat_train, 'train')
evaluate(y_test, y_hat_test, 'test')

图片

图片

330/330 [==============================] - 0s 1ms/step 1/1 [==============================] - 0s 20ms/step train set RMSE:26.346721649169922, R2:0.9899616368349312 test set RMSE:246.51723899515346, R2:0.5192274671464132


(246.51723899515346, 0.5192274671464132)

工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1811567.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity | Shader基础知识(番外:了解内置Shader-Standard-含specular模式<二>)

目录 前言 一、Standard参数详解 1.NormalMap法线贴图 2.HeightMap高度贴图 3.Occlusion遮挡贴图 4.DetailMask细节遮挡 5.Emission自发光 6.Tiling铺地砖和Offset偏移度 二、Standard-Specular setup模式 三、作者的碎碎念 前言 Unity | Shader基础知识(番外&#xf…

【ATU Book-i.MX8系列 - TFLite 进阶】 NXP i.MX8M Plus 实现高效 Mobilenet SSD 物体检测

NXP i.MX8M Plus 实现高效 Mobilenet SSD 物体检测 一、概述 在 边缘运算(Edge Computing) 领域中&#xff0c;轻量级的模型扮演着举足轻重的角色。因此&#xff0c;如何在有限硬体资源下实现电脑视觉&#xff08;Computer vision&#xff09; 应用是个极具挑战性的课题。特别…

Flow Launcher:Windows高效启动与搜索工具

目录 一、软件简介 二、软件安装 三、软件功能介绍 3.1 快速启动应用 3.2 文件快速搜索 3.3 多功能操作中心 3.4 支持插件扩展 一、软件简介 Flow Launcher 是基于C#编程语言开发一款专为Windows设计的高效启动与搜索工具&#xff0c;它以创新简洁的界面重新定义了用户…

基于SWIFT和Qwen1.5-14B-Chat进行大模型LoRA微调测试

基于SWIFT和Qwen1.5-14B-Chat进行大模型LoRA微调测试 环境准备 基础环境 操作系统&#xff1a;Ubuntu 18.04.5 LTS (GNU/Linux 3.10.0-1127.el7.x86_64 x86_64)Anaconda3&#xff1a;Anaconda3-2023.03-1-Linux-x86_64根据服务器网络情况配置好conda源和pip源&#xff0c;此…

揭秘!义乌理阳是否涉足海外拼多多选品师项目?

在全球化的今天&#xff0c;跨境电商已成为一种趋势&#xff0c;越来越多的企业开始关注并投入这一领域。而拼多多作为国内知名的电商平台&#xff0c;其海外业务也在迅速扩展。那么&#xff0c;义乌理阳信息咨询服务有限公司是否有海外拼多多选品师的项目呢?下面我们将对此进…

JavaSE中的IO(输入/输出)字节流字符流

JavaSE中的IO&#xff08;输入/输出&#xff09;知识是一个广泛的领域&#xff0c;它涵盖了如何在Java程序中进行数据的读取和写入。以下是对JavaSE中IO知识的一个清晰归纳&#xff1a; 一、基础知识 流&#xff08;Stream&#xff09;的概念 流是一组有顺序的、有起点和终点…

大模型应用之基于 Langchain 的测试用例生成

一 用例生成实践效果 在组内的日常工作安排中&#xff0c;持续优化测试技术、提高测试效率始终是重点任务。近期&#xff0c;我们在探索实践使用大模型生成测试用例&#xff0c;期望能够借助其强大的自然语言处理能力&#xff0c;自动化地生成更全面和高质量的测试用例。 当前…

【通过新能源汽车的智慧数字底盘技术看计算机的相关技术堆栈?以后是软硬结合的全能程序员的天下,取代全栈(前后端都会的全栈程序员)】

汽车的“智慧数字底盘”是一个综合性的技术平台&#xff0c;旨在提升车辆的性能、安全性和驾驶体验。它集成了多种先进的技术和系统&#xff0c;是全能程序员的必杀技&#xff01; 1. 传感器技术 a. 激光雷达&#xff08;LiDAR&#xff09; 用于生成高分辨率的3D地图&#…

PG sql调优案例学习

一&#xff0c;开发范式 1.不要轻易把字段嵌入到表达式 例&#xff1a;在sal列上有索引,但是条件语句中把sal列放在了表达式当中,导致索引被压抑,因为索引里面储存的是sal列的值,而不是sal加上100以后的值。 在条件中查询谁的工资1002000。这样写即使在sal上有索引也会走全表…

【倪诗韵神品连珠琴】音质纯净共鸣好,漆髹水墨黑云纹,讲究

【倪诗韵神品连珠琴】音质纯净共鸣好&#xff0c;漆髹水墨黑云纹&#xff0c;用料讲究。 此琴音质纯净共鸣好&#xff0c;非常清透&#xff0c;适合清风清新俊逸之流&#xff0c;琴体造型秀气&#xff0c;漆髹水墨黑云纹&#xff0c;用料讲究&#xff0c;木材纹理竖直而无疤。琴…

【制作100个unity游戏之27】使用unity复刻经典游戏《植物大战僵尸》,制作属于自己的植物大战僵尸随机版和杂交版7(附带项目源码)

最终效果 系列导航 文章目录 最终效果系列导航前言绘制进度条UI控制关卡进度测试按配置表使用关卡进度变化源码结束语 前言 本节主要实现关卡进度条的功能 绘制进度条UI 控制关卡进度测试 新增ProgressPanel代码&#xff0c;控制关卡进度 public class ProgressPanel : Mon…

kettle学习(利用jsonPath定位,json文件转换)

kettle学习&#xff08;利用jsonPath定位&#xff0c;json文件转换&#xff09; 于数据处理的广袤天地间&#xff0c;我们时常需应对各类繁杂状况与各式格式。Kettle 作为极具威力的数据集成利器&#xff0c;赋予了我们诸多功能与无限可能此次博客里&#xff0c;我们将重点投向…

一站到底-Vue移动端从零到一构建高效应用

​&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;vue篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来vue篇专栏内容:一文精通Vue移动端&#xff1a;从零到一构建高效应用 目录 1、项目创建 2、引入组件库 二、功能实…

上位机图像处理和嵌入式模块部署(f407 mcu内部flash编程)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 对于f407这样的mcu来说&#xff0c;有的时候我们需要对mcu内部的flash进行编程处理。有两种情况需要对flash进行编程&#xff0c;一种情况是可能一…

深度学习500问——Chapter10:迁移学习(2)

文章目录 11.2 迁移学习的基本思路有哪些 11.2.1 基于样本迁移 11.2.2 基于特征迁移 11.2.3 基于模型迁移 11.2.4 基于关系迁移 11.2 迁移学习的基本思路有哪些 迁移学习的基本方法可以分为四种。这四种基本方法分别是&#xff1a;基于样本的迁移&#xff0c;基于模型的迁移&a…

电子元器件批发的几种模式

电子元器件的批发模式多种多样&#xff0c;以下是几种常见的模式&#xff1a; 传统批发模式&#xff1a;传统的电子元器件批发模式是指厂商或代理商通过与制造商签订合同&#xff0c;大批量购买元器件&#xff0c;并将其以较低的价格批发给零售商或其他中小型企业。这种模式通常…

树-二叉树的最大路径和

一、问题描述 二、解题思路 因为各个节点的值可能为负数&#xff0c;初始化res(最大路径和)的值为最小整数&#xff1a;Integer.MIN_VALUE 我们这里使用深度遍历&#xff08;递归&#xff09;的方法&#xff0c;先看某一个子树的情况&#xff1a; 这里有一个技巧&#xff0c;…

【C#】pdf按页分割文件,以及分页合并,效果还不错,你值得拥有

欢迎来到《小5讲堂》 这是《C#》系列文章&#xff0c;每篇文章将以博主理解的角度展开讲解。 温馨提示&#xff1a;博主能力有限&#xff0c;理解水平有限&#xff0c;若有不对之处望指正&#xff01; 目录 背景效果单页分割文件合并多页分割插件说明相关文章 背景 最近遇到一…

STM32Cube系列教程11:STM32 AES加解密模块性能测试

文章目录 本次测试环境本次测试所使用的系统时钟为48MHz本次测试主要测试对大量数据进行加解密所需的时间&#xff0c;本次为不严谨测试&#xff0c;忽略了程序调用耗时&#xff0c;结果仅供参考。 AES算法与数据加解密加密与解密对称加解密AES算法AES-ECBAES-CBC 填充算法PKCS…

ITIL简介重要组成部分

ITIL简介&重要组成部分 ITIL&#xff08;Information Technology Infrastructure Library&#xff09;是一个IT服务管理的最佳实践框架&#xff0c;通过服务战略、服务设计、服务过渡、服务运营和持续服务改进五个核心模块&#xff0c;帮助组织优化IT服务流程&#xff0c;…