PSD-公开数据集
psd-分解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68import mne
import numpy as np
import os
def eeg_power_band(epochs):
"""
根据epochs的特定频段中的相对功率来创建EEG特征。
"""
FREQ_BANDS = {"delta": [0.5, 4],
"theta": [4, 8],
"alpha": [8, 13],
"sigma": [13, 25],
"beta": [25, 45]}
spectrum = epochs.compute_psd(method='welch', fmin=0.5, fmax=45., n_fft=256, n_overlap=10)
psds, freqs = spectrum.get_data(return_freqs=True)
psds /= np.sum(psds, axis=-1, keepdims=True)
X = []
for fmin, fmax in FREQ_BANDS.values():
psds_band = psds[:, :, (freqs >= fmin) & (freqs < fmax)].mean(axis=-1)
X.append(psds_band.reshape(len(psds), -1))
return np.concatenate(X, axis=1)
def process_eeg_data(input_folders, output_folders):
"""
处理输入文件夹中的所有.set文件,并将提取的特征保存到输出文件夹中。
"""
for input_folder, output_folder in zip(input_folders, output_folders):
if not os.path.exists(output_folder):
os.makedirs(output_folder)
for filename in os.listdir(input_folder):
if filename.endswith(".set"):
set_file_path = os.path.join(input_folder, filename)
# Load the .set file using MNE
raw = mne.io.read_raw_eeglab(set_file_path, preload=True)
# Set EEG reference
raw.set_eeg_reference('average', projection=True)
# Filter the data
raw.filter(1., 45., fir_design='firwin')
# Create events (you may need to modify this based on your data)
events = mne.make_fixed_length_events(raw, start=0, duration=5.0)
# Create Epochs object
epochs = mne.Epochs(raw, events, tmin=0, tmax=4.0, baseline=None, preload=True)
# Extract features
features = eeg_power_band(epochs)
# Save features to output folder
output_file_path = os.path.join(output_folder, filename.replace(".set", "_features.npy"))
print(features.shape)
np.save(output_file_path, features)
print(f"Processed {filename} and saved features to {output_file_path}")
# Example usage
input_folders = ["公开数据集/CN", "公开数据集/AD",'公开数据集/FDT']
output_folders = ["psd/CN", "psd/AD",'psd/FDT']
process_eeg_data(input_folders, output_folders)划分训练和测试
1 |
|
2 划分数据集debug版本
1 |
|
3.建立列表
1 |
|
- 建立列表
1 |
|
- 训练模型
1 |
|
- 测试代码
1 |
|
实践过程中的问题。
对于不同的学习率,激活函数,模型结构所组合出来的实验的ACC都不一样。
针对一个问题进行优化
混合数据集处理流程
PSD+CNN
- 进行PSD+CNN处理
1 |
|
按照要求剪切数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import os
import numpy as np
# 输入文件夹路径列表
input_folder_paths = ["psd/test/AD", "psd//test/CN",'psd//test/MCI','psd/train/AD','psd/train/CN','psd/train/MCI']
# 输出文件夹路径列表
output_folder_paths = ['psd_data_cut/test/AD','psd_data_cut/test/CN','psd_data_cut/test/MCI','psd_data_cut/train/AD', 'psd_data_cut/train/CN','psd_data_cut/train/MCI']
# 如果输出文件夹不存在,则创建它们
for output_folder_path in output_folder_paths:
if not os.path.exists(output_folder_path):
os.makedirs(output_folder_path)
# 遍历每个输入文件夹
for input_folder_path, output_folder_path in zip(input_folder_paths, output_folder_paths):
# 获取输入文件夹中的所有.npy文件
npy_files = [f for f in os.listdir(input_folder_path) if f.endswith('.npy')]
for npy_file in npy_files:
# 完整文件路径
file_path = os.path.join(input_folder_path, npy_file)
# 加载数据
data = np.load(file_path)
# 确保数据的第二个维度是95
if data.shape[1] != 55:
print(f"文件 {npy_file} 不是期望的形状 {data.shape}")
continue
# 保存每个通道的数据到单独的.npy文件中
base_filename = os.path.splitext(npy_file)[0]
for i in range(data.shape[0]):
output_filename = f"{base_filename}_channel_{i}.npy"
output_path = os.path.join(output_folder_path, output_filename)
np.save(output_path, data[i])
print(f"保存 {output_path} 成功!")
print("所有文件处理完成!")建立列表
1 |
|
训练CNN
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import pandas as pd
from tqdm import tqdm
class StarReLU(nn.Module):
"""
StarReLU: s * relu(x) ** 2 + b
"""
def __init__(self, scale_value=1.0, bias_value=0.0,
scale_learnable=True, bias_learnable=True,
mode=None, inplace=False):
super().__init__()
self.inplace = inplace
self.relu = nn.ReLU(inplace=inplace)
self.scale = nn.Parameter(scale_value * torch.ones(1),
requires_grad=scale_learnable)
self.bias = nn.Parameter(bias_value * torch.ones(1),
requires_grad=bias_learnable)
def forward(self, x):
return self.scale * self.relu(x)**2 + self.bias
# 自定义数据集类,从CSV文件中加载数据
class CsvDataset(Dataset):
def __init__(self, csv_file):
self.data = pd.read_csv(csv_file)
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
file_path = self.data.iloc[idx, 0]
label = self.data.iloc[idx, 1]
data = np.load(file_path)
return torch.tensor(data, dtype=torch.float32), torch.tensor(label, dtype=torch.long)
# 定义简单的卷积神经网络模型
class CNN(nn.Module):
def __init__(self):
super(CNN, self).__init__()
self.conv1 = nn.Conv1d(in_channels=1, out_channels=512, kernel_size=3)
self.conv2 = nn.Conv1d(in_channels=512, out_channels=32, kernel_size=3)
self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
self.fc1 = nn.Linear(32 * 12, 128) # 根据实际情况调整输入大小
self.fc2 = nn.Linear(128, 3) # 假设有2个类别
self.futool = StarReLU()
def forward(self, x):
x = self.conv1(x)
# print('1',x.shape)
x = self.futool(x)
x = self.pool(x)
# print('2',x.shape)
x = self.conv2(x)
# print('3',x.shape)
x = self.futool(x)
x = self.pool(x)
# print('4',x.shape)
x = x.view(-1, 32 * 12)
# print('4-1',x.shape)
x = self.fc1(x)
# print('5',x.shape)
x = self.futool(x)
# print('6',x.shape)
x = self.fc2(x)
# print('7',x.shape)
return x
import torch
import torch
from tqdm import tqdm
def train(model, train_loader, val_loader, criterion, optimizer, num_epochs=5, save_path='best_model.pth'):
model.train()
total_samples = len(train_loader.dataset)
best_val_acc = 0.0
for epoch in range(num_epochs):
running_loss = 0.0
correct_predictions = 0
progress_bar = tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}', leave=False)
for data, labels in progress_bar:
optimizer.zero_grad()
outputs = model(data.unsqueeze(1))
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item() * data.size(0)
_, predicted = torch.max(outputs, 1)
correct_predictions += (predicted == labels).sum().item()
progress_bar.set_postfix({'loss': running_loss / total_samples, 'accuracy': correct_predictions / total_samples})
epoch_loss = running_loss / total_samples
epoch_accuracy = correct_predictions / total_samples
print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}")
# Validate the model
model.eval()
total_samples_val = len(val_loader.dataset)
running_loss_val = 0.0
correct_predictions_val = 0
with torch.no_grad():
for data, labels in val_loader:
outputs = model(data.unsqueeze(1))
loss_val = criterion(outputs, labels)
running_loss_val += loss_val.item() * data.size(0)
_, predicted = torch.max(outputs.data, 1)
correct_predictions_val += (predicted == labels).sum().item()
epoch_loss_val = running_loss_val / total_samples_val
epoch_accuracy_val = correct_predictions_val / total_samples_val
print(f'Validation Loss: {epoch_loss_val:.4f}, Accuracy: {epoch_accuracy_val:.4f}')
if epoch_accuracy_val > best_val_acc:
best_val_acc = epoch_accuracy_val
torch.save(model.state_dict(), save_path)
print(f'Saved the model with the best validation accuracy: {best_val_acc:.4f}')
print('Training finished')
# CSV训练集文件路径
train_csv_file_path = 'train_data.csv'
# CSV测试集文件路径
test_csv_file_path = 'test_data.csv'
# 创建训练集和测试集数据集实例
train_dataset = CsvDataset(train_csv_file_path)
test_dataset = CsvDataset(test_csv_file_path)
# 创建训练集和测试集数据加载器
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True,drop_last=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False,drop_last=True)
# 初始化模型、损失函数和优化器
model = CNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.RMSprop(model.parameters(), lr=0.0001)
# 训练模型
train(model, train_loader, test_loader, criterion, optimizer, num_epochs=100)测试CNN
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import pandas as pd
from tqdm import tqdm
import torch
from torch.utils.data import DataLoader
from torch import nn
import numpy as np
from sklearn.metrics import recall_score, f1_score, precision_score, confusion_matrix, accuracy_score
import matplotlib.pyplot as plt
from matplotlib import rcParams
import os
class StarReLU(nn.Module):
"""
StarReLU: s * relu(x) ** 2 + b
"""
def __init__(self, scale_value=1.0, bias_value=0.0,
scale_learnable=True, bias_learnable=True,
mode=None, inplace=False):
super().__init__()
self.inplace = inplace
self.relu = nn.ReLU(inplace=inplace)
self.scale = nn.Parameter(scale_value * torch.ones(1),
requires_grad=scale_learnable)
self.bias = nn.Parameter(bias_value * torch.ones(1),
requires_grad=bias_learnable)
def forward(self, x):
return self.scale * self.relu(x)**2 + self.bias
# 自定义数据集类,从CSV文件中加载数据
class CsvDataset(Dataset):
def __init__(self, csv_file):
self.data = pd.read_csv(csv_file)
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
file_path = self.data.iloc[idx, 0]
label = self.data.iloc[idx, 1]
data = np.load(file_path)
return torch.tensor(data, dtype=torch.float32), torch.tensor(label, dtype=torch.long)
# 定义简单的卷积神经网络模型
class CNN(nn.Module):
def __init__(self):
super(CNN, self).__init__()
self.conv1 = nn.Conv1d(in_channels=1, out_channels=512, kernel_size=3)
self.conv2 = nn.Conv1d(in_channels=512, out_channels=32, kernel_size=3)
self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
self.fc1 = nn.Linear(32 * 22, 128) # 根据实际情况调整输入大小
self.fc2 = nn.Linear(128, 3) # 假设有2个类别
self.futool = StarReLU()
def forward(self, x):
x = self.conv1(x)
# print('1',x.shape)
x = self.futool(x)
x = self.pool(x)
# print('2',x.shape)
x = self.conv2(x)
# print('3',x.shape)
x = self.futool(x)
x = self.pool(x)
# print('4',x.shape)
x = x.view(-1, 32 * 22)
# print('4-1',x.shape)
x = self.fc1(x)
# print('5',x.shape)
x = self.futool(x)
# print('6',x.shape)
x = self.fc2(x)
# print('7',x.shape)
return x
# 设置中文显示
rcParams['font.family'] = 'SimHei'
labels = ['CN', 'FDT']
softmax = nn.Softmax(dim=1)
def val(batch_size=16):
# 数据集和数据加载器
val_dataset = CsvDataset(csv_file='test_data.csv')
val_data_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNN(num_classes=2).to(device)
model.load_state_dict(torch.load("87.12.pth"))
arr_y = []
arr_y_pred = []
for val_x, val_y in val_data_loader:
val_x = val_x.to(device)
val_y = val_y.to(device)
val_x = val_x.unsqueeze(1)
val_y_pred = model(val_x)
arr_y.extend(val_y.cpu().numpy())
pred_result = softmax(val_y_pred).max(dim=1)[1]
arr_y_pred.extend(pred_result.cpu().numpy())
accuracy = accuracy_score(arr_y, arr_y_pred)
precision = precision_score(arr_y, arr_y_pred, average="macro")
recall = recall_score(arr_y, arr_y_pred, average="macro")
f1 = f1_score(arr_y, arr_y_pred, average="macro")
# 计算特异度
cm = confusion_matrix(arr_y, arr_y_pred)
specificity = []
for i in range(len(labels)):
TN = cm.sum() - (cm[i, :].sum() + cm[:, i].sum() - cm[i, i])
FP = cm[:, i].sum() - cm[i, i]
specificity.append(TN / (TN + FP))
avg_specificity = np.mean(specificity)
print(f"Accuracy: {accuracy:.5f}, Precision: {precision:.5f}, Recall: {recall:.5f}, F1: {f1:.5f}, Specificity: {avg_specificity:.5f}")
plt.imshow(cm, cmap="Blues")
plt.xticks(range(len(labels)), labels=labels)
plt.yticks(range(len(labels)), labels=labels)
plt.colorbar()
plt.xlabel("预测值")
plt.ylabel("真实值")
thresh = cm.mean()
for i in range(len(labels)):
for j in range(len(labels)):
info = cm[j, i]
prob = info / np.sum(cm[j])
plt.text(i, j, f"{info}\n({prob*100:.2f}%)", color="white" if info > thresh else "black", ha='center', va='center')
plt.savefig("confusion_matrix.jpg")
plt.show()
if __name__ == "__main__":
val()
mlpformer
PSD-公开数据集
https://chenlidbk.xyz/2024/06/11/paper-idear6/