Source code for yall.activelearning

import numpy as np
from sklearn import model_selection, metrics

from .containers import Data
from .querystrategies import QueryStrategy, SimpleMargin
from .initializations import LDSCentrality as LDS


[docs]class ActiveLearningModel(object): def __init__(self, classifier, query_strategy, eval_metric="auc", U_proportion=0.9, init_L="random", random_state=None): ''' :param sklearn.base.BaseEstimator classifier: Classifier to build the model. :param QueryStrategy query_strategy: QueryStrategy instance to use. :param str eval_metric: One of "auc", "accuracy". :param float U_proportion: proportion of training data to be assigned the unlabeled set. :param str init_L: How to initialize L: "random" or "LDS". :param int random_state: Sets the random_state parameter of train_test_split. ''' self.__check_args(classifier, query_strategy, U_proportion) self.classifier = classifier self.query_strategy = query_strategy self.eval_metric = eval_metric self.U_proportion = U_proportion self.init_L = init_L self.random_state = random_state self.L = Data() # Labeled data. self.U = Data() # Unlabeled data. self.T = Data() # Test data. self.classes = None def __check_args(self, classifier, query_strategy, U_proportion): if not isinstance(query_strategy, QueryStrategy): raise ValueError("query_strategy must be an instance of QueryStrategy.") # noqa if not 0 < U_proportion < 1: raise ValueError("U_proportion must be in range (0,1) exclusive. Got {}." # noqa .format(U_proportion)) if isinstance(query_strategy, SimpleMargin): if not hasattr(classifier, "decision_function"): raise ValueError("{} compatible only with discriminative models." # noqa .format(str(query_strategy))) def _random_init(self, X, y, U_size): """ Initialize the labeled set at random. :param np.array X: feature matrix :param np.array y: label vector :param int U_size: The number of samples to keep unlabeled. :returns tuple of labeled X, unlabeled X, labeled y, unlabeled y :rtype tuple(np.array, np.array, np.array, np.array) """ split = model_selection.train_test_split(X, y, test_size=U_size, random_state=self.random_state) # noqa return split def _LDS_init(self, X, y, U_size): """ Initialize the labeled set using local density score (LDS) sampling. :param np.array X: feature matrix :param np.array y: label vector :param int U_size: The number of samples to keep unlabeled. :returns tuple of labeled X, unlabeled X, labeled y, unlabeled y :rtype tuple(np.array, np.array, np.array, np.array) """ k = 10 idxs = LDS(k=k, threshold="auto").find_centers(X, y) mask = np.zeros(X.shape[0], dtype=bool) mask[idxs] = True Lx = X[mask, ] Ux = X[np.logical_not(mask), ] Ly = y[mask] Uy = y[np.logical_not(mask), ] return Lx, Ux, Ly, Uy
[docs] def prepare_data(self, train_X, test_X, train_y, test_y): ''' Splits data into unlabeled, labeled, and test sets according to self.U_proportion. :param np.array train_X: Training data features. :param np.array test_X: Test data features. :param np.array train_y: Training data labels. :param np.array test_y: Test data labels. ''' U_size = int(np.ceil(self.U_proportion * train_X.shape[0])) if not 0 < U_size < train_X.shape[0]: raise ValueError("U_proportion must result in non-empty labeled and unlabeled sets.") # noqa if train_X.shape[0] - U_size <= 1: raise ValueError("U_proportion must result in a labeled set with > 1 members.") # noqa if self.init_L == "random": split = self._random_init(train_X, train_y, U_size) elif self.init_L == "LDS": split = self._LDS_init(train_X, train_y, U_size) self.L.X, self.U.X, self.L.y, self.U.y = split self.T.X = test_X self.T.y = test_y
[docs] def update_labels(self): ''' Gets the chosen index from the query strategy, adds the corresponding data point to L and removes it from U. Logs which instance is picked from U. :returns: chosen x and y, for use with partial_train() :rtype: tuple(numpy.ndarray, numpy.ndarray) ''' index = self.query_strategy.query(self.U, self.L, self.classifier) chosen_x = self.U.X[index] chosen_y = np.array([self.U.y[index]]) self.L.y = np.append(self.L.y, chosen_y, axis=0) self.L.X = np.vstack((self.L.X, chosen_x)) self.U.X = np.delete(self.U.X, index, axis=0) self.U.y = np.delete(self.U.y, index, axis=0) return chosen_x.reshape(1, -1), chosen_y
[docs] def train(self): ''' Trains the classifier on L. ''' self.classifier.fit(self.L.X, self.L.y)
[docs] def partial_train(self, new_x, new_y): ''' Given a subset of training examples, calls partial_fit. :param numpy.ndarray new_x: Feature array. :param numpy.ndarray new_y: Label array. ''' if self.classes is None: self.classes = np.unique(self.U.y) self.classifier.partial_fit(new_x, new_y, classes=self.classes)
[docs] def score(self): ''' Computes the performance of the current classifier according to self.eval_metric. :returns: performance score :rtype: float ''' if self.eval_metric == "auc": try: # If the classifier is probabilistic. res = self.classifier.predict_proba(self.T.X)[:, 1] except AttributeError: res = self.classifier.decision_function(self.T.X) score = metrics.roc_auc_score(self.T.y, res) elif self.eval_metric == "accuracy": res = self.classifier.predict(self.T.X) score = metrics.accuracy_score(self.T.y, res) else: raise AttributeError("Metric '{}' is not supported." .format(self.eval_metric)) return score
def _get_choice_order(self, ndraws): """ Finds the members of the labeled set in the order in which they were chosen by the query strategy. :param int ndraws: The number of draws made. :returns: labeled X and labeled y according to their choice order :rtype: dict({'X': np.array, 'y': np.array}) """ mask = np.ones(self.L.y.shape, dtype=bool) L_0_index = self.L.y.shape[0] - ndraws mask[:L_0_index] = False choice_order = {'X': self.L.X[mask], 'y': self.L.y[mask]} return choice_order
[docs] def run(self, train_X, test_X, train_y, test_y, ndraws=None, verbose=0): ''' Run the active learning model. Saves AUC scores for each sampling iteration. :param np.array train_X: Training data features. :param np.array test_X: Test data features. :param np.array train_y: Training data labels. :param np.array test_y: Test data labels. :param int ndraws: Number of times to query the unlabeled set. If None, query entire unlabeled set. :param int verbose: If > 0, print information. :returns: AUC scores for each sampling iteration. :rtype: numpy.ndarray(shape=(ndraws, )) ''' # Populate L, U, and T self.prepare_data(train_X, test_X, train_y, test_y) if ndraws is None: ndraws = self.U.X.shape[0] scores = np.zeros(ndraws, dtype=np.float32) for i in range(ndraws): if verbose > 0: print(f"{i}\r", end='') self.train() auc = self.score() scores[i] = auc self.update_labels() choice_order = self._get_choice_order(ndraws) return scores, choice_order