"""
This module provides implementations of RKHS Instrumental Variable (IV) estimators.
Classes:
_BaseRKHSIV: Base class for RKHS IV methods.
RKHSIV: RKHS IV estimator.
RKHSIVCV: RKHS IV estimator with cross-validation.
RKHSIVL2: RKHS IV estimator with L2 regularization.
RKHSIVL2CV: RKHS IV estimator with L2 regularization and cross-validation.
ApproxRKHSIV: Approximate RKHS IV estimator using kernel approximations.
ApproxRKHSIVCV: Approximate RKHS IV estimator with cross-validation using kernel approximations.
"""
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from sklearn.metrics.pairwise import pairwise_kernels
from sklearn.model_selection import KFold
from sklearn.kernel_approximation import Nystroem, RBFSampler
import numpy as np
import scipy
[docs]def _check_auto(param):
return (isinstance(param, str) and (param == 'auto'))
[docs]class _BaseRKHSIV:
"""
Base class for RKHS IV methods.
This class provides common functionality for RKHS IV estimators.
Parameters:
kernel (str or callable): Kernel function or string identifier.
gamma (float): Gamma parameter for the kernel.
degree (int): Degree for polynomial kernels.
coef0 (float): Zero coefficient for polynomial kernels.
delta_scale (str or float): Scale of the critical radius.
delta_exp (str or float): Exponent of the critical radius.
alpha_scale (str or float): Scale of the regularization parameter.
kernel_params (dict): Additional parameters for the kernel.
"""
def __init__(self, *args, **kwargs):
return
[docs] def _get_delta(self, n):
"""
Compute the critical radius.
Parameters:
n (int): Number of samples.
Returns:
float: Critical radius.
"""
delta_scale = 5 if _check_auto(self.delta_scale) else self.delta_scale
delta_exp = .4 if _check_auto(self.delta_exp) else self.delta_exp
return delta_scale / (n**(delta_exp))
[docs] def _get_alpha_scale(self):
return 60 if _check_auto(self.alpha_scale) else self.alpha_scale
[docs] def _get_alpha_scales(self):
return ([c for c in np.geomspace(0.1, 1e4, self.n_alphas)]
if _check_auto(self.alpha_scales) else self.alpha_scales)
[docs] def _get_alpha(self, delta, alpha_scale):
return alpha_scale * (delta**4)
[docs] def _get_kernel(self, X, Y=None):
if callable(self.kernel):
params = self.kernel_params or {}
else:
params = {"gamma": self.gamma,
"degree": self.degree,
"coef0": self.coef0}
return pairwise_kernels(X, Y, metric=self.kernel,
filter_params=True, **params)
[docs]class RKHSIV(_BaseRKHSIV):
"""
RKHS IV estimator.
This class implements an RKHS IV estimator.
Parameters:
kernel (str or callable): Kernel function or string identifier.
gamma (float): Gamma parameter for the kernel.
degree (int): Degree for polynomial kernels.
coef0 (float): Zero coefficient for polynomial kernels.
delta_scale (str or float): Scale of the critical radius.
delta_exp (str or float): Exponent of the critical radius.
alpha_scale (str or float): Scale of the regularization parameter.
kernel_params (dict): Additional parameters for the kernel.
"""
def __init__(self, kernel='rbf', gamma=2, degree=3, coef0=1,
delta_scale='auto', delta_exp='auto', alpha_scale='auto',
kernel_params=None):
self.kernel = kernel
self.degree = degree
self.coef0 = coef0
self.gamma = gamma
self.kernel_params = kernel_params
self.delta_scale = delta_scale
self.delta_exp = delta_exp
self.alpha_scale = alpha_scale
[docs] def fit(self, Z, T, Y):
"""
Fit the RKHS IV estimator.
Parameters:
Z (array-like): Instrumental variables.
T (array-like): Treatments.
Y (array-like): Outcomes.
Returns:
self: Fitted estimator.
"""
n = Y.shape[0]
delta = self._get_delta(n)
alpha = self._get_alpha(delta, self._get_alpha_scale())
Kh = self._get_kernel(T)
Kf = self._get_kernel(Z)
RootKf = scipy.linalg.sqrtm(Kf).astype(float)
M = RootKf @ np.linalg.inv(
Kf / (2 * n * delta**2) + np.eye(n) / 2) @ RootKf
self.T = T.copy()
self.a = np.linalg.pinv(Kh @ M @ Kh + alpha * Kh) @ Kh @ M @ Y
return self
[docs] def predict(self, T_test):
"""
Predict outcomes for new treatments.
Parameters:
T_test (array-like): New treatments.
Returns:
array-like: Predicted outcomes.
"""
return self._get_kernel(T_test, Y=self.T) @ self.a
[docs] def score(self, Z, T, Y, delta='auto'):
"""
Compute the score of the fitted estimator.
Parameters:
Z (array-like): Instrumental variables.
T (array-like): Treatments.
Y (array-like): Outcomes.
delta (str or float): Critical radius.
Returns:
float: Score.
"""
n = Y.shape[0]
delta = self._get_delta(n)
Kf = self._get_kernel(Z)
RootKf = scipy.linalg.sqrtm(Kf).astype(float)
M = RootKf @ np.linalg.inv(
Kf / (2 * n * delta**2) + np.eye(n) / 2) @ RootKf
Y_pred = self.predict(T)
return ((Y - Y_pred).T @ M @ (Y - Y_pred))[0, 0] / n**2
[docs]class RKHSIVCV(RKHSIV):
"""
RKHS IV estimator with cross-validation.
This class implements an RKHS IV estimator with cross-validation.
Parameters:
kernel (str or callable): Kernel function or string identifier.
gamma (float): Gamma parameter for the kernel.
degree (int): Degree for polynomial kernels.
coef0 (float): Zero coefficient for polynomial kernels.
delta_scale (str or float): Scale of the critical radius.
delta_exp (str or float): Exponent of the critical radius.
alpha_scales (str or array-like): Scale of the regularization parameter.
n_alphas (int): Number of alpha scales to try.
cv (int): Number of folds for cross-validation.
kernel_params (dict): Additional parameters for the kernel.
"""
def __init__(self, kernel='rbf', gamma=2, degree=3, coef0=1, kernel_params=None,
delta_scale='auto', delta_exp='auto', alpha_scales='auto', n_alphas=30, cv=6):
self.kernel = kernel
self.degree = degree
self.coef0 = coef0
self.gamma = gamma
self.kernel_params = kernel_params
self.delta_scale = delta_scale
self.delta_exp = delta_exp
self.alpha_scales = alpha_scales
self.n_alphas = n_alphas
self.cv = cv
[docs] def fit(self, Z, T, Y):
"""
Fit the RKHS IV estimator with cross-validation.
Parameters:
Z (array-like): Instrumental variables.
T (array-like): Treatments.
Y (array-like): Outcomes.
Returns:
self: Fitted estimator.
"""
n = Y.shape[0]
Kh = self._get_kernel(T)
Kf = self._get_kernel(Z)
RootKf = scipy.linalg.sqrtm(Kf).astype(float)
alpha_scales = self._get_alpha_scales()
n_train = n * (self.cv - 1) / self.cv
n_test = n / self.cv
delta_train = self._get_delta(n_train)
delta_test = self._get_delta(n_test)
delta = self._get_delta(n)
scores = []
for it, (train, test) in enumerate(KFold(n_splits=self.cv).split(Z)):
M_train = RootKf[np.ix_(train, train)] @ np.linalg.inv(
Kf[np.ix_(train, train)] / (2 * n_train * (delta_train**2)) + np.eye(len(train)) / 2) @ RootKf[np.ix_(train, train)]
M_test = RootKf[np.ix_(test, test)] @ np.linalg.inv(
Kf[np.ix_(test, test)] / (2 * n_test * (delta_test**2)) + np.eye(len(test)) / 2) @ RootKf[np.ix_(test, test)]
Kh_train = Kh[np.ix_(train, train)]
KMK_train = Kh_train @ M_train @ Kh_train
B_train = Kh_train @ M_train @ Y[train]
scores.append([])
for alpha_scale in alpha_scales:
alpha = self._get_alpha(delta_train, alpha_scale)
a = np.linalg.pinv(KMK_train + alpha * Kh_train) @ B_train
res = Y[test] - Kh[np.ix_(test, train)] @ a
scores[it].append((res.T @ M_test @ res)[
0, 0] / (res.shape[0]**2))
self.alpha_scales = alpha_scales
self.avg_scores = np.mean(np.array(scores), axis=0)
self.best_alpha_scale = alpha_scales[np.argmin(self.avg_scores)]
delta = self._get_delta(n)
self.best_alpha = self._get_alpha(delta, self.best_alpha_scale)
M = RootKf @ np.linalg.inv(
Kf / (2 * n * delta**2) + np.eye(n) / 2) @ RootKf
self.T = T.copy()
self.a = np.linalg.pinv(
Kh @ M @ Kh + self.best_alpha * Kh) @ Kh @ M @ Y
return self
[docs]class RKHSIVL2(_BaseRKHSIV):
"""
RKHS IV estimator with L2 regularization.
This class implements an RKHS IV estimator with L2 regularization.
Parameters:
kernel (str or callable): Kernel function or string identifier.
gamma (float): Gamma parameter for the kernel.
degree (int): Degree for polynomial kernels.
coef0 (float): Zero coefficient for polynomial kernels.
delta_scale (str or float): Scale of the critical radius.
delta_exp (str or float): Exponent of the critical radius.
kernel_params (dict): Additional parameters for the kernel.
"""
def __init__(self, kernel='rbf', gamma=2, degree=3, coef0=1,
delta_scale='auto', delta_exp='auto', kernel_params=None):
self.kernel = kernel
self.degree = degree
self.coef0 = coef0
self.gamma = gamma
self.kernel_params = kernel_params
self.delta_scale = delta_scale
self.delta_exp = delta_exp
[docs] def fit(self, Z, T, Y):
"""
Fit the RKHS IV estimator with L2 regularization.
Parameters:
Z (array-like): Instrumental variables.
T (array-like): Treatments.
Y (array-like): Outcomes.
Returns:
self: Fitted estimator.
"""
n = Y.shape[0]
delta = self._get_delta(n)
alpha = delta**4
Kh = self._get_kernel(T)
Kf = self._get_kernel(Z)
M = np.linalg.pinv(Kf) @ Kf
self.T = T.copy()
self.a = np.linalg.pinv(Kh @ M @ Kh + alpha * Kh @ Kh) @ Kh @ M @ Y
return self
[docs] def predict(self, T_test):
"""
Predict outcomes for new treatments.
Parameters:
T_test (array-like): New treatments.
Returns:
array-like: Predicted outcomes.
"""
return self._get_kernel(T_test, Y=self.T) @ self.a
[docs]class RKHSIVL2CV(RKHSIVL2):
"""
RKHS IV estimator with L2 regularization and cross-validation.
This class implements an RKHS IV estimator with L2 regularization and cross-validation.
Parameters:
kernel (str or callable): Kernel function or string identifier.
gamma (float): Gamma parameter for the kernel.
degree (int): Degree for polynomial kernels.
coef0 (float): Zero coefficient for polynomial kernels.
delta_scale (str or float): Scale of the critical radius.
delta_exp (str or float): Exponent of the critical radius.
alpha_scales (str or array-like): Scale of the regularization parameter.
n_alphas (int): Number of alpha scales to try.
cv (int): Number of folds for cross-validation.
kernel_params (dict): Additional parameters for the kernel.
"""
def __init__(self, kernel='rbf', gamma=2, degree=3, coef0=1, kernel_params=None,
delta_scale='auto', delta_exp='auto', alpha_scales='auto', n_alphas=30, cv=6):
self.kernel = kernel
self.degree = degree
self.coef0 = coef0
self.gamma = gamma
self.kernel_params = kernel_params
self.delta_scale = delta_scale
self.delta_exp = delta_exp
self.alpha_scales = alpha_scales
self.n_alphas = n_alphas
self.cv = cv
[docs] def fit(self, Z, T, Y):
"""
Fit the RKHS IV estimator with L2 regularization and cross-validation.
Parameters:
Z (array-like): Instrumental variables.
T (array-like): Treatments.
Y (array-like): Outcomes.
Returns:
self: Fitted estimator.
"""
n = Y.shape[0]
Kh = self._get_kernel(T)
Kf = self._get_kernel(Z)
alpha_scales = self._get_alpha_scales()
n_train = n * (self.cv - 1) / self.cv
n_test = n / self.cv
delta_train = self._get_delta(n_train)
delta = self._get_delta(n)
scores = []
for it, (train, test) in enumerate(KFold(n_splits=self.cv).split(Z)):
M_train = np.linalg.pinv(Kf[np.ix_(train, train)]) @ Kf[np.ix_(train, train)]
M_test = np.linalg.pinv(Kf[np.ix_(test, test)]) @ Kf[np.ix_(test, test)]
Kh_train = Kh[np.ix_(train, train)]
KMK_train = Kh_train @ M_train @ Kh_train
B_train = Kh_train @ M_train @ Y[train]
scores.append([])
for alpha_scale in alpha_scales:
alpha = alpha_scale * delta_train**4
a = np.linalg.pinv(KMK_train + alpha * Kh_train @ Kh_train) @ B_train
res = Y[test] - Kh[np.ix_(test, train)] @ a
scores[it].append((res.T @ M_test @ res)[
0, 0] / (res.shape[0]**2))
self.alpha_scales = alpha_scales
self.avg_scores = np.mean(np.array(scores), axis=0)
self.best_alpha_scale = alpha_scales[np.argmin(self.avg_scores)]
delta = self._get_delta(n)
self.best_alpha = self.best_alpha_scale * delta**4
M = np.linalg.pinv(Kf) @ Kf
self.T = T.copy()
self.a = np.linalg.pinv(
Kh @ M @ Kh + self.best_alpha * Kh @ Kh) @ Kh @ M @ Y
return self
[docs]class ApproxRKHSIV(_BaseRKHSIV):
"""
Approximate RKHS IV estimator using kernel approximations.
This class implements an approximate RKHS IV estimator using kernel approximations.
Parameters:
kernel_approx (str): Kernel approximation method ('nystrom' or 'rbfsampler').
n_components (int): Number of approximation components.
kernel (str or callable): Kernel function or string identifier.
gamma (float): Gamma parameter for the kernel.
degree (int): Degree for polynomial kernels.
coef0 (float): Zero coefficient for polynomial kernels.
delta_scale (str or float): Scale of the critical radius.
delta_exp (str or float): Exponent of the critical radius.
alpha_scale (str or float): Scale of the regularization parameter.
kernel_params (dict): Additional parameters for the kernel.
"""
def __init__(self, kernel_approx='nystrom', n_components=10,
kernel='rbf', gamma=2, degree=3, coef0=1, kernel_params=None,
delta_scale='auto', delta_exp='auto', alpha_scale='auto'):
self.kernel_approx = kernel_approx
self.n_components = n_components
self.kernel = kernel
self.degree = degree
self.coef0 = coef0
self.gamma = gamma
self.kernel_params = kernel_params
self.delta_scale = delta_scale
self.delta_exp = delta_exp
self.alpha_scale = alpha_scale
[docs] def _get_new_approx_instance(self):
"""
Create a new kernel approximation instance.
Returns:
object: Kernel approximation instance.
"""
if (self.kernel_approx == 'rbfsampler') and (self.kernel == 'rbf'):
return RBFSampler(gamma=self.gamma, n_components=self.n_components, random_state=1)
elif self.kernel_approx == 'nystrom':
return Nystroem(kernel=self.kernel, gamma=self.gamma, coef0=self.coef0, degree=self.degree, kernel_params=self.kernel_params,
random_state=1, n_components=self.n_components)
else:
raise AttributeError("Invalid kernel approximator")
[docs] def fit(self, Z, T, Y):
"""
Fit the approximate RKHS IV estimator.
Parameters:
Z (array-like): Instrumental variables.
T (array-like): Treatments.
Y (array-like): Outcomes.
Returns:
self: Fitted estimator.
"""
n = Y.shape[0]
delta = self._get_delta(n)
alpha = self._get_alpha(delta, self._get_alpha_scale())
self.featZ = self._get_new_approx_instance()
RootKf = self.featZ.fit_transform(Z)
self.featT = self._get_new_approx_instance()
RootKh = self.featT.fit_transform(T)
Q = np.linalg.pinv(RootKf.T @ RootKf /
(2 * n * delta**2) + np.eye(self.n_components) / 2)
A = RootKh.T @ RootKf
W = (A @ Q @ A.T + alpha * np.eye(self.n_components))
B = A @ Q @ RootKf.T @ Y
self.a = np.linalg.pinv(W) @ B
self.fitted_delta = delta
return self
[docs] def predict(self, T):
"""
Predict outcomes for new treatments.
Parameters:
T (array-like): New treatments.
Returns:
array-like: Predicted outcomes.
"""
return self.featT.transform(T) @ self.a
[docs] def score(self, Z, T, Y, delta='auto'):
"""
Compute the score of the fitted estimator.
Parameters:
Z (array-like): Instrumental variables.
T (array-like): Treatments.
Y (array-like): Outcomes.
delta (str or float): Critical radius.
Returns:
float: Score.
"""
n = Y.shape[0]
delta = self._get_delta(n)
featZ = self._get_new_approx_instance()
RootKf = featZ.fit_transform(Z)
RootKh = self.featT.fit_transform(T)
Q = np.linalg.pinv(RootKf.T @ RootKf /
(2 * n * delta**2) + np.eye(self.n_components) / 2)
Y_pred = self.predict(T)
res = RootKf.T @ (Y - Y_pred)
return (res.T @ Q @ res)[0, 0] / n**2
[docs]class ApproxRKHSIVCV(ApproxRKHSIV):
"""
Approximate RKHS IV estimator with cross-validation using kernel approximations.
This class implements an approximate RKHS IV estimator with cross-validation using kernel approximations.
Parameters:
kernel_approx (str): Kernel approximation method ('nystrom' or 'rbfsampler').
n_components (int): Number of approximation components.
kernel (str or callable): Kernel function or string identifier.
gamma (float): Gamma parameter for the kernel.
degree (int): Degree for polynomial kernels.
coef0 (float): Zero coefficient for polynomial kernels.
delta_scale (str or float): Scale of the critical radius.
delta_exp (str or float): Exponent of the critical radius.
alpha_scales (str or array-like): Scale of the regularization parameter.
n_alphas (int): Number of alpha scales to try.
cv (int): Number of folds for cross-validation.
kernel_params (dict): Additional parameters for the kernel.
"""
def __init__(self, kernel_approx='nystrom', n_components=10,
kernel='rbf', gamma=2, degree=3, coef0=1, kernel_params=None,
delta_scale='auto', delta_exp='auto', alpha_scales='auto', n_alphas=30, cv=6):
self.kernel_approx = kernel_approx
self.n_components = n_components
self.kernel = kernel
self.degree = degree
self.coef0 = coef0
self.gamma = gamma
self.kernel_params = kernel_params
self.delta_scale = delta_scale
self.delta_exp = delta_exp
self.alpha_scales = alpha_scales
self.n_alphas = n_alphas
self.cv = cv
[docs] def fit(self, Z, T, Y):
"""
Fit the approximate RKHS IV estimator with cross-validation.
Parameters:
Z (array-like): Instrumental variables.
T (array-like): Treatments.
Y (array-like): Outcomes.
Returns:
self: Fitted estimator.
"""
n = Y.shape[0]
self.featZ = self._get_new_approx_instance()
RootKf = self.featZ.fit_transform(Z)
self.featT = self._get_new_approx_instance()
RootKh = self.featT.fit_transform(T)
alpha_scales = self._get_alpha_scales()
n_train = n * (self.cv - 1) / self.cv
n_test = n / self.cv
delta_train = self._get_delta(n_train)
delta_test = self._get_delta(n_test)
delta = self._get_delta(n)
scores = []
for it, (train, test) in enumerate(KFold(n_splits=self.cv).split(Z)):
RootKf_train, RootKf_test = RootKf[train], RootKf[test]
RootKh_train, RootKh_test = RootKh[train], RootKh[test]
Q_train = np.linalg.pinv(
RootKf_train.T @ RootKf_train / (2 * n_train * (delta_train**2)) + np.eye(self.n_components) / 2)
Q_test = np.linalg.pinv(
RootKf_test.T @ RootKf_test / (2 * n_test * (delta_test**2)) + np.eye(self.n_components) / 2)
A_train = RootKh_train.T @ RootKf_train
AQA_train = A_train @ Q_train @ A_train.T
B_train = A_train @ Q_train @ RootKf_train.T @ Y[train]
scores.append([])
for alpha_scale in alpha_scales:
alpha = self._get_alpha(delta_train, alpha_scale)
a = np.linalg.pinv(AQA_train + alpha *
np.eye(self.n_components)) @ B_train
res = RootKf_test.T @ (Y[test] - RootKh_test @ a)
scores[it].append((res.T @ Q_test @ res)[
0, 0] / (len(test)**2))
self.alpha_scales = alpha_scales
self.avg_scores = np.mean(np.array(scores), axis=0)
self.best_alpha_scale = alpha_scales[np.argmin(self.avg_scores)]
delta = self._get_delta(n)
self.best_alpha = self._get_alpha(delta, self.best_alpha_scale)
Q = np.linalg.pinv(RootKf.T @ RootKf /
(2 * n * delta**2) + np.eye(self.n_components) / 2)
A = RootKh.T @ RootKf
W = (A @ Q @ A.T + self.best_alpha * np.eye(self.n_components))
B = A @ Q @ RootKf.T @ Y
self.a = np.linalg.pinv(W) @ B
self.fitted_delta = delta
return self