Source code for eagpytorch.utils

#!/usr/bin/env python3

__all__ = [
    "array_to_tensor",
    "check_device",
    "load_config",
    "load_data",
    "plot_kernel",
    "set_kernel",
    "tensor_to_array"
]

from attrdict import AttrDict
from os import makedirs
from os.path import (
    basename,
    isdir,
    join,
    splitext
)
from urllib.request import urlretrieve
import csv
import yaml

from gpytorch.distributions import MultivariateNormal
from gpytorch.kernels import (
    CosineKernel,
    LinearKernel,
    MaternKernel,
    PeriodicKernel,
    RBFKernel,
    RQKernel,
    ScaleKernel,
    SpectralMixtureKernel
)
from gpytorch import kernels
from pyro.distributions import Poisson, Bernoulli
import matplotlib.pyplot as plt
import numpy as np
import torch


csv.field_size_limit(1000000000)


[docs]def load_config(config_path): """config(yaml)ファイルを読み込む Parameters ---------- config_path : string config fileのパスを指定する Returns ------- config : attrdict.AttrDict configを読み込んでattrdictにしたもの """ with open(config_path, 'r', encoding='utf-8') as fi_: return AttrDict(yaml.load(fi_, Loader=yaml.SafeLoader))
[docs]def load_data(file_path): """csv or tsvを読み込む関数 Parameters ---------- config_path : string csv or tsv fileのパスを指定する Returns ------- header : list ヘッダーのみを取り出したlist data : list 対象のファイルのdata部分を取り出したlist """ header = [] data = [] with open(file_path, 'r', newline="\n") as fi_: root, ext = splitext(file_path) if ext == '.tsv': reader = csv.reader(fi_, delimiter='\t') elif ext == '.csv': reader = csv.reader(fi_, delimiter=',') header = next(reader) for d in reader: data.append([float(i) for i in d]) return header, data
[docs]def check_device(): """pytorchが認識しているデバイスを返す関数 Returns ------- device : str cudaを使用する場合 `'cuda'` 、cpuで計算する場合は `'cpu'` """ DEVICE = ('cuda' if torch.cuda.is_available() else 'cpu') return DEVICE
[docs]def array_to_tensor(input_data, device=None): """np.array -> torch.tensor変換関数 Parameters ---------- input_data : np.array 変換したいnp.array形式のデータ device : str, default None cudaを使用する場合 `'cuda'` 、cpuで計算する場合は `'cpu'` 指定しない場合はtorchが認識している環境が選ばれるため、特に意思がなければデフォルトで良いはず。 Returns ------- output_data : torch.tensor input_dataをtensor型に変換したもの """ if device is None: device = check_device() if input_data.dtype == float: # np.arrayはdoubleを前提として動いているが、 # torchはdouble(float64)を前提としていない機能があるため、float32に変更する必要がある output_data = torch.tensor(input_data, dtype=torch.float32).contiguous().to(device) elif input_data.dtype == int: # 同様でtorchではlongが標準 output_data = torch.tensor(input_data, dtype=torch.long).contiguous().to(device) return output_data
[docs]def tensor_to_array(input_data): """torch.tensor -> np.array変換関数 Parameters ---------- input_data : torch.tensor 変換したいtorch.tensor形式のデータ Returns ------- output_data : np.array input_dataをarray型に変換したもの """ # numpyがメモリを共有するのを防ぐために以下の処理となる output_data = input_data.to('cpu').detach().numpy().copy() return output_data
def data_downloader(): """the UC Irvine Machine Learning Repositoryからのdata downloader Examples -------- プロジェクトのホームディレクトリから:: $ python -c "from gp.utils.utils import data_downloader;data_downloader()" """ index = { 1: {'url': 'https://archive.ics.uci.edu/ml/machine-learning-databases/00409/Daily_Demand_Forecasting_Orders.csv', 'name': 'Daily Demand Forecasting Orders Data Set', 'page': 'https://archive.ics.uci.edu/ml/datasets/Daily+Demand+Forecasting+Orders'}, 2: {'url': 'https://archive.ics.uci.edu/ml/machine-learning-databases/00247/data_akbilgic.xlsx', 'name': 'ISTANBUL STOCK EXCHANGE Data Set', 'page': 'https://archive.ics.uci.edu/ml/datasets/ISTANBUL+STOCK+EXCHANGE'}, 3: {'url': 'https://archive.ics.uci.edu/ml/machine-learning-databases/00502/online_retail_II.xlsx', 'name': 'Online Retail II Data Set', 'page': 'https://archive.ics.uci.edu/ml/datasets/Online+Retail+II'}, 4: {'url': 'https://archive.ics.uci.edu/ml/machine-learning-databases/00501/PRSA2017_Data_20130301-20170228.zip', 'name': 'Beijing Multi-Site Air-Quality Data Data Set', 'page': 'https://archive.ics.uci.edu/ml/datasets/Beijing+Multi-Site+Air-Quality+Data'}, 5: {'url': 'https://archive.ics.uci.edu/ml/machine-learning-databases/00381/PRSA_data_2010.1.1-2014.12.31.csv', 'name': 'Beijing PM2.5 Data Data Set', 'page': 'https://archive.ics.uci.edu/ml/datasets/Beijing+PM2.5+Data'}, 6: {'url': 'https://archive.ics.uci.edu/ml/machine-learning-databases/00514/Bias_correction_ucl.csv', 'name': 'Bias correction of numerical prediction model temperature forecast Data Set', 'page': 'https://archive.ics.uci.edu/ml/datasets/Bias+correction+of+numerical+prediction+model+temperature+forecast'} } for k, v in index.items(): print(f"number: {k}") print(f"name: {v['name']}") print(f"page: {v['page']}") print(f"url: {v['url']}") print() inp = int(input('欲しいデータの番号を入力して(1~6): ')) if inp in index.keys(): directory = './data' if not isdir(directory): makedirs(directory) urlretrieve(index[inp]['url'], join(directory, basename(index[inp]['url']))) def save_model(file_path, *, epoch, model, likelihood, mll, optimizer, loss): """モデルの保存関数 Parameters ---------- file_path : str モデルの保存先のパスとファイル名 epoch : int 現在のエポック数 model : :obj:`gpytorch.models` 学習済みのモデルのオブジェクト likelihood : :obj:`gpytorch.likelihoods` 学習済みのlikelihoodsのオブジェクト mll : :obj:`gpytorch.mlls` 学習済みのmllsのオブジェクト optimizer : :obj:`torch.optim` 学習済みのoptimのオブジェクト loss : list 現在のエポックまでの経過loss """ torch.save({'epoch': epoch, 'model': model.state_dict(), 'likelihood': likelihood.state_dict(), 'mll': mll.state_dict(), 'optimizer': optimizer.state_dict(), 'loss': loss}, file_path) def load_model(file_path, *, epoch, model, likelihood, mll, optimizer, loss): """モデルの保存関数 Parameters ---------- file_path : str モデルの保存先のパスとファイル名 epoch : int 現在のエポック数 model : :obj:`gpytorch.models` 学習済みのモデルのオブジェクト likelihood : :obj:`gpytorch.likelihoods` 学習済みのlikelihoodsのオブジェクト mll : :obj:`gpytorch.mlls` 学習済みのmllsのオブジェクト optimizer : :obj:`torch.optim` 学習済みのoptimのオブジェクト loss : list 現在のエポックまでの経過loss Returns ------- epoch : int 現在のエポック数 model : :obj:`gpytorch.models` 学習済みのモデルのオブジェクト likelihood : :obj:`gpytorch.likelihoods` 学習済みのlikelihoodsのオブジェクト mll : :obj:`gpytorch.mlls` 学習済みのmllsのオブジェクト optimizer : :obj:`torch.optim` 学習済みのoptimのオブジェクト loss : list 現在のエポックまでの経過loss """ temp = torch.load(file_path) epoch = temp['epoch'] model.load_state_dict(temp['model']) likelihood.load_state_dict(temp['likelihood']) mll.load_state_dict(temp['mll']) optimizer.load_state_dict(temp['optimizer']) loss = temp['loss'] return epoch, model, likelihood, mll, optimizer, loss
[docs]def set_kernel(kernel, **kwargs): """kernelsを指定する Parameters ---------- kernel : str or :obj:`gpytorch.kernels` 使用するカーネル関数を指定する 基本はstrで指定されることを想定しているものの、自作のカーネル関数を入力することも可能 **kwargs : dict カーネル関数に渡す設定 Returns ------- out : :obj:`gpytorch.kernels` カーネル関数のインスタンス """ if isinstance(kernel, str): if kernel in {'CosineKernel'}: return ScaleKernel( CosineKernel(**kwargs) ) elif kernel in {'LinearKernel'}: return ScaleKernel( LinearKernel(**kwargs) ) elif kernel in {'MaternKernel'}: return ScaleKernel( MaternKernel(**kwargs) ) elif kernel in {'PeriodicKernel'}: return ScaleKernel( PeriodicKernel(**kwargs) ) elif kernel in {'RBFKernel'}: return ScaleKernel( RBFKernel(**kwargs) ) elif kernel in {'RQKernel'}: return ScaleKernel( RQKernel(**kwargs) ) elif kernel in {'SpectralMixtureKernel'}: # SpectralMixtureKernelはScaleKernelを使えない return SpectralMixtureKernel(**kwargs) else: raise ValueError elif kernels.__name__ in str(type(kernel)): return kernel
[docs]def plot_kernel(kernel, plot_range=None, **kwargs): """カーネルの概形をプロットする関数 Parameters ---------- kernel : str or :obj:`gpytorch.kernels` 使用するカーネル関数を指定する plot_range : tuple, default None プロットする幅 **kwargs : dict カーネル関数に渡す設定 """ if plot_range is None: plot_range = torch.linspace(-1.5, 1.5) elif isinstance(plot_range, tuple): if len(plot_range) == 2: plot_range = torch.linspace(plot_range[0], plot_range[1]) else: ValueError else: ValueError kernel = set_kernel(kernel, **kwargs) plt.plot(kernel(plot_range).numpy()[50]) plt.xticks( [i for i in np.linspace(0, 100, 5)], [f'{i:.2f}' for i in np.linspace( plot_range.min().item(), plot_range.max().item(), 5 )] ) plt.xlabel('x') plt.ylabel( f'kernel ({((plot_range.max()+plot_range.min())/2).item():.2f}, x)' ) plt.show()
def _predict_obj(input, cl=0.6827, sample_num=None): """predictメソッドで利用するオブジェクトを返す関数 Parameters ---------- input : object likelihoodsの返り値 cl : float default 0.6827(1sigma) 信頼区間[%] sample_num : int default None サンプル数 Returns ------- output : object 予測された目的変数のオブジェクト - output.mean : 予測された目的変数の平均値 - output.upper : 予測された目的変数の信頼区間の上限 - output.lower : 予測された目的変数の信頼区間の下限 - output.samples : 入力説明変数に対する予測サンプル(sample_num個サンプルされる) - output.probs : BernoulliLikelihood を指定した際に、2値分類の予測確率 このとき mean,upper,lower は output に追加されない """ class OutPut: pass output = OutPut if isinstance(input, MultivariateNormal): std = input.stddev mean = input.mean output.mean = tensor_to_array(mean) dist = torch.distributions.Normal(mean, std) output.upper = tensor_to_array(dist.icdf(torch.tensor([(1.+cl)/2.]))) output.lower = tensor_to_array(dist.icdf(torch.tensor([(1.-cl)/2.]))) if sample_num: output.samples = tensor_to_array( input.sample(torch.Size([sample_num])) ) else: output.samples = None elif isinstance(input, Poisson): output.mean = tensor_to_array(input.mean) percentiles = [(1.-cl)/2., (1.+cl)/2.] output.lower, output.upper = _percentiles_from_samples( input.sample(torch.Size([1000])), percentiles ) output.lower = tensor_to_array(output.lower) output.upper = tensor_to_array(output.upper) if sample_num: output.samples = tensor_to_array( input.sample(torch.Size([sample_num])) ) else: output.samples = None elif isinstance(input, Bernoulli): output.probs = tensor_to_array(input.probs) if sample_num: output.samples = tensor_to_array( input.sample(torch.Size([sample_num])) ) else: output.samples = None else: percentiles = [(1.-cl)/2., 0.5, (1.+cl)/2.] output.lower, output.mean, output.upper = _percentiles_from_samples( input.sample(torch.Size([1000])), percentiles ) output.lower = tensor_to_array(output.lower) output.mean = tensor_to_array(output.mean) output.upper = tensor_to_array(output.upper) if sample_num: output.samples = tensor_to_array( input.sample(torch.Size([sample_num])) ) else: output.samples = None return output def _percentiles_from_samples(samples, percentiles=[0.05, 0.5, 0.95]): """サンプルされたデータセットからパーセンタイル点を求め、関数形をスムージングする関数 Parameters ---------- samples : object likelihoodsの返り値 percentiles : list default [0.05, 0.5, 0.95] 知りたいパーセンタイル点 Returns ------- percentiles_from_samples : tensor パーセンタイル点の値 """ num_samples = samples.size(0) samples = samples.sort(dim=0)[0] # Get samples corresponding to percentile percentile_samples = [ samples[int(num_samples * percentile)] for percentile in percentiles ] # Smooth the samples kernel = torch.full((1, 1, 5), fill_value=0.2) percentiles_samples = [ torch.nn.functional.conv1d( percentile_sample.view(1, 1, -1), kernel, padding=2 ).view(-1) for percentile_sample in percentile_samples ] return percentiles_samples def _sample_f(predicts_f, sample_f_num): """関数fのサンプル Parameters ---------- predicts_f : :obj:`MultivariateNormal` or None 事前分布fの予測モデル出力 sample_f_num : int or None サンプル数 Returns ------- out : numpy.array or None サンプルされた関数形 """ if predicts_f is None: return None else: if sample_f_num is not None: return tensor_to_array(predicts_f.sample(torch.Size([sample_f_num]))) else: return None