import os
|
|
import torch
|
import torch.nn as nn
|
import torch.nn.functional as F
|
import torch.optim as optim
|
from torchvision import datasets, transforms
|
from torch.utils.data import Dataset, DataLoader
|
from PIL import Image
|
|
|
def default_loader(path):
|
return Image.open(path)
|
|
|
class MyDataset(Dataset):
|
# 使用__init__()初始化一些需要传入的参数及数据集的调用
|
def __init__(self, dir_path, resize, target_transform=None, loader=default_loader):
|
super(MyDataset, self).__init__()
|
# 对继承自父类的属性进行初始化
|
dirs = os.listdir(dir_path)
|
for d in dirs:
|
if
|
|
fh = open(txt, 'r')
|
imgs = []
|
# 按照传入的路径和txt文本参数,以只读的方式打开这个文本
|
for line in fh: # 迭代该列表,按行循环txt文本中的内容
|
line = line.strip('\n')
|
line = line.rstrip('\n')
|
# 删除本行string字符串末尾的指定字符,默认为空白符,包括空格、换行符、回车符、制表符
|
words = line.split()
|
# 用split将该行分割成列表,split的默认参数是空格,所以不传递任何参数时分割空格
|
imgs.append((words[0], int(words[1])))
|
# 把txt里的内容读入imgs列表保存
|
self.imgs = imgs
|
# 重新定义图像大小
|
self.transform = transforms.Compose([transforms.Resize(size=(resize, resize)), transforms.ToTensor()])
|
self.target_transform = target_transform
|
self.loader = loader
|
|
# 使用__getitem__()对数据进行预处理并返回想要的信息
|
def __getitem__(self, index):
|
fn, label = self.imgs[index]
|
# fn是图片path
|
img = self.loader(fn)
|
# 按照路径读取图片
|
if self.transform is not None:
|
img = self.transform(img)
|
# 数据标签转换为Tensor
|
return img, label
|
# return回哪些内容,那么在训练时循环读取每个batch时,就能获得哪些内容
|
|
# 使用__len__()初始化一些需要传入的参数及数据集的调用
|
def __len__(self):
|
return len(self.imgs)
|
|
|
class CNN(nn.Module):
|
# 定义网络结构
|
def __init__(self):
|
super(CNN, self).__init__()
|
# 图片是灰度图片,只有一个通道
|
self.conv1 = nn.Conv2d(in_channels=1, out_channels=32,
|
kernel_size=3, stride=1, padding=2)
|
self.relu1 = nn.ReLU()
|
self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
|
self.conv2 = nn.Conv2d(in_channels=32, out_channels=64,
|
kernel_size=3, stride=1, padding=2)
|
self.relu2 = nn.ReLU()
|
self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
|
self.fc1 = nn.Linear(in_features=8 * 8 * 64, out_features=256)
|
self.relu3 = nn.ReLU()
|
self.fc2 = nn.Linear(in_features=256, out_features=10)
|
|
# 定义前向传播过程的计算函数
|
def forward(self, x):
|
# 第一层卷积、激活函数和池化
|
x = self.conv1(x)
|
x = self.relu1(x)
|
x = self.pool1(x)
|
# 第二层卷积、激活函数和池化
|
x = self.conv2(x)
|
x = self.relu2(x)
|
x = self.pool2(x)
|
# 将数据平展成一维
|
x = x.view(-1, 8 * 8 * 64)
|
# 第一层全连接层
|
x = self.fc1(x)
|
x = self.relu3(x)
|
# 第二层全连接层
|
x = self.fc2(x)
|
return x
|
|
|
# 加载MNIST数据集
|
|
transforms = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
|
|
train_dataset = datasets.MNIST('../data', train=True, download=True,
|
transform=transforms)
|
test_dataset = datasets.MNIST('../data', train=False,
|
transform=transforms)
|
|
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
|
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1000, shuffle=True)
|
|
|
# 训练过程
|
def train(epoch, model, train_loader, test_loader):
|
correct = 0
|
total = 0
|
running_loss = 0
|
|
for x, y in train_loader:
|
# 把数据放到GPU上
|
x, y = x.to(device), y.to(device)
|
y_pred = model(x)
|
loss = loss_fn(y_pred, y)
|
# 梯度清零
|
optimizer.zero_grad()
|
loss.backward() # backward 反向传播
|
optimizer.step()
|
|
# 计算损失过程
|
with torch.no_grad():
|
y_pred = torch.argmax(y_pred, dim=1)
|
correct += (y_pred == y).sum().item()
|
total += y.size(0)
|
running_loss += loss.item()
|
|
# 循环完一次后, 计算损失
|
epoch_loss = running_loss / len(train_loader.dataset)
|
epoch_acc = correct / total
|
|
# 测试数据的代码
|
test_correct = 0
|
test_total = 0
|
test_running_loss = 0
|
with torch.no_grad():
|
for x, y in test_loader:
|
x, y = x.to(device), y.to(device)
|
y_pred = model(x)
|
loss = loss_fn(y_pred, y)
|
|
# 计算损失
|
y_pred = torch.argmax(y_pred, dim=1)
|
test_correct += (y_pred == y).sum().item()
|
test_total += y.size(0)
|
test_running_loss += loss.item()
|
|
# 计算平均损失
|
test_epoch_loss = test_running_loss / len(test_loader.dataset)
|
test_epoch_acc = test_correct / test_total
|
|
# 打印输出
|
print('epoch:', epoch,
|
'loss:', round(epoch_loss, 3),
|
'accuracy:', round(epoch_acc, 3),
|
'test_loss:', round(test_epoch_loss, 3),
|
'test_accuracy:', round(test_epoch_acc, 3))
|
|
return epoch_loss, epoch_acc, test_epoch_loss, test_epoch_acc
|
|
|
# 定义测试函数
|
def test(model, device, test_loader):
|
model.eval()
|
test_loss = 0
|
correct = 0
|
with torch.no_grad():
|
for data, target in test_loader:
|
data, target = data.to(device), target.to(device)
|
output = model(data)
|
test_loss += F.nll_loss(output, target, reduction='sum').item()
|
pred = output.argmax(dim=1, keepdim=True)
|
correct += pred.eq(target.view_as(pred)).sum().item()
|
|
test_loss /= len(test_loader.dataset)
|
|
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
|
test_loss, correct, len(test_loader.dataset),
|
100. * correct / len(test_loader.dataset)))
|
|
|
# 定义设备和模型
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
model = CNN().to(device)
|
|
# 定义损失函数
|
loss_fn = torch.nn.CrossEntropyLoss()
|
# optimizer 优化器, 防止过拟合
|
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
|
epochs = 20
|
train_loss = []
|
train_acc = []
|
test_loss = []
|
test_acc = []
|
# 训练模型
|
for epoch in range(1, 11):
|
epoch_loss, epoch_acc, test_epoch_loss, test_epoch_acc = train(epoch, model,
|
train_loader, test_loader)
|
train_loss.append(epoch_loss)
|
train_acc.append(epoch_acc)
|
test_loss.append(test_epoch_loss)
|
test_acc.append(test_epoch_acc)
|
# 保存模型
|
# 模型保存
|
PATH = './mnist_net.pth'
|
torch.save(model.state_dict(), PATH)
|