2802 lines
67 KiB
Python

"""This file contains code for use with "Think Stats" and
"Think Bayes", both by Allen B. Downey, available from greenteapress.com
Copyright 2014 Allen B. Downey
License: GNU GPLv3 http://www.gnu.org/licenses/gpl.html
"""
from __future__ import print_function, division
"""This file contains class definitions for:
Hist: represents a histogram (map from values to integer frequencies).
Pmf: represents a probability mass function (map from values to probs).
_DictWrapper: private parent class for Hist and Pmf.
Cdf: represents a discrete cumulative distribution function
Pdf: represents a continuous probability density function
"""
import bisect
import copy
import logging
import math
import random
import re
from collections import Counter
from operator import itemgetter
import thinkplot
import numpy as np
import pandas
import scipy
from scipy import stats
from scipy import special
from scipy import ndimage
from io import open
ROOT2 = math.sqrt(2)
def RandomSeed(x):
"""Initialize the random and np.random generators.
x: int seed
"""
random.seed(x)
np.random.seed(x)
def Odds(p):
"""Computes odds for a given probability.
Example: p=0.75 means 75 for and 25 against, or 3:1 odds in favor.
Note: when p=1, the formula for odds divides by zero, which is
normally undefined. But I think it is reasonable to define Odds(1)
to be infinity, so that's what this function does.
p: float 0-1
Returns: float odds
"""
if p == 1:
return float('inf')
return p / (1 - p)
def Probability(o):
"""Computes the probability corresponding to given odds.
Example: o=2 means 2:1 odds in favor, or 2/3 probability
o: float odds, strictly positive
Returns: float probability
"""
return o / (o + 1)
def Probability2(yes, no):
"""Computes the probability corresponding to given odds.
Example: yes=2, no=1 means 2:1 odds in favor, or 2/3 probability.
yes, no: int or float odds in favor
"""
return yes / (yes + no)
class Interpolator(object):
"""Represents a mapping between sorted sequences; performs linear interp.
Attributes:
xs: sorted list
ys: sorted list
"""
def __init__(self, xs, ys):
self.xs = xs
self.ys = ys
def Lookup(self, x):
"""Looks up x and returns the corresponding value of y."""
return self._Bisect(x, self.xs, self.ys)
def Reverse(self, y):
"""Looks up y and returns the corresponding value of x."""
return self._Bisect(y, self.ys, self.xs)
def _Bisect(self, x, xs, ys):
"""Helper function."""
if x <= xs[0]:
return ys[0]
if x >= xs[-1]:
return ys[-1]
i = bisect.bisect(xs, x)
frac = 1.0 * (x - xs[i - 1]) / (xs[i] - xs[i - 1])
y = ys[i - 1] + frac * 1.0 * (ys[i] - ys[i - 1])
return y
class _DictWrapper(object):
"""An object that contains a dictionary."""
def __init__(self, obj=None, label=None):
"""Initializes the distribution.
obj: Hist, Pmf, Cdf, Pdf, dict, pandas Series, list of pairs
label: string label
"""
self.label = label if label is not None else '_nolegend_'
self.d = {}
# flag whether the distribution is under a log transform
self.log = False
if obj is None:
return
if isinstance(obj, (_DictWrapper, Cdf, Pdf)):
self.label = label if label is not None else obj.label
if isinstance(obj, dict):
self.d.update(obj.items())
elif isinstance(obj, (_DictWrapper, Cdf, Pdf)):
self.d.update(obj.Items())
elif isinstance(obj, pandas.Series):
self.d.update(obj.value_counts().iteritems())
else:
# finally, treat it like a list
self.d.update(Counter(obj))
if len(self) > 0 and isinstance(self, Pmf):
self.Normalize()
def __hash__(self):
return id(self)
def __str__(self):
cls = self.__class__.__name__
return '%s(%s)' % (cls, str(self.d))
__repr__ = __str__
def __eq__(self, other):
return self.d == other.d
def __len__(self):
return len(self.d)
def __iter__(self):
return iter(self.d)
def iterkeys(self):
"""Returns an iterator over keys."""
return iter(self.d)
def __contains__(self, value):
return value in self.d
def __getitem__(self, value):
return self.d.get(value, 0)
def __setitem__(self, value, prob):
self.d[value] = prob
def __delitem__(self, value):
del self.d[value]
def Copy(self, label=None):
"""Returns a copy.
Make a shallow copy of d. If you want a deep copy of d,
use copy.deepcopy on the whole object.
label: string label for the new Hist
returns: new _DictWrapper with the same type
"""
new = copy.copy(self)
new.d = copy.copy(self.d)
new.label = label if label is not None else self.label
return new
def Scale(self, factor):
"""Multiplies the values by a factor.
factor: what to multiply by
Returns: new object
"""
new = self.Copy()
new.d.clear()
for val, prob in self.Items():
new.Set(val * factor, prob)
return new
def Log(self, m=None):
"""Log transforms the probabilities.
Removes values with probability 0.
Normalizes so that the largest logprob is 0.
"""
if self.log:
raise ValueError("Pmf/Hist already under a log transform")
self.log = True
if m is None:
m = self.MaxLike()
for x, p in self.d.items():
if p:
self.Set(x, math.log(p / m))
else:
self.Remove(x)
def Exp(self, m=None):
"""Exponentiates the probabilities.
m: how much to shift the ps before exponentiating
If m is None, normalizes so that the largest prob is 1.
"""
if not self.log:
raise ValueError("Pmf/Hist not under a log transform")
self.log = False
if m is None:
m = self.MaxLike()
for x, p in self.d.items():
self.Set(x, math.exp(p - m))
def GetDict(self):
"""Gets the dictionary."""
return self.d
def SetDict(self, d):
"""Sets the dictionary."""
self.d = d
def Values(self):
"""Gets an unsorted sequence of values.
Note: one source of confusion is that the keys of this
dictionary are the values of the Hist/Pmf, and the
values of the dictionary are frequencies/probabilities.
"""
return self.d.keys()
def Items(self):
"""Gets an unsorted sequence of (value, freq/prob) pairs."""
return self.d.items()
def Render(self, **options):
"""Generates a sequence of points suitable for plotting.
Note: options are ignored
Returns:
tuple of (sorted value sequence, freq/prob sequence)
"""
if min(self.d.keys()) is np.nan:
logging.warning('Hist: contains NaN, may not render correctly.')
return zip(*sorted(self.Items()))
def MakeCdf(self, label=None):
"""Makes a Cdf."""
label = label if label is not None else self.label
return Cdf(self, label=label)
def Print(self):
"""Prints the values and freqs/probs in ascending order."""
for val, prob in sorted(self.d.items()):
print(val, prob)
def Set(self, x, y=0):
"""Sets the freq/prob associated with the value x.
Args:
x: number value
y: number freq or prob
"""
self.d[x] = y
def Incr(self, x, term=1):
"""Increments the freq/prob associated with the value x.
Args:
x: number value
term: how much to increment by
"""
self.d[x] = self.d.get(x, 0) + term
def Mult(self, x, factor):
"""Scales the freq/prob associated with the value x.
Args:
x: number value
factor: how much to multiply by
"""
self.d[x] = self.d.get(x, 0) * factor
def Remove(self, x):
"""Removes a value.
Throws an exception if the value is not there.
Args:
x: value to remove
"""
del self.d[x]
def Total(self):
"""Returns the total of the frequencies/probabilities in the map."""
total = sum(self.d.values())
return total
def MaxLike(self):
"""Returns the largest frequency/probability in the map."""
return max(self.d.values())
def Largest(self, n=10):
"""Returns the largest n values, with frequency/probability.
n: number of items to return
"""
return sorted(self.d.items(), reverse=True)[:n]
def Smallest(self, n=10):
"""Returns the smallest n values, with frequency/probability.
n: number of items to return
"""
return sorted(self.d.items(), reverse=False)[:n]
class Hist(_DictWrapper):
"""Represents a histogram, which is a map from values to frequencies.
Values can be any hashable type; frequencies are integer counters.
"""
def Freq(self, x):
"""Gets the frequency associated with the value x.
Args:
x: number value
Returns:
int frequency
"""
return self.d.get(x, 0)
def Freqs(self, xs):
"""Gets frequencies for a sequence of values."""
return [self.Freq(x) for x in xs]
def IsSubset(self, other):
"""Checks whether the values in this histogram are a subset of
the values in the given histogram."""
for val, freq in self.Items():
if freq > other.Freq(val):
return False
return True
def Subtract(self, other):
"""Subtracts the values in the given histogram from this histogram."""
for val, freq in other.Items():
self.Incr(val, -freq)
class Pmf(_DictWrapper):
"""Represents a probability mass function.
Values can be any hashable type; probabilities are floating-point.
Pmfs are not necessarily normalized.
"""
def Prob(self, x, default=0):
"""Gets the probability associated with the value x.
Args:
x: number value
default: value to return if the key is not there
Returns:
float probability
"""
return self.d.get(x, default)
def Probs(self, xs):
"""Gets probabilities for a sequence of values."""
return [self.Prob(x) for x in xs]
def Percentile(self, percentage):
"""Computes a percentile of a given Pmf.
Note: this is not super efficient. If you are planning
to compute more than a few percentiles, compute the Cdf.
percentage: float 0-100
returns: value from the Pmf
"""
p = percentage / 100.0
total = 0
for val, prob in sorted(self.Items()):
total += prob
if total >= p:
return val
def ProbGreater(self, x):
"""Probability that a sample from this Pmf exceeds x.
x: number
returns: float probability
"""
if isinstance(x, _DictWrapper):
return PmfProbGreater(self, x)
else:
t = [prob for (val, prob) in self.d.items() if val > x]
return sum(t)
def ProbLess(self, x):
"""Probability that a sample from this Pmf is less than x.
x: number
returns: float probability
"""
if isinstance(x, _DictWrapper):
return PmfProbLess(self, x)
else:
t = [prob for (val, prob) in self.d.items() if val < x]
return sum(t)
def __lt__(self, obj):
"""Less than.
obj: number or _DictWrapper
returns: float probability
"""
return self.ProbLess(obj)
def __gt__(self, obj):
"""Greater than.
obj: number or _DictWrapper
returns: float probability
"""
return self.ProbGreater(obj)
def __ge__(self, obj):
"""Greater than or equal.
obj: number or _DictWrapper
returns: float probability
"""
return 1 - (self < obj)
def __le__(self, obj):
"""Less than or equal.
obj: number or _DictWrapper
returns: float probability
"""
return 1 - (self > obj)
def Normalize(self, fraction=1.0):
"""Normalizes this PMF so the sum of all probs is fraction.
Args:
fraction: what the total should be after normalization
Returns: the total probability before normalizing
"""
if self.log:
raise ValueError("Normalize: Pmf is under a log transform")
total = self.Total()
if total == 0.0:
raise ValueError('Normalize: total probability is zero.')
#logging.warning('Normalize: total probability is zero.')
#return total
factor = fraction / total
for x in self.d:
self.d[x] *= factor
return total
def Random(self):
"""Chooses a random element from this PMF.
Note: this is not very efficient. If you plan to call
this more than a few times, consider converting to a CDF.
Returns:
float value from the Pmf
"""
target = random.random()
total = 0.0
for x, p in self.d.items():
total += p
if total >= target:
return x
# we shouldn't get here
raise ValueError('Random: Pmf might not be normalized.')
def Mean(self):
"""Computes the mean of a PMF.
Returns:
float mean
"""
mean = 0.0
for x, p in self.d.items():
mean += p * x
return mean
def Var(self, mu=None):
"""Computes the variance of a PMF.
mu: the point around which the variance is computed;
if omitted, computes the mean
returns: float variance
"""
if mu is None:
mu = self.Mean()
var = 0.0
for x, p in self.d.items():
var += p * (x - mu) ** 2
return var
def Std(self, mu=None):
"""Computes the standard deviation of a PMF.
mu: the point around which the variance is computed;
if omitted, computes the mean
returns: float standard deviation
"""
var = self.Var(mu)
return math.sqrt(var)
def MaximumLikelihood(self):
"""Returns the value with the highest probability.
Returns: float probability
"""
_, val = max((prob, val) for val, prob in self.Items())
return val
def CredibleInterval(self, percentage=90):
"""Computes the central credible interval.
If percentage=90, computes the 90% CI.
Args:
percentage: float between 0 and 100
Returns:
sequence of two floats, low and high
"""
cdf = self.MakeCdf()
return cdf.CredibleInterval(percentage)
def __add__(self, other):
"""Computes the Pmf of the sum of values drawn from self and other.
other: another Pmf or a scalar
returns: new Pmf
"""
try:
return self.AddPmf(other)
except AttributeError:
return self.AddConstant(other)
def AddPmf(self, other):
"""Computes the Pmf of the sum of values drawn from self and other.
other: another Pmf
returns: new Pmf
"""
pmf = Pmf()
for v1, p1 in self.Items():
for v2, p2 in other.Items():
pmf.Incr(v1 + v2, p1 * p2)
return pmf
def AddConstant(self, other):
"""Computes the Pmf of the sum a constant and values from self.
other: a number
returns: new Pmf
"""
pmf = Pmf()
for v1, p1 in self.Items():
pmf.Set(v1 + other, p1)
return pmf
def __sub__(self, other):
"""Computes the Pmf of the diff of values drawn from self and other.
other: another Pmf
returns: new Pmf
"""
try:
return self.SubPmf(other)
except AttributeError:
return self.AddConstant(-other)
def SubPmf(self, other):
"""Computes the Pmf of the diff of values drawn from self and other.
other: another Pmf
returns: new Pmf
"""
pmf = Pmf()
for v1, p1 in self.Items():
for v2, p2 in other.Items():
pmf.Incr(v1 - v2, p1 * p2)
return pmf
def __mul__(self, other):
"""Computes the Pmf of the product of values drawn from self and other.
other: another Pmf
returns: new Pmf
"""
try:
return self.MulPmf(other)
except AttributeError:
return self.MulConstant(other)
def MulPmf(self, other):
"""Computes the Pmf of the diff of values drawn from self and other.
other: another Pmf
returns: new Pmf
"""
pmf = Pmf()
for v1, p1 in self.Items():
for v2, p2 in other.Items():
pmf.Incr(v1 * v2, p1 * p2)
return pmf
def MulConstant(self, other):
"""Computes the Pmf of the product of a constant and values from self.
other: a number
returns: new Pmf
"""
pmf = Pmf()
for v1, p1 in self.Items():
pmf.Set(v1 * other, p1)
return pmf
def __div__(self, other):
"""Computes the Pmf of the ratio of values drawn from self and other.
other: another Pmf
returns: new Pmf
"""
try:
return self.DivPmf(other)
except AttributeError:
return self.MulConstant(1/other)
__truediv__ = __div__
def DivPmf(self, other):
"""Computes the Pmf of the ratio of values drawn from self and other.
other: another Pmf
returns: new Pmf
"""
pmf = Pmf()
for v1, p1 in self.Items():
for v2, p2 in other.Items():
pmf.Incr(v1 / v2, p1 * p2)
return pmf
def Max(self, k):
"""Computes the CDF of the maximum of k selections from this dist.
k: int
returns: new Cdf
"""
cdf = self.MakeCdf()
return cdf.Max(k)
class Joint(Pmf):
"""Represents a joint distribution.
The values are sequences (usually tuples)
"""
def Marginal(self, i, label=None):
"""Gets the marginal distribution of the indicated variable.
i: index of the variable we want
Returns: Pmf
"""
pmf = Pmf(label=label)
for vs, prob in self.Items():
pmf.Incr(vs[i], prob)
return pmf
def Conditional(self, i, j, val, label=None):
"""Gets the conditional distribution of the indicated variable.
Distribution of vs[i], conditioned on vs[j] = val.
i: index of the variable we want
j: which variable is conditioned on
val: the value the jth variable has to have
Returns: Pmf
"""
pmf = Pmf(label=label)
for vs, prob in self.Items():
if vs[j] != val:
continue
pmf.Incr(vs[i], prob)
pmf.Normalize()
return pmf
def MaxLikeInterval(self, percentage=90):
"""Returns the maximum-likelihood credible interval.
If percentage=90, computes a 90% CI containing the values
with the highest likelihoods.
percentage: float between 0 and 100
Returns: list of values from the suite
"""
interval = []
total = 0
t = [(prob, val) for val, prob in self.Items()]
t.sort(reverse=True)
for prob, val in t:
interval.append(val)
total += prob
if total >= percentage / 100.0:
break
return interval
def MakeJoint(pmf1, pmf2):
"""Joint distribution of values from pmf1 and pmf2.
Assumes that the PMFs represent independent random variables.
Args:
pmf1: Pmf object
pmf2: Pmf object
Returns:
Joint pmf of value pairs
"""
joint = Joint()
for v1, p1 in pmf1.Items():
for v2, p2 in pmf2.Items():
joint.Set((v1, v2), p1 * p2)
return joint
def MakeHistFromList(t, label=None):
"""Makes a histogram from an unsorted sequence of values.
Args:
t: sequence of numbers
label: string label for this histogram
Returns:
Hist object
"""
return Hist(t, label=label)
def MakeHistFromDict(d, label=None):
"""Makes a histogram from a map from values to frequencies.
Args:
d: dictionary that maps values to frequencies
label: string label for this histogram
Returns:
Hist object
"""
return Hist(d, label)
def MakePmfFromList(t, label=None):
"""Makes a PMF from an unsorted sequence of values.
Args:
t: sequence of numbers
label: string label for this PMF
Returns:
Pmf object
"""
return Pmf(t, label=label)
def MakePmfFromDict(d, label=None):
"""Makes a PMF from a map from values to probabilities.
Args:
d: dictionary that maps values to probabilities
label: string label for this PMF
Returns:
Pmf object
"""
return Pmf(d, label=label)
def MakePmfFromItems(t, label=None):
"""Makes a PMF from a sequence of value-probability pairs
Args:
t: sequence of value-probability pairs
label: string label for this PMF
Returns:
Pmf object
"""
return Pmf(dict(t), label=label)
def MakePmfFromHist(hist, label=None):
"""Makes a normalized PMF from a Hist object.
Args:
hist: Hist object
label: string label
Returns:
Pmf object
"""
if label is None:
label = hist.label
return Pmf(hist, label=label)
def MakeMixture(metapmf, label='mix'):
"""Make a mixture distribution.
Args:
metapmf: Pmf that maps from Pmfs to probs.
label: string label for the new Pmf.
Returns: Pmf object.
"""
mix = Pmf(label=label)
for pmf, p1 in metapmf.Items():
for x, p2 in pmf.Items():
mix.Incr(x, p1 * p2)
return mix
def MakeUniformPmf(low, high, n):
"""Make a uniform Pmf.
low: lowest value (inclusive)
high: highest value (inclusize)
n: number of values
"""
pmf = Pmf()
for x in np.linspace(low, high, n):
pmf.Set(x, 1)
pmf.Normalize()
return pmf
class Cdf(object):
"""Represents a cumulative distribution function.
Attributes:
xs: sequence of values
ps: sequence of probabilities
label: string used as a graph label.
"""
def __init__(self, obj=None, ps=None, label=None):
"""Initializes.
If ps is provided, obj must be the corresponding list of values.
obj: Hist, Pmf, Cdf, Pdf, dict, pandas Series, list of pairs
ps: list of cumulative probabilities
label: string label
"""
self.label = label if label is not None else '_nolegend_'
if isinstance(obj, (_DictWrapper, Cdf, Pdf)):
if not label:
self.label = label if label is not None else obj.label
if obj is None:
# caller does not provide obj, make an empty Cdf
self.xs = np.asarray([])
self.ps = np.asarray([])
if ps is not None:
logging.warning("Cdf: can't pass ps without also passing xs.")
return
else:
# if the caller provides xs and ps, just store them
if ps is not None:
if isinstance(ps, str):
logging.warning("Cdf: ps can't be a string")
self.xs = np.asarray(obj)
self.ps = np.asarray(ps)
return
# caller has provided just obj, not ps
if isinstance(obj, Cdf):
self.xs = copy.copy(obj.xs)
self.ps = copy.copy(obj.ps)
return
if isinstance(obj, _DictWrapper):
dw = obj
else:
dw = Hist(obj)
if len(dw) == 0:
self.xs = np.asarray([])
self.ps = np.asarray([])
return
xs, freqs = zip(*sorted(dw.Items()))
self.xs = np.asarray(xs)
self.ps = np.cumsum(freqs, dtype=np.float)
self.ps /= self.ps[-1]
def __str__(self):
return 'Cdf(%s, %s)' % (str(self.xs), str(self.ps))
__repr__ = __str__
def __len__(self):
return len(self.xs)
def __getitem__(self, x):
return self.Prob(x)
def __setitem__(self):
raise UnimplementedMethodException()
def __delitem__(self):
raise UnimplementedMethodException()
def __eq__(self, other):
return np.all(self.xs == other.xs) and np.all(self.ps == other.ps)
def Copy(self, label=None):
"""Returns a copy of this Cdf.
label: string label for the new Cdf
"""
if label is None:
label = self.label
return Cdf(list(self.xs), list(self.ps), label=label)
def MakePmf(self, label=None):
"""Makes a Pmf."""
if label is None:
label = self.label
return Pmf(self, label=label)
def Values(self):
"""Returns a sorted list of values.
"""
return self.xs
def Items(self):
"""Returns a sorted sequence of (value, probability) pairs.
Note: in Python3, returns an iterator.
"""
a = self.ps
b = np.roll(a, 1)
b[0] = 0
return zip(self.xs, a-b)
def Shift(self, term):
"""Adds a term to the xs.
term: how much to add
"""
new = self.Copy()
# don't use +=, or else an int array + float yields int array
new.xs = new.xs + term
return new
def Scale(self, factor):
"""Multiplies the xs by a factor.
factor: what to multiply by
"""
new = self.Copy()
# don't use *=, or else an int array * float yields int array
new.xs = new.xs * factor
return new
def Prob(self, x):
"""Returns CDF(x), the probability that corresponds to value x.
Args:
x: number
Returns:
float probability
"""
if x < self.xs[0]:
return 0.0
index = bisect.bisect(self.xs, x)
p = self.ps[index-1]
return p
def Probs(self, xs):
"""Gets probabilities for a sequence of values.
xs: any sequence that can be converted to NumPy array
returns: NumPy array of cumulative probabilities
"""
xs = np.asarray(xs)
index = np.searchsorted(self.xs, xs, side='right')
ps = self.ps[index-1]
ps[xs < self.xs[0]] = 0.0
return ps
ProbArray = Probs
def Value(self, p):
"""Returns InverseCDF(p), the value that corresponds to probability p.
Args:
p: number in the range [0, 1]
Returns:
number value
"""
if p < 0 or p > 1:
raise ValueError('Probability p must be in range [0, 1]')
index = bisect.bisect_left(self.ps, p)
return self.xs[index]
def ValueArray(self, ps):
"""Returns InverseCDF(p), the value that corresponds to probability p.
Args:
ps: NumPy array of numbers in the range [0, 1]
Returns:
NumPy array of values
"""
ps = np.asarray(ps)
if np.any(ps < 0) or np.any(ps > 1):
raise ValueError('Probability p must be in range [0, 1]')
index = np.searchsorted(self.ps, ps, side='left')
return self.xs[index]
def Percentile(self, p):
"""Returns the value that corresponds to percentile p.
Args:
p: number in the range [0, 100]
Returns:
number value
"""
return self.Value(p / 100.0)
def PercentileRank(self, x):
"""Returns the percentile rank of the value x.
x: potential value in the CDF
returns: percentile rank in the range 0 to 100
"""
return self.Prob(x) * 100.0
def Random(self):
"""Chooses a random value from this distribution."""
return self.Value(random.random())
def Sample(self, n):
"""Generates a random sample from this distribution.
n: int length of the sample
returns: NumPy array
"""
ps = np.random.random(n)
return self.ValueArray(ps)
def Mean(self):
"""Computes the mean of a CDF.
Returns:
float mean
"""
old_p = 0
total = 0.0
for x, new_p in zip(self.xs, self.ps):
p = new_p - old_p
total += p * x
old_p = new_p
return total
def CredibleInterval(self, percentage=90):
"""Computes the central credible interval.
If percentage=90, computes the 90% CI.
Args:
percentage: float between 0 and 100
Returns:
sequence of two floats, low and high
"""
prob = (1 - percentage / 100.0) / 2
interval = self.Value(prob), self.Value(1 - prob)
return interval
ConfidenceInterval = CredibleInterval
def _Round(self, multiplier=1000.0):
"""
An entry is added to the cdf only if the percentile differs
from the previous value in a significant digit, where the number
of significant digits is determined by multiplier. The
default is 1000, which keeps log10(1000) = 3 significant digits.
"""
# TODO(write this method)
raise UnimplementedMethodException()
def Render(self, **options):
"""Generates a sequence of points suitable for plotting.
An empirical CDF is a step function; linear interpolation
can be misleading.
Note: options are ignored
Returns:
tuple of (xs, ps)
"""
def interleave(a, b):
c = np.empty(a.shape[0] + b.shape[0])
c[::2] = a
c[1::2] = b
return c
a = np.array(self.xs)
xs = interleave(a, a)
shift_ps = np.roll(self.ps, 1)
shift_ps[0] = 0
ps = interleave(shift_ps, self.ps)
return xs, ps
def Max(self, k):
"""Computes the CDF of the maximum of k selections from this dist.
k: int
returns: new Cdf
"""
cdf = self.Copy()
cdf.ps **= k
return cdf
def MakeCdfFromItems(items, label=None):
"""Makes a cdf from an unsorted sequence of (value, frequency) pairs.
Args:
items: unsorted sequence of (value, frequency) pairs
label: string label for this CDF
Returns:
cdf: list of (value, fraction) pairs
"""
return Cdf(dict(items), label=label)
def MakeCdfFromDict(d, label=None):
"""Makes a CDF from a dictionary that maps values to frequencies.
Args:
d: dictionary that maps values to frequencies.
label: string label for the data.
Returns:
Cdf object
"""
return Cdf(d, label=label)
def MakeCdfFromList(seq, label=None):
"""Creates a CDF from an unsorted sequence.
Args:
seq: unsorted sequence of sortable values
label: string label for the cdf
Returns:
Cdf object
"""
return Cdf(seq, label=label)
def MakeCdfFromHist(hist, label=None):
"""Makes a CDF from a Hist object.
Args:
hist: Pmf.Hist object
label: string label for the data.
Returns:
Cdf object
"""
if label is None:
label = hist.label
return Cdf(hist, label=label)
def MakeCdfFromPmf(pmf, label=None):
"""Makes a CDF from a Pmf object.
Args:
pmf: Pmf.Pmf object
label: string label for the data.
Returns:
Cdf object
"""
if label is None:
label = pmf.label
return Cdf(pmf, label=label)
class UnimplementedMethodException(Exception):
"""Exception if someone calls a method that should be overridden."""
class Suite(Pmf):
"""Represents a suite of hypotheses and their probabilities."""
def Update(self, data):
"""Updates each hypothesis based on the data.
data: any representation of the data
returns: the normalizing constant
"""
for hypo in self.Values():
like = self.Likelihood(data, hypo)
self.Mult(hypo, like)
return self.Normalize()
def LogUpdate(self, data):
"""Updates a suite of hypotheses based on new data.
Modifies the suite directly; if you want to keep the original, make
a copy.
Note: unlike Update, LogUpdate does not normalize.
Args:
data: any representation of the data
"""
for hypo in self.Values():
like = self.LogLikelihood(data, hypo)
self.Incr(hypo, like)
def UpdateSet(self, dataset):
"""Updates each hypothesis based on the dataset.
This is more efficient than calling Update repeatedly because
it waits until the end to Normalize.
Modifies the suite directly; if you want to keep the original, make
a copy.
dataset: a sequence of data
returns: the normalizing constant
"""
for data in dataset:
for hypo in self.Values():
like = self.Likelihood(data, hypo)
self.Mult(hypo, like)
return self.Normalize()
def LogUpdateSet(self, dataset):
"""Updates each hypothesis based on the dataset.
Modifies the suite directly; if you want to keep the original, make
a copy.
dataset: a sequence of data
returns: None
"""
for data in dataset:
self.LogUpdate(data)
def Likelihood(self, data, hypo):
"""Computes the likelihood of the data under the hypothesis.
hypo: some representation of the hypothesis
data: some representation of the data
"""
raise UnimplementedMethodException()
def LogLikelihood(self, data, hypo):
"""Computes the log likelihood of the data under the hypothesis.
hypo: some representation of the hypothesis
data: some representation of the data
"""
raise UnimplementedMethodException()
def Print(self):
"""Prints the hypotheses and their probabilities."""
for hypo, prob in sorted(self.Items()):
print(hypo, prob)
def MakeOdds(self):
"""Transforms from probabilities to odds.
Values with prob=0 are removed.
"""
for hypo, prob in self.Items():
if prob:
self.Set(hypo, Odds(prob))
else:
self.Remove(hypo)
def MakeProbs(self):
"""Transforms from odds to probabilities."""
for hypo, odds in self.Items():
self.Set(hypo, Probability(odds))
def MakeSuiteFromList(t, label=None):
"""Makes a suite from an unsorted sequence of values.
Args:
t: sequence of numbers
label: string label for this suite
Returns:
Suite object
"""
hist = MakeHistFromList(t, label=label)
d = hist.GetDict()
return MakeSuiteFromDict(d)
def MakeSuiteFromHist(hist, label=None):
"""Makes a normalized suite from a Hist object.
Args:
hist: Hist object
label: string label
Returns:
Suite object
"""
if label is None:
label = hist.label
# make a copy of the dictionary
d = dict(hist.GetDict())
return MakeSuiteFromDict(d, label)
def MakeSuiteFromDict(d, label=None):
"""Makes a suite from a map from values to probabilities.
Args:
d: dictionary that maps values to probabilities
label: string label for this suite
Returns:
Suite object
"""
suite = Suite(label=label)
suite.SetDict(d)
suite.Normalize()
return suite
class Pdf(object):
"""Represents a probability density function (PDF)."""
def Density(self, x):
"""Evaluates this Pdf at x.
Returns: float or NumPy array of probability density
"""
raise UnimplementedMethodException()
def GetLinspace(self):
"""Get a linspace for plotting.
Not all subclasses of Pdf implement this.
Returns: numpy array
"""
raise UnimplementedMethodException()
def MakePmf(self, **options):
"""Makes a discrete version of this Pdf.
options can include
label: string
low: low end of range
high: high end of range
n: number of places to evaluate
Returns: new Pmf
"""
label = options.pop('label', '')
xs, ds = self.Render(**options)
return Pmf(dict(zip(xs, ds)), label=label)
def Render(self, **options):
"""Generates a sequence of points suitable for plotting.
If options includes low and high, it must also include n;
in that case the density is evaluated an n locations between
low and high, including both.
If options includes xs, the density is evaluate at those location.
Otherwise, self.GetLinspace is invoked to provide the locations.
Returns:
tuple of (xs, densities)
"""
low, high = options.pop('low', None), options.pop('high', None)
if low is not None and high is not None:
n = options.pop('n', 101)
xs = np.linspace(low, high, n)
else:
xs = options.pop('xs', None)
if xs is None:
xs = self.GetLinspace()
ds = self.Density(xs)
return xs, ds
def Items(self):
"""Generates a sequence of (value, probability) pairs.
"""
return zip(*self.Render())
class NormalPdf(Pdf):
"""Represents the PDF of a Normal distribution."""
def __init__(self, mu=0, sigma=1, label=None):
"""Constructs a Normal Pdf with given mu and sigma.
mu: mean
sigma: standard deviation
label: string
"""
self.mu = mu
self.sigma = sigma
self.label = label if label is not None else '_nolegend_'
def __str__(self):
return 'NormalPdf(%f, %f)' % (self.mu, self.sigma)
def GetLinspace(self):
"""Get a linspace for plotting.
Returns: numpy array
"""
low, high = self.mu-3*self.sigma, self.mu+3*self.sigma
return np.linspace(low, high, 101)
def Density(self, xs):
"""Evaluates this Pdf at xs.
xs: scalar or sequence of floats
returns: float or NumPy array of probability density
"""
return stats.norm.pdf(xs, self.mu, self.sigma)
class ExponentialPdf(Pdf):
"""Represents the PDF of an exponential distribution."""
def __init__(self, lam=1, label=None):
"""Constructs an exponential Pdf with given parameter.
lam: rate parameter
label: string
"""
self.lam = lam
self.label = label if label is not None else '_nolegend_'
def __str__(self):
return 'ExponentialPdf(%f)' % (self.lam)
def GetLinspace(self):
"""Get a linspace for plotting.
Returns: numpy array
"""
low, high = 0, 5.0/self.lam
return np.linspace(low, high, 101)
def Density(self, xs):
"""Evaluates this Pdf at xs.
xs: scalar or sequence of floats
returns: float or NumPy array of probability density
"""
return stats.expon.pdf(xs, scale=1.0/self.lam)
class EstimatedPdf(Pdf):
"""Represents a PDF estimated by KDE."""
def __init__(self, sample, label=None):
"""Estimates the density function based on a sample.
sample: sequence of data
label: string
"""
self.label = label if label is not None else '_nolegend_'
self.kde = stats.gaussian_kde(sample)
low = min(sample)
high = max(sample)
self.linspace = np.linspace(low, high, 101)
def __str__(self):
return 'EstimatedPdf(label=%s)' % str(self.label)
def GetLinspace(self):
"""Get a linspace for plotting.
Returns: numpy array
"""
return self.linspace
def Density(self, xs):
"""Evaluates this Pdf at xs.
returns: float or NumPy array of probability density
"""
return self.kde.evaluate(xs)
def CredibleInterval(pmf, percentage=90):
"""Computes a credible interval for a given distribution.
If percentage=90, computes the 90% CI.
Args:
pmf: Pmf object representing a posterior distribution
percentage: float between 0 and 100
Returns:
sequence of two floats, low and high
"""
cdf = pmf.MakeCdf()
prob = (1 - percentage / 100.0) / 2
interval = cdf.Value(prob), cdf.Value(1 - prob)
return interval
def PmfProbLess(pmf1, pmf2):
"""Probability that a value from pmf1 is less than a value from pmf2.
Args:
pmf1: Pmf object
pmf2: Pmf object
Returns:
float probability
"""
total = 0.0
for v1, p1 in pmf1.Items():
for v2, p2 in pmf2.Items():
if v1 < v2:
total += p1 * p2
return total
def PmfProbGreater(pmf1, pmf2):
"""Probability that a value from pmf1 is less than a value from pmf2.
Args:
pmf1: Pmf object
pmf2: Pmf object
Returns:
float probability
"""
total = 0.0
for v1, p1 in pmf1.Items():
for v2, p2 in pmf2.Items():
if v1 > v2:
total += p1 * p2
return total
def PmfProbEqual(pmf1, pmf2):
"""Probability that a value from pmf1 equals a value from pmf2.
Args:
pmf1: Pmf object
pmf2: Pmf object
Returns:
float probability
"""
total = 0.0
for v1, p1 in pmf1.Items():
for v2, p2 in pmf2.Items():
if v1 == v2:
total += p1 * p2
return total
def RandomSum(dists):
"""Chooses a random value from each dist and returns the sum.
dists: sequence of Pmf or Cdf objects
returns: numerical sum
"""
total = sum(dist.Random() for dist in dists)
return total
def SampleSum(dists, n):
"""Draws a sample of sums from a list of distributions.
dists: sequence of Pmf or Cdf objects
n: sample size
returns: new Pmf of sums
"""
pmf = Pmf(RandomSum(dists) for i in range(n))
return pmf
def EvalNormalPdf(x, mu, sigma):
"""Computes the unnormalized PDF of the normal distribution.
x: value
mu: mean
sigma: standard deviation
returns: float probability density
"""
return stats.norm.pdf(x, mu, sigma)
def MakeNormalPmf(mu, sigma, num_sigmas, n=201):
"""Makes a PMF discrete approx to a Normal distribution.
mu: float mean
sigma: float standard deviation
num_sigmas: how many sigmas to extend in each direction
n: number of values in the Pmf
returns: normalized Pmf
"""
pmf = Pmf()
low = mu - num_sigmas * sigma
high = mu + num_sigmas * sigma
for x in np.linspace(low, high, n):
p = EvalNormalPdf(x, mu, sigma)
pmf.Set(x, p)
pmf.Normalize()
return pmf
def EvalBinomialPmf(k, n, p):
"""Evaluates the binomial PMF.
Returns the probabily of k successes in n trials with probability p.
"""
return stats.binom.pmf(k, n, p)
def EvalHypergeomPmf(k, N, K, n):
"""Evaluates the hypergeometric PMF.
Returns the probabily of k successes in n trials from a population
N with K successes in it.
"""
return stats.hypergeom.pmf(k, N, K, n)
def EvalPoissonPmf(k, lam):
"""Computes the Poisson PMF.
k: number of events
lam: parameter lambda in events per unit time
returns: float probability
"""
# don't use the scipy function (yet). for lam=0 it returns NaN;
# should be 0.0
# return stats.poisson.pmf(k, lam)
return lam ** k * math.exp(-lam) / special.gamma(k+1)
def MakePoissonPmf(lam, high, step=1):
"""Makes a PMF discrete approx to a Poisson distribution.
lam: parameter lambda in events per unit time
high: upper bound of the Pmf
returns: normalized Pmf
"""
pmf = Pmf()
for k in range(0, high + 1, step):
p = EvalPoissonPmf(k, lam)
pmf.Set(k, p)
pmf.Normalize()
return pmf
def EvalExponentialPdf(x, lam):
"""Computes the exponential PDF.
x: value
lam: parameter lambda in events per unit time
returns: float probability density
"""
return lam * math.exp(-lam * x)
def EvalExponentialCdf(x, lam):
"""Evaluates CDF of the exponential distribution with parameter lam."""
return 1 - math.exp(-lam * x)
def MakeExponentialPmf(lam, high, n=200):
"""Makes a PMF discrete approx to an exponential distribution.
lam: parameter lambda in events per unit time
high: upper bound
n: number of values in the Pmf
returns: normalized Pmf
"""
pmf = Pmf()
for x in np.linspace(0, high, n):
p = EvalExponentialPdf(x, lam)
pmf.Set(x, p)
pmf.Normalize()
return pmf
def StandardNormalCdf(x):
"""Evaluates the CDF of the standard Normal distribution.
See http://en.wikipedia.org/wiki/Normal_distribution
#Cumulative_distribution_function
Args:
x: float
Returns:
float
"""
return (math.erf(x / ROOT2) + 1) / 2
def EvalNormalCdf(x, mu=0, sigma=1):
"""Evaluates the CDF of the normal distribution.
Args:
x: float
mu: mean parameter
sigma: standard deviation parameter
Returns:
float
"""
return stats.norm.cdf(x, loc=mu, scale=sigma)
def EvalNormalCdfInverse(p, mu=0, sigma=1):
"""Evaluates the inverse CDF of the normal distribution.
See http://en.wikipedia.org/wiki/Normal_distribution#Quantile_function
Args:
p: float
mu: mean parameter
sigma: standard deviation parameter
Returns:
float
"""
return stats.norm.ppf(p, loc=mu, scale=sigma)
def EvalLognormalCdf(x, mu=0, sigma=1):
"""Evaluates the CDF of the lognormal distribution.
x: float or sequence
mu: mean parameter
sigma: standard deviation parameter
Returns: float or sequence
"""
return stats.lognorm.cdf(x, loc=mu, scale=sigma)
def RenderExpoCdf(lam, low, high, n=101):
"""Generates sequences of xs and ps for an exponential CDF.
lam: parameter
low: float
high: float
n: number of points to render
returns: numpy arrays (xs, ps)
"""
xs = np.linspace(low, high, n)
ps = 1 - np.exp(-lam * xs)
#ps = stats.expon.cdf(xs, scale=1.0/lam)
return xs, ps
def RenderNormalCdf(mu, sigma, low, high, n=101):
"""Generates sequences of xs and ps for a Normal CDF.
mu: parameter
sigma: parameter
low: float
high: float
n: number of points to render
returns: numpy arrays (xs, ps)
"""
xs = np.linspace(low, high, n)
ps = stats.norm.cdf(xs, mu, sigma)
return xs, ps
def RenderParetoCdf(xmin, alpha, low, high, n=50):
"""Generates sequences of xs and ps for a Pareto CDF.
xmin: parameter
alpha: parameter
low: float
high: float
n: number of points to render
returns: numpy arrays (xs, ps)
"""
if low < xmin:
low = xmin
xs = np.linspace(low, high, n)
ps = 1 - (xs / xmin) ** -alpha
#ps = stats.pareto.cdf(xs, scale=xmin, b=alpha)
return xs, ps
class Beta(object):
"""Represents a Beta distribution.
See http://en.wikipedia.org/wiki/Beta_distribution
"""
def __init__(self, alpha=1, beta=1, label=None):
"""Initializes a Beta distribution."""
self.alpha = alpha
self.beta = beta
self.label = label if label is not None else '_nolegend_'
def Update(self, data):
"""Updates a Beta distribution.
data: pair of int (heads, tails)
"""
heads, tails = data
self.alpha += heads
self.beta += tails
def Mean(self):
"""Computes the mean of this distribution."""
return self.alpha / (self.alpha + self.beta)
def Random(self):
"""Generates a random variate from this distribution."""
return random.betavariate(self.alpha, self.beta)
def Sample(self, n):
"""Generates a random sample from this distribution.
n: int sample size
"""
size = n,
return np.random.beta(self.alpha, self.beta, size)
def EvalPdf(self, x):
"""Evaluates the PDF at x."""
return x ** (self.alpha - 1) * (1 - x) ** (self.beta - 1)
def MakePmf(self, steps=101, label=None):
"""Returns a Pmf of this distribution.
Note: Normally, we just evaluate the PDF at a sequence
of points and treat the probability density as a probability
mass.
But if alpha or beta is less than one, we have to be
more careful because the PDF goes to infinity at x=0
and x=1. In that case we evaluate the CDF and compute
differences.
"""
if self.alpha < 1 or self.beta < 1:
cdf = self.MakeCdf()
pmf = cdf.MakePmf()
return pmf
xs = [i / (steps - 1.0) for i in range(steps)]
probs = [self.EvalPdf(x) for x in xs]
pmf = Pmf(dict(zip(xs, probs)), label=label)
return pmf
def MakeCdf(self, steps=101):
"""Returns the CDF of this distribution."""
xs = [i / (steps - 1.0) for i in range(steps)]
ps = [special.betainc(self.alpha, self.beta, x) for x in xs]
cdf = Cdf(xs, ps)
return cdf
class Dirichlet(object):
"""Represents a Dirichlet distribution.
See http://en.wikipedia.org/wiki/Dirichlet_distribution
"""
def __init__(self, n, conc=1, label=None):
"""Initializes a Dirichlet distribution.
n: number of dimensions
conc: concentration parameter (smaller yields more concentration)
label: string label
"""
if n < 2:
raise ValueError('A Dirichlet distribution with '
'n<2 makes no sense')
self.n = n
self.params = np.ones(n, dtype=np.float) * conc
self.label = label if label is not None else '_nolegend_'
def Update(self, data):
"""Updates a Dirichlet distribution.
data: sequence of observations, in order corresponding to params
"""
m = len(data)
self.params[:m] += data
def Random(self):
"""Generates a random variate from this distribution.
Returns: normalized vector of fractions
"""
p = np.random.gamma(self.params)
return p / p.sum()
def Likelihood(self, data):
"""Computes the likelihood of the data.
Selects a random vector of probabilities from this distribution.
Returns: float probability
"""
m = len(data)
if self.n < m:
return 0
x = data
p = self.Random()
q = p[:m] ** x
return q.prod()
def LogLikelihood(self, data):
"""Computes the log likelihood of the data.
Selects a random vector of probabilities from this distribution.
Returns: float log probability
"""
m = len(data)
if self.n < m:
return float('-inf')
x = self.Random()
y = np.log(x[:m]) * data
return y.sum()
def MarginalBeta(self, i):
"""Computes the marginal distribution of the ith element.
See http://en.wikipedia.org/wiki/Dirichlet_distribution
#Marginal_distributions
i: int
Returns: Beta object
"""
alpha0 = self.params.sum()
alpha = self.params[i]
return Beta(alpha, alpha0 - alpha)
def PredictivePmf(self, xs, label=None):
"""Makes a predictive distribution.
xs: values to go into the Pmf
Returns: Pmf that maps from x to the mean prevalence of x
"""
alpha0 = self.params.sum()
ps = self.params / alpha0
return Pmf(zip(xs, ps), label=label)
def BinomialCoef(n, k):
"""Compute the binomial coefficient "n choose k".
n: number of trials
k: number of successes
Returns: float
"""
return scipy.misc.comb(n, k)
def LogBinomialCoef(n, k):
"""Computes the log of the binomial coefficient.
http://math.stackexchange.com/questions/64716/
approximating-the-logarithm-of-the-binomial-coefficient
n: number of trials
k: number of successes
Returns: float
"""
return n * math.log(n) - k * math.log(k) - (n - k) * math.log(n - k)
def NormalProbability(ys, jitter=0.0):
"""Generates data for a normal probability plot.
ys: sequence of values
jitter: float magnitude of jitter added to the ys
returns: numpy arrays xs, ys
"""
n = len(ys)
xs = np.random.normal(0, 1, n)
xs.sort()
if jitter:
ys = Jitter(ys, jitter)
else:
ys = np.array(ys)
ys.sort()
return xs, ys
def Jitter(values, jitter=0.5):
"""Jitters the values by adding a uniform variate in (-jitter, jitter).
values: sequence
jitter: scalar magnitude of jitter
returns: new numpy array
"""
n = len(values)
return np.random.uniform(-jitter, +jitter, n) + values
def NormalProbabilityPlot(sample, fit_color='0.8', **options):
"""Makes a normal probability plot with a fitted line.
sample: sequence of numbers
fit_color: color string for the fitted line
options: passed along to Plot
"""
xs, ys = NormalProbability(sample)
mean, var = MeanVar(sample)
std = math.sqrt(var)
fit = FitLine(xs, mean, std)
thinkplot.Plot(*fit, color=fit_color, label='model')
xs, ys = NormalProbability(sample)
thinkplot.Plot(xs, ys, **options)
def Mean(xs):
"""Computes mean.
xs: sequence of values
returns: float mean
"""
return np.mean(xs)
def Var(xs, mu=None, ddof=0):
"""Computes variance.
xs: sequence of values
mu: option known mean
ddof: delta degrees of freedom
returns: float
"""
xs = np.asarray(xs)
if mu is None:
mu = xs.mean()
ds = xs - mu
return np.dot(ds, ds) / (len(xs) - ddof)
def Std(xs, mu=None, ddof=0):
"""Computes standard deviation.
xs: sequence of values
mu: option known mean
ddof: delta degrees of freedom
returns: float
"""
var = Var(xs, mu, ddof)
return math.sqrt(var)
def MeanVar(xs, ddof=0):
"""Computes mean and variance.
Based on http://stackoverflow.com/questions/19391149/
numpy-mean-and-variance-from-single-function
xs: sequence of values
ddof: delta degrees of freedom
returns: pair of float, mean and var
"""
xs = np.asarray(xs)
mean = xs.mean()
s2 = Var(xs, mean, ddof)
return mean, s2
def Trim(t, p=0.01):
"""Trims the largest and smallest elements of t.
Args:
t: sequence of numbers
p: fraction of values to trim off each end
Returns:
sequence of values
"""
n = int(p * len(t))
t = sorted(t)[n:-n]
return t
def TrimmedMean(t, p=0.01):
"""Computes the trimmed mean of a sequence of numbers.
Args:
t: sequence of numbers
p: fraction of values to trim off each end
Returns:
float
"""
t = Trim(t, p)
return Mean(t)
def TrimmedMeanVar(t, p=0.01):
"""Computes the trimmed mean and variance of a sequence of numbers.
Side effect: sorts the list.
Args:
t: sequence of numbers
p: fraction of values to trim off each end
Returns:
float
"""
t = Trim(t, p)
mu, var = MeanVar(t)
return mu, var
def CohenEffectSize(group1, group2):
"""Compute Cohen's d.
group1: Series or NumPy array
group2: Series or NumPy array
returns: float
"""
diff = group1.mean() - group2.mean()
n1, n2 = len(group1), len(group2)
var1 = group1.var()
var2 = group2.var()
pooled_var = (n1 * var1 + n2 * var2) / (n1 + n2)
d = diff / math.sqrt(pooled_var)
return d
def Cov(xs, ys, meanx=None, meany=None):
"""Computes Cov(X, Y).
Args:
xs: sequence of values
ys: sequence of values
meanx: optional float mean of xs
meany: optional float mean of ys
Returns:
Cov(X, Y)
"""
xs = np.asarray(xs)
ys = np.asarray(ys)
if meanx is None:
meanx = np.mean(xs)
if meany is None:
meany = np.mean(ys)
cov = np.dot(xs-meanx, ys-meany) / len(xs)
return cov
def Corr(xs, ys):
"""Computes Corr(X, Y).
Args:
xs: sequence of values
ys: sequence of values
Returns:
Corr(X, Y)
"""
xs = np.asarray(xs)
ys = np.asarray(ys)
meanx, varx = MeanVar(xs)
meany, vary = MeanVar(ys)
corr = Cov(xs, ys, meanx, meany) / math.sqrt(varx * vary)
return corr
def SerialCorr(series, lag=1):
"""Computes the serial correlation of a series.
series: Series
lag: integer number of intervals to shift
returns: float correlation
"""
xs = series[lag:]
ys = series.shift(lag)[lag:]
corr = Corr(xs, ys)
return corr
def SpearmanCorr(xs, ys):
"""Computes Spearman's rank correlation.
Args:
xs: sequence of values
ys: sequence of values
Returns:
float Spearman's correlation
"""
xranks = pandas.Series(xs).rank()
yranks = pandas.Series(ys).rank()
return Corr(xranks, yranks)
def MapToRanks(t):
"""Returns a list of ranks corresponding to the elements in t.
Args:
t: sequence of numbers
Returns:
list of integer ranks, starting at 1
"""
# pair up each value with its index
pairs = enumerate(t)
# sort by value
sorted_pairs = sorted(pairs, key=itemgetter(1))
# pair up each pair with its rank
ranked = enumerate(sorted_pairs)
# sort by index
resorted = sorted(ranked, key=lambda trip: trip[1][0])
# extract the ranks
ranks = [trip[0]+1 for trip in resorted]
return ranks
def LeastSquares(xs, ys):
"""Computes a linear least squares fit for ys as a function of xs.
Args:
xs: sequence of values
ys: sequence of values
Returns:
tuple of (intercept, slope)
"""
meanx, varx = MeanVar(xs)
meany = Mean(ys)
slope = Cov(xs, ys, meanx, meany) / varx
inter = meany - slope * meanx
return inter, slope
def FitLine(xs, inter, slope):
"""Fits a line to the given data.
xs: sequence of x
returns: tuple of numpy arrays (sorted xs, fit ys)
"""
fit_xs = np.sort(xs)
fit_ys = inter + slope * fit_xs
return fit_xs, fit_ys
def Residuals(xs, ys, inter, slope):
"""Computes residuals for a linear fit with parameters inter and slope.
Args:
xs: independent variable
ys: dependent variable
inter: float intercept
slope: float slope
Returns:
list of residuals
"""
xs = np.asarray(xs)
ys = np.asarray(ys)
res = ys - (inter + slope * xs)
return res
def CoefDetermination(ys, res):
"""Computes the coefficient of determination (R^2) for given residuals.
Args:
ys: dependent variable
res: residuals
Returns:
float coefficient of determination
"""
return 1 - Var(res) / Var(ys)
def CorrelatedGenerator(rho):
"""Generates standard normal variates with serial correlation.
rho: target coefficient of correlation
Returns: iterable
"""
x = random.gauss(0, 1)
yield x
sigma = math.sqrt(1 - rho**2)
while True:
x = random.gauss(x * rho, sigma)
yield x
def CorrelatedNormalGenerator(mu, sigma, rho):
"""Generates normal variates with serial correlation.
mu: mean of variate
sigma: standard deviation of variate
rho: target coefficient of correlation
Returns: iterable
"""
for x in CorrelatedGenerator(rho):
yield x * sigma + mu
def RawMoment(xs, k):
"""Computes the kth raw moment of xs.
"""
return sum(x**k for x in xs) / len(xs)
def CentralMoment(xs, k):
"""Computes the kth central moment of xs.
"""
mean = RawMoment(xs, 1)
return sum((x - mean)**k for x in xs) / len(xs)
def StandardizedMoment(xs, k):
"""Computes the kth standardized moment of xs.
"""
var = CentralMoment(xs, 2)
std = math.sqrt(var)
return CentralMoment(xs, k) / std**k
def Skewness(xs):
"""Computes skewness.
"""
return StandardizedMoment(xs, 3)
def Median(xs):
"""Computes the median (50th percentile) of a sequence.
xs: sequence or anything else that can initialize a Cdf
returns: float
"""
cdf = Cdf(xs)
return cdf.Value(0.5)
def IQR(xs):
"""Computes the interquartile of a sequence.
xs: sequence or anything else that can initialize a Cdf
returns: pair of floats
"""
cdf = Cdf(xs)
return cdf.Value(0.25), cdf.Value(0.75)
def PearsonMedianSkewness(xs):
"""Computes the Pearson median skewness.
"""
median = Median(xs)
mean = RawMoment(xs, 1)
var = CentralMoment(xs, 2)
std = math.sqrt(var)
gp = 3 * (mean - median) / std
return gp
class FixedWidthVariables(object):
"""Represents a set of variables in a fixed width file."""
def __init__(self, variables, index_base=0):
"""Initializes.
variables: DataFrame
index_base: are the indices 0 or 1 based?
Attributes:
colspecs: list of (start, end) index tuples
names: list of string variable names
"""
self.variables = variables
# note: by default, subtract 1 from colspecs
self.colspecs = variables[['start', 'end']] - index_base
# convert colspecs to a list of pair of int
self.colspecs = self.colspecs.astype(np.int).values.tolist()
self.names = variables['name']
def ReadFixedWidth(self, filename, **options):
"""Reads a fixed width ASCII file.
filename: string filename
returns: DataFrame
"""
df = pandas.read_fwf(filename,
colspecs=self.colspecs,
names=self.names,
**options)
return df
def ReadStataDct(dct_file, **options):
"""Reads a Stata dictionary file.
dct_file: string filename
options: dict of options passed to open()
returns: FixedWidthVariables object
"""
type_map = dict(byte=int, int=int, long=int, float=float, double=float)
var_info = []
for line in open(dct_file, **options):
match = re.search( r'_column\(([^)]*)\)', line)
if match:
start = int(match.group(1))
t = line.split()
vtype, name, fstring = t[1:4]
name = name.lower()
if vtype.startswith('str'):
vtype = str
else:
vtype = type_map[vtype]
long_desc = ' '.join(t[4:]).strip('"')
var_info.append((start, vtype, name, fstring, long_desc))
columns = ['start', 'type', 'name', 'fstring', 'desc']
variables = pandas.DataFrame(var_info, columns=columns)
# fill in the end column by shifting the start column
variables['end'] = variables.start.shift(-1)
variables.loc[len(variables)-1, 'end'] = 0
dct = FixedWidthVariables(variables, index_base=1)
return dct
def Resample(xs, n=None):
"""Draw a sample from xs with the same length as xs.
xs: sequence
n: sample size (default: len(xs))
returns: NumPy array
"""
if n is None:
n = len(xs)
return np.random.choice(xs, n, replace=True)
def SampleRows(df, nrows, replace=False):
"""Choose a sample of rows from a DataFrame.
df: DataFrame
nrows: number of rows
replace: whether to sample with replacement
returns: DataDf
"""
indices = np.random.choice(df.index, nrows, replace=replace)
sample = df.loc[indices]
return sample
def ResampleRows(df):
"""Resamples rows from a DataFrame.
df: DataFrame
returns: DataFrame
"""
return SampleRows(df, len(df), replace=True)
def ResampleRowsWeighted(df, column='finalwgt'):
"""Resamples a DataFrame using probabilities proportional to given column.
df: DataFrame
column: string column name to use as weights
returns: DataFrame
"""
weights = df[column]
cdf = Cdf(dict(weights))
indices = cdf.Sample(len(weights))
sample = df.loc[indices]
return sample
def PercentileRow(array, p):
"""Selects the row from a sorted array that maps to percentile p.
p: float 0--100
returns: NumPy array (one row)
"""
rows, cols = array.shape
index = int(rows * p / 100)
return array[index,]
def PercentileRows(ys_seq, percents):
"""Given a collection of lines, selects percentiles along vertical axis.
For example, if ys_seq contains simulation results like ys as a
function of time, and percents contains (5, 95), the result would
be a 90% CI for each vertical slice of the simulation results.
ys_seq: sequence of lines (y values)
percents: list of percentiles (0-100) to select
returns: list of NumPy arrays, one for each percentile
"""
nrows = len(ys_seq)
ncols = len(ys_seq[0])
array = np.zeros((nrows, ncols))
for i, ys in enumerate(ys_seq):
array[i,] = ys
array = np.sort(array, axis=0)
rows = [PercentileRow(array, p) for p in percents]
return rows
def Smooth(xs, sigma=2, **options):
"""Smooths a NumPy array with a Gaussian filter.
xs: sequence
sigma: standard deviation of the filter
"""
return ndimage.filters.gaussian_filter1d(xs, sigma, **options)
class HypothesisTest(object):
"""Represents a hypothesis test."""
def __init__(self, data):
"""Initializes.
data: data in whatever form is relevant
"""
self.data = data
self.MakeModel()
self.actual = self.TestStatistic(data)
self.test_stats = None
self.test_cdf = None
def PValue(self, iters=1000):
"""Computes the distribution of the test statistic and p-value.
iters: number of iterations
returns: float p-value
"""
self.test_stats = [self.TestStatistic(self.RunModel())
for _ in range(iters)]
self.test_cdf = Cdf(self.test_stats)
count = sum(1 for x in self.test_stats if x >= self.actual)
return count / iters
def MaxTestStat(self):
"""Returns the largest test statistic seen during simulations.
"""
return max(self.test_stats)
def PlotCdf(self, label=None):
"""Draws a Cdf with vertical lines at the observed test stat.
"""
def VertLine(x):
"""Draws a vertical line at x."""
thinkplot.Plot([x, x], [0, 1], color='0.8')
VertLine(self.actual)
thinkplot.Cdf(self.test_cdf, label=label)
def TestStatistic(self, data):
"""Computes the test statistic.
data: data in whatever form is relevant
"""
raise UnimplementedMethodException()
def MakeModel(self):
"""Build a model of the null hypothesis.
"""
pass
def RunModel(self):
"""Run the model of the null hypothesis.
returns: simulated data
"""
raise UnimplementedMethodException()
def main():
pass
if __name__ == '__main__':
main()