mirror of
https://github.com/donnemartin/data-science-ipython-notebooks.git
synced 2024-03-22 13:30:56 +08:00
235 lines
8.1 KiB
Python
235 lines
8.1 KiB
Python
"""This file is only here to speed up the execution of notebooks.
|
|
|
|
It contains a subset of the code defined in simple_rnn.ipynb and
|
|
lstm_text.ipynb, in particular the code compiling Theano function.
|
|
Executing this script first will populate the cache of compiled C code,
|
|
which will make subsequent compilations faster.
|
|
|
|
The use case is to run this script in the background when a demo VM
|
|
such as the one for NVIDIA's qwikLABS, so that the compilation phase
|
|
started from the notebooks is faster.
|
|
|
|
"""
|
|
import numpy
|
|
|
|
import theano
|
|
import theano.tensor as T
|
|
|
|
from theano import config
|
|
from theano.tensor.nnet import categorical_crossentropy
|
|
|
|
|
|
floatX = theano.config.floatX
|
|
|
|
|
|
# simple_rnn.ipynb
|
|
|
|
class SimpleRNN(object):
|
|
def __init__(self, input_dim, recurrent_dim):
|
|
w_xh = numpy.random.normal(0, .01, (input_dim, recurrent_dim))
|
|
w_hh = numpy.random.normal(0, .02, (recurrent_dim, recurrent_dim))
|
|
self.w_xh = theano.shared(numpy.asarray(w_xh, dtype=floatX), name='w_xh')
|
|
self.w_hh = theano.shared(numpy.asarray(w_hh, dtype=floatX), name='w_hh')
|
|
self.b_h = theano.shared(numpy.zeros((recurrent_dim,), dtype=floatX), name='b_h')
|
|
self.parameters = [self.w_xh, self.w_hh, self.b_h]
|
|
|
|
def _step(self, input_t, previous):
|
|
return T.tanh(T.dot(previous, self.w_hh) + input_t)
|
|
|
|
def __call__(self, x):
|
|
x_w_xh = T.dot(x, self.w_xh) + self.b_h
|
|
result, updates = theano.scan(self._step,
|
|
sequences=[x_w_xh],
|
|
outputs_info=[T.zeros_like(self.b_h)])
|
|
return result
|
|
|
|
|
|
w_ho_np = numpy.random.normal(0, .01, (15, 1))
|
|
w_ho = theano.shared(numpy.asarray(w_ho_np, dtype=floatX), name='w_ho')
|
|
b_o = theano.shared(numpy.zeros((1,), dtype=floatX), name='b_o')
|
|
|
|
x = T.matrix('x')
|
|
my_rnn = SimpleRNN(1, 15)
|
|
hidden = my_rnn(x)
|
|
prediction = T.dot(hidden, w_ho) + b_o
|
|
parameters = my_rnn.parameters + [w_ho, b_o]
|
|
l2 = sum((p**2).sum() for p in parameters)
|
|
mse = T.mean((prediction[:-1] - x[1:])**2)
|
|
cost = mse + .0001 * l2
|
|
gradient = T.grad(cost, wrt=parameters)
|
|
|
|
lr = .3
|
|
updates = [(par, par - lr * gra) for par, gra in zip(parameters, gradient)]
|
|
update_model = theano.function([x], cost, updates=updates)
|
|
get_cost = theano.function([x], mse)
|
|
predict = theano.function([x], prediction)
|
|
get_hidden = theano.function([x], hidden)
|
|
get_gradient = theano.function([x], gradient)
|
|
|
|
predict = theano.function([x], prediction)
|
|
|
|
# Generating sequences
|
|
|
|
x_t = T.vector()
|
|
h_p = T.vector()
|
|
preactivation = T.dot(x_t, my_rnn.w_xh) + my_rnn.b_h
|
|
h_t = my_rnn._step(preactivation, h_p)
|
|
o_t = T.dot(h_t, w_ho) + b_o
|
|
|
|
single_step = theano.function([x_t, h_p], [o_t, h_t])
|
|
|
|
# lstm_text.ipynb
|
|
|
|
def gauss_weight(rng, ndim_in, ndim_out=None, sd=.005):
|
|
if ndim_out is None:
|
|
ndim_out = ndim_in
|
|
W = rng.randn(ndim_in, ndim_out) * sd
|
|
return numpy.asarray(W, dtype=config.floatX)
|
|
|
|
|
|
def index_dot(indices, w):
|
|
return w[indices.flatten()]
|
|
|
|
|
|
class LstmLayer:
|
|
|
|
def __init__(self, rng, input, mask, n_in, n_h):
|
|
|
|
# Init params
|
|
self.W_i = theano.shared(gauss_weight(rng, n_in, n_h), 'W_i', borrow=True)
|
|
self.W_f = theano.shared(gauss_weight(rng, n_in, n_h), 'W_f', borrow=True)
|
|
self.W_c = theano.shared(gauss_weight(rng, n_in, n_h), 'W_c', borrow=True)
|
|
self.W_o = theano.shared(gauss_weight(rng, n_in, n_h), 'W_o', borrow=True)
|
|
|
|
self.U_i = theano.shared(gauss_weight(rng, n_h), 'U_i', borrow=True)
|
|
self.U_f = theano.shared(gauss_weight(rng, n_h), 'U_f', borrow=True)
|
|
self.U_c = theano.shared(gauss_weight(rng, n_h), 'U_c', borrow=True)
|
|
self.U_o = theano.shared(gauss_weight(rng, n_h), 'U_o', borrow=True)
|
|
|
|
self.b_i = theano.shared(numpy.zeros((n_h,), dtype=config.floatX),
|
|
'b_i', borrow=True)
|
|
self.b_f = theano.shared(numpy.zeros((n_h,), dtype=config.floatX),
|
|
'b_f', borrow=True)
|
|
self.b_c = theano.shared(numpy.zeros((n_h,), dtype=config.floatX),
|
|
'b_c', borrow=True)
|
|
self.b_o = theano.shared(numpy.zeros((n_h,), dtype=config.floatX),
|
|
'b_o', borrow=True)
|
|
|
|
self.params = [self.W_i, self.W_f, self.W_c, self.W_o,
|
|
self.U_i, self.U_f, self.U_c, self.U_o,
|
|
self.b_i, self.b_f, self.b_c, self.b_o]
|
|
|
|
outputs_info = [T.zeros((input.shape[1], n_h)),
|
|
T.zeros((input.shape[1], n_h))]
|
|
|
|
rval, updates = theano.scan(self._step,
|
|
sequences=[mask, input],
|
|
outputs_info=outputs_info)
|
|
|
|
# self.output is in the format (length, batchsize, n_h)
|
|
self.output = rval[0]
|
|
|
|
def _step(self, m_, x_, h_, c_):
|
|
|
|
i_preact = (index_dot(x_, self.W_i) +
|
|
T.dot(h_, self.U_i) + self.b_i)
|
|
i = T.nnet.sigmoid(i_preact)
|
|
|
|
f_preact = (index_dot(x_, self.W_f) +
|
|
T.dot(h_, self.U_f) + self.b_f)
|
|
f = T.nnet.sigmoid(f_preact)
|
|
|
|
o_preact = (index_dot(x_, self.W_o) +
|
|
T.dot(h_, self.U_o) + self.b_o)
|
|
o = T.nnet.sigmoid(o_preact)
|
|
|
|
c_preact = (index_dot(x_, self.W_c) +
|
|
T.dot(h_, self.U_c) + self.b_c)
|
|
c = T.tanh(c_preact)
|
|
|
|
c = f * c_ + i * c
|
|
c = m_[:, None] * c + (1. - m_)[:, None] * c_
|
|
|
|
h = o * T.tanh(c)
|
|
h = m_[:, None] * h + (1. - m_)[:, None] * h_
|
|
|
|
return h, c
|
|
|
|
|
|
def sequence_categorical_crossentropy(prediction, targets, mask):
|
|
prediction_flat = prediction.reshape(((prediction.shape[0] *
|
|
prediction.shape[1]),
|
|
prediction.shape[2]), ndim=2)
|
|
targets_flat = targets.flatten()
|
|
mask_flat = mask.flatten()
|
|
ce = categorical_crossentropy(prediction_flat, targets_flat)
|
|
return T.sum(ce * mask_flat)
|
|
|
|
|
|
class LogisticRegression(object):
|
|
|
|
def __init__(self, rng, input, n_in, n_out):
|
|
|
|
W = gauss_weight(rng, n_in, n_out)
|
|
self.W = theano.shared(value=numpy.asarray(W, dtype=theano.config.floatX),
|
|
name='W', borrow=True)
|
|
# initialize the biases b as a vector of n_out 0s
|
|
self.b = theano.shared(value=numpy.zeros((n_out,),
|
|
dtype=theano.config.floatX),
|
|
name='b', borrow=True)
|
|
|
|
# compute vector of class-membership probabilities in symbolic form
|
|
energy = T.dot(input, self.W) + self.b
|
|
energy_exp = T.exp(energy - T.max(energy, axis=2, keepdims=True))
|
|
pmf = energy_exp / energy_exp.sum(axis=2, keepdims=True)
|
|
self.p_y_given_x = pmf
|
|
self.params = [self.W, self.b]
|
|
|
|
batch_size = 100
|
|
n_h = 50
|
|
|
|
# The Theano graph
|
|
# Set the random number generator' seeds for consistency
|
|
rng = numpy.random.RandomState(12345)
|
|
|
|
x = T.lmatrix('x')
|
|
mask = T.matrix('mask')
|
|
|
|
# Construct an LSTM layer and a logistic regression layer
|
|
recurrent_layer = LstmLayer(rng=rng, input=x, mask=mask, n_in=111, n_h=n_h)
|
|
logreg_layer = LogisticRegression(rng=rng, input=recurrent_layer.output[:-1],
|
|
n_in=n_h, n_out=111)
|
|
|
|
# define a cost variable to optimize
|
|
cost = sequence_categorical_crossentropy(logreg_layer.p_y_given_x,
|
|
x[1:],
|
|
mask[1:]) / batch_size
|
|
|
|
# create a list of all model parameters to be fit by gradient descent
|
|
params = logreg_layer.params + recurrent_layer.params
|
|
|
|
# create a list of gradients for all model parameters
|
|
grads = T.grad(cost, params)
|
|
|
|
learning_rate = 0.1
|
|
updates = [
|
|
(param_i, param_i - learning_rate * grad_i)
|
|
for param_i, grad_i in zip(params, grads)
|
|
]
|
|
|
|
update_model = theano.function([x, mask], cost, updates=updates)
|
|
|
|
evaluate_model = theano.function([x, mask], cost)
|
|
|
|
# Generating Sequences
|
|
x_t = T.iscalar()
|
|
h_p = T.vector()
|
|
c_p = T.vector()
|
|
h_t, c_t = recurrent_layer._step(T.ones(1), x_t, h_p, c_p)
|
|
energy = T.dot(h_t, logreg_layer.W) + logreg_layer.b
|
|
|
|
energy_exp = T.exp(energy - T.max(energy, axis=1, keepdims=True))
|
|
|
|
output = energy_exp / energy_exp.sum(axis=1, keepdims=True)
|
|
single_step = theano.function([x_t, h_p, c_p], [output, h_t, c_t])
|