用numpy实现pytorch式的深度学习框架similartorch

https://github.com/kaszperro/slick-dnnhttps://github.com/kaszperro/slick-dnn

项目的git地址:https://github.com/leeguandong/SimilarWork

按照torch的风格写的基于numpy的深度学习框架,其实无论是tensorflow还是pytorch,框架的原理都是相近的,只不过说静态图和动态图的设计上有差异,但是反向传播这些都是相似的。我参考pytorch在similartorch中添加了autograd,nn,utils和tensor这四个主要的部分,其中autograd主要是自动微分,实现反向传播的,但是不像静态图,要先在graph上检索operation(session),再结合优化器计算反向传播的梯度,nn中主要是functional和modules,modules中会定义关于Module的容器,它允许以不同的方式来建立model,比如sequential的方式,在functional中是关于modules中class的函数形式,当然定义class时一定要实现forward和backward方法,两者皆在自动微分时自动计算,tensor是所有进入框架的数据的载体,它可以和numpy直接切换,默认是不计算梯度的,utils时主要是定义data的迭代器,迭代器包括dataset和dataloader两种。

Tensor的定义,tensor是数据的载体,是深度学习中核心的数据结构,核心是backward属性,tensor的定义本身还是比较复杂的,属性相对较多。

import numpy as np
from typing import Typefrom .nn import Add, Subtract, Multiply, Divide, Power, Positive, Negative, MatMul, SwapAxes
from .autograd import Autogradclass Tensor(object):def __init__(self, data: np.array, requires_grad=False):self.data = dataself.requires_grad = requires_gradself.grad = Noneif requires_grad:self.grad = np.zeros_like(self.data, dtype=np.float32)self.backward_function = Noneself.backward_tensor = []self.shape = self.data.shapedef backward(self, grad=np.array([1])):if self.requires_grad:self.grad = grad + self.gradsum_ax = tuple(range(len(self.grad.shape) - len(self.data.shape)))self.grad = np.sum(self.grad, sum_ax)if self.backward_function is not None:accumulated = self.backward_function(grad)if len(self.backward_tensor) == 1:accumulated = accumulated,for bv, ac in zip(self.backward_tensor, accumulated):bv.backward(ac)@classmethoddef _op(cls, Op: Type[Autograd], *input_vars):f = Op()return f(*input_vars)def __str__(self):return "\n" + self.data.__str__()def __add__(self, other):from .nn import Addreturn self._op(Add, self, other)def __radd__(self, other):return self._op(Add, other, self)def __sub__(self, other):return self._op(Subtract, self, other)def __rsub__(self, other):return self._op(Subtract, other, self)def __matmul__(self, other):return self._op(MatMul, self, other)def __rmatmul__(self, other):return self._op(MatMul, other, self)def __mul__(self, other):return self._op(Multiply, self, other)def __rmul__(self, other):return self._op(Multiply, other, self)def __copy__(self):"""复制当前Tensor的grad,data,requires_grad,如果当前的Tensor没有梯度,则梯度为None:return:"""copy = Tensor(np.copy(self.data), requires_grad=self.requires_grad)try:copy.grad[:] = self.grad[:]except:passreturn copydef copy(self):return self.__copy__()def numpy(self):return self.data.copy()def __len__(self):return len(self.data)@propertydef size(self):return self.data.size@propertydef ndim(self):return self.data.ndim@propertydef shape(self):return self.data.shape@propertydef T(self):passdef swapaxes(self, axis1, axis2):return SwapAxes(axis1, axis2)(self)

nn模块,在similartorch中主要由两部分组成,第一部分是modules模块,第二部分是functional,就是对modules里面的类做了函数上的封装。在modules中包括activation,容器sequential,conv,flatten,img2col,init,linear,loss,pooling以及基类Modules,similartorch.ones等基础函数和add,matmul这种基础算子。

mathematical:定义了常用的一些计算的函数,比如add,mul这些常用的,继承自Autograd,其实可以再做了operation的类继承autograd的,实现了forward和backward方法。其实numpy中也有这些方法,但是重新用框架定义之后,再写上backward方法之后,可以再构造模型时使用重新包装了backward的框架定义的方法,在反向传播时,可以链接上math方法的导数,这样写的话就比较类似tf1时对算子层的深度解耦,你可以自己拼接出想要的函数,也可以在nn中把想要的函数定义好,直接写backward方法,这样的话算子的粒度更粗了,并不是特备灵活。

import numpy as np
from similartorch.autograd import Autogradclass Add(Autograd):def forward(self, ctx, x, y):return x + ydef backward(self, ctx, grad):return grad, gradclass Subtract(Autograd):def forward(self, ctx, x, y):return x - ydef backward(self, ctx, grad):return grad, -gradclass MatMul(Autograd):def forward(self, ctx, x, y):ctx.save_for_back(x, y)return x @ ydef backward(self, ctx, grad: np.array):t1, t2 = ctx.data_for_backgrad1 = grad @ np.swapaxes(t2, -1, -2)grad2 = np.swapaxes(t1, -1, -2) @ gradreturn grad1, grad2class Multiply(Autograd):def forward(self, ctx, x, y):ctx.save_for_back(x, y)return x * ydef backward(self, ctx, grad: np.array):t1, t2 = ctx.data_for_backreturn grad * t2, grad * t1class Assign(Autograd):def forward(self, ctx, x):return xdef backward(self, ctx, grad):return Noneclass Divide(Autograd):def forward(self, ctx, x, y):ctx.save_for_back(x, y)return x / ydef backward(self, ctx, grad):t1, t2 = ctx.data_for_backgrad1 = grad / t2grad2 = -grad1 * (t1 / t2)return grad1, grad2class Negative(Autograd):def forward(self, ctx, x):return -xdef backward(self, ctx, grad):return -gradclass Positive(Autograd):def forward(self, ctx, x):return np.positive(x)def backward(self, ctx, grad):return np.positive(grad)class Power(Autograd):def forward(self, ctx, x, y):ctx.save_for_back(x, y)return x ** ydef backward(self, ctx, grad):t1, t2 = ctx.data_for_backgrad1 = grad * t2 * (t1 ** np.where(t2, (t2 - 1), 1))grad2 = grad * (t1 ** t2) * np.log(np.where(t1, t1, 1))return grad1, grad2# --------------------------------------------------------------------------------
class Exp(Autograd):def forward(self, ctx, x):ctx.save_for_back(x)return np.exp(x)def backward(self, ctx, grad):t1, _ = ctx.data_for_backreturn grad * np.exp(t1)class Log(Autograd):def forward(self, ctx, x):return np.log(x)def backward(self, ctx, grad):t1, _ = ctx.data_for_backreturn grad / t1

activation:类和函数这块的定义和上面的mathematical有所不同,mathematical提供的一些方法大都还是tensor的方法,在框架中所有的数据都是Tensor,因此可以直接利用的属性方法,activation和module中和functional中的方法是对应的,在pytorch中functional中的函数提供给类做forward的方法,但是在similartorch中主要还是实现了类中的forward方法,functional本身只是类方法的实例化。

import numpy as np
from similartorch.autograd import Autogradclass ReLU(Autograd):def forward(self, ctx, x):ctx.save_for_back(x)return np.clip(x, a_min=0, a_max=None)def backward(self, ctx, grad):t, = ctx.data_for_backreturn np.where(t < 0, 0, grad)class Sigmoid(Autograd):def forward(self, ctx, x):sig = 1 / (1 + np.exp(-x))ctx.save_for_back(sig)return sigdef backward(self, ctx, grad):sig, = ctx.data_for_backreturn sig * (1 - sig) * gradclass Softmax(Autograd):def forward(self, ctx, x):softm = np.exp(x) / np.sum(np.exp(x), axis=-1, keepdims=True)ctx.save_for_back(softm)return softmdef backward(self, ctx, grad):softm, = ctx.data_for_backreturn grad * softm * (1 - softm)class Softplus(Autograd):def forward(self, ctx, x):ctx.save_for_back(1 + np.exp(-x))return np.log(1 + np.exp(-x))def backward(self, ctx, grad):softp, = ctx.data_for_backreturn grad / softpclass Softsign(Autograd):def forward(self, ctx, x):ctx.save_for_back(1 + np.abs(x))return x / (1 + np.abs(x))def backward(self, ctx, grad):softs, = ctx.data_for_backreturn grad / softsclass ArcTan(Autograd):def forward(self, ctx, x):ctx.save_for_back(x)return np.arctan(x)def backward(self, ctx, grad):t, = ctx.data_for_backreturn grad / (t * t + 1)class Tanh(Autograd):def forward(self, ctx, x):tanh = np.tanh(x)ctx.save_for_back(tanh)return tanhdef backward(self, ctx, grad):tanh, = ctx.data_for_backreturn (1 - tanh * tanh) * grad

loss模块

import numpy as np
from similartorch.autograd import Autogradclass MSELoss(Autograd):def forward(self, ctx, target, input):if target.shape != input.shape:raise ValueError("wrong shape")ctx.save_for_back(target, input)return ((target - input) ** 2).mean()def backward(self, ctx, grad):target, input = ctx.data_for_backbatch = target.shape[0]grad1 = grad * 2 * (target - input) / batchgrad2 = grad * 2 * (input - target) / batchreturn grad1, grad2class CrossEntropyLoss(Autograd):def forward(self, ctx, target, input):ctx.save_for_back(target, input)input = np.clip(input, 1e-15, 1 - 1e-15)return -target * np.log(input) - (1 - target) * np.log(1 - input)def backward(self, ctx, grad):target, input = ctx.data_for_backbatch = target.shape[0]input = np.clip(input, 1e-15, 1 - 1e-15)grad1 = grad * (np.log(1 - input) - np.log(input)) / batchgrad2 = grad * (- target / input + (1 - target) / (1 - input)) / batchreturn grad1, grad2

pooling模块,pooling这块的backward很简单,maxpool的话,就用mask记住max最大的位置,average的话就全部赋均值即可。

import numpy as npfrom abc import ABC
from similartorch.autograd import Autograd, Context
from .img2col import Img2Colclass BasePool(Autograd, ABC):def __init__(self, kernel_size, stride=1):if isinstance(kernel_size, int):kernel_size = (kernel_size, kernel_size)if isinstance(stride, int):stride = (stride, stride)self.kernel_size = kernel_sizeself.stride = stride@staticmethoddef _fill_col(to_fill, new_shape):repeats = new_shape[-2]ret = np.repeat(to_fill, repeats, -2)ret = np.reshape(ret, new_shape)return retclass MaxPool2d(BasePool):def forward(self, ctx: Context, input):img_w = input.shape[-1]img_h = input.shape[-2]channels = input.shape[-3]new_w = (img_w - self.kernel_size[0]) // self.stride[0] + 1new_h = (img_h - self.kernel_size[1]) // self.stride[1] + 1img_out = Img2Col.img2col_forward(self.kernel_size, self.stride, False, input)maxed = np.max(img_out, -2)ctx.save_for_back(img_out, input.shape, maxed.shape)return np.reshape(maxed, (-1, channels, new_h, new_w))def backward(self, ctx: Context, grad: np.array = None):"""切成一小块算max,计算完毕之后再把shape转回去"""reshaped_image, back_shape, maxed_shape = ctx.data_for_backgrad = np.reshape(grad, maxed_shape)mask = (reshaped_image == np.max(reshaped_image, -2, keepdims=True))new_grad = self._fill_col(grad, reshaped_image.shape)new_grad = np.where(mask, new_grad, 0)return Img2Col.img2col_backward(self.kernel_size, self.stride, back_shape, new_grad)class AvgPool2d(BasePool):def forward(self, ctx: Context, input):img_w = input.shape[-1]img_h = input.shape[-2]channels = input.shape[-3]new_w = (img_w - self.kernel_size[0]) // self.stride[0] + 1new_h = (img_h - self.kernel_size[1]) // self.stride[1] + 1img_out = Img2Col.img2col_forward(self.kernel_size, self.stride, False, input)averaged = np.average(img_out, -2)ctx.save_for_back(img_out, input.shape, averaged.shape)return np.reshape(averaged, (-1, channels, new_h, new_w))def backward(self, ctx, grad):reshaped_image, back_shape, averaged_shape = ctx.data_for_backgrad = np.reshape(grad, averaged_shape)new_grad = self._fill_col(grad, reshaped_image.shape) / (self.kernel_size[0] * self.kernel_size[1])return Img2Col.img2col_backward(self.kernel_size, self.stride, back_shape, new_grad)

conv层以及其继承的基类module,module没有backward方法,继承自module的linear,sequential,conv都没有backward方法,实际上这些属于高阶的算子,反向传播往往可以由低阶算子拼出来,因此对于这些操作没有给出其backward的形式。

import math
import numpy as np
import similartorch
from similartorch import Tensor
from .img2col import Img2Col
from .module import Module
from . import initclass Conv2d(Module):def __init__(self, in_channels, out_channels, kernel_size, stride, padding=0, add_bias=True):super(Conv2d, self).__init__()if isinstance(kernel_size, int):kernel_size = (kernel_size, kernel_size)if isinstance(stride, int):stride = (stride, stride)if isinstance(padding, int):padding = (padding, padding)self.in_channels = in_channelsself.out_channels = out_channelsself.kernel_size = kernel_sizeself.padding = paddingself.stride = strideself.add_bias = add_biasself.weight = similartorch.rands([0, 0.05, (self.out_channels, self.in_channels,self.kernel_size[0], self.kernel_size[1])], requires_grad=True)if add_bias:self.bias = similartorch.zeros(out_channels, np.float32, requires_grad=True)self.register_parameter(("weight", self.weight), ("bias", self.bias))else:self.register_parameter(("weight", self.weight))self.img2col = Img2Col(self.kernel_size, self.stride)def reset_parameters(self):init.kaiming_uniform_(self.weight, a=math.sqrt(5))if self.bias is not None:fan_in, _ = init._calculate_fan_in_and_fan_out(self.weight)bound = 1 / math.sqrt(fan_in)init.uniform_(self.bias, -bound, bound)def forward(self, input: Tensor) -> Tensor:img2col = self.img2col(input)output = self.weight.reshape(self.weight.shape[0], -1) @ img2colimg_w = input.shape[-1]img_h = input.shape[-2]new_w = (img_w - self.kernel_size[0]) // self.stride[0] + 1new_h = (img_h - self.kernel_size[1]) // self.stride[1] + 1batch_input = len(input.shape) == 4if batch_input:output_shape = (input.shape[0], self.out_channels, new_h, new_w)else:output_shape = (self.out_channels, new_h, new_w)if self.add_bias:output = (output.swapaxes(-1, 2) + self.bias).swapaxes(-1, -2)return output.reshape(*output_shape)import numpy as npfrom abc import ABC, abstractmethod
from collections import OrderedDictfrom similartorch.tensor import Tensorclass Module(ABC):def __init__(self):self._parameters = OrderedDict([])def register_parameter(self, *var_iterable):for var_name, var in var_iterable:self._parameters.update({var_name: var})def parameters(self) -> list:return list(self._parameters.values())def get_state_dict(self) -> OrderedDict:return self._parametersdef load_state_dict(self, state_dict: OrderedDict):for k, val in state_dict.items():self._parameters[k].data = np.array(val)@abstractmethoddef forward(self, *input) -> Tensor:raise NotImplementedErrordef __call__(self, *input) -> Tensor:return self.forward(*input)

autograd自动微分模块

from abc import ABC, abstractmethodfrom similartorch.tensor import Tensorclass Context:def __init__(self):self.data_for_back = Nonedef save_for_back(self, *data):self.data_for_back = tuple(data)class Autograd(ABC):def apply(self, *tensor_list):ctx = Context()forward_tensor = self.forward(ctx, *map(lambda v: v.data, tensor_list))output_tensor = Tensor(forward_tensor, requires_grad=False)output_tensor.backward_function = lambda x: self.backward(ctx, x)output_tensor.backward_tensor = list(tensor_list)return output_tensor@abstractmethoddef forward(self, ctx, *tensor_list):raise NotImplementedError@abstractmethoddef backward(self, ctx, grad):raise NotImplementedErrordef __call__(self, *tensor_list):return self.apply(*tensor_list)

优化器模块

from abc import ABC, abstractmethodclass Optimizer(ABC):def __init__(self, param_list: list):self.param_list = param_listself.state = {}def zero_grad(self):for param in self.param_list:param.grad.fill(0)@abstractmethoddef step(self):raise NotImplementedErrorimport numpy as np
from .optimizer import Optimizerclass Adam(Optimizer):def __init__(self, param_list: list, learning_rate=0.01, beta1=0.9, beta2=0.999, epsilon=1e-8):super(Adam, self).__init__(param_list)self.lr = learning_rateself.beta1 = beta1self.beta2 = beta2self.eps = epsilon@staticmethoddef initialize_state(state, param):state["step"] = 0state["m"] = np.zeros(param.grad.shape)state["v"] = np.zeros(param.grad.shape)def step(self):for param in self.param_list:if param.grad is None:continueif param not in self.state:self.state[param] = {}state = self.state[param]if len(state) == 0:self.initialize_state(state, param)state["step"] += 1state["m"] = self.beta1 * state["m"] + (1 - self.beta1) * param.gradstate["v"] = self.beta2 * state["v"] + (1 - self.beta2) * param.gradm_hat = state["m"] / (1 - self.beta1 ** state["step"])v_hat = state["v"] / (1 - self.beta2 ** state["step"])param.data -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)import numpy as np
from .optimizer import Optimizerclass SGD(Optimizer):def __init__(self, param_list: list, learning_rate=0.01, momentum=0., decay=0.):super(SGD, self).__init__(param_list)self.lr = learning_rateself.decay = decayself.momentum = momentum@staticmethoddef initialize_state(state, param):state["v"] = np.zeros_like(param.grad)def step(self):for param in self.param_list:if param.grad is None:continueif param not in self.state:self.state[param] = {}state = self.state[param]if len(state) == 0:self.initialize_state(state, param)state["v"] = self.momentum * state["v"] - self.lr * param.gradparam.data += state["v"]self.lr = self.lr / (1 + self.decay)

utils模块主要写了数据加载的一些函数,dataset和dataloader模块,dataloader主要就是实现了__iter__和__next__迭代器,一些具体的数据的数据加载类包括MNIST等。

整体看下来,做一个类torch的深度学习小框架,纯numpy实现,自动微分模块autograd,定义了核心的forward和backward,是个抽象基类,主要是定义接口的,backward通过backward_function来实现,这层其实是通过算子的backward方法去实现的,算子大多继承自Autograd,均实现了forward和backward方法,其次是modules模块,在nn中,这是构建模型所需的函数和类,optim是优化器,utils中是数据加载的方法。


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部