# -*- coding: utf-8 -*-
"""
Created on Sun May 28 10:44:14 2023
@author: dell
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import numpy as np
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from sklearn.preprocessing import MinMaxScaler
from tqdm import tqdm
# 解决画图中文显示问题
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 输入的历史look_back步,和预测未来的T步
look_back = 10
T = 1
epochs = 30 # 迭代次数
num_features = 6 # 输入特证数
embed_dim = 32 # 嵌入维度
dense_dim = 128 # 隐藏层神经元个数
num_heads = 4 # 头数
dropout_rate = 0.1 # 失活率
num_blocks = 3 # 编码器解码器数
learn_rate = 0.0001 # 学习率
batch_size = 32 # 批大小
# 读取数据
dataset = pd.read_excel('2019feng.xlsx', usecols=[0, 1, 2, 3, 4, 5])
dataX = dataset.values
dataY = dataset['gonglv'].values
# 归一化数据
scaler1 = MinMaxScaler(feature_range=(0, 1))
scaler2 = MinMaxScaler(feature_range=(0, 1))
data_X = scaler1.fit_transform(dataX)
data_Y = scaler2.fit_transform(dataY.reshape(-1, 1))
# 划分训练集和测试集,用70%作为训练集,20%作为验证集,10%作为测试集
train_size = int(len(data_X) * 0.7)
val_size = int(len(data_X) * 0.1)
test_size = len(data_X) - train_size - val_size
train_X, train_Y = data_X[0:train_size], data_Y[0:train_size]
val_X, val_Y = data_X[train_size:train_size + val_size], data_Y[train_size:train_size + val_size]
test_X, test_Y = data_X[train_size + val_size:], data_Y[train_size + val_size:]
# 定义输入数据,输出标签数据的格式的函数,并将数据转换为模型可接受的3D格式
def create_dataset(datasetX, datasetY, look_back=1, T=1):
dataX, dataY = [], []
for i in range(0, len(datasetX) - look_back - T, T):
a = datasetX[i:(i + look_back), :]
dataX.append(a)
if T == 1:
dataY.append(datasetY[i + look_back])
else:
dataY.append(datasetY[i + look_back:i + look_back + T, 0])
return np.array(dataX), np.array(dataY)
# 准备训练集和测试集的数据
trainX, trainY = create_dataset(train_X, train_Y, look_back, T)
valX, valY = create_dataset(val_X, val_Y, look_back, T)
testX, testY = create_dataset(test_X, test_Y, look_back, T)
# 转换为PyTorch的Tensor数据
trainX = torch.Tensor(trainX)
trainY = torch.Tensor(trainY)
valX = torch.Tensor(valX)
valY = torch.Tensor(valY)
testX = torch.Tensor(testX)
testY = torch.Tensor(testY)
import torch
import torch.nn as nn
import einops
import pywt
import torch
from torch import nn
from functools import partial
import torch.nn.functional as F
def create_wavelet_filter_1d(wave, in_size, type=torch.float):
w = pywt.Wavelet(wave)
dec_hi = torch.tensor(w.dec_hi[::-1], dtype=type)
dec_lo = torch.tensor(w.dec_lo[::-1], dtype=type)
rec_hi = torch.tensor(w.rec_hi[::-1], dtype=type)
rec_lo = torch.tensor(w.rec_lo[::-1], dtype=type)
# Decomposition filters
dec_filters = torch.stack([dec_lo, dec_hi], dim=0)
dec_filters = dec_filters[:, None].repeat(in_size, 1, 1)
# Reconstruction filters
rec_filters = torch.stack([rec_lo, rec_hi], dim=0)
rec_filters = rec_filters[:, None].repeat(in_size, 1, 1)
return dec_filters, rec_filters
def wavelet_transform_1d(x, filters):
b, c, l = x.shape
pad = filters.shape[2] // 2 - 1
x = F.conv1d(x, filters, stride=2, groups=c, padding=pad)
x = x.reshape(b, c, 2, l // 2)
return x
def inverse_wavelet_transform_1d(x, filters):
b, c, _, l_half = x.shape
pad = filters.shape[2] // 2 - 1
x = x.reshape(b, c * 2, l_half)
x = F.conv_transpose1d(x, filters, stride=2, groups=c, padding=pad)
return x
#论文里是用2D进行图像处理,这里将其转化为1D这也算个小创新点,将图像领域的扩展到时间序列一维领域
class WTConv1d(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size=5, stride=1, bias=True, wt_levels=1, wt_type='db1'):
super(WTConv1d, self).__init__()
assert in_channels == out_channels
self.in_channels = in_channels
self.wt_levels = wt_levels
self.stride = stride
self.wt_filter, self.iwt_filter = create_wavelet_filter_1d(wt_type, in_channels, torch.float)
self.wt_filter = nn.Parameter(self.wt_filter, requires_grad=False)
self.iwt_filter = nn.Parameter(self.iwt_filter, requires_grad=False)
self.wt_function = partial(wavelet_transform_1d, filters=self.wt_filter)
self.iwt_function = partial(inverse_wavelet_transform_1d, filters=self.iwt_filter)
self.base_conv = nn.Conv1d(in_channels, in_channels, kernel_size, padding='same', stride=1, bias=bias)
self.base_scale = _ScaleModule([1, in_channels, 1])
self.wavelet_convs = nn.ModuleList(
[nn.Conv1d(in_channels * 2, in_channels * 2, kernel_size, padding='same', stride=1, bias=False)
for _ in range(self.wt_levels)]
)
self.wavelet_scale = nn.ModuleList(
[_ScaleModule([1, in_channels * 2, 1], init_scale=0.1) for _ in range(self.wt_levels)]
)
if self.stride > 1:
self.stride_filter = nn.Parameter(torch.ones(in_channels, 1, 1), requires_grad=False)
self.do_stride = lambda x_in: F.conv1d(x_in, self.stride_filter, bias=None, stride=self.stride, groups=in_channels)
else:
self.do_stride = None
def forward(self, x):
x_ll_in_levels = []
x_h_in_levels = []
shapes_in_levels = []
curr_x_ll = x
for i in range(self.wt_levels):
curr_shape = curr_x_ll.shape
shapes_in_levels.append(curr_shape)
if curr_shape[2] % 2 > 0:
curr_pads = (0, curr_shape[2] % 2)
curr_x_ll = F.pad(curr_x_ll, curr_pads)
curr_x = self.wt_function(curr_x_ll)
curr_x_ll = curr_x[:, :, 0, :]
shape_x = curr_x.shape
curr_x_tag = curr_x.reshape(shape_x[0], shape_x[1] * 2, shape_x[3])
curr_x_tag = self.wavelet_scale[i](self.wavelet_convs[i](curr_x_tag))
curr_x_tag = curr_x_tag.reshape(shape_x)
x_ll_in_levels.append(curr_x_tag[:, :, 0, :])
x_h_in_levels.append(curr_x_tag[:, :, 1, :])
next_x_ll = 0
for i in range(self.wt_levels - 1, -1, -1):
curr_x_ll = x_ll_in_levels.pop()
curr_x_h = x_h_in_levels.pop()
curr_shape = shapes_in_levels.pop()
curr_x_ll = curr_x_ll + next_x_ll
curr_x = torch.cat([curr_x_ll.unsqueeze(2), curr_x_h.unsqueeze(2)], dim=2)
next_x_ll = self.iwt_function(curr_x)
next_x_ll = next_x_ll[:, :, :curr_shape[2]]
x_tag = next_x_ll
assert len(x_ll_in_levels) == 0
x = self.base_scale(self.base_conv(x))
x = x + x_tag
if self.do_stride is not None:
x = self.do_stride(x)
return x
class _ScaleModule(nn.Module):
def __init__(self, dims, init_scale=1.0, init_bias=0):
super(_ScaleModule, self).__init__()
self.dims = dims
self.weight = nn.Parameter(torch.ones(*dims) * init_scale)
self.bias = None
def forward(self, x):
return torch.mul(self.weight, x)
class DepthwiseSeparableConvWithWTConv1d(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size=3):
super(DepthwiseSeparableConvWithWTConv1d, self).__init__()

机器学习之心
- 粉丝: 3w+
最新资源
- excel电子表格模板批量自动化-动态销量看板.zip
- excel电子表格模板批量自动化-店铺数据运营监控表1.1.zip
- excel电子表格模板批量自动化-SMJ成绩统计工具Ⅲ(E)-班主任.zip
- excel电子表格模板批量自动化-SMJ成绩统计工具Ⅲ(E)-年级段.zip
- excel电子表格模板批量自动化-SMJ成绩统计工具Ⅲ(E)-课任教师.zip
- excel电子表格模板批量自动化-靓采化妆品销售情况表.zip
- excel电子表格模板批量自动化-员工外勤费用报销单.zip
- excel电子表格模板批量自动化-神奇万年历.zip
- excel电子表格模板批量自动化-产品销售数据半年度业绩分析表1.zip
- excel电子表格模板批量自动化-销售报表-销售管理明细表1.zip
- excel电子表格模板批量自动化-顾客档案管理表.zip
- excel电子表格模板批量自动化-【双11】1111推进内容-.zip
- excel电子表格模板批量自动化-销售数据透视图.zip
- excel电子表格模板批量自动化-销售收入成本利润表1.zip
- excel电子表格模板批量自动化-成都95平米报价表.zip
- excel电子表格模板批量自动化-8 采购管理系统.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈


