Source code for tinychain.ml.nn

""":class:`NeuralNet` and :class:`Layer` :class:`Model` definitions with common implementations"""

import inspect
import logging

from ..collection.tensor import einsum, Dense, Tensor
from ..context import Context
from ..decorators import differentiable, post, reflect
from ..error import NotImplemented
from ..math.operator import derivative_of, gradients, Dual, Gradients
from ..generic import Map, Tuple
from ..reflect import method
from ..reflect.functions import parse_args
from ..scalar.number import Float
from ..scalar.ref import deref, form_of, get_ref, is_ref, After
from ..service import model_uri, Dynamic, Model
from ..uri import URI

from .constants import NS, VERSION
from .interface import Differentiable, Gradient
from .variable import namespace, Variable


NAME = "nn"


# TODO: move into the reflect.method module and rename
[docs]class ReflectedMethod(method.Post): def __init__(self, header, name, graph, sig, rtype): if tuple(sig.parameters)[0] != "self": raise TypeError(f"not a method signature: {tuple(sig.parameters.items())}") self.name = name self.header = header self.graph = graph self.sig = sig self.rtype = rtype def __call__(self, *args, **kwargs): from ..scalar.ref import Post params = parse_args(self.sig[1:], *args, **kwargs) return self.rtype(form=Post(self.subject(), params)) def __form__(self): return self.graph
[docs]class Layer(Model, Differentiable): """A :class:`Layer` in a :class:`NeuralNet`""" __uri__ = model_uri(NS, NAME, VERSION, "Layer") @reflect def gradient(self, inputs: Tensor, loss: Tensor) -> Map[Gradient]: if self.eval is Layer.eval: # if this is an abstract class, don't try to reflect over the eval method return NotImplemented("Layer.gradient") sig = inspect.signature(Linear.gradient.form) if is_ref(self.eval): form = deref(self.eval) else: form = form_of(self.eval) var_names = {var: name for name, var in namespace(self).items()} grads = gradients(form[-1], loss) [loss] = [grad for var, grad in grads.items() if var not in var_names] grads = {var_names[var]: grad for var, grad in grads.items() if var in var_names} grads["inputs"] = loss cxt = Context(form) cxt.gradient = grads return ReflectedMethod(self, "gradient", cxt, sig, Map[Gradient])
[docs]class ConvLayer(Layer, Dynamic):
[docs] @classmethod def create(cls, inputs_shape, filter_shape, stride=1, padding=1, activation=None, optimal_std=None): """ Create a new, empty :class:`ConvLayer` with the given shape and activation function. Args: `inputs_shape`: size of inputs `[c_i, h_i, w_i]` where `c_i`: number of channels, `h_i`: channel height, 'w_i': channel width; `filter_shape`: size of filter `[h_f, w_f, out_c]` where `out_c`: number of output channels, `h_f`: kernel height, 'w_f`: kernel width; `activation`: activation function """ c_i, h_i, w_i = inputs_shape out_c, h_f, w_f = filter_shape input_size = c_i * h_i * w_i output_size = out_c * h_f * w_f if callable(optimal_std): std = optimal_std(input_size, output_size) elif optimal_std: std = optimal_std else: std = (input_size * output_size)**0.5 weights = Variable.random_normal([out_c, c_i, h_f, w_f], mean=0.0, std=std) bias = Variable.random_normal([out_c, 1], mean=0.0, std=std) return cls(weights, bias, inputs_shape, filter_shape, stride, padding, activation)
def __init__(self, weights, bias, inputs_shape, filter_shape, stride, padding, activation): if not padding or padding < 0: raise ValueError(f"invalid padding for ConvLayer: {padding}") if not isinstance(weights, Variable): logging.warning(f"ConvLayer with weights {weights} will not be trainable") if not isinstance(bias, Variable): logging.warning(f"ConvLayer with bias {bias} will not be trainable") # compile-time constants self._inputs_shape = inputs_shape self._filter_shape = filter_shape self._stride = stride self._padding = padding self._activation = activation # TODO: require a differentiable Function, not a callable Python literal # run-time state self.weights = weights self.bias = bias Dynamic.__init__(self) @differentiable def eval(self, cxt, inputs: Tensor) -> Tensor: batch_size = inputs.shape[0] inputs = Tensor[Float].with_shape([batch_size] + self._inputs_shape)(form=inputs) padding = self._padding stride = self._stride c_i, h_i, w_i = self._inputs_shape out_c, h_f, w_f = self._filter_shape h_out = int(((h_i - h_f) + (2 * padding)) / (stride + 1)) w_out = int(((w_i - w_f) + (2 * padding)) / (stride + 1)) assert h_out assert w_out pad_matrix = Dense.zeros([batch_size, c_i, h_i + padding * 2, w_i + padding * 2]) pad_matrix = Tensor(After( pad_matrix[:, :, padding:(padding + h_i), padding:(padding + w_i)].write(inputs), pad_matrix)) shape = [c_i * h_f * w_f, batch_size] im2col_matrix = [] for i in range(h_out): for j in range(w_out): im2col = pad_matrix[:, :, i:i + h_f, j:j + w_f].reshape(shape) im2col_matrix.append(im2col) assert im2col_matrix im2col_matrix = Dense.concatenate(im2col_matrix, 0) cxt.im2col_matrix = im2col_matrix.reshape([batch_size * h_out * w_out, c_i * h_f * w_f]) cxt.im2col_matrix_T = cxt.im2col_matrix.transpose() cxt.w_col = self.weights.reshape([out_c, c_i * h_f * w_f]) class Convolution(Dual): def __repr__(self): return f"Convolution({self.subject}, {self.args})" def forward(self): return cxt.w_col @ cxt.im2col_matrix_T def backward(self, variable=None): w_col = derivative_of(cxt.w_col, variable, keepdims=True) im2col_matrix = derivative_of(cxt.im2col_matrix_T, variable, keepdims=True) return (w_col @ cxt.im2col_matrix_T) + (cxt.w_col @ im2col_matrix) def gradients(self, loss): grads = Gradients() grads[self.subject] = (loss @ cxt.im2col_matrix).reshape(self.subject.shape) loss = (cxt.w_col.transpose() @ loss) # TODO: there should not be any loss of precision in this step loss = loss.reshape([batch_size, c_i, None]).sum(-1).expand_dims().expand_dims() grads[self.args] = Dense.zeros([batch_size, c_i, h_i, w_i]) grad_slice = grads[self.args][:, :, padding:(h_i - padding), padding:(w_i - padding)] grads[self.args] = Tensor(After(grad_slice.write(loss), grads[self.args])) return grads shape = [out_c, h_out, w_out, batch_size] cxt.activation = Tensor(Convolution(self.weights, inputs)) + self.bias cxt.output = cxt.activation.reshape(shape).transpose([3, 0, 1, 2]) # shape = [batch_size, out_c, h_out, w_out] if self._activation: return self._activation(cxt.output) else: return cxt.output
[docs]class Linear(Layer, Dynamic):
[docs] @classmethod def create(cls, input_size, output_size, activation=None, optimal_std=None): if callable(optimal_std): std = optimal_std(input_size, output_size) elif optimal_std: std = optimal_std else: std = (input_size * output_size)**0.5 weights = Variable.random_normal([input_size, output_size], std=std) bias = Variable.random_normal([output_size], std=std) return cls(weights, bias, activation)
def __init__(self, weights, bias, activation=None): # compile-time constants self._activation = activation # TODO: require a differentiable Function, not a callable Python literal # run-time state self.weights = weights self.bias = bias Dynamic.__init__(self) @differentiable def eval(self, cxt, inputs: Tensor) -> Tensor: batch_size = inputs.shape[0] inputs = Tensor[Float].with_shape([batch_size, self.weights.shape[0]])(form=inputs) cxt.activation = inputs @ self.weights cxt.with_bias = cxt.activation + self.bias return self._activation(cxt.with_bias) if self._activation else cxt.with_bias
[docs]class NeuralNet(Model, Differentiable): """A neural network""" __uri__ = model_uri(NS, NAME, VERSION, "NeuralNet")
[docs]class Sequential(NeuralNet, Dynamic): """A sequence of :class:`Layer` s""" def __init__(self, layers): if not layers: raise ValueError("Sequential requires at least one layer") self.layers = layers Dynamic.__init__(self) @differentiable def eval(self, inputs: Tensor) -> Tensor: state = self.layers[0].eval(inputs) for i in range(1, len(self.layers)): state = self.layers[i].eval(state) return state @post def gradient(self, cxt, inputs: Tensor, loss: Tensor) -> Map[Gradient]: layer_inputs = [inputs] for layer in self.layers[:-1]: layer_inputs.append(layer.eval(layer_inputs[-1])) cxt.layer_inputs = layer_inputs layer_grads = [] for i, (inputs, layer) in reversed(list(enumerate(zip(cxt.layer_inputs, self.layers)))): # TODO: this call to get_ref should not be necessary layer = get_ref(layer, URI(self, "layers", i)) layer_grad = layer.gradient(inputs, loss) layer_grads.append(layer_grad) loss = layer_grad["inputs"] # TODO: should this handle other layer eval signatures automatically? cxt.layer_grads = list(reversed(layer_grads)) grads = {} for i, layer_grad in enumerate(cxt.layer_grads): for name in namespace(self.layers[i]): grads[f"layers.{i}.{name}"] = layer_grad[name] grads["inputs"] = loss return grads
[docs]class DNN(Sequential):
[docs] @classmethod def create(cls, schema): """ Create a new :class:`Sequential` neural net of :class:`Linear` layers. `schema` should be a list of 2- or 3-tuples of the form `(input_size, output_size, activation)` (the arguments to `Linear.create`). """ layers = Tuple([Linear.create(*layer_schema) for layer_schema in schema]) return cls(layers)