PyTorch 梳理

背景

目前 OpenAI、Tesla 等公司均使用 PyTorch 进行模型开发和线上部署,看起来 PyTorch 在工程领域的使用也日渐成熟了。之前一直使用的是Tensorflow , 近期准备重构一个推荐项目,准备用 PyTorch 全部重写,借这次机会将 PyTorch 的知识做一个梳理

Tips : 过程使用 Google Colab (With GPU)

目录

  • 基础
    • 常用 import
    • Tensor 构建
    • tensor 操作
  • 数据
    • Numpy
    • 数据加载
    • 自定义数据集
    • 公开数据集
  • 模型
    • 构建
    • 训练
    • 推断
    • 保存
    • 部署

基础

常用 import

import torch
from torch import nn
import torch.nn.functional as F

Tensor 构建

基础创建 :

torch.tensor(7)
torch.tensor([[1., -1.], [1., -1.]])
# reference: https://pytorch.org/docs/stable/torch.html#tensor-creation-ops 
# more:  sparse_coo_tensor , sparse_csr_tensor ...

常用创建:

zeros = torch.zeros(size=(3, 4)) # size: can be a variable number of arguments or a collection like a list or tuple.
ones = torch.ones(size=(3, 4))
random_tensor = torch.rand(size=(3, 4))
zero_to_ten = torch.arange(start=0, end=10, step=1)
torch.rand(4)

指定数据类型

torch.zeros([2, 4], dtype=torch.int32)
# torch.float , torch.double , torch.int ...
# reference: https://pytorch.org/docs/stable/tensors.html#data-types

修改数据类型

x = torch.arange(0, 100, 10)
x_float = x.type(torch.float)

获取Tensor信息:

# Create a tensor
some_tensor = torch.rand(3, 4)

# Find out details about it
print(some_tensor)
print(f"Shape of tensor: {some_tensor.shape}")
print(f"Datatype of tensor: {some_tensor.dtype}")
print(f"Device tensor is stored on: {some_tensor.device}") # will default to CPU

自动微分

torch.randn(1, dtype=torch.float,requires_grad=True)
# 记录梯度,更新参数

Tensor 操作

基础数学运算

tensor = torch.tensor([1, 2, 3])
tensor + 10
tensor - 10
tensor * 10
tensor / 10
tensor * tensor

矩阵运算 ( matrix multiplication is all you need. )

tensor = torch.tensor([1, 2, 3])
torch.matmul(tensor, tensor)

tensor1 = torch.rand(2,4)
tensor2 = torch.rand(4,5)
tensor1.matmul(tensor2)
torch.matmul(tensor1, tensor2) 

最大最小等操作

x = torch.arange(0, 100, 10)

print(f"Minimum: {x.min()}")
print(f"Maximum: {x.max()}")
# print(f"Mean: {x.mean()}") # this will error
print(f"Mean: {x.type(torch.float32).mean()}") # won't work without float datatype
print(f"Sum: {x.sum()}")

# Returns index of max and min values
print(f"Index where max value occurs: {x.argmax()}")
print(f"Index where min value occurs: {x.argmin()}")

变换操作

x = torch.arange(1., 9.) # torch.Size([8])

# reshape
print(x.reshape(1,8))
print(x.reshape(8,1))
print(x.reshape(2,4))

# stack
torch.stack([x,x],dim=0) # torch.Size([2, 8])
torch.stack([x,x],dim=1) # torch.Size([8, 2])

# squeeze
x.reshape(1,8,1).squeeze() # torch.Size([8])

# unsqueeze

x.unsqueeze(dim=1) # torch.Size([8, 1])

# permute
x_original = torch.rand(size=(224, 224, 3))
x_permuted = x_original.permute(2, 0, 1) # shifts axis 0->1, 1->2, 2->0  /  torch.Size([3, 224, 224])

索引 (Same as Numpy)

x = torch.tensor([[1, 2, 3], [4, 5, 6]])
x[1][2]
x[1][:2]
x[0][1] = 8
# reference: https://pytorch.org/docs/stable/torch.html#indexing-slicing-joining

获取Tensor数值

x = torch.tensor([[1, 2, 3], [4, 5, 6]])
x[0][0].item()
x.numpy()

随机种子

import torch
import random

# # Set the random seed
RANDOM_SEED=42

torch.manual_seed(seed=RANDOM_SEED) 
random_tensor_A = torch.rand(3, 4)

# 每次生成前都需要设置一下
torch.manual_seed(seed=RANDOM_SEED) 
random_tensor_B = torch.rand(3, 4)

GPU

import torch
# 判断是否可以使用GPU
torch.cuda.is_available()

# 设置设备类型
device = "cuda" if torch.cuda.is_available() else "cpu"

# 将 Tensor 放到GPU 上
tensor = torch.tensor([1, 2, 3])
print(tensor, tensor.device)
tensor_on_gpu = tensor.to(device)
tensor_on_gpu

# 从GPU移动到CPU
tensor_on_gpu.cpu()

数据

Numpy

从 numpy 中获取数据


from sklearn.datasets import make_circles
# Make 1000 samples 
n_samples = 1000
# Create circles
X, y = make_circles(n_samples,noise=0.03, random_state=42) 

import torch
X = torch.from_numpy(X).type(torch.float)
y = torch.from_numpy(y).type(torch.float)

数据加载

DataLoader 将 Dataset 转化为 Python的 iterable 对象 ,每次返回一个Batch

torch.utils.data.DataLoader

DataLoader 使用

from torch.utils.data import DataLoader

# Setup the batch size hyperparameter
BATCH_SIZE = 32

# Turn datasets into iterables (batches)
train_dataloader = DataLoader(train_data, # dataset to turn into iterable
    batch_size=BATCH_SIZE, # how many samples per batch? 
    shuffle=True # shuffle data every epoch?
)


train_features_batch, train_labels_batch = next(iter(train_dataloader))

自定义数据集

读取批量csv文件(可以是整体内存加载不下的)

# -*- coding: utf-8 -*-

import os
import numpy as np
from torch.utils.data import IterableDataset
from torch.utils.data import DataLoader
from itertools import cycle
import glob

# ref https://pytorch.org/tutorials/beginner/basics/data_tutorial.html


class CustomCsvIterableDataset(IterableDataset):
    def __init__(self, file_path, schema, data_type, field_delim=",", repeat=False):
        self.file_path = file_path
        self.field_delim = field_delim
        self.schema = schema
        self.repeat = repeat
        self.feature_names = [f for f in schema if not f.startswith("label")]
        self.feature_names_set = set(self.feature_names)
        self.dtype_dict = dict(zip(schema, data_type))

    def _parse_file(self, file_path):
        def process_line(line):
            line_split = line.strip().split(self.field_delim)
            features = {}
            label = None
            for data, feature_name in zip(line_split, self.schema):
                if feature_name == 'label':
                    data = self._auto_convert_dtype(data, feature_name)
                    label = data
                if feature_name in self.feature_names_set:
                    data = self._auto_convert_dtype(data, feature_name)
                    features[feature_name] = data
            return features, label

        if os.path.isfile(file_path):
            with open(file_path, 'r') as f:
                yield from (process_line(line) for line in f)

        if os.path.isdir(file_path):
            files = glob.glob(os.path.join(file_path, '*'))
            np.random.shuffle(files)
            for file in files:
                with open(file, 'r') as f:
                    yield from (process_line(line) for line in f)

    def _auto_convert_dtype(self, data, feature_name):
        if self.dtype_dict[feature_name] == 'int':
            return int(data)
        elif self.dtype_dict[feature_name] in ('float', 'double'):
            return float(data)
        else:
            return data

    def __iter__(self):
        if not self.repeat:
            return self._parse_file(self.file_path)
        else:
            # like what we do in tensorflow [dataset.repeat(None)  # repeat indefinitely.]
            return cycle(self._parse_file(self.file_path))


if __name__ == '__main__':
    file_path = '../data/example/train/train_data'
    # file_path = '../data/example/train'
    schema = ["col1", "col2", "col3", "col4", "col5",
              "col6", "col7", "col8", "label", "label2"]
    data_type = ["float", "float", "float", "string", "float",
                 "string", "string", "string", "float", "float"]

    raw_dataset = CustomCsvIterableDataset(
        file_path=file_path, schema=schema, data_type=data_type, field_delim=",")

    dataset = DataLoader(raw_dataset, batch_size=5)

    for x, y in dataset:
        for k, v in x.items():
            print(k, v)
        print('-' * 80)
        print(y)
        break


# col1 tensor([ 2.,  2., 10., 10.,  2.], dtype=torch.float64)
# col2 tensor([ 3.,  3., 13., 13.,  3.], dtype=torch.float64)
# col3 tensor([ 5.,  5., 16., 16.,  5.], dtype=torch.float64)
# col4 ['a', 'a', 'b', 'b', 'a']
# col5 tensor([3., 3., 9., 9., 3.], dtype=torch.float64)
# col6 ['r#w#k', 'r#w#k', 'g#d#a', 'g#d#a', 'r#w#k']
# col7 ['f', 'f', 'e', 'e', 'f']
# col8 ['e', 'e', 'w', 'w', 'e']
# --------------------------------------------------------------------------------
# tensor([0., 0., 1., 1., 0.], dtype=torch.float64)

做一些预处理

# -*- coding: utf-8 -*-

import os
from torchtext.vocab import vocab
from collections import Counter, OrderedDict
import torch


class VocabProcess:
    def __init__(self, schema, data_type, vocab_path, sequence_sep):
        self.schema = schema
        self.string_feature_names = [f for f, t in zip(
            schema, data_type) if t == 'string']
        self.string_feature_names_set = set(self.string_feature_names)
        self.vocab_path = vocab_path
        self.sequence_sep_dict = dict(zip(schema, sequence_sep))
        self.vocabs_dict = dict()
        self._build_vocabs()

    def _build_vocabs(self):
        for feature_name in self.string_feature_names:
            feature_name, feature_vocab = self._build_vocab_one(feature_name,
                                                                os.path.join(self.vocab_path, feature_name))
            self.vocabs_dict.update({feature_name: feature_vocab})

    def _build_vocab_one(self, feature_name, vocab_path):
        # ref : https://pytorch.org/text/stable/vocab.html
        vocab_list = [' ']
        with open(vocab_path, 'r') as f:
            for line in f:
                vocab_list.append(line.strip())
        vocab_dict = OrderedDict(Counter(vocab_list))
        feature_vocab = vocab(vocab_dict)
        default_index = 0
        feature_vocab.set_default_index(default_index)
        return [feature_name, feature_vocab]

    def do_vocab_index(self, example):
        for feature_name in example.keys():
            if feature_name in self.string_feature_names_set:
                if len(self.sequence_sep_dict[feature_name]) == 0:
                    example[feature_name] = torch.tensor(
                        self.vocabs_dict[feature_name](example[feature_name]))
                else:
                    example[feature_name] = torch.tensor(
                        [self.vocabs_dict[feature_name](e.split(self.sequence_sep_dict[feature_name]))
                         for e in example[feature_name]])
        return example


if __name__ == '__main__':
    schema = ["col1", "col2", "col3", "col4", "col5",
              "col6", "col7", "col8", "label", "label2"]
    data_type = ["float", "float", "float", "string", "float",
                 "string", "string", "string", "float", "float"]
    sequence_sep = ["", "", "", "", "", "#", "", "", "", ""]
    vocab_path = '../data/example/vocabulary/'

    VP = VocabProcess(schema=schema,
                      data_type=data_type,
                      vocab_path=vocab_path,
                      sequence_sep=sequence_sep)

    # print(VP.do_vocab_index({"col4": ["a", "b", 'c'], "col6": ["g#d", "d#a", 'g#k']}))

    from torch.utils.data import DataLoader
    from read_data_from_csv_iterable import CustomCsvIterableDataset

    file_path = '../data/example/train/train_data'

    raw_dataset = CustomCsvIterableDataset(file_path=file_path,
                                           schema=schema,
                                           data_type=data_type,
                                           field_delim=",")

    dataset = DataLoader(raw_dataset, batch_size=5)

    for x, y in dataset:
        for k, v in x.items():
            print(k, v)
        print('-' * 80)
        for k, v in VP.do_vocab_index(x).items():
            print(k, v)
        print('-' * 80)
        print(y)
        break

# col1 tensor([ 2.,  2., 10., 10.,  2.], dtype=torch.float64)
# col2 tensor([ 3.,  3., 13., 13.,  3.], dtype=torch.float64)
# col3 tensor([ 5.,  5., 16., 16.,  5.], dtype=torch.float64)
# col4 ['a', 'a', 'b', 'b', 'a']
# col5 tensor([3., 3., 9., 9., 3.], dtype=torch.float64)
# col6 ['r#w#k', 'r#w#k', 'g#d#a', 'g#d#a', 'r#w#k']
# col7 ['f', 'f', 'e', 'e', 'f']
# col8 ['e', 'e', 'w', 'w', 'e']
# --------------------------------------------------------------------------------
# col1 tensor([ 2.,  2., 10., 10.,  2.], dtype=torch.float64)
# col2 tensor([ 3.,  3., 13., 13.,  3.], dtype=torch.float64)
# col3 tensor([ 5.,  5., 16., 16.,  5.], dtype=torch.float64)
# col4 tensor([1, 1, 2, 2, 1])
# col5 tensor([3., 3., 9., 9., 3.], dtype=torch.float64)
# col6 tensor([[1, 2, 3],
#         [1, 2, 3],
#         [4, 5, 6],
#         [4, 5, 6],
#         [1, 2, 3]])
# col7 tensor([0, 0, 1, 1, 0])
# col8 tensor([1, 1, 2, 2, 1])
# --------------------------------------------------------------------------------
# tensor([0., 0., 1., 1., 0.], dtype=torch.float64)

公开数据集

torchvision.datasets 中有很多公开的数据集

  • 一些案例
    • torchvision.datasets.FashionMNIST()
# Setup training data
train_data = datasets.FashionMNIST(
    root="data", # where to download data to?
    train=True, # get training data
    download=True, # download data if it doesn't exist on disk
    transform=ToTensor(), # images come as PIL format, we want to turn into Torch tensors
    target_transform=None # you can transform labels as well
)

模型

构建

所有模型基本上就是一个数据流动,进去X然后出来Y

以下为最基础的模型

  • nn.Module 模式
# Create a Linear Regression model class
class LinearRegressionModel(nn.Module): # <- almost everything in PyTorch is a nn.Module (think of this as neural network lego blocks)
    def __init__(self):
        super().__init__() 
        self.weights = nn.Parameter(torch.randn(1, # <- start with random weights (this will get adjusted as the model learns)
                                                dtype=torch.float), # <- PyTorch loves float32 by default
                                   requires_grad=True) # <- can we update this value with gradient descent?)

        self.bias = nn.Parameter(torch.randn(1, # <- start with random bias (this will get adjusted as the model learns)
                                            dtype=torch.float), # <- PyTorch loves float32 by default
                                requires_grad=True) # <- can we update this value with gradient descent?))

    # Forward defines the computation in the model
    def forward(self, x: torch.Tensor) -> torch.Tensor: # <- "x" is the input data (e.g. training/testing features)
        return self.weights * x + self.bias # <- this is the linear regression formula (y = m*x + b)

  • Sequence 模式
model = nn.Sequential(
    nn.Linear(in_features=2, out_features=5),
    nn.Linear(in_features=5, out_features=1)
).to(device)

构建模型的几个基本组件,各种模型使用以下组件基本均能完成搭建

  • torch.nn
  • nn.Module
  • nn.Parameter
  • forward()
  • torch.optim

常用组件

  • nn.Linear(in_features=2, out_features=5, bias=True)

访问模型参数

model = LinearRegressionModel()

# 用于参数更新
model.parameters()

# 用于事后查看、保存等
model.state_dict()

state_dict is simply a Python dictionary object that maps each layer to its parameter tensor.

训练

训练即参数更新的过程,需要依赖

  • Loss function (误差还有多大)
    • Mean Absolute Error (MAE) for regression problems | torch.nn.L1Loss()
    • Mean squared error (MSE) or L2 Loss for regression problems | torch.nn.MSELoss()
    • Cross Entropy for multiclass classification problems | torch.nn.CrossEntropyLoss()
    • Binary Cross Entropy for binary classification problems | torch.nn.BCELoss()
      • torch.nn.BCEWithLogitsLoss() - This is the same as above except it has a sigmoid layer (nn.Sigmoid) built-in
    • 文末有更细致使用介绍 ...
  • Optimizer (如何更新参数)
    • Stochastic gradient descent | torch.optim.SGD()
    • Adam optimizer | torch.optim.Adam()
    • ...

训练准备

# Create the loss function
loss_fn = nn.L1Loss() # MAE loss is same as L1Loss

# Create the optimizer
optimizer = torch.optim.SGD(params=model.parameters(), # parameters of target model to optimize
                            lr=0.01) # learning rate (how much the optimizer should change parameters at each step, higher=more (less stable), lower=less (might take a long time))

训练的基础流程

  1. Forward pass
  2. Calculate the loss
  3. Zero gradients
  4. Perform backpropagation on the loss
  5. Step the optimizer (gradient descent)
for epoch in range(epochs):
	# 训练模式
	model.train()
	# 推断
	y_pred = model(X_train)
	# 误差计算
	loss = loss_fun(y_pred , y_true)
	# 避免每阶段累积
	optimizer.zero_grad()
	# 反向传播
	loss.backward()
	# 参数更新
	optimizer.step()

测试的基本流程

for epoch in range(epochs):

	# 此处放训练代码 #

	# ===以下为测试代码===

	# 设置为评估模式
	model.eval()
	
	with torch.inference_mode():
		# 预测
		test_pred = model(X_test)
		# 计算误差 , 有时候需要把两个的数据类型转化为一致
		test_loss = loss_fn(test_pred,y_test)
		
		# 可以增加更多自定义评估计算逻辑

	# 打印相关信息
	if epoch % 10 == 0:
		epoch_count.append(epoch)
		train_loss_values.append(loss)
		test_loss_values.append(test_loss)

可以考虑将数据和模型放到GPU上,如果可用的话

# 判断是否可以使用GPU
torch.cuda.is_available()

# 设置设备类型
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)

# 可以以下代码看当前模型是否在GPU上
next(model.parameters()).device

# 数据也放到GPU上
X_train = X_train.to(device)
X_test = X_test.to(device)
y_train = y_train.to(device)
y_test = y_test.to(device)

推断

简单的推断(与测试阶段一致), 考虑在什么设备上做推断

# 1. Set the model in evaluation mode
model.eval()

# 2. Setup the inference mode context manager
with torch.inference_mode():
  # 3. Make sure the calculations are done with the model and data on the same device
  # in our case, we haven't setup device-agnostic code yet so our data and model are
  # on the CPU by default.
  # model.to(device)
  # X_test = X_test.to(device)
  y_preds = model(X_test)
y_preds

保存

保存模型有两种模式

  • pickle 模式
    • torch.save
    • torch.load
  • state_dict 模式(推荐)
    • model.state_dict() & torch.save(obj, f)
    • torch.nn.Module.load_state_dict

下面展开介绍 state_dict 保存模式

以下只保存了模型的参数,你并没有保存具体的模型

from pathlib import Path

# 1. Create models directory 
MODEL_PATH = Path("models")
MODEL_PATH.mkdir(parents=True, exist_ok=True)

# 2. Create model save path 
MODEL_NAME = "example_model.pth"
MODEL_SAVE_PATH = MODEL_PATH / MODEL_NAME

# 3. Save the model state dict 
print(f"Saving model to: {MODEL_SAVE_PATH}")
# only saving the state_dict() only saves the models learned parameters
torch.save(obj=model.state_dict(), f=MODEL_SAVE_PATH)

加载模型

# 重新初始化一个模型,其中参数是随机的
loaded_model = LinearRegressionModel()

# 加载训练保存的模型参数,覆盖当前初始化的模型参数
loaded_model.load_state_dict(torch.load(f=MODEL_SAVE_PATH))

使用模型和之前的推断模块写的一样

# 设置为 evaluation 模式
loaded_model.eval()

# 推断模式做预测
with torch.inference_mode():
    loaded_model_preds = loaded_model_0(X_test) 

# 也可以考虑把模型放在GPU上做推断(数据也需要在GPU)

部署

最简单的可以考虑使用 FastAPI 来部署模型

扩展阅读

一些常见的分类问题

  • Binary classification(二分类,例如垃圾邮件)
  • Multi-class classification(多分类,例如物体分类)
  • Multi-label classification (多标签,例如文章可以有哪些标签)

一些常见的视觉模型

  • 二分类(是猫还是狗)
  • 多分类(是猫、是狗还是鸡)
  • 物体检测
  • 语义分割

一些实用函数

  • torch.eq() calculates where two tensors are equal
    • correct = torch.eq(y_true, y_pred).sum().item()
  • nn.ReLU() non-linearity (activation function)
  • nn.Flatten() Flatten the sample

损失函数使用

回归: L1Loss

# 或者使用 L2Loss

y_pred = model(X_train)

loss = loss_fn(y_pred, y_train) # out_features = 1 (1,)

二分类: BCEWithLogitsLoss

loss_fn = nn.BCEWithLogitsLoss()

y_logits = model(X_train).squeeze() # out_features = 1 (1,)

loss = loss_fn(y_logits, y_train) # BCEWithLogitsLoss calculates loss using logits

# y_pred = torch.round(torch.sigmoid(y_logits)) # logits -> prediction probabilities -> prediction labels

多分类: CrossEntropyLoss

loss_fn = nn.CrossEntropyLoss()

y_logits = model(X_train) # model outputs raw logits / output_features = NUM_CLASSES
y_pred = torch.softmax(y_logits, dim=1).argmax(dim=1) # go from logits -> prediction probabilities -> prediction labels

loss = loss_fn(y_logits, y_train) 

模拟数据

二分类

# Make and plot data
from sklearn.datasets import make_circles

n_samples = 1000

X, y = make_circles(n_samples=1000,
    noise=0.03,
    random_state=42,
)

多分类

from sklearn.datasets import make_blobs

# Set the hyperparameters for data creation
NUM_CLASSES = 4
NUM_FEATURES = 2
RANDOM_SEED = 42

# Create multi-class data
X_blob, y_blob = make_blobs(n_samples=1000,
    n_features=NUM_FEATURES, # X features
    centers=NUM_CLASSES, # y labels 
    cluster_std=1.5, # give the clusters a little shake up (try changing this to 1.0, the default)
    random_state=RANDOM_SEED
)

参考文档