Source code for eagpytorch._base_ExactGP

#!/usr/bin/env python3

__all__ = [
    "ExactGPModel",
    "RunExactGP"
]

from random import seed

from gpytorch.distributions import MultivariateNormal
from gpytorch.means import ConstantMean
from gpytorch.mlls import ExactMarginalLogLikelihood
from gpytorch.models import ExactGP
from numpy.random import seed as nseed
from numpy import (
    mean,
    ndarray
)
from torch import (
    manual_seed,
    no_grad
)
from torch.optim import (
    Adadelta,
    Adagrad,
    Adam,
    RMSprop,
    SGD
)

from ._const import (
    # likelihoods
    gaussianlikelihood,
    # mlls
    exactmarginalloglikelihood,
    # optimizers
    adadelta,
    adagrad,
    adam,
    rmsprop,
    sgd
)
from ._likelihoods import GaussianLikelihood
from .utils import (
    array_to_tensor,
    check_device,
    load_model,
    plot_kernel,
    save_model,
    set_kernel,
    _predict_obj,
    _sample_f
)


[docs]class ExactGPModel(ExactGP): """ExactGP用のモデル定義クラス Parameters ---------- train_x : torch.tensor 学習用データセットの説明変数 train_y : torch.tensor 学習用データセットの目的変数 likelihood : :obj:`gpytorch.likelihoods` likelihoodのインスタンス ex_var_dim : int ARDカーネルのパラメータの個数。説明変数の数に対応する。 `ex_var_dim=None` を指定するとRBFカーネルになる。 kernel : str or :obj:`gpytorch.kernels` 使用するカーネル関数を指定する 基本はstrで指定されることを想定しているものの、自作のカーネル関数を入力することも可能 **ker_conf : dict カーネル関数に渡す設定 """ def __init__(self, train_x, train_y, likelihood, ex_var_dim, kernel, **ker_conf): super(ExactGPModel, self).__init__(train_x, train_y, likelihood) self.mean_module = ConstantMean() _ker_conf = {'ard_num_dims': ex_var_dim} _ker_conf.update(ker_conf) self.covar_module = set_kernel(kernel, **_ker_conf)
[docs] def forward(self, x): """ExactGPModelのforwardメソッド """ mean_x = self.mean_module(x) covar_x = self.covar_module(x) return MultivariateNormal(mean_x, covar_x)
[docs]class RunExactGP(object): """ExactGPModelの実行クラス Parameters ---------- kernel : str or :obj:`gpytorch.kernels`, default :obj:`'RBFKernel'` 使用するカーネル関数を指定する。下記から選択する。 - :obj:`'CosineKernel'` - :obj:`'LinearKernel'` - :obj:`'MaternKernel'` - :obj:`'PeriodicKernel'` - :obj:`'RBFKernel'` - :obj:`'RQKernel'` - :obj:`'SpectralMixtureKernel'` 基本はstrで指定されることを想定しているものの、 :obj:`gpytorch.kernels` を用いた自作のカーネル関数を入力することも可能 likelihood : str, default :obj:`'GaussianLikelihood'` likelihoodとして使用するクラス名が指定される。 - :obj:`'GaussianLikelihood'`, :obj:`'GL'` : likelihoodにガウス分布を仮定したガウス過程を行う場合 optimizer : str, default :obj:`'Adam'` optimizerとして使用するクラス名が指定される。下記から選択する。 - :obj:`'Adam'` - :obj:`'sgd'` - :obj:`'RMSprop'` - :obj:`'Adadelta'` - :obj:`'Adagrad'` mll : str, default :obj:`'ExactMarginalLogLikelihood'` 確率分布の周辺化の方法のクラス名が指定される。 - :obj:`'ExactMarginalLogLikelihood'` ard_option : bool, default True ARDカーネルを利用するかが指定される もし :obj:`kernel_coeff` を利用する場合 `ard_option=True` を選択する ker_conf : dict, default dict() カーネル関数に渡す設定 mll_conf : dict, default dict() mllに渡す設定 opt_conf : dict, default dict() optimizerに渡す設定 random_state : int, default None seedの固定 """ def __init__(self, *, kernel='RBFKernel', likelihood='GaussianLikelihood', optimizer='Adam', mll='ExactMarginalLogLikelihood', ard_option=True, ker_conf=dict(), opt_conf=dict(), mll_conf=dict(), random_state=None): if isinstance(random_state, int): seed(random_state) nseed(random_state) manual_seed(random_state) self.device = check_device() self._kernel = kernel self._likelihood = likelihood self._optimizer = optimizer self._mll = mll self.ard_option = ard_option self.epoch = 0 self.model = None # 空のmodelを作成しないとloadできない self.mll = None # 空のmodelを作成しないとloadできない self.optimizer = None # 空のmodelを作成しないとloadできない self._ker_conf = ker_conf self._opt_conf = opt_conf self._mll_conf = mll_conf self.loss = [] def _set_likelihood(self): """likelihoodとしてself._likelihoodの指示の元、インスタンスを立てるメソッド """ if self._likelihood in gaussianlikelihood: return GaussianLikelihood().to(self.device) else: raise ValueError(f'likelihood={self._likelihood}は用意されていません') def _set_mll(self, mll_conf): """mllとしてself._mllの指示の元、インスタンスを立てるメソッド """ # mllのインスタンスを立てる if self._mll in exactmarginalloglikelihood: return ExactMarginalLogLikelihood( self.likelihood, self.model ) else: raise ValueError(f'mll={self._mll}は用意されていません') def _set_optimizer(self, lr, opt_conf): """optimizerとしてself._optimizerの指示の元、インスタンスを立てるメソッド """ if self._optimizer in adam: return Adam([ {'params': self.model.parameters()} ], lr=lr, **opt_conf) elif self._optimizer in sgd: return SGD([ {'params': self.model.parameters()} ], lr=lr, **opt_conf) elif self._optimizer in rmsprop: return RMSprop([ {'params': self.model.parameters()} ], lr=lr, **opt_conf) elif self._optimizer in adadelta: return Adadelta([ {'params': self.model.parameters()} ], lr=lr, **opt_conf) elif self._optimizer in adagrad: return Adagrad([ {'params': self.model.parameters()} ], lr=lr, **opt_conf) else: raise ValueError(f'optimizer={self._optimizer}は用意されていません')
[docs] def set_model(self, train_x, train_y, *, lr=1e-3): """使用するモデルのインスタンスを立てるメソッド Parameters ---------- train_x : np.array or torch.tensor 学習用データセットの説明変数 train_y : np.array or torch.tensor 学習用データセットの目的変数 lr : float 学習率 """ if type(train_x) == ndarray: train_x = (array_to_tensor(train_x),) train_x = tuple(tri.unsqueeze(-1) if tri.ndimension() == 1 else tri for tri in train_x)[0] if type(train_y) == ndarray: train_y = array_to_tensor(train_y) ard_option = self.ard_option ex_var_dim = train_x.shape[1] kernel = self._kernel ker_conf = self._ker_conf # likelihoodのインスタンスを立てる self.likelihood = self._set_likelihood() # ここで上記モデルのインスタンスを立てる if ard_option: self.model = ExactGPModel( train_x, train_y, self.likelihood, ex_var_dim=ex_var_dim, kernel=kernel, **ker_conf ).to(self.device) else: self.model = ExactGPModel( train_x, train_y, self.likelihood, ex_var_dim=None, kernel=kernel, **ker_conf ).to(self.device) # mllのインスタンスを立てる self.mll = self._set_mll(self._mll_conf) # optimizerのインスタンスを立てる self.optimizer = self._set_optimizer(lr, opt_conf=self._opt_conf)
[docs] def fit(self, epochs, *, test_x=None, test_y=None, verbose=True): """学習用メソッド Parameters ---------- epochs : int エポック数 test_x : np.ndarray or torch.tensor, default None 説明変数のテストデータ test_y : np.ndarray or torch.tensor, default None 目的変数のテストデータ もし test_x, test_y を設定している場合エポックごとにテストデータに対するlossも表示されるように設定される verbose : bool, default True 表示形式 """ if type(test_x) == ndarray: test_x = (array_to_tensor(test_x),) test_x = tuple(tri.unsqueeze(-1) if tri.ndimension() == 1 else tri for tri in test_x)[0] if type(test_y) == ndarray: test_y = array_to_tensor(test_y) for epoch in range(epochs): train_loss = [] test_loss = [] self.model.train() self.likelihood.train() self.optimizer.zero_grad() output = self.model(self.model.train_inputs[0]) loss = - self.mll(output, self.model.train_targets) loss.backward() self.optimizer.step() self.loss.append(loss.item()) train_loss.append(loss.item()) self.model.eval() self.likelihood.eval() if test_x is not None and test_y is not None: with no_grad(): output = self.model(test_x) loss = - self.mll(output, test_y) test_loss.append(loss.item()) is_display_timing = epoch % (epochs // 10) == 0 if epochs >= 10 else True if is_display_timing or epoch == epochs - 1 and verbose: if test_loss: print(f'Epoch {epoch + 1}/{epochs}' + f' - Train Loss: {mean(train_loss):.5f} /' + f' Test Loss: {mean(test_loss):.5f}') else: print(f'Epoch {epoch + 1}/{epochs}' + f' - Train Loss: {mean(train_loss):.5f}') # TODO: 追加学習のために再学習の際、self.epochを利用する形にする self.epoch = epoch + 1
[docs] def predict(self, X, *, cl=0.6827, sample_num=None, sample_f_num=None): """予測用メソッド Parameters ---------- X : np.array or torch.tensor 入力説明変数 cl : float default 0.6827(1sigma) 信頼区間 sample_num : int default None yのサンプル数 sample_f_num : int default None fのサンプル数 Returns ------- output : object 予測された目的変数のオブジェクト。下記の属性が用意されている。 - output.mean : 予測された目的変数の平均値 - output.upper : 予測された目的変数の信頼区間の上限 - output.lower : 予測された目的変数の信頼区間の下限 - output.samples : 入力説明変数に対する予測値yのサンプル(sample_num個サンプルされる) - output.samples_f : 入力説明変数に対する予測関数fのサンプル(sample_f_num個サンプルされる) """ if type(X) == ndarray: X = array_to_tensor(X) if not 0.5 < cl < 1.0: raise ValueError(f'cl={cl}が正しい値ではありません') self.model.eval() self.likelihood.eval() with no_grad(): predicts = self.likelihood(self.model(X)) if self._likelihood in {'GaussianLikelihood', 'GL'}: predicts_f = self.model(X) else: predicts_f = None output = _predict_obj(predicts, cl, sample_num) output.samples_f = _sample_f(predicts_f, sample_f_num) return output
[docs] def save(self, file_path): """モデルのsaveメソッド Parameters ---------- file_path : str モデルの保存先のパスとファイル名 """ data = dict( epoch=self.epoch, model=self.model, likelihood=self.likelihood, mll=self.mll, optimizer=self.optimizer, loss=self.loss ) save_model(file_path, **data)
[docs] def load(self, file_path): """モデルのloadメソッド Parameters ---------- file_path : str モデルの保存先のパスとファイル名 """ data = dict( epoch=self.epoch, model=self.model, likelihood=self.likelihood, mll=self.mll, optimizer=self.optimizer, loss=self.loss ) (self.epoch, self.model, self.likelihood, self.mll, self.optimizer, self.loss) = load_model(file_path, **data)
[docs] def kernel_coeff(self): """kernelの係数を出力するメソッド Returns ------- output_dict : dict カーネル関数の係数 `ard_option=True` の場合、 $\Theta$ が各々の説明変数ごとに重みを変えて更新され、出力される Warning -------- RBFKernelの場合、各説明変数の重要度 $\eta$ は出力される `'base_kernel.raw_lengthscale'` の逆数の2乗に対応する """ # TODO: kernel関数をスイッチさせ、それに応じてわかりやすい形に変形する output_dict = self.model.covar_module.state_dict() return output_dict
[docs] def plot_kernel(self, *, kernel=None, plot_range=None, **kwargs): """カーネル関数のプロット Parameters ---------- kernel : str or :obj:`gpytorch.kernels`, default None 使用するカーネル関数を指定する plot_range : tuple, default None プロットする幅 **kwargs : dict カーネル関数に渡す設定 """ if kernel is None: if kwargs: temp_kernel = set_kernel(self._kernel, **self._ker_conf) else: temp_kernel = set_kernel(self._kernel, **kwargs) else: if kwargs: temp_kernel = set_kernel(kernel, **self._ker_conf) else: temp_kernel = set_kernel(kernel, **kwargs) plot_kernel(temp_kernel, plot_range, **kwargs)