PyTorch 梳理
背景
目前 OpenAI、Tesla 等公司均使用 PyTorch 进行模型开发和线上部署,看起来 PyTorch 在工程领域的使用也日渐成熟了。之前一直使用的是Tensorflow , 近期准备重构一个推荐项目,准备用 PyTorch 全部重写,借这次机会将 PyTorch 的知识做一个梳理
Tips : 过程使用 Google Colab (With GPU)
目录
- 基础
- 常用 import
- Tensor 构建
- tensor 操作
- 数据
- Numpy
- 数据加载
- 自定义数据集
- 公开数据集
- 模型
- 构建
- 训练
- 推断
- 保存
- 部署
基础
常用 import
import torch
from torch import nn
import torch.nn.functional as F
Tensor 构建
基础创建 :
torch.tensor(7)
torch.tensor([[1., -1.], [1., -1.]])
# reference: https://pytorch.org/docs/stable/torch.html#tensor-creation-ops
# more: sparse_coo_tensor , sparse_csr_tensor ...
常用创建:
zeros = torch.zeros(size=(3, 4)) # size: can be a variable number of arguments or a collection like a list or tuple.
ones = torch.ones(size=(3, 4))
random_tensor = torch.rand(size=(3, 4))
zero_to_ten = torch.arange(start=0, end=10, step=1)
torch.rand(4)
指定数据类型
torch.zeros([2, 4], dtype=torch.int32)
# torch.float , torch.double , torch.int ...
# reference: https://pytorch.org/docs/stable/tensors.html#data-types
修改数据类型
x = torch.arange(0, 100, 10)
x_float = x.type(torch.float)
获取Tensor信息:
# Create a tensor
some_tensor = torch.rand(3, 4)
# Find out details about it
print(some_tensor)
print(f"Shape of tensor: {some_tensor.shape}")
print(f"Datatype of tensor: {some_tensor.dtype}")
print(f"Device tensor is stored on: {some_tensor.device}") # will default to CPU
自动微分
torch.randn(1, dtype=torch.float,requires_grad=True)
# 记录梯度,更新参数
Tensor 操作
基础数学运算
tensor = torch.tensor([1, 2, 3])
tensor + 10
tensor - 10
tensor * 10
tensor / 10
tensor * tensor
矩阵运算 ( matrix multiplication is all you need. )
tensor = torch.tensor([1, 2, 3])
torch.matmul(tensor, tensor)
tensor1 = torch.rand(2,4)
tensor2 = torch.rand(4,5)
tensor1.matmul(tensor2)
torch.matmul(tensor1, tensor2)
最大最小等操作
x = torch.arange(0, 100, 10)
print(f"Minimum: {x.min()}")
print(f"Maximum: {x.max()}")
# print(f"Mean: {x.mean()}") # this will error
print(f"Mean: {x.type(torch.float32).mean()}") # won't work without float datatype
print(f"Sum: {x.sum()}")
# Returns index of max and min values
print(f"Index where max value occurs: {x.argmax()}")
print(f"Index where min value occurs: {x.argmin()}")
变换操作
x = torch.arange(1., 9.) # torch.Size([8])
# reshape
print(x.reshape(1,8))
print(x.reshape(8,1))
print(x.reshape(2,4))
# stack
torch.stack([x,x],dim=0) # torch.Size([2, 8])
torch.stack([x,x],dim=1) # torch.Size([8, 2])
# squeeze
x.reshape(1,8,1).squeeze() # torch.Size([8])
# unsqueeze
x.unsqueeze(dim=1) # torch.Size([8, 1])
# permute
x_original = torch.rand(size=(224, 224, 3))
x_permuted = x_original.permute(2, 0, 1) # shifts axis 0->1, 1->2, 2->0 / torch.Size([3, 224, 224])
索引 (Same as Numpy)
x = torch.tensor([[1, 2, 3], [4, 5, 6]])
x[1][2]
x[1][:2]
x[0][1] = 8
# reference: https://pytorch.org/docs/stable/torch.html#indexing-slicing-joining
获取Tensor数值
x = torch.tensor([[1, 2, 3], [4, 5, 6]])
x[0][0].item()
x.numpy()
随机种子
import torch
import random
# # Set the random seed
RANDOM_SEED=42
torch.manual_seed(seed=RANDOM_SEED)
random_tensor_A = torch.rand(3, 4)
# 每次生成前都需要设置一下
torch.manual_seed(seed=RANDOM_SEED)
random_tensor_B = torch.rand(3, 4)
GPU
import torch
# 判断是否可以使用GPU
torch.cuda.is_available()
# 设置设备类型
device = "cuda" if torch.cuda.is_available() else "cpu"
# 将 Tensor 放到GPU 上
tensor = torch.tensor([1, 2, 3])
print(tensor, tensor.device)
tensor_on_gpu = tensor.to(device)
tensor_on_gpu
# 从GPU移动到CPU
tensor_on_gpu.cpu()
数据
Numpy
从 numpy 中获取数据
from sklearn.datasets import make_circles
# Make 1000 samples
n_samples = 1000
# Create circles
X, y = make_circles(n_samples,noise=0.03, random_state=42)
import torch
X = torch.from_numpy(X).type(torch.float)
y = torch.from_numpy(y).type(torch.float)
数据加载
DataLoader 将 Dataset 转化为 Python的 iterable 对象 ,每次返回一个Batch
torch.utils.data.DataLoader
DataLoader 使用
from torch.utils.data import DataLoader
# Setup the batch size hyperparameter
BATCH_SIZE = 32
# Turn datasets into iterables (batches)
train_dataloader = DataLoader(train_data, # dataset to turn into iterable
batch_size=BATCH_SIZE, # how many samples per batch?
shuffle=True # shuffle data every epoch?
)
train_features_batch, train_labels_batch = next(iter(train_dataloader))
自定义数据集
读取批量csv文件(可以是整体内存加载不下的)
# -*- coding: utf-8 -*-
import os
import numpy as np
from torch.utils.data import IterableDataset
from torch.utils.data import DataLoader
from itertools import cycle
import glob
# ref https://pytorch.org/tutorials/beginner/basics/data_tutorial.html
class CustomCsvIterableDataset(IterableDataset):
def __init__(self, file_path, schema, data_type, field_delim=",", repeat=False):
self.file_path = file_path
self.field_delim = field_delim
self.schema = schema
self.repeat = repeat
self.feature_names = [f for f in schema if not f.startswith("label")]
self.feature_names_set = set(self.feature_names)
self.dtype_dict = dict(zip(schema, data_type))
def _parse_file(self, file_path):
def process_line(line):
line_split = line.strip().split(self.field_delim)
features = {}
label = None
for data, feature_name in zip(line_split, self.schema):
if feature_name == 'label':
data = self._auto_convert_dtype(data, feature_name)
label = data
if feature_name in self.feature_names_set:
data = self._auto_convert_dtype(data, feature_name)
features[feature_name] = data
return features, label
if os.path.isfile(file_path):
with open(file_path, 'r') as f:
yield from (process_line(line) for line in f)
if os.path.isdir(file_path):
files = glob.glob(os.path.join(file_path, '*'))
np.random.shuffle(files)
for file in files:
with open(file, 'r') as f:
yield from (process_line(line) for line in f)
def _auto_convert_dtype(self, data, feature_name):
if self.dtype_dict[feature_name] == 'int':
return int(data)
elif self.dtype_dict[feature_name] in ('float', 'double'):
return float(data)
else:
return data
def __iter__(self):
if not self.repeat:
return self._parse_file(self.file_path)
else:
# like what we do in tensorflow [dataset.repeat(None) # repeat indefinitely.]
return cycle(self._parse_file(self.file_path))
if __name__ == '__main__':
file_path = '../data/example/train/train_data'
# file_path = '../data/example/train'
schema = ["col1", "col2", "col3", "col4", "col5",
"col6", "col7", "col8", "label", "label2"]
data_type = ["float", "float", "float", "string", "float",
"string", "string", "string", "float", "float"]
raw_dataset = CustomCsvIterableDataset(
file_path=file_path, schema=schema, data_type=data_type, field_delim=",")
dataset = DataLoader(raw_dataset, batch_size=5)
for x, y in dataset:
for k, v in x.items():
print(k, v)
print('-' * 80)
print(y)
break
# col1 tensor([ 2., 2., 10., 10., 2.], dtype=torch.float64)
# col2 tensor([ 3., 3., 13., 13., 3.], dtype=torch.float64)
# col3 tensor([ 5., 5., 16., 16., 5.], dtype=torch.float64)
# col4 ['a', 'a', 'b', 'b', 'a']
# col5 tensor([3., 3., 9., 9., 3.], dtype=torch.float64)
# col6 ['r#w#k', 'r#w#k', 'g#d#a', 'g#d#a', 'r#w#k']
# col7 ['f', 'f', 'e', 'e', 'f']
# col8 ['e', 'e', 'w', 'w', 'e']
# --------------------------------------------------------------------------------
# tensor([0., 0., 1., 1., 0.], dtype=torch.float64)
做一些预处理
# -*- coding: utf-8 -*-
import os
from torchtext.vocab import vocab
from collections import Counter, OrderedDict
import torch
class VocabProcess:
def __init__(self, schema, data_type, vocab_path, sequence_sep):
self.schema = schema
self.string_feature_names = [f for f, t in zip(
schema, data_type) if t == 'string']
self.string_feature_names_set = set(self.string_feature_names)
self.vocab_path = vocab_path
self.sequence_sep_dict = dict(zip(schema, sequence_sep))
self.vocabs_dict = dict()
self._build_vocabs()
def _build_vocabs(self):
for feature_name in self.string_feature_names:
feature_name, feature_vocab = self._build_vocab_one(feature_name,
os.path.join(self.vocab_path, feature_name))
self.vocabs_dict.update({feature_name: feature_vocab})
def _build_vocab_one(self, feature_name, vocab_path):
# ref : https://pytorch.org/text/stable/vocab.html
vocab_list = [' ']
with open(vocab_path, 'r') as f:
for line in f:
vocab_list.append(line.strip())
vocab_dict = OrderedDict(Counter(vocab_list))
feature_vocab = vocab(vocab_dict)
default_index = 0
feature_vocab.set_default_index(default_index)
return [feature_name, feature_vocab]
def do_vocab_index(self, example):
for feature_name in example.keys():
if feature_name in self.string_feature_names_set:
if len(self.sequence_sep_dict[feature_name]) == 0:
example[feature_name] = torch.tensor(
self.vocabs_dict[feature_name](example[feature_name]))
else:
example[feature_name] = torch.tensor(
[self.vocabs_dict[feature_name](e.split(self.sequence_sep_dict[feature_name]))
for e in example[feature_name]])
return example
if __name__ == '__main__':
schema = ["col1", "col2", "col3", "col4", "col5",
"col6", "col7", "col8", "label", "label2"]
data_type = ["float", "float", "float", "string", "float",
"string", "string", "string", "float", "float"]
sequence_sep = ["", "", "", "", "", "#", "", "", "", ""]
vocab_path = '../data/example/vocabulary/'
VP = VocabProcess(schema=schema,
data_type=data_type,
vocab_path=vocab_path,
sequence_sep=sequence_sep)
# print(VP.do_vocab_index({"col4": ["a", "b", 'c'], "col6": ["g#d", "d#a", 'g#k']}))
from torch.utils.data import DataLoader
from read_data_from_csv_iterable import CustomCsvIterableDataset
file_path = '../data/example/train/train_data'
raw_dataset = CustomCsvIterableDataset(file_path=file_path,
schema=schema,
data_type=data_type,
field_delim=",")
dataset = DataLoader(raw_dataset, batch_size=5)
for x, y in dataset:
for k, v in x.items():
print(k, v)
print('-' * 80)
for k, v in VP.do_vocab_index(x).items():
print(k, v)
print('-' * 80)
print(y)
break
# col1 tensor([ 2., 2., 10., 10., 2.], dtype=torch.float64)
# col2 tensor([ 3., 3., 13., 13., 3.], dtype=torch.float64)
# col3 tensor([ 5., 5., 16., 16., 5.], dtype=torch.float64)
# col4 ['a', 'a', 'b', 'b', 'a']
# col5 tensor([3., 3., 9., 9., 3.], dtype=torch.float64)
# col6 ['r#w#k', 'r#w#k', 'g#d#a', 'g#d#a', 'r#w#k']
# col7 ['f', 'f', 'e', 'e', 'f']
# col8 ['e', 'e', 'w', 'w', 'e']
# --------------------------------------------------------------------------------
# col1 tensor([ 2., 2., 10., 10., 2.], dtype=torch.float64)
# col2 tensor([ 3., 3., 13., 13., 3.], dtype=torch.float64)
# col3 tensor([ 5., 5., 16., 16., 5.], dtype=torch.float64)
# col4 tensor([1, 1, 2, 2, 1])
# col5 tensor([3., 3., 9., 9., 3.], dtype=torch.float64)
# col6 tensor([[1, 2, 3],
# [1, 2, 3],
# [4, 5, 6],
# [4, 5, 6],
# [1, 2, 3]])
# col7 tensor([0, 0, 1, 1, 0])
# col8 tensor([1, 1, 2, 2, 1])
# --------------------------------------------------------------------------------
# tensor([0., 0., 1., 1., 0.], dtype=torch.float64)
公开数据集
在 torchvision.datasets
中有很多公开的数据集
- 一些案例
- torchvision.datasets.FashionMNIST()
# Setup training data
train_data = datasets.FashionMNIST(
root="data", # where to download data to?
train=True, # get training data
download=True, # download data if it doesn't exist on disk
transform=ToTensor(), # images come as PIL format, we want to turn into Torch tensors
target_transform=None # you can transform labels as well
)
模型
构建
所有模型基本上就是一个数据流动,进去X然后出来Y
以下为最基础的模型
- nn.Module 模式
# Create a Linear Regression model class
class LinearRegressionModel(nn.Module): # <- almost everything in PyTorch is a nn.Module (think of this as neural network lego blocks)
def __init__(self):
super().__init__()
self.weights = nn.Parameter(torch.randn(1, # <- start with random weights (this will get adjusted as the model learns)
dtype=torch.float), # <- PyTorch loves float32 by default
requires_grad=True) # <- can we update this value with gradient descent?)
self.bias = nn.Parameter(torch.randn(1, # <- start with random bias (this will get adjusted as the model learns)
dtype=torch.float), # <- PyTorch loves float32 by default
requires_grad=True) # <- can we update this value with gradient descent?))
# Forward defines the computation in the model
def forward(self, x: torch.Tensor) -> torch.Tensor: # <- "x" is the input data (e.g. training/testing features)
return self.weights * x + self.bias # <- this is the linear regression formula (y = m*x + b)
- Sequence 模式
model = nn.Sequential(
nn.Linear(in_features=2, out_features=5),
nn.Linear(in_features=5, out_features=1)
).to(device)
构建模型的几个基本组件,各种模型使用以下组件基本均能完成搭建
- torch.nn
- nn.Module
- nn.Parameter
- forward()
- torch.optim
常用组件
- nn.Linear(in_features=2, out_features=5, bias=True)
访问模型参数
model = LinearRegressionModel()
# 用于参数更新
model.parameters()
# 用于事后查看、保存等
model.state_dict()
state_dict is simply a Python dictionary object that maps each layer to its parameter tensor.
训练
训练即参数更新的过程,需要依赖
- Loss function (误差还有多大)
- Mean Absolute Error (MAE) for regression problems | torch.nn.L1Loss()
- Mean squared error (MSE) or L2 Loss for regression problems | torch.nn.MSELoss()
- Cross Entropy for multiclass classification problems | torch.nn.CrossEntropyLoss()
- Binary Cross Entropy for binary classification problems | torch.nn.BCELoss()
- torch.nn.BCEWithLogitsLoss() - This is the same as above except it has a sigmoid layer (nn.Sigmoid) built-in
- 文末有更细致使用介绍 ...
- Optimizer (如何更新参数)
- Stochastic gradient descent | torch.optim.SGD()
- Adam optimizer | torch.optim.Adam()
- ...
训练准备
# Create the loss function
loss_fn = nn.L1Loss() # MAE loss is same as L1Loss
# Create the optimizer
optimizer = torch.optim.SGD(params=model.parameters(), # parameters of target model to optimize
lr=0.01) # learning rate (how much the optimizer should change parameters at each step, higher=more (less stable), lower=less (might take a long time))
训练的基础流程
- Forward pass
- Calculate the loss
- Zero gradients
- Perform backpropagation on the loss
- Step the optimizer (gradient descent)
for epoch in range(epochs):
# 训练模式
model.train()
# 推断
y_pred = model(X_train)
# 误差计算
loss = loss_fun(y_pred , y_true)
# 避免每阶段累积
optimizer.zero_grad()
# 反向传播
loss.backward()
# 参数更新
optimizer.step()
测试的基本流程
for epoch in range(epochs):
# 此处放训练代码 #
# ===以下为测试代码===
# 设置为评估模式
model.eval()
with torch.inference_mode():
# 预测
test_pred = model(X_test)
# 计算误差 , 有时候需要把两个的数据类型转化为一致
test_loss = loss_fn(test_pred,y_test)
# 可以增加更多自定义评估计算逻辑
# 打印相关信息
if epoch % 10 == 0:
epoch_count.append(epoch)
train_loss_values.append(loss)
test_loss_values.append(test_loss)
可以考虑将数据和模型放到GPU上,如果可用的话
# 判断是否可以使用GPU
torch.cuda.is_available()
# 设置设备类型
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
# 可以以下代码看当前模型是否在GPU上
next(model.parameters()).device
# 数据也放到GPU上
X_train = X_train.to(device)
X_test = X_test.to(device)
y_train = y_train.to(device)
y_test = y_test.to(device)
推断
简单的推断(与测试阶段一致), 考虑在什么设备上做推断
# 1. Set the model in evaluation mode
model.eval()
# 2. Setup the inference mode context manager
with torch.inference_mode():
# 3. Make sure the calculations are done with the model and data on the same device
# in our case, we haven't setup device-agnostic code yet so our data and model are
# on the CPU by default.
# model.to(device)
# X_test = X_test.to(device)
y_preds = model(X_test)
y_preds
保存
保存模型有两种模式
- pickle 模式
- torch.save
- torch.load
- state_dict 模式(推荐)
- model.state_dict() & torch.save(obj, f)
- torch.nn.Module.load_state_dict
下面展开介绍 state_dict 保存模式
以下只保存了模型的参数,你并没有保存具体的模型
from pathlib import Path
# 1. Create models directory
MODEL_PATH = Path("models")
MODEL_PATH.mkdir(parents=True, exist_ok=True)
# 2. Create model save path
MODEL_NAME = "example_model.pth"
MODEL_SAVE_PATH = MODEL_PATH / MODEL_NAME
# 3. Save the model state dict
print(f"Saving model to: {MODEL_SAVE_PATH}")
# only saving the state_dict() only saves the models learned parameters
torch.save(obj=model.state_dict(), f=MODEL_SAVE_PATH)
加载模型
# 重新初始化一个模型,其中参数是随机的
loaded_model = LinearRegressionModel()
# 加载训练保存的模型参数,覆盖当前初始化的模型参数
loaded_model.load_state_dict(torch.load(f=MODEL_SAVE_PATH))
使用模型和之前的推断模块写的一样
# 设置为 evaluation 模式
loaded_model.eval()
# 推断模式做预测
with torch.inference_mode():
loaded_model_preds = loaded_model_0(X_test)
# 也可以考虑把模型放在GPU上做推断(数据也需要在GPU)
部署
最简单的可以考虑使用 FastAPI 来部署模型
扩展阅读
一些常见的分类问题
- Binary classification(二分类,例如垃圾邮件)
- Multi-class classification(多分类,例如物体分类)
- Multi-label classification (多标签,例如文章可以有哪些标签)
一些常见的视觉模型
- 二分类(是猫还是狗)
- 多分类(是猫、是狗还是鸡)
- 物体检测
- 语义分割
一些实用函数
- torch.eq() calculates where two tensors are equal
- correct = torch.eq(y_true, y_pred).sum().item()
- nn.ReLU() non-linearity (activation function)
- nn.Flatten() Flatten the sample
损失函数使用
回归: L1Loss
# 或者使用 L2Loss
y_pred = model(X_train)
loss = loss_fn(y_pred, y_train) # out_features = 1 (1,)
二分类: BCEWithLogitsLoss
loss_fn = nn.BCEWithLogitsLoss()
y_logits = model(X_train).squeeze() # out_features = 1 (1,)
loss = loss_fn(y_logits, y_train) # BCEWithLogitsLoss calculates loss using logits
# y_pred = torch.round(torch.sigmoid(y_logits)) # logits -> prediction probabilities -> prediction labels
多分类: CrossEntropyLoss
loss_fn = nn.CrossEntropyLoss()
y_logits = model(X_train) # model outputs raw logits / output_features = NUM_CLASSES
y_pred = torch.softmax(y_logits, dim=1).argmax(dim=1) # go from logits -> prediction probabilities -> prediction labels
loss = loss_fn(y_logits, y_train)
模拟数据
二分类
# Make and plot data
from sklearn.datasets import make_circles
n_samples = 1000
X, y = make_circles(n_samples=1000,
noise=0.03,
random_state=42,
)
多分类
from sklearn.datasets import make_blobs
# Set the hyperparameters for data creation
NUM_CLASSES = 4
NUM_FEATURES = 2
RANDOM_SEED = 42
# Create multi-class data
X_blob, y_blob = make_blobs(n_samples=1000,
n_features=NUM_FEATURES, # X features
centers=NUM_CLASSES, # y labels
cluster_std=1.5, # give the clusters a little shake up (try changing this to 1.0, the default)
random_state=RANDOM_SEED
)