Source code for tinychain.ml.optimizer

from .. import error
from ..collection.tensor import Dense, Tensor
from ..decorators import post
from ..math.operator import constant, derivative_of, is_constant
from ..ml.variable import namespace
from ..scalar.number import Float, F32, F64, UInt
from ..scalar.ref import form_of, After, If
from ..service import model_uri, Dynamic, Model

from .constants import NS, VERSION


NAME = "optimizer"


[docs]class Optimizer(Model): """An optimizer for a :class:`Differentiable` :class:`Model`""" __uri__ = model_uri(NS, NAME, VERSION, "Optimizer") @post def train(self, i, inputs): return error.NotImplemented(f"{self.__class__.__name__}.train")
[docs]class GradientDescent(Optimizer, Dynamic): """A simple gradient descent optimizer with a configurable learning rate.""" def __init__(self, ml_model, cost, learning_rate=0.001): # compile-time constants self._cost = cost self._lr = learning_rate # run-time state self.ml_model = ml_model Dynamic.__init__(self) @post def train(self, cxt, i: UInt, inputs: Tensor) -> Tensor: outputs = self.ml_model.eval(inputs) d_loss = derivative_of(self._cost(inputs, outputs)) cxt.d_loss = constant(d_loss.copy() if isinstance(d_loss, Tensor) else d_loss) assert is_constant(cxt.d_loss) cxt.grads = self.ml_model.gradient(inputs, cxt.d_loss) writes = [] for name, var in namespace(self.ml_model).items(): grad = cxt.grads[name] # TODO: replace `shape.len()` with `ndim` delta = Float(If(grad.shape.len() > 0, Tensor(grad).sum(), grad)) writes.append(var.update(self._lr * delta)) return writes
[docs]class Adam(Optimizer, Dynamic): """ Adam optimizer, an adaptive learning rate optimization algorithm designed to handle sparse gradients and noisy data. Based on "Adam: A Method for Stochastic Optimization" by Kingma & Ba, 2014: https://arxiv.org/abs/1412.6980 """ def __init__(self, ml_model, cost, beta1=0.9, beta2=0.999, learning_rate=0.001, eps=1e-8): # compile-time constants self._cost = cost # run-time state self.ml_model = ml_model self.beta1 = F32(beta1) self.beta2 = F32(beta2) self.lr = F32(learning_rate) self.eps = F64(eps) self.m = {} self.v = {} for name, var in namespace(ml_model).items(): shape = form_of(var.shape) if not isinstance(shape, (list, tuple)): raise ValueError(f"the shape of Variable {name} must be defined at compile time (found {shape})") self.m[name] = Dense.constant(shape, 0) self.v[name] = Dense.constant(shape, 0) Dynamic.__init__(self) @post def train(self, cxt, i: UInt, inputs: Tensor) -> Tensor: assert set(self.m) == set(self.v) trainable = namespace(self.ml_model) outputs = self.ml_model.eval(inputs) d_loss = derivative_of(self._cost(inputs, outputs)) cxt.d_loss = constant(d_loss.copy() if isinstance(d_loss, Tensor) else d_loss) assert is_constant(cxt.d_loss) grads = self.ml_model.gradient(inputs, cxt.d_loss) cxt.grads = { name: Float(If(grads[name].shape.len() > 0, Tensor(grads[name]).sum(), grads[name])) for name in self.m } cxt.update_m = {name: self.m[name] * self.beta1 * cxt.grads[name] * (1. - self.beta1) for name in self.m} cxt.update_v = {name: self.v[name] * self.beta2 + cxt.grads[name]**2 * (1. - self.beta2) for name in self.v} cxt.a = self.lr * (1. - self.beta2**i)**0.5 / (1 - self.beta1**i) cxt.update_model = {name: self.m[name] / (self.v[name]**0.5 + self.eps) * cxt.a for name in self.m} updates = After([ [self.m[name].write(cxt.update_m[name]) for name in self.m], [self.v[name].write(cxt.update_v[name]) for name in self.v], ], [trainable[name].update(cxt.update_model[name]) for name in self.m]) return updates