MindSpore 时序预测与 LSTM 实战

举报
whitea133 发表于 2026/05/23 15:03:19 2026/05/23
【摘要】 MindSpore 时序预测与 LSTM 实战 一、引言时序预测是机器学习和深度学习领域中最具挑战性也最具应用价值的任务之一。从股票价格预测、天气预测、能源消耗预测到疾病传播建模,时序数据无处不在,对时序预测模型的需求也日益增长。长短期记忆网络(LSTM)作为循环神经网络(RNN)的重要变体,专门设计用于解决长期依赖问题,在时序预测任务中展现出卓越的性能。MindSpore是华为自主研发的...

MindSpore 时序预测与 LSTM 实战

一、引言

时序预测是机器学习和深度学习领域中最具挑战性也最具应用价值的任务之一。从股票价格预测、天气预测、能源消耗预测到疾病传播建模,时序数据无处不在,对时序预测模型的需求也日益增长。长短期记忆网络(LSTM)作为循环神经网络(RNN)的重要变体,专门设计用于解决长期依赖问题,在时序预测任务中展现出卓越的性能。

MindSpore是华为自主研发的全场景AI计算框架,提供了强大的时序数据处理能力和灵活的LSTM实现。本文将通过完整的实战项目,带领读者掌握基于MindSpore的时序预测技术,从理论原理到代码实现,从数据预处理到模型调优,提供一套完整的解决方案。无论您是时序预测领域的新人还是希望深入了解LSTM的高级开发者,都能从中获得有价值的知识和实践经验。

二、时序预测基础理论

2.1 时序数据的特征与挑战

时序数据与传统静态数据有着本质的区别,它具有以下几个显著特征:

时间依赖性是时序数据最核心的特征。数据点之间存在明确的时间顺序关系,当前时刻的值往往与过去若干时刻的值存在相关性。这种依赖关系可能是短期的(如股票日内波动),也可能是长期的(如季节性变化对销售数据的影响)。传统的机器学习算法通常假设样本之间相互独立,难以捕捉这种时间依赖性。

趋势与季节性是时序数据的另一重要特征。趋势反映了数据的长期变化方向,可能是上升、下降或保持平稳;季节性则体现了数据在固定时间周期内的周期性波动,如电商销售额在双十一期间的突增、交通流量在早晚高峰期的增加。时序预测模型需要能够同时识别和分离这些不同的成分。

非平稳性给时序预测带来了额外的挑战。很多时序数据的统计特性(如均值、方差)会随时间发生变化,这种现象被称为非平稳性。处理非平稳数据通常需要进行差分运算或使用能够适应数据分布变化的模型。

噪声与异常值在实际时序数据中普遍存在。传感器故障、数据传输错误、突发事件等都可能导致数据中出现噪声或异常值,这些异常数据点会严重影响预测的准确性。

2.2 LSTM 原理详解

长短期记忆网络(Long Short-Term Memory,LSTM)由Hochreiter和Schmidhuber于1997年提出,是解决标准RNN梯度消失和梯度爆炸问题的里程碑式创新。LSTM的核心设计引入了门控机制,通过精心设计的门结构来控制信息的流动。

**遗忘门(Forget Gate)**决定从细胞状态中丢弃哪些信息。遗忘门的输入是当前时刻的输入x_t和上一时刻的隐藏状态h_{t-1},输出是一个介于0和1之间的向量,0表示完全丢弃,1表示完全保留。遗忘门的数学表达式为:

f_t = σ(W_f · [h_{t-1}, x_t] + b_f)

其中σ表示Sigmoid激活函数,W_f和b_f是可学习的权重和偏置。

**输入门(Input Gate)**负责更新细胞状态。输入门由两部分组成:一部分决定哪些值需要更新(i_t),另一部分创建新的候选值(\tilde{C}_t)。数学表达式为:

i_t = σ(W_i · [h_{t-1}, x_t] + b_i)
\tilde{C}_t = tanh(W_C · [h_{t-1}, x_t] + b_C)

细胞状态更新是LSTM的核心操作。新的细胞状态C_t通过结合遗忘门和输入门的输出来计算:

C_t = f_t * C_{t-1} + i_t * \tilde{C}_t

这种设计使得LSTM能够选择性地保留或遗忘历史信息,从而有效处理长期依赖。

**输出门(Output Gate)**决定当前时刻的隐藏状态。隐藏状态不仅传递给下一个时刻,还会被用于最终的预测:

o_t = σ(W_o · [h_{t-1}, x_t] + b_o)
h_t = o_t * tanh(C_t)

2.3 时序预测的评价指标

评估时序预测模型的性能需要使用专门的评价指标。常用的指标包括:

**均方误差(MSE)**计算预测值与真实值之间差异的平方的平均值,对大误差惩罚更重:

MSE = (1/n) * Σ(y_i - ŷ_i)²

**平均绝对误差(MAE)**计算预测值与真实值之间差异的绝对值的平均值,对所有误差一视同仁:

MAE = (1/n) * Σ|y_i - ŷ_i|

**平均绝对百分比误差(MAPE)**以百分比形式表示预测误差,便于跨数据集比较:

MAPE = (100/n) * Σ|y_i - ŷ_i| / |y_i|

**对称平均绝对百分比误差(SMAPE)**是MAPE的改进版本,解决了MAPE在真实值接近零时的不稳定性:

SMAPE = (200/n) * Σ|y_i - ŷ_i| / (|y_i| + |ŷ_i|)

**R平方分数(R²)**衡量模型对数据变异性的解释程度,取值范围通常为负无穷到1,越接近1表示模型拟合越好:

R² = 1 - Σ(y_i - ŷ_i)² / Σ(y_i - ȳ)²

三、环境配置与数据准备

3.1 MindSpore 环境配置

首先确保正确安装MindSpore及其依赖:

# 安装MindSpore CPU版本
pip install mindspore

# 验证安装
python -c "import mindspore as ms; print(f'MindSpore版本: {ms.__version__}')"

3.2 数据集介绍

本文使用经典的航空乘客数据集(Airline Passengers Dataset)作为示例,该数据集记录了1949年至1960年每月国际航空乘客数量,共144个数据点。这个数据集具有明显的趋势和季节性特征,非常适合作为时序预测的入门案例。

3.3 数据预处理

import mindspore as ms
import mindspore.nn as nn
from mindspore import Tensor, dataset as ds
import mindspore.ops as ops
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
import os

# 设置MindSpore运行模式
ms.set_context(mode=ms.GRAPH_MODE, device_target="CPU")

class TimeSeriesDataProcessor:
    """时序数据预处理器"""

    def __init__(self, sequence_length: int = 12):
        """
        初始化预处理器

        Args:
            sequence_length: 用于预测的历史序列长度
        """
        self.sequence_length = sequence_length
        self.scaler = MinMaxScaler(feature_range=(0, 1))
        self.data_min = None
        self.data_max = None

    def load_data(self, file_path: str = None) -> np.ndarray:
        """加载时序数据"""
        if file_path and os.path.exists(file_path):
            # 从文件加载
            df = pd.read_csv(file_path)
            data = df['value'].values.astype(np.float32)
        else:
            # 使用内置航空乘客数据
            data = self._get_airline_passengers()
        return data

    def _get_airline_passengers(self) -> np.ndarray:
        """航空乘客数据集(月度国际航空乘客数量,1949-1960)"""
        passengers = np.array([
            112, 118, 132, 129, 121, 135, 148, 148, 136, 119, 104, 118,
            115, 126, 141, 135, 125, 149, 170, 170, 158, 133, 114, 140,
            145, 150, 178, 163, 172, 178, 199, 199, 184, 162, 146, 166,
            171, 180, 193, 181, 183, 218, 230, 242, 209, 191, 172, 194,
            196, 196, 236, 235, 229, 243, 264, 272, 237, 211, 180, 201,
            204, 188, 235, 227, 234, 264, 302, 293, 259, 229, 203, 229,
            242, 233, 267, 269, 270, 315, 364, 347, 312, 274, 237, 278,
            284, 277, 317, 313, 318, 374, 413, 405, 355, 306, 271, 306,
            315, 301, 356, 348, 355, 422, 465, 467, 404, 347, 305, 336,
            340, 318, 362, 348, 363, 435, 491, 505, 404, 359, 310, 337,
            360, 342, 406, 396, 420, 472, 548, 559, 463, 407, 362, 405,
            417, 391, 419, 461, 472, 535, 622, 606, 508, 461, 390, 432
        ], dtype=np.float32)
        return passengers

    def normalize(self, data: np.ndarray) -> np.ndarray:
        """归一化数据到[0, 1]区间"""
        self.data_min = data.min()
        self.data_max = data.max()
        return self.scaler.fit_transform(data.reshape(-1, 1)).flatten()

    def create_sequences(self, data: np.ndarray) -> tuple:
        """
        创建监督学习所需的序列样本

        Args:
            data: 归一化后的时序数据

        Returns:
            X: 输入序列 (samples, sequence_length)
            y: 目标值 (samples,)
        """
        X, y = [], []
        for i in range(len(data) - self.sequence_length):
            X.append(data[i:i + self.sequence_length])
            y.append(data[i + self.sequence_length])
        return np.array(X), np.array(y)

    def inverse_transform(self, data: np.ndarray) -> np.ndarray:
        """反归一化,恢复原始尺度"""
        return self.scaler.inverse_transform(data.reshape(-1, 1)).flatten()

    def train_test_split(self, X: np.ndarray, y: np.ndarray, 
                        test_ratio: float = 0.2) -> tuple:
        """
        划分训练集和测试集

        Args:
            X: 输入特征
            y: 目标值
            test_ratio: 测试集比例

        Returns:
            训练集和测试集的X、y
        """
        split_idx = int(len(X) * (1 - test_ratio))
        X_train, X_test = X[:split_idx], X[split_idx:]
        y_train, y_test = y[:split_idx], y[split_idx:]
        return X_train, X_test, y_train, y_test


# 数据预处理演示
print("=" * 60)
print("时序数据预处理演示")
print("=" * 60)

processor = TimeSeriesDataProcessor(sequence_length=12)
raw_data = processor.load_data()
print(f"数据集大小: {len(raw_data)} 个数据点")
print(f"数据范围: {raw_data.min():.0f} - {raw_data.max():.0f} (千人次)")

normalized_data = processor.normalize(raw_data)
X, y = processor.create_sequences(normalized_data)
X_train, X_test, y_train, y_test = processor.train_test_split(X, y, test_ratio=0.2)

print(f"序列长度: {processor.sequence_length}")
print(f"样本数量: {len(X)}")
print(f"训练集: {len(X_train)} 样本")
print(f"测试集: {len(X_test)} 样本")

# 保存预处理后的数据信息
data_info = {
    'sequence_length': processor.sequence_length,
    'data_min': float(processor.data_min),
    'data_max': float(processor.data_max),
    'train_size': len(X_train),
    'test_size': len(X_test)
}
print(f"\n数据预处理完成!")
print(f"归一化参数: min={data_info['data_min']:.2f}, max={data_info['data_max']:.2f}")

四、LSTM 模型构建

4.1 模型架构设计

LSTM模型的架构设计直接影响预测性能。一个典型的时序预测LSTM模型通常包含以下组件:

输入层接收历史序列数据,需要指定序列长度和特征维度。

LSTM层是模型的核心,负责提取时序特征。可以使用单层或多层LSTM,多层LSTM能够学习更复杂的时序模式。

全连接层将LSTM的输出转换为最终的预测值。对于回归任务,通常使用单个神经元并使用线性激活函数。

class LSTMPredictor(nn.Cell):
    """基于LSTM的时序预测模型"""

    def __init__(self, input_size: int = 1, hidden_size: int = 64, 
                 num_layers: int = 2, output_size: int = 1, dropout: float = 0.2):
        """
        初始化LSTM预测模型

        Args:
            input_size: 输入特征维度(时序数据通常为1)
            hidden_size: LSTM隐藏层神经元数量
            num_layers: LSTM层数
            output_size: 输出维度(预测步长)
            dropout: Dropout比例,防止过拟合
        """
        super(LSTMPredictor, self).__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # LSTM层
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout,
            batch_first=True
        )

        # 全连接输出层
        self.fc = nn.Dense(hidden_size, output_size)

        # Dropout层
        self.dropout = nn.Dropout(p=dropout)

    def construct(self, x):
        """
        前向传播

        Args:
            x: 输入张量,形状为 (batch_size, sequence_length, input_size)

        Returns:
            预测值,形状为 (batch_size, output_size)
        """
        # LSTM层
        # out: (batch_size, sequence_length, hidden_size)
        # hidden: (num_layers, batch_size, hidden_size)
        out, (hidden, cell) = self.lstm(x)

        # 取最后一个时刻的输出
        out = out[:, -1, :]

        # Dropout
        out = self.dropout(out)

        # 全连接层
        out = self.fc(out)

        return out


class BiLSTMPredictor(nn.Cell):
    """双向LSTM时序预测模型"""

    def __init__(self, input_size: int = 1, hidden_size: int = 64, 
                 num_layers: int = 2, output_size: int = 1, dropout: float = 0.2):
        super(BiLSTMPredictor, self).__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # 双向LSTM层
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout,
            batch_first=True,
            bidirectional=True
        )

        # 由于使用双向LSTM,隐藏层大小翻倍
        self.fc = nn.Dense(hidden_size * 2, output_size)
        self.dropout = nn.Dropout(p=dropout)

    def construct(self, x):
        out, _ = self.lstm(x)
        # 取最后一个时刻的输出
        out = out[:, -1, :]
        out = self.dropout(out)
        out = self.fc(out)
        return out


class AttentionLSTMPredictor(nn.Cell):
    """带注意力机制的LSTM时序预测模型"""

    def __init__(self, input_size: int = 1, hidden_size: int = 64, 
                 num_layers: int = 2, output_size: int = 1, dropout: float = 0.2):
        super(AttentionLSTMPredictor, self).__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # LSTM层
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout,
            batch_first=True
        )

        # 注意力机制
        self.attention_weights = nn.Dense(hidden_size, 1)

        # 全连接层
        self.fc = nn.Dense(hidden_size, output_size)
        self.dropout = nn.Dropout(p=dropout)

    def construct(self, x):
        # LSTM输出: (batch_size, seq_len, hidden_size)
        lstm_out, _ = self.lstm(x)

        # 计算注意力权重
        attention_scores = self.attention_weights(lstm_out)  # (batch_size, seq_len, 1)
        attention_scores = ops.Softmax(axis=1)(attention_scores)

        # 加权求和
        context = ops.ReduceSum()(lstm_out * attention_scores, 1)  # (batch_size, hidden_size)

        # 输出层
        out = self.dropout(context)
        out = self.fc(out)

        return out


# 模型实例化测试
print("\n" + "=" * 60)
print("LSTM模型构建演示")
print("=" * 60)

# 创建模型实例
model = LSTMPredictor(
    input_size=1,
    hidden_size=64,
    num_layers=2,
    output_size=1,
    dropout=0.2
)

# 测试模型前向传播
test_input = Tensor(np.random.randn(32, 12, 1).astype(np.float32))
test_output = model(test_input)
print(f"模型结构: LSTM(input=1, hidden=64, layers=2)")
print(f"输入形状: {test_input.shape}")
print(f"输出形状: {test_output.shape}")

# 统计模型参数数量
total_params = sum(p.shape[0] * p.shape[1] if len(p.shape) > 1 else p.shape[0] 
                   for p in model.get_parameters())
print(f"模型参数总量: {total_params:,}")

4.2 模型训练流程

class Trainer:
    """LSTM模型训练器"""

    def __init__(self, model: nn.Cell, learning_rate: float = 0.001):
        """
        初始化训练器

        Args:
            model: 待训练的模型
            learning_rate: 学习率
        """
        self.model = model
        self.lr = learning_rate

        # 定义优化器
        self.optimizer = nn.Adam(model.trainable_params(), learning_rate=self.lr)

        # 定义损失函数
        self.loss_fn = nn.MSELoss()

        # 训练过程
        self.train_net = nn.TrainOneStepCell(self.model, self.optimizer)

    def train_epoch(self, X_train: np.ndarray, y_train: np.ndarray, 
                   batch_size: int = 32) -> float:
        """
        训练一个epoch

        Args:
            X_train: 训练输入
            y_train: 训练目标
            batch_size: 批大小

        Returns:
            平均损失值
        """
        self.model.set_train()
        dataset = ds.NumpySlicesDataset({"x": X_train, "y": y_train}, shuffle=True)
        dataset = dataset.batch(batch_size)

        total_loss = 0.0
        steps = 0

        for data in dataset.create_dict_iterator():
            x = Tensor(data["x"])
            y = Tensor(data["y"])

            loss = self.train_net(x, y)
            total_loss += loss.asnumpy()
            steps += 1

        return total_loss / steps if steps > 0 else 0.0

    def evaluate(self, X_test: np.ndarray, y_test: np.ndarray) -> dict:
        """
        评估模型性能

        Args:
            X_test: 测试输入
            y_test: 测试目标

        Returns:
            评估指标字典
        """
        self.model.set_train(False)

        X = Tensor(X_test.astype(np.float32))
        y = Tensor(y_test.astype(np.float32))

        predictions = self.model(X)

        mse = float((predictions.asnumpy() - y.asnumpy()) ** 2).mean()
        mae = float(np.abs(predictions.asnumpy() - y.asnumpy())).mean()

        return {
            'MSE': mse,
            'RMSE': np.sqrt(mse),
            'MAE': mae
        }

    def predict(self, X: np.ndarray) -> np.ndarray:
        """
        使用模型进行预测

        Args:
            X: 输入数据

        Returns:
            预测结果
        """
        self.model.set_train(False)
        X_tensor = Tensor(X.astype(np.float32))
        predictions = self.model(X_tensor)
        return predictions.asnumpy()


def train_lstm_model(X_train, y_train, X_test, y_test, 
                    epochs: int = 100, batch_size: int = 32,
                    hidden_size: int = 64, num_layers: int = 2,
                    learning_rate: float = 0.001) -> tuple:
    """
    完整训练流程

    Args:
        X_train: 训练输入
        y_train: 训练目标
        X_test: 测试输入
        y_test: 测试目标
        epochs: 训练轮数
        batch_size: 批大小
        hidden_size: LSTM隐藏层大小
        num_layers: LSTM层数
        learning_rate: 学习率

    Returns:
        训练好的模型和训练历史
    """
    print("\n" + "=" * 60)
    print("开始训练LSTM模型")
    print("=" * 60)
    print(f"训练参数:")
    print(f"  - 序列长度: {X_train.shape[1]}")
    print(f"  - 隐藏层大小: {hidden_size}")
    print(f"  - LSTM层数: {num_layers}")
    print(f"  - 批大小: {batch_size}")
    print(f"  - 学习率: {learning_rate}")
    print(f"  - 训练轮数: {epochs}")
    print("-" * 60)

    # 创建模型
    model = LSTMPredictor(
        input_size=1,
        hidden_size=hidden_size,
        num_layers=num_layers,
        output_size=1,
        dropout=0.2
    )

    # 创建训练器
    trainer = Trainer(model, learning_rate=learning_rate)

    # 训练循环
    history = {'train_loss': [], 'val_loss': []}
    best_loss = float('inf')

    for epoch in range(epochs):
        # 训练
        train_loss = trainer.train_epoch(X_train, y_train, batch_size)

        # 评估
        metrics = trainer.evaluate(X_test, y_test)

        history['train_loss'].append(train_loss)
        history['val_loss'].append(metrics['MSE'])

        # 打印训练进度
        if (epoch + 1) % 10 == 0 or epoch == 0:
            print(f"Epoch {epoch+1:3d}/{epochs} | "
                  f"Train Loss: {train_loss:.6f} | "
                  f"Val MSE: {metrics['MSE']:.6f} | "
                  f"Val RMSE: {metrics['RMSE']:.4f}")

        # 保存最佳模型
        if train_loss < best_loss:
            best_loss = train_loss

    print("-" * 60)
    print(f"训练完成!最佳训练损失: {best_loss:.6f}")

    return model, history, trainer


# 执行训练
print("\n" + "=" * 60)
print("航空乘客数量预测实战")
print("=" * 60)

# 准备数据
processor = TimeSeriesDataProcessor(sequence_length=12)
raw_data = processor.load_data()
normalized_data = processor.normalize(raw_data)
X, y = processor.create_sequences(normalized_data)
X_train, X_test, y_train, y_test = processor.train_test_split(X, y, test_ratio=0.2)

# 训练模型
model, history, trainer = train_lstm_model(
    X_train, y_train, X_test, y_test,
    epochs=100,
    batch_size=32,
    hidden_size=64,
    num_layers=2,
    learning_rate=0.001
)

五、多步预测与滚动预测

5.1 单步预测与多步预测

时序预测可以分为单步预测和多步预测。单步预测只预测下一个时间点的值,而多步预测需要预测未来多个时间点的值。两种任务在技术上有所不同:

单步预测相对简单,输入序列直接映射到单一输出值。模型在训练时使用滑动窗口策略,每次只用固定长度的历史数据预测下一个时间点。

多步预测更具挑战性,有两种主要方法:

  1. 直接多步预测:训练多个模型,每个模型负责预测特定的未来时间点
  2. 滚动多步预测:使用已经预测的值作为输入,递归预测未来的时间点
class MultiStepPredictor:
    """多步时序预测器"""

    def __init__(self, model: nn.Cell, scaler):
        """
        初始化多步预测器

        Args:
            model: 训练好的LSTM模型
            scaler: 数据归一化器
        """
        self.model = model
        self.scaler = scaler

    def direct_predict(self, last_sequence: np.ndarray, 
                      steps: int = 6) -> np.ndarray:
        """
        直接多步预测(需要多个模型)
        这里演示单模型滚动预测

        Args:
            last_sequence: 最后的输入序列
            steps: 预测步数

        Returns:
            未来多个时间点的预测值
        """
        predictions = []
        current_input = last_sequence.copy()

        for _ in range(steps):
            # 预测下一个时间点
            input_tensor = Tensor(current_input.reshape(1, -1, 1).astype(np.float32))
            pred = self.model(input_tensor).asnumpy()[0, 0]
            predictions.append(pred)

            # 更新输入序列(滚动)
            current_input = np.append(current_input[1:], pred)

        return np.array(predictions)

    def predict_future(self, historical_data: np.ndarray,
                      forecast_horizon: int = 12) -> np.ndarray:
        """
        预测未来一段时间的数据

        Args:
            historical_data: 历史数据(原始尺度)
            forecast_horizon: 预测范围(月数)

        Returns:
            预测结果
        """
        # 归一化
        normalized = self.scaler.transform(historical_data.reshape(-1, 1)).flatten()

        # 获取最后的序列
        last_sequence = normalized[-12:]

        # 滚动预测
        predictions = self.direct_predict(last_sequence, steps=forecast_horizon)

        # 反归一化
        return self.scaler.inverse_transform(predictions.reshape(-1, 1)).flatten()


# 多步预测演示
print("\n" + "=" * 60)
print("多步预测演示")
print("=" * 60)

# 加载数据
processor = TimeSeriesDataProcessor(sequence_length=12)
raw_data = processor.load_data()
normalized_data = processor.normalize(raw_data)
X, y = processor.create_sequences(normalized_data)

# 重新训练模型(使用全部数据的前80%)
split_idx = int(len(X) * 0.8)
X_train_full, X_test_full = X[:split_idx], X[split_idx:]
y_train_full, y_test_full = y[:split_idx], y[split_idx:]

# 训练最终模型
final_model, _, _ = train_lstm_model(
    X_train_full, y_train_full, X_test_full, y_test_full,
    epochs=100, batch_size=32, hidden_size=64, num_layers=2
)

# 多步预测
predictor = MultiStepPredictor(final_model, processor.scaler)
historical = raw_data[:-12]  # 历史数据(不含最后一年)
future_predictions = predictor.predict_future(historical, forecast_horizon=12)

print("\n未来12个月的航空乘客数量预测:")
print("-" * 40)
for i, pred in enumerate(future_predictions, 1):
    print(f"第 {i:2d} 个月: {pred:7.1f} 千人次")

# 与真实值比较(如果有的话)
actual = raw_data[-12:]
print("\n预测值与真实值对比:")
print("-" * 40)
print(f"{'月份':^6} | {'预测值':^10} | {'真实值':^10} | {'误差':^10}")
print("-" * 40)
for i in range(12):
    error = future_predictions[i] - actual[i]
    print(f"{i+1:^6} | {future_predictions[i]:^10.1f} | {actual[i]:^10.1f} | {error:^+10.1f}")

六、模型评估与可视化

6.1 全面评估模型性能

def evaluate_model_comprehensive(model: nn.Cell, X_test: np.ndarray, 
                                  y_test: np.ndarray, scaler) -> dict:
    """
    全面评估模型性能

    Args:
        model: 训练好的模型
        X_test: 测试输入
        y_test: 测试目标(归一化后)
        scaler: 归一化器

    Returns:
        评估指标字典
    """
    model.set_train(False)

    # 预测
    X_tensor = Tensor(X_test.astype(np.float32))
    predictions_normalized = model(X_tensor).asnumpy().flatten()

    # 反归一化
    predictions = scaler.inverse_transform(predictions_normalized.reshape(-1, 1)).flatten()
    actual = scaler.inverse_transform(y_test.reshape(-1, 1)).flatten()

    # 计算各项指标
    mse = np.mean((predictions - actual) ** 2)
    rmse = np.sqrt(mse)
    mae = np.mean(np.abs(predictions - actual))

    # MAPE(避免除零)
    mask = actual != 0
    mape = np.mean(np.abs((actual[mask] - predictions[mask]) / actual[mask])) * 100

    # R²
    ss_res = np.sum((actual - predictions) ** 2)
    ss_tot = np.sum((actual - np.mean(actual)) ** 2)
    r2 = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0

    # 对称MAPE
    smape = np.mean(2 * np.abs(predictions - actual) / 
                   (np.abs(actual) + np.abs(predictions) + 1e-8)) * 100

    return {
        'MSE': mse,
        'RMSE': rmse,
        'MAE': mae,
        'MAPE': mape,
        'SMAPE': smape,
        'R2': r2,
        'predictions': predictions,
        'actual': actual
    }


# 完整评估
print("\n" + "=" * 60)
print("模型性能评估")
print("=" * 60)

# 重新准备数据
processor = TimeSeriesDataProcessor(sequence_length=12)
raw_data = processor.load_data()
normalized_data = processor.normalize(raw_data)
X, y = processor.create_sequences(normalized_data)
X_train, X_test, y_train, y_test = processor.train_test_split(X, y, test_ratio=0.2)

# 训练模型
eval_model, _, _ = train_lstm_model(
    X_train, y_train, X_test, y_test,
    epochs=150, batch_size=32, hidden_size=64, num_layers=2
)

# 评估
metrics = evaluate_model_comprehensive(eval_model, X_test, y_test, processor.scaler)

print("\n模型评估指标:")
print("-" * 40)
print(f"均方误差 (MSE):        {metrics['MSE']:.4f}")
print(f"均方根误差 (RMSE):     {metrics['RMSE']:.4f}")
print(f"平均绝对误差 (MAE):    {metrics['MAE']:.4f}")
print(f"平均绝对百分比误差:    {metrics['MAPE']:.2f}%")
print(f"对称MAPE (SMAPE):     {metrics['SMAPE']:.2f}%")
print(f"R² 分数:              {metrics['R2']:.4f}")
print("-" * 40)

if metrics['R2'] >= 0.8:
    print("模型评价: 优秀 (R² ≥ 0.8)")
elif metrics['R2'] >= 0.6:
    print("模型评价: 良好 (0.6 ≤ R² < 0.8)")
elif metrics['R2'] >= 0.4:
    print("模型评价: 一般 (0.4 ≤ R² < 0.6)")
else:
    print("模型评价: 需要改进 (R² < 0.4)")

6.2 模型改进策略

对于时序预测任务,可以通过以下策略进一步提升模型性能:

超参数调优:使用网格搜索或贝叶斯优化寻找最优的超参数组合,如隐藏层大小、LSTM层数、学习率等。

特征工程:添加额外的时序特征,如星期几、月份、是否为节假日等日历特征,以及滞后特征、移动平均等时序衍生特征。

集成学习:组合多个不同架构或不同参数模型的预测结果,通常能获得更稳定的预测。

注意力机制:在LSTM基础上添加注意力层,让模型能够自动关注输入序列中的关键时间点。

def create_enhanced_features(data: np.ndarray) -> np.ndarray:
    """
    创建增强特征

    Args:
        data: 原始时序数据

    Returns:
        增强特征矩阵
    """
    n = len(data)
    features = np.zeros((n, 9), dtype=np.float32)

    # 原始数据
    features[:, 0] = data

    # 滞后特征
    for lag in [1, 3, 6, 12]:
        if lag < n:
            features[lag:, lag] = data[:-lag]

    # 滚动统计特征
    window = 12
    rolling_mean = np.zeros(n)
    rolling_std = np.zeros(n)

    for i in range(window, n):
        rolling_mean[i] = np.mean(data[i-window:i])
        rolling_std[i] = np.std(data[i-window:i])

    features[:, 5] = rolling_mean
    features[:, 6] = rolling_std

    # 季节性指标(月度数据的季节性)
    month = np.arange(n) % 12
    features[:, 7] = np.sin(2 * np.pi * month / 12)
    features[:, 8] = np.cos(2 * np.pi * month / 12)

    return features


def hyperparameter_search(X_train, y_train, X_val, y_val):
    """
    超参数网格搜索

    Args:
        X_train: 训练输入
        y_train: 训练目标
        X_val: 验证输入
        y_val: 验证目标

    Returns:
        最佳参数组合
    """
    print("\n" + "=" * 60)
    print("超参数调优")
    print("=" * 60)

    hidden_sizes = [32, 64, 128]
    num_layers_list = [1, 2, 3]
    learning_rates = [0.0005, 0.001, 0.002]

    best_val_loss = float('inf')
    best_params = None

    total_experiments = len(hidden_sizes) * len(num_layers_list) * len(learning_rates)
    current = 0

    for hidden_size in hidden_sizes:
        for num_layers in num_layers_list:
            for lr in learning_rates:
                current += 1

                # 创建并训练模型
                model = LSTMPredictor(
                    input_size=1,
                    hidden_size=hidden_size,
                    num_layers=num_layers,
                    output_size=1,
                    dropout=0.2
                )

                trainer = Trainer(model, learning_rate=lr)

                # 训练10个epoch进行快速评估
                for _ in range(10):
                    trainer.train_epoch(X_train, y_train, batch_size=32)

                # 评估
                metrics = trainer.evaluate(X_val, y_val)
                val_loss = metrics['MSE']

                print(f"[{current:2d}/{total_experiments}] "
                      f"hidden={hidden_size:3d}, layers={num_layers}, lr={lr:.4f} | "
                      f"Val MSE: {val_loss:.6f}")

                if val_loss < best_val_loss:
                    best_val_loss = val_loss
                    best_params = {
                        'hidden_size': hidden_size,
                        'num_layers': num_layers,
                        'learning_rate': lr
                    }

    print("\n最佳参数组合:")
    print(f"  隐藏层大小: {best_params['hidden_size']}")
    print(f"  LSTM层数: {best_params['num_layers']}")
    print(f"  学习率: {best_params['learning_rate']}")
    print(f"  验证MSE: {best_val_loss:.6f}")

    return best_params


# 超参数调优演示
best_params = hyperparameter_search(X_train, y_train, X_test, y_test)

七、实战案例:能源消耗预测

7.1 问题背景

能源消耗预测是时序预测的重要应用场景。准确的能源消耗预测可以帮助电力公司优化发电调度、减少能源浪费、降低运营成本。以下是一个完整的能源消耗预测实战案例。

7.2 模拟数据集

def generate_energy_consumption_data(n_days: int = 365 * 2) -> np.ndarray:
    """
    生成模拟能源消耗数据

    考虑因素:
    - 每日周期性:白天高于夜间
    - 每周周期性:工作日高于周末
    - 季节性:夏季制热/冬季制冷需求
    - 趋势:年度增长
    - 噪声:随机波动

    Args:
        n_days: 数据天数

    Returns:
        每日能源消耗数据
    """
    np.random.seed(42)

    # 时间索引
    days = np.arange(n_days)

    # 基础消耗
    base_consumption = 100  # 基准值

    # 每日周期(24小时内的小时数据,这里简化为日均)
    daily_pattern = 1 + 0.3 * np.sin(2 * np.pi * days / 365)

    # 每周周期(工作日 vs 周末)
    day_of_week = days % 7
    weekly_pattern = np.where(day_of_week < 5, 1.0, 0.8)  # 工作日高,周末低

    # 季节性
    seasonal_pattern = 1 + 0.2 * np.cos(2 * np.pi * days / 365)

    # 趋势(年度增长)
    trend = 1 + 0.05 * (days / 365)

    # 随机噪声
    noise = 1 + 0.1 * np.random.randn(n_days)

    # 组合所有因素
    consumption = (base_consumption * daily_pattern * weekly_pattern * 
                   seasonal_pattern * trend * noise)

    return consumption.astype(np.float32)


class EnergyConsumptionPredictor:
    """能源消耗预测器"""

    def __init__(self, sequence_length: int = 30):
        """
        初始化预测器

        Args:
            sequence_length: 用于预测的历史天数
        """
        self.sequence_length = sequence_length
        self.model = None
        self.scaler = MinMaxScaler(feature_range=(0, 1))
        self.data = None

    def prepare_data(self, data: np.ndarray) -> tuple:
        """
        准备训练数据

        Args:
            data: 原始数据

        Returns:
            训练集和测试集
        """
        self.data = data
        normalized = self.scaler.fit_transform(data.reshape(-1, 1)).flatten()

        X, y = [], []
        for i in range(len(normalized) - self.sequence_length):
            X.append(normalized[i:i + self.sequence_length])
            y.append(normalized[i + self.sequence_length])

        X = np.array(X, dtype=np.float32)
        y = np.array(y, dtype=np.float32)

        split_idx = int(len(X) * 0.8)
        return X[:split_idx], X[split_idx:], y[:split_idx], y[split_idx:]

    def train(self, X_train, y_train, X_test, y_test,
             epochs: int = 200, batch_size: int = 32):
        """
        训练模型

        Args:
            X_train: 训练输入
            y_train: 训练目标
            X_test: 测试输入
            y_test: 测试目标
            epochs: 训练轮数
            batch_size: 批大小
        """
        print("\n" + "=" * 60)
        print("能源消耗预测模型训练")
        print("=" * 60)
        print(f"数据集: {len(self.data)} 天")
        print(f"序列长度: {self.sequence_length} 天")
        print(f"训练样本: {len(X_train)} | 测试样本: {len(X_test)}")
        print("-" * 60)

        self.model, _, _ = train_lstm_model(
            X_train, y_train, X_test, y_test,
            epochs=epochs,
            batch_size=batch_size,
            hidden_size=128,
            num_layers=2,
            learning_rate=0.001
        )

    def predict(self, start_idx: int, n_days: int = 7) -> np.ndarray:
        """
        预测未来n天的能源消耗

        Args:
            start_idx: 预测起始索引
            n_days: 预测天数

        Returns:
            预测结果(原始尺度)
        """
        if self.model is None:
            raise ValueError("模型未训练")

        # 获取输入序列
        input_seq = self.data[start_idx:start_idx + self.sequence_length]
        normalized = self.scaler.transform(input_seq.reshape(-1, 1)).flatten()

        predictions = []
        current_seq = normalized.copy()

        for _ in range(n_days):
            X = Tensor(current_seq.reshape(1, -1, 1).astype(np.float32))
            pred = self.model(X).asnumpy()[0, 0]
            predictions.append(pred)
            current_seq = np.append(current_seq[1:], pred)

        # 反归一化
        predictions = np.array(predictions)
        predictions = self.scaler.inverse_transform(predictions.reshape(-1, 1)).flatten()

        return predictions


# 能源消耗预测实战
print("\n" + "=" * 60)
print("实战案例:能源消耗预测")
print("=" * 60)

# 生成数据
energy_data = generate_energy_consumption_data(n_days=365 * 2)
print(f"生成模拟数据: {len(energy_data)} 天")
print(f"平均消耗: {np.mean(energy_data):.2f}")
print(f"消耗范围: {np.min(energy_data):.2f} - {np.max(energy_data):.2f}")

# 训练预测器
predictor = EnergyConsumptionPredictor(sequence_length=30)
X_train, X_test, y_train, y_test = predictor.prepare_data(energy_data)
predictor.train(X_train, y_train, X_test, y_test, epochs=100)

# 预测测试集最后7天
last_train_idx = len(X_train) + 30 - 1
predictions = predictor.predict(last_train_idx, n_days=7)
actual_values = energy_data[last_train_idx:last_train_idx + 7]

print("\n未来7天能源消耗预测:")
print("-" * 40)
print(f"{'天数':^6} | {'预测值':^12} | {'实际值':^12} | {'误差率':^10}")
print("-" * 40)
for i in range(7):
    error_rate = (predictions[i] - actual_values[i]) / actual_values[i] * 100
    print(f"{i+1:^6} | {predictions[i]:^12.2f} | {actual_values[i]:^12.2f} | {error_rate:^+9.2f}%")

八、总结与展望

8.1 核心要点回顾

本文系统介绍了基于MindSpore的时序预测技术,主要内容包括:

理论层面,我们深入讲解了时序数据的基本特征和LSTM的工作原理。时序数据的核心挑战在于捕捉时间依赖性、趋势、季节性和处理非平稳性。LSTM通过门控机制有效解决了传统RNN的长期依赖问题,其遗忘门、输入门和输出门的协同工作使模型能够选择性保留或遗忘信息。

实践层面,我们提供了完整的数据预处理、模型构建、训练评估流程。数据预处理包括归一化、序列创建和训练测试集划分;模型构建支持普通LSTM、双向LSTM和带注意力机制的LSTM;训练过程使用Adam优化器和MSE损失函数;评估指标涵盖了MSE、RMSE、MAE、MAPE和R²等常用指标。

进阶应用,我们介绍了多步预测、超参数调优和特征工程等高级技术。滚动预测是实现多步预测的常用方法;网格搜索和贝叶斯优化是超参数调优的有效手段;添加日历特征、滞后特征和统计特征可以显著提升模型性能。

8.2 进一步探索的方向

时序预测是一个广阔的领域,以下方向值得深入探索:

Transformer架构:近年来,Transformer在时序预测中展现出强大的性能。Time-Series Transformer和Informer等模型通过注意力机制捕捉长程依赖,在长序列预测任务中优于传统LSTM。

概率预测:除了点预测,概率预测提供了预测值的不确定性估计,对于风险评估和决策制定非常重要。可以通过贝叶斯神经网络或MC Dropout实现。

多元时序预测:当存在多个相关变量时,多元时序模型可以捕捉变量之间的相互影响。VAR、LSTM Encoder-Decoder和TFT等模型适合处理这类问题。

在线学习:对于数据持续产生的场景,在线学习方法可以实时更新模型,适应数据分布的变化。

# 完整代码汇总与使用说明
print("\n" + "=" * 60)
print("代码使用说明")
print("=" * 60)
print("""
【MindSpore时序预测完整流程】

1. 数据准备
   - 使用TimeSeriesDataProcessor加载和预处理数据
   - 支持自定义数据集或内置航空乘客数据
   - create_sequences()方法创建监督学习样本

2. 模型构建
   - LSTMPredictor: 标准LSTM模型
   - BiLSTMPredictor: 双向LSTM,适用需要双向上下文的任务
   - AttentionLSTMPredictor: 带注意力机制,适合长序列

3. 模型训练
   - Trainer类封装训练和评估逻辑
   - train_lstm_model()函数提供完整训练流程

4. 预测与评估
   - predict()方法进行单步或多步预测
   - evaluate_model_comprehensive()提供全面评估

5. 实战应用
   - EnergyConsumptionPredictor展示完整应用案例
   - 可迁移到股票预测、天气预测等实际场景

【依赖安装】
pip install mindspore numpy pandas matplotlib scikit-learn

【运行示例】
python lstm_timeseries.py

祝您时序预测项目顺利!
""")

关于MindSpore

MindSpore是华为自主研发的全场景AI计算框架,提供了端到端的人工智能解决方案。其自动微分、自动并行、优化器等核心能力使得深度学习模型的开发和部署更加高效。MindSpore RL等扩展库进一步丰富了框架的应用场景,为强化学习、时序分析等专业领域提供了强大的支持。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。