import numpy as np
import scipy.spatial.distance as spd
from collections import namedtuple
__all__ = ['UncertaintySampler',
'CombinedSampler',
'DistDivSampler',
'Random',
'SimpleMargin',
'Margin',
'Entropy',
'LeastConfidence',
'LeastConfidenceBias',
'LeastConfidenceDynamicBias',
'DistanceToCenter',
'MinMax',
'Density']
class QueryStrategy(object):
'''
Base query strategy class. In general, a query strategy
consists of a scoring function, which assigns scores to
each unlabeled instance, and a query function, which chooses
and instance from these scores.
'''
def __init__(self):
# Unlabeled data, labeled data, classifier.
self.Args = namedtuple('Args', ['U', 'L', 'clf'])
def get_args(self, *args):
'''
Creates a namedtuple instance containing arguments to score().
*args is:
:param Data unlabeled: Unlabeled set.
:param Data labeled: Labeled set.
:param sklearn.base.BaseEstimator classifier: Classifier to use.
:returns: Arguments
:rtype: namedtuple
'''
if len(args) != 3:
raise ValueError("Number of arguments must be 3")
args = self.Args(U=args[0], L=args[1], clf=args[2])
return args
def score(self, *args, **kwargs):
'''
Computes an array of scores for members of U from which
to choose.
'''
raise NotImplementedError()
def choose(self, scores):
'''
Picks the most informative example according to its score.
'''
raise NotImplementedError()
def query(self, *args, **kwargs):
'''
A simple interface to self.score() and self.choose().
'''
scores = self.score(*args, **kwargs)
index = self.choose(scores)
return index
[docs]class UncertaintySampler(QueryStrategy):
def __init__(self, model_change=False):
super().__init__()
self.model_change = model_change
if self.model_change is True:
# These will be set of the wrapped score method.
self.previous_scores = None
self.chosen_index = None
def __score(self):
'''
In uncertainty sampling it is possible to use model change,
which is implemented as a wrapper around the scoring function.
See model_change() below. The __init__() method for a child of
this class should define the score() method in the following manner:
if self.model_change is True:
self.score = self.model_change_wrapper(self.__score)
else:
self.score = self.__score
'''
raise NotImplementedError()
[docs] def model_change_wrapper(self, score_func):
'''
Model change wrapper around the scoring function. See doc
for __score() above for usage insructions.
:math:`score_{mc}(X) = score(X; t) - w_o score(X; t-1)`
:math:`score(X, t)`: The score at time t
:math:`w_o = \\frac{1}{\\mid L \\mid}`
:param function score_func: Scoring function to wrap.
:returns: Wrapped scoring function.
:rtype: function
'''
def wrapper(*args):
args = self.get_args(*args)
scores = score_func(*args)
w_o = 1 / args.L.y.shape[0]
if self.chosen_index is None: # i.e. this is the first run.
self.previous_scores = np.zeros(shape=scores.shape)
else:
# If we've chosen and thus removed an example, we have to
# remove it from the current self.previous_scores to make
# sure we're comparing the same examples across iterations.
self.previous_scores = np.delete(self.previous_scores,
self.chosen_index, axis=0)
scores = scores - (w_o * self.previous_scores)
self.previous_scores = scores # Save these scores for next time.
return scores
return wrapper
[docs] def choose(self, scores):
'''
:param numpy.ndarray scores: Output of self.score()
:returns: Index of chosen example.
:rtype: int
'''
return np.argmax(scores)
[docs]class CombinedSampler(QueryStrategy):
'''
Allows one sampler's scores to be weighted by anothers according
to the equation:
:math:`score(x) = score_{qs1}(x) \\times score_{qs2}(x)^{\\beta}`
Assumes :math:`x^* = argmax(score)`
'''
# TODO: test if choice_metric matches choose() of qs1 and qs2.
def __init__(self, qs1=None, qs2=None, beta=1, choice_metric=np.argmax):
'''
:param QueryStrategy qs1: Main query strategy.
:param QueryStrategy qs2: Query strategy to use as weight.
:param float beta: Scale factor for score_qs2.
:param function choice_metric: Function that takes a 1d np.array
and returns a chosen index.
'''
if qs1 is None or qs2 is None:
raise ValueError("Must supply both qs1 and qs2")
super().__init__()
self.qs1 = qs1
self.qs2 = qs2
if beta == 'dynamic':
self.beta = beta
else:
self.beta = float(beta)
self.choice_metric = choice_metric
self.U_0_size = None
def __str__(self):
return f"Combined Sampler: qs1: {self.qs1}; qs2 {self.qs2}; beta={self.beta}" # noqa
def __repr__(self):
return "CombinedSampler"
def _compute_beta(self, *args):
'''
Dynamic beta is computed according to the ratio of number of labeled
to unlabeled samples.
:math:`\beta = 2|U|/|L|`
:returns: beta
:rtype: float
'''
args = self.get_args(*args)
beta = 2 * (args.U.X.shape[0] / args.L.X.shape[0])
return beta
def _normalize_scores(self, scores):
'''
Computes minmax normalization to map scores to the (0,1) interval.
'''
if all(scores == scores[0]): # If all scores are equal.
norm_scores = scores if scores[0] < 1 else (1/scores[0])
else:
num = scores - np.min(scores)
denom = np.max(scores) - np.min(scores)
norm_scores = num / denom
return norm_scores
[docs] def score(self, *args):
'''
Computes the combined scores from qs1 and qs2.
:returns: scores
:rtype: numpy.ndarray
'''
if self.beta == 'dynamic':
beta = self._compute_beta(*args)
else:
beta = self.beta
qs1_scores = self._normalize_scores(self.qs1.score(*args))
qs2_scores = self._normalize_scores(self.qs2.score(*args))
scores = qs1_scores * (qs2_scores**beta)
return scores
[docs] def choose(self, scores):
'''
Returns the example with the "best" score
according to self.choice_metric.
'''
return self.choice_metric(scores)
# TODO: Write unit tests.
[docs]class DistDivSampler(QueryStrategy):
'''
Combined sampling method as in
"Active learning for clinical text classification:
is it better than random sampling?"
:math:`x^* = argmin_x (\\lambda score_{qs1}(x) +
(1 - \\lambda) score_{qs2}(x))`
'''
def __init__(self, qs1=None, qs2=None, lam=0.5, choice_metric=np.argmax):
'''
:param QueryStrategy qs1: Uncertainty sampling query strategy.
:param QueryStrategy qs2: Representative sampling query strategy.
:param float lambda: Query strategy weight [0,1] or "dynamic".
:param function choice_metric: Function that takes a 1d np.array
and returns a chosen index.
'''
if qs1 is None or qs2 is None:
raise ValueError("Must supply both qs1 and qs2")
super().__init__()
self.qs1 = qs1
self.qs2 = qs2
if lam == "dynamic":
self.lam = lam
else:
self.lam = float(lam)
self.choice_metric = choice_metric
def __str__(self):
return f"DistDiv Sampler: qs1={self.qs1}; qs2={self.qs2}; lambda={self.lam}" # noqa
def __repr__(self):
return "DistDivSampler"
def _compute_lambda(self, *args):
'''
Dynamic lambda is computed according to the ratio of
number of labeled to the total number of samples.
lambda = |L| / (|L|+|U|)
:returns: lambda
:rtype: float
'''
args = self.get_args(*args)
lam = args.L.X.shape[0] / (args.L.X.shape[0] + args.U.X.shape[0])
a = 20000
lam = (a**lam - 1) / (a - 1) # Exponential decay
return lam
def _normalize_scores(self, scores):
'''
Computes minmax normalization to map scores to the (0,1) interval.
'''
if all(scores == scores[0]): # If all scores are equal.
norm_scores = scores if scores[0] < 1 else (1/scores[0])
else:
num = scores - np.min(scores)
denom = np.max(scores) - np.min(scores)
norm_scores = num / denom
return norm_scores
[docs] def score(self, *args):
'''
Computes the combined scores from qs1 and qs2.
:returns: scores
:rtype: numpy.ndarray
'''
if self.lam == "dynamic":
lam = self._compute_lambda(*args)
else:
lam = self.lam
qs1_scores = self._normalize_scores(self.qs1.score(*args))
qs2_scores = self._normalize_scores(self.qs2.score(*args))
scores = (lam * qs1_scores) + ((1 - lam) * qs2_scores)
return scores
[docs] def choose(self, scores):
'''
Returns the example with the "best" score
according to self.choice_metric.
'''
return self.choice_metric(scores)
[docs]class Random(QueryStrategy):
'''
Random query strategy. Equivalent to passive learning.
'''
def __init__(self):
super().__init__()
def __str__(self):
return "Random Sampler"
[docs] def score(self, *args):
'''
In the random case, just output the indices.
'''
args = self.get_args(*args)
return np.arange(args.U.X.shape[0])
[docs] def choose(self, scores):
'''
Picks an index at random.
:param numpy.ndarray scores: Output of self.score()
:returns: Index of chosen example.
:rtype: int
'''
return np.random.choice(scores)
[docs]class SimpleMargin(QueryStrategy):
'''
Finds the example x that is closest to the separating hyperplane.
:math:`x^* = argmin_x |f(x)|`
'''
def __init__(self):
super().__init__()
def __str__(self):
return "Simple Margin Sampler"
[docs] def score(self, *args):
'''
Computes distances to the hyperplane for each member of
the unlabeled set.
'''
args = self.get_args(*args)
distances = args.clf.decision_function(args.U.X)
scores = np.abs(distances)
return scores
[docs] def choose(self, scores):
'''
Returns the example with the shortest distance to the hyperplane.
In the multiclass case, his will return the row index of the
example with the smallest absolute distance to any hyperplane.
Could be modified to choose the smallest average distance
to all hyperplanes.
:param numpy.ndarray scores: Output of self.score()
:returns: Index of chosen example.
:rtype: int
'''
if len(scores.shape) == 1: # Binary classification returns a vector.
scores = scores.reshape(-1, 1)
idx = np.argmin(scores)
row_idx = np.unravel_index(idx, scores.shape, order='C')[0]
return row_idx
[docs]class Margin(QueryStrategy):
'''
Margin Sampler. Chooses the member from the unlabeled set
with the smallest difference between the posterior probabilities
of the two most probable class labels.
:math:`x^* = argmin_x P(\\hat{y_1}|x) - P(\\hat{y_2}|x)`
where :math:`\\hat{y_1}` is the most probable label
and :math:`\\hat{y_2}` is the second most probable label.
'''
def __init__(self):
super().__init__()
def __str__(self):
return "Margin Sampler"
[docs] def score(self, *args):
'''
Computes the difference between posterior probability estimates
for the top two most probable labels.
:returns: Posterior probability differences.
:rtype: numpy.ndarray
'''
args = self.get_args(*args)
probs = args.clf.predict_proba(args.U.X)
# Sort each row from high to low. Multiply by -1 to keep sign the same.
probs = np.sort(-probs, axis=1) * -1
# Compute the difference between first and second most likely labels.
scores = probs[:, 0] - probs[:, 1]
return scores
[docs] def choose(self, scores):
'''
Returns the example with the smallest difference between the two
most probable class labels.
:param numpy.ndarray scores: Output of self.score()
:returns: Index of chosen example.
:rtype: int
'''
return np.argmin(scores)
[docs]class Entropy(UncertaintySampler):
'''
Entropy Sampler. Chooses the member from the unlabeled set
with the greatest entropy across possible labels.
:math:`x^* = argmax_x -\\sum_i P(y_i|x) \\times log_2(P(y_i|x))`
'''
def __init__(self, model_change=False):
super().__init__(model_change=model_change)
# Define self.score()
if self.model_change is True:
self.score = self.model_change_wrapper(self.__score)
else:
self.score = self.__score
def __str__(self):
return "Entropy Sampler"
def __score(self, *args):
'''
Computes entropies for each member of the unlabeled set.
:returns: Entropies.
:rtype: numpy.ndarray
'''
args = self.get_args(*args)
probs = args.clf.predict_proba(args.U.X)
# TODO: Catch warning when 0 in probs and only display it once.
# Can't handle 0 probabilities with log.
if (probs.ravel() == 0).any():
probs[probs == 0] = 1e-16
scores = -np.sum(np.multiply(probs, np.log2(probs)), axis=1)
return scores
[docs]class LeastConfidence(UncertaintySampler):
'''
Least confidence (uncertainty sampling). Chooses the member from
the unlabeled set with the greatest uncertainty, i.e. the greatest
posterior probability of all labels except the most likely one.
:math:`x^* = argmax_x 1 - P(\\hat{y}|x)`
where :math:`\\hat{y} = argmax_y P(y|x)`
'''
def __init__(self, model_change=False):
super().__init__(model_change=model_change)
# Define self.score()
if self.model_change is True:
self.score = self.model_change_wrapper(self.__score)
else:
self.score = self.__score
def __str__(self):
return "Least Confidence"
def __score(self, *args):
'''
Computes leftover probabilities for each member of the unlabeled set.
:returns: Leftover probabilities.
:rtype: numpy.ndarray
'''
args = self.get_args(*args)
probs = args.clf.predict_proba(args.U.X)
scores = 1 - np.max(probs, axis=1)
return scores
[docs]class LeastConfidenceBias(UncertaintySampler):
'''
Least confidence with bias. This is the same as least confidence, but
moves the decision boundary according to the current class distribution.
.. math::
x^* =
\\Biggl \\lbrace
{
\\frac{P(\\hat{y}|x)}{P_{max}}, \\text{ if } {P(\\hat{y}|x) < P_{max}}
\\atop
\\frac{1 - P(\\hat{y}|x)}{P_{max}}, \\text{ otherwise }
}
where
:math:`P_{max} = mean(0.5, 1 - pp)` and
:math:`pp` is the percentage of positive examples in the labeled set.
'''
def __init__(self, model_change=False):
super().__init__(model_change)
# Define self.score()
if self.model_change is True:
self.score = self.model_change_wrapper(self.__score)
else:
self.score = self.__score
def __str__(self):
return "Least Confidence with Bias"
def __score(self, *args):
'''
Computes leftover probabilities for each member of the unlabeled set,
adjusted for the current class distribution.
:returns: scores
:rtype: numpy.ndarray
'''
args = self.get_args(*args)
pp = sum(args.L.y) / args.L.y.shape[0]
p_max = np.mean([0.5, 1 - pp])
probs = np.max(args.clf.predict_proba(args.U.X), axis=1)
scores = np.where(probs < p_max, # If
probs / p_max, # Then
(1 - probs) / p_max) # Else
return scores
[docs]class LeastConfidenceDynamicBias(UncertaintySampler):
'''
Least confidence with dynamic bias. This is the same as least confidence
with bias, but the bias also adjusts for the relative sizes of the
labeled and unlabeled data sets.
.. math::
x^* =
\\Biggl \\lbrace
{
\\frac{P(\\hat{y}|x)}{P_{max}}, \\text{ if } {P(\\hat{y}|x) < P_{max}}
\\atop
\\frac{1 - P(\\hat{y}|x)}{P_{max}}, \\text{ otherwise }
}
where
:math:`P_{max} = (1 - pp)w_b + 0.5w_y`
:math:`pp` is the percentage of positive examples in the labeled set.
:math:`w_u = \\frac{|L|}{U_0}` and
:math:`U_0` is the initial unlabeled set.
:math:`w_b = 1 - w_u`
'''
def __init__(self, model_change=False):
super().__init__(model_change)
self.U_0_size = -1
# Define self.score()
if self.model_change is True:
self.score = self.model_change_wrapper(self.__score)
else:
self.score = self.__score
def __str__(self):
return "Least Confidence with Dynamic Bias"
def __score(self, *args):
'''
:returns: scores
:rtype: numpy.ndarray
'''
args = self.get_args(*args)
if self.U_0_size < 0: # Set U_0_size if unset (-1)
self.U_0_size = args.U.X.shape[0]
pp = sum(args.L.y) / args.L.y.shape[0]
w_u = args.L.y.shape[0] / self.U_0_size
w_b = 1 - w_u
p_max = w_b * (1 - pp) + w_u * 0.5
probs = args.clf.predict_proba(args.U.X)[:, 1]
scores = np.where(probs < p_max, # If
probs / p_max, # Then
(1 - probs) / p_max) # Else
return scores
[docs]class DistanceToCenter(QueryStrategy):
'''
Distance to Center sampling. Measures the distance of each point
to the average x (center) in the labeled data set and computes
the similarity using the equation below.
:math:`x* = argmin_x \\frac{1}{1 + dist(x, x_L)}`
where dist(A, B) is the distance between vectors A and B.
:math:`x_L` is the mean vector in L (i.e. L's center).
'''
def __init__(self, metric='euclidean'):
'''
:param str metric: Distance metric to use. See spd.cdist doc for
available metrics.
'''
super().__init__()
self.distance_metric = metric
self.VI = None
def __str__(self):
return "Distance to Center Sampler"
[docs] def score(self, *args):
'''
:returns: Distances.
:rtype: numpy.ndarray
'''
args = self.get_args(*args)
mean_labeled_x = np.mean(args.L.X, axis=0)
if self.distance_metric == 'mahalanobis' and self.VI is None:
full_matrix = np.vstack([args.U.X, args.L.X]).T
# Use pseudo inverse because features are sparse.
self.VI = np.linalg.pinv(np.cov(full_matrix)).T
distances = spd.cdist([mean_labeled_x], args.U.X,
metric=self.distance_metric, VI=self.VI)
densities = 1 / (1 + distances)
return densities[0]
[docs] def choose(self, scores):
'''
Returns the example with the lowest similarity to the average x in L.
:param numpy.ndarray scores: Output of self.score()
:returns: Index of chosen example.
:rtype: int
'''
return np.argmin(scores)
[docs]class Density(QueryStrategy):
'''
Finds the example x in U that has the greatest average distance to
every other point in U.
:math:`x^* = argmin_x \\frac{1}{U} \\sum_{u=1} \\frac{1}{1 + dist(x, x_u)}`
'''
def __init__(self, metric='euclidean'):
'''
:param str metric: Distance metric to use. See spd.cdist doc for
available metrics.
'''
super().__init__()
self.distance_metric = metric
self.VI = None
def __str__(self):
return "Density Sampler"
[docs] def score(self, *args):
'''
Computes average distance between each member of U and each other
member of U.
:returns: Minimum distances from each point in U to each other point.
:rtype: numpy.ndarray
'''
args = self.get_args(*args)
# Computing similarity to itself will fail.
if args.U.X.shape[0] == 1:
return np.empty(1)
if self.distance_metric == 'mahalanobis' and self.VI is None:
full_matrix = np.vstack([args.U.X, args.L.X]).T
# Use pseudo inverse because features are sparse.
self.VI = np.linalg.pinv(np.cov(full_matrix)).T
distances = spd.cdist(args.U.X, args.U.X,
metric=self.distance_metric, VI=self.VI)
if np.isnan(distances).any():
raise ValueError("Distances contain NaN values. Check that input vectors != 0.") # noqa
num_x = args.U.X.shape[0]
# Remove zero scores b/c we want distance from every OTHER point.
np.fill_diagonal(distances, np.NaN)
distances = distances[~np.isnan(distances)].reshape(num_x, num_x - 1)
similarities = 1 / (1 + distances)
scores = np.mean(similarities, axis=1)
return scores
[docs] def choose(self, scores):
'''
Returns the example with the lowest similarity to the average x in U.
:param numpy.ndarray scores: Output of self.score()
:returns: Index of chosen example.
:rtype: int
'''
return np.argmin(scores)
[docs]class MinMax(QueryStrategy):
'''
Finds the exmaple x in U that has the maximum smallest distance
to every point in L. Ensures representative coverage of the dataset.
:math:`x^* = argmax_{x_i} ( min_{x_j} dist(x_i, x_j) )`
where :math:`x_i \\in U`, :math:`x_j \\in L`, dist(.) is the
given distance metric.
'''
def __init__(self, metric='euclidean'):
'''
:param str metric: Distance metric to use. See the spd.cdist doc for
available metrics.
'''
super().__init__()
self.distance_metric = str(metric)
self.VI = None
def __str__(self):
return "Min Max Sampler"
# TODO: Precompute distances to avoid redundant computation.
[docs] def score(self, *args):
'''
Computes minimum distance between each member of unlabeled_x
and each member of labeled_x.
:returns: Minimum distances from each unlabeled_x to each labeled_x.
:rtype: numpy.ndarray
'''
args = self.get_args(*args)
if self.distance_metric == 'mahalanobis' and self.VI is None:
full_matrix = np.vstack([args.U.X, args.L.X]).T
# Use pseudo inverse because features are sparse.
self.VI = np.linalg.pinv(np.cov(full_matrix)).T
distances = spd.cdist(args.U.X, args.L.X,
metric=self.distance_metric, VI=self.VI)
scores = np.min(distances, axis=1)
return scores
[docs] def choose(self, scores):
'''
Returns the examples with the greatest minimum distance to
every other x in L.
:param numpy.ndarray scores: Output of self.score()
:returns: Index of chosen example.
:rtype: int
'''
return np.argmax(scores)