Skip to content

Commit

Permalink
Bugfixes, added features, code cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
anttttti committed Sep 1, 2019
1 parent 01012a6 commit f96d967
Show file tree
Hide file tree
Showing 19 changed files with 195 additions and 129 deletions.
9 changes: 7 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
=============
Wordbatch 1.4
Wordbatch 1.4.1
=============

Overview
Expand All @@ -10,7 +10,7 @@ Python library for distributed AI processing pipelines, using swappable schedule
Wordbatch parallelizes task pipelines as minibatches processed by a chosen scheduler backend. This allows
the user to develop AI programs on a local workstation or laptop, and scale the same
solution on a cluster or the cloud, simply by changing the pipeline backend to a distributed scheduler such as Spark,
Dask and Ray. A backend can be chosen based on performance characteristics on a particular task, and swapped for
Dask or Ray. A backend can be chosen based on performance characteristics on a particular task, and swapped for
different situations. For example, an AI model can be trained using a distributed backend, and then debugged or
deployed using a single serial process.

Expand All @@ -30,6 +30,11 @@ pip install wordbatch

macOS: compile using GCC-7 (https://github.com/anttttti/Wordbatch/issues/1)

linux: make sure GCC and its required libraries are installed before installing Wordbatch
| sudo apt install gcc
| sudo apt-get update
| sudo apt-get install --reinstall build-essential
Getting started
===============

Expand Down
3 changes: 1 addition & 2 deletions scripts/backends_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ def normalize_text(text):
if task=="WordBag":
wb = WordBatch(normalize_text=normalize_text,
dictionary=Dictionary(min_df=10, max_words=1000000, verbose=0),
tokenizer= Tokenizer(spellcor_count=2, spellcor_dist=2, raw_min_df= 2,
stemmer= stemmer),
tokenizer= Tokenizer(spellcor_count=2, spellcor_dist=2, stemmer= stemmer),
extractor=WordBag(hash_ngrams=0, norm= 'l2', tf= 'binary', idf= 50.0),
batcher= batcher,
verbose= 0)
Expand Down
1 change: 0 additions & 1 deletion scripts/classify_airline_sentiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ def sentiment_to_label(sentiment):
d_sentiment_to_label= {"neutral":0, "negative":-1, "positive":1}
df['airline_sentiment_confidence']= df['airline_sentiment_confidence'].astype('str')
df['sentiment'] = (df['airline_sentiment']).map(d_sentiment_to_label)
#df['sentiment']= (df['airline_sentiment']).apply(lambda x: sentiment_to_label(x))
df= df[['text','sentiment']]

re_attags= re.compile(" @[^ ]* ")
Expand Down
4 changes: 2 additions & 2 deletions scripts/wordvec_regressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from wordbatch.pipelines import WordBatch
from wordbatch.models import FTRL
from wordbatch.extractors import WordVec, Hstack
from wordbatch.data_utils import shuffle
from sklearn.utils import shuffle
import threading
import sys
if sys.version_info.major == 3:
Expand Down Expand Up @@ -53,7 +53,7 @@ def __init__(self, pickle_model="", datadir=None, batcher=None):
else: self.train(datadir, pickle_model)

def fit_batch(self, texts, labels, rcount):
texts, labels = shuffle(texts, labels, seed=rcount)
texts, labels = shuffle(texts, labels)
print("Transforming", rcount)
#texts= self.wb.fit_transform(texts, tn__batcher=self.batcher, dct__reset= False, dct__batcher= self.batcher)
texts = self.wb.fit_transform(texts)
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

setup(
name='Wordbatch',
version='1.4.0',
version='1.4.1',
description='Python library for distributed AI processing pipelines, using swappable scheduler backends',
url='https://github.com/anttttti/Wordbatch',
author='Antti Puurula',
Expand All @@ -38,7 +38,7 @@
"Topic :: Software Development :: Libraries :: Python Modules",
],
install_requires=['cython', 'scikit-learn', 'python-Levenshtein', 'py-lz4framed', 'randomgen', 'numpy', 'scipy',
'pandas'],
'pandas', 'wheel>=0.33.4'],
extras_require={'dev': ['nltk', 'textblob', 'keras', 'pyspark', 'dask', 'distributed', 'ray']},


Expand Down
2 changes: 1 addition & 1 deletion wordbatch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os
PACKAGE_DIR = os.path.dirname(os.path.abspath(__file__))
__version__ = '1.4.0'
__version__ = '1.4.1'

8 changes: 5 additions & 3 deletions wordbatch/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,11 @@ def process_batches(self, task, data, args, backend=None, backend_handle=None, i
"""
if procs is None: procs= self.procs
if backend is None: backend= self.backend
if backend_handle is None: backend_handle = self.backend
if backend_handle is None: backend_handle = self.backend_handle
if verbose is None: verbose= self.verbose
if verbose > 1:
print("Parallel task:", task, " backend:", backend, " procs:", self.procs,
" input_split:", input_split, " merge_output:", merge_output)
print("Task:", task, " backend:", backend, " backend_handle:", backend_handle, " procs:",
self.procs, " input_split:", input_split, " merge_output:", merge_output)

if verbose> 10:
print("len(data):", len(data), "len(args):", len(args), "[type(x) for x in data]:",
Expand Down Expand Up @@ -241,6 +241,8 @@ def f_ray(f, data):
# jobs = [self.backend_handle.submit(task, (x,), (), ()) for x in paral_params]
# results = [x() for x in jobs]
if merge_output: return self.merge_batches(self.collect_batches(results, backend=backend))
if verbose > 2:
print("Task:", task, " backend:", backend, " backend_handle:", backend_handle, " completed")
return results

def shuffle_batch(self, texts, labels= None, seed= None):
Expand Down
50 changes: 25 additions & 25 deletions wordbatch/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,31 @@
import itertools
import scipy.sparse as ssp

@contextmanager
def timer(name):
t0 = time.time()
yield
print(name + " done in " + str(time.time() - t0) + "s")

def shuffle(*objects, seed=0):
#Faster than inplace, but uses more memory
if isinstance(objects[0], ssp.base.spmatrix): lenn= objects[0].shape[0]
else: lenn= len(objects[0])
shuffled= randomgen.xoroshiro128.Xoroshiro128(seed).generator.permutation(lenn)
return [[x[z] for z in shuffled] if type(x)==list else x[shuffled] for x in objects]

def inplace_shuffle(*objects, seed=0):
#Slower than shuffle, but uses no extra memory
rand = randomgen.xoroshiro128.Xoroshiro128(seed).generator
for x in objects:
rand.seed(seed)
rand.shuffle(x)

def inplace_shuffle_threaded(*objects, threads= 0, seed=0):
#Faster than inplace for very large array sizes, > 10000000
if threads== 0: threads= min(len(objects), multiprocessing.cpu_count())
with ThreadPool(processes=threads) as pool:
pool.map(partial(inplace_shuffle, seed=seed), objects)
# @contextmanager
# def timer(name):
# t0 = time.time()
# yield
# print(name + " done in " + str(time.time() - t0) + "s")
#
# def shuffle(*objects, seed=0):
# #Faster than inplace, but uses more memory
# if isinstance(objects[0], ssp.base.spmatrix): lenn= objects[0].shape[0]
# else: lenn= len(objects[0])
# shuffled= randomgen.xoroshiro128.Xoroshiro128(seed).generator.permutation(lenn)
# return [[x[z] for z in shuffled] if type(x)==list else x[shuffled] for x in objects]
#
# def inplace_shuffle(*objects, seed=0):
# #Slower than shuffle, but uses no extra memory
# rand = randomgen.xoroshiro128.Xoroshiro128(seed).generator
# for x in objects:
# rand.seed(seed)
# rand.shuffle(x)
#
# def inplace_shuffle_threaded(*objects, threads= 0, seed=0):
# #Faster than inplace for very large array sizes, > 10000000
# if threads== 0: threads= min(len(objects), multiprocessing.cpu_count())
# with ThreadPool(processes=threads) as pool:
# pool.map(partial(inplace_shuffle, seed=seed), objects)

def indlist2csrmatrix(indlist, datalist= None, shape= None):
#Convert a list of indicator lists to a scipy.sparse.csr_matrix
Expand Down
15 changes: 15 additions & 0 deletions wordbatch/extractors/extractors.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -389,3 +389,18 @@ class PandasHash:
def fit_transform(self, texts, y=None):
return self.transform(texts, y)


class CategoricalEncoder:
def __init__(self, *args, **kwargs):
self.dictionary= kwargs.get('dictionary', None)

def transform(self, data, y= None):
return [self.dictionary.word2id.get(x, self.dictionary.max_words) for x in data]

def fit(self, data, y=None):
self.dictionary.prune_dictionary(re_encode=True, prune_dfs=False)
return self

def fit_transform(self, data, y=None):
self.fit(data)
return self.transform(data, y)
17 changes: 12 additions & 5 deletions wordbatch/models/fm_ftrl.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ cdef class FM_FTRL:
double beta=0.01, # ~ alpha/2
double L1=0.0001,
double L2=0.1,
unsigned int D=2**25,
unsigned int D=0,
double alpha_fm=0.03,
double L2_fm= 0.005,
double init_fm= 0.01,
Expand Down Expand Up @@ -175,8 +175,11 @@ cdef class FM_FTRL:
def predict(self, X, int threads= 0):
if threads==0: threads= self.threads
if type(X) != ssp.csr.csr_matrix: X= ssp.csr_matrix(X, dtype=np.float64)
return self.predict_f(np.ascontiguousarray(X.data), np.ascontiguousarray(X.indices),
np.ascontiguousarray(X.indptr), threads)
if X.shape[1] != self.D:
print("Dimension mismatch! self.D=", self.D, "X.shape[1]=", X.shape[1])
# return self.predict_f(np.ascontiguousarray(X.data), np.ascontiguousarray(X.indices),
# np.ascontiguousarray(X.indptr), threads)
return self.predict_f(X.data, X.indices, X.indptr, threads)

def predict_f(self, np.ndarray[double, ndim=1, mode='c'] X_data,
np.ndarray[int, ndim=1, mode='c'] X_indices,
Expand Down Expand Up @@ -213,9 +216,13 @@ cdef class FM_FTRL:
return self.fit(X, y, sample_weight= sample_weight, threads = threads, seed = seed, reset= False)

def fit(self, X, y, sample_weight= None, int threads= 0, int seed= 0, reset= True):
if reset: self.reset()
if threads == 0: threads= self.threads
if type(X) != ssp.csr.csr_matrix: X = ssp.csr_matrix(X, dtype=np.float64)
if reset or self.D==0:
self.D= X.shape[1]
self.reset()
elif X.shape[1] != self.D:
print("Dimension mismatch! self.D=", self.D, "X.shape[1]=", X.shape[1])
#if type(y) != np.array: y = np.array(y, dtype=np.float64)
y= np.ascontiguousarray(y, dtype=np.float64)
if sample_weight is not None and type(sample_weight) != np.array:
Expand All @@ -233,7 +240,7 @@ cdef class FM_FTRL:
e_noise= self.e_noise, e_clip= self.e_clip, abs_e
cdef double *w= &self.w[0], *z= &self.z[0], *n= &self.n[0], *n_fm= &self.n_fm[0], \
*z_fm= &self.z_fm[0], *w_fm= &self.w_fm[0], *ys= <double*> y.data
cdef unsigned int D_fm= self.D_fm, lenn, ptr, row_count= X_indptr.shape[0]-1, row, inv_link= self.inv_link
cdef int D_fm= self.D_fm, lenn, ptr, row_count= X_indptr.shape[0]-1, row, inv_link= self.inv_link
cdef bint bias_term= self.bias_term
cdef int* inds, indptr
cdef double* vals
Expand Down
19 changes: 12 additions & 7 deletions wordbatch/models/ftrl.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ cdef double inv_link_f(double e, int inv_link) nogil:
return e

cdef double predict_single(int* inds, double* vals, int lenn, double L1, double baL2, double ialpha, double beta,
double* w, double* z, double* n, bint bias_term, int threads):# nogil:
double* w, double* z, double* n, bint bias_term, int threads) nogil:
cdef int i, ii
cdef double sign, zi, wi
cdef double e= 0.0
Expand Down Expand Up @@ -83,7 +83,7 @@ cdef class FTRL:
double beta=1.0,
double L1=1.0,
double L2=1.0,
unsigned int D=2**25,
unsigned int D=0,
double init= 0.0,
unsigned int iters=10,
double e_clip= 1.0,
Expand Down Expand Up @@ -123,6 +123,8 @@ cdef class FTRL:
def predict(self, X, int threads= 0):
if threads==0: threads= self.threads
if type(X) != ssp.csr.csr_matrix: X= ssp.csr_matrix(X, dtype=np.float64)
if X.shape[1] != self.D:
print("Dimension mismatch! self.D=", self.D, "X.shape[1]=", X.shape[1])
# return self.predict_f(X, np.ascontiguousarray(X.data), np.ascontiguousarray(X.indices),
# np.ascontiguousarray(X.indptr), threads)
return self.predict_f(X.data, X.indices, X.indptr, threads)
Expand All @@ -134,7 +136,7 @@ cdef class FTRL:
p= np.zeros(X_indptr.shape[0]-1, dtype= np.float64)
cdef double *w= &self.w[0], *z= &self.z[0], *n= &self.n[0]
cdef double[:] pp= p
cdef int lenn, row_count= X_indptr.shape[0]-1, row, ptr
cdef unsigned lenn, row_count= X_indptr.shape[0]-1, row, ptr
cdef bint bias_term= self.bias_term
for row in range(row_count):
ptr= X_indptr[row]
Expand All @@ -149,10 +151,13 @@ cdef class FTRL:
return self.fit(X, y, sample_weight= sample_weight, threads = threads, reset= False)

def fit(self, X, y, sample_weight= None, int threads= 0, reset= True):
if reset: self.reset()
if threads == 0: threads= self.threads
if type(X) != ssp.csr.csr_matrix:
X = ssp.csr_matrix(X, dtype=np.float64)
if type(X) != ssp.csr.csr_matrix: X = ssp.csr_matrix(X, dtype=np.float64)
if reset or self.D==0:
self.D= X.shape[1]
self.reset()
elif X.shape[1] != self.D:
print("Dimension mismatch! self.D=", self.D, "X.shape[1]=", X.shape[1])
#if type(y) != np.array: y = np.array(y, dtype=np.float64)
y= np.ascontiguousarray(y, dtype=np.float64)
if sample_weight is not None and type(sample_weight) != np.array:
Expand All @@ -169,7 +174,7 @@ cdef class FTRL:
cdef double ialpha= 1.0/self.alpha, L1= self.L1, beta= self.beta, baL2= beta * ialpha + self.L2, e, e_total= 0,\
e_clip= self.e_clip, abs_e
cdef double *w= &self.w[0], *z= &self.z[0], *n= &self.n[0], *ys= <double*> y.data
cdef unsigned int lenn, ptr, row_count= X_indptr.shape[0]-1, row, inv_link= self.inv_link, j=0, jj
cdef int lenn, ptr, row_count= X_indptr.shape[0]-1, row, inv_link= self.inv_link, j=0, jj
cdef bint bias_term= self.bias_term
cdef int* inds, indptr
cdef double* vals
Expand Down
10 changes: 8 additions & 2 deletions wordbatch/models/nn_relu_h1.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ cdef class NN_ReLU_H1:
def __init__(self,
double alpha=0.1,
double L2=0.001,
int D=2**25,
int D=0,
int D_nn=30,
double init_nn=0.01,
double e_noise=0.0001,
Expand Down Expand Up @@ -118,6 +118,8 @@ cdef class NN_ReLU_H1:
def predict(self, X, int threads= 0):
if threads==0: threads= self.threads
if type(X) != ssp.csr.csr_matrix: X= ssp.csr_matrix(X, dtype=np.float64)
if X.shape[1] != self.D:
print("Dimension mismatch! self.D=", self.D, "X.shape[1]=", X.shape[1])
return self.predict_f(X.data, X.indices, X.indptr, threads)

def predict_f(self, np.ndarray[double, ndim=1, mode='c'] X_data,
Expand All @@ -140,9 +142,13 @@ cdef class NN_ReLU_H1:
return self.fit(X, y, sample_weight= sample_weight, threads = threads, seed = seed, reset= False)

def fit(self, X, y, sample_weight= None, int threads= 0, int seed= 0, reset= True):
if reset: self.reset()
if threads == 0: threads= self.threads
if type(X) != ssp.csr.csr_matrix: X = ssp.csr_matrix(X, dtype=np.float64)
if reset or self.D==0:
self.D= X.shape[1]
self.reset()
elif X.shape[1] != self.D:
print("Dimension mismatch! self.D=", self.D, "X.shape[1]=", X.shape[1])
if type(y) != np.array: y = np.array(y, dtype=np.float64)
# self.fit_f(X, np.ascontiguousarray(X.data), np.ascontiguousarray(X.indices),
# np.ascontiguousarray(X.indptr), y, threads)
Expand Down
10 changes: 8 additions & 2 deletions wordbatch/models/nn_relu_h2.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ cdef class NN_ReLU_H2:
def __init__(self,
double alpha=0.1,
double L2=0.00001,
int D=2**25,
int D=0,
int D_nn=12,
int D_nn2=4,
double init_nn=0.01,
Expand Down Expand Up @@ -140,6 +140,8 @@ cdef class NN_ReLU_H2:
def predict(self, X, int threads= 0):
if threads==0: threads= self.threads
if type(X) != ssp.csr.csr_matrix: X= ssp.csr_matrix(X, dtype=np.float64)
if X.shape[1] != self.D:
print("Dimension mismatch! self.D=", self.D, "X.shape[1]=", X.shape[1])
return self.predict_f(X.data, X.indices, X.indptr, threads)

def predict_f(self, np.ndarray[double, ndim=1, mode='c'] X_data,
Expand All @@ -163,9 +165,13 @@ cdef class NN_ReLU_H2:
return self.fit(X, y, threads=threads, seed=seed, reset=False)

def fit(self, X, y, int threads= 0, int seed= 0, reset=True):
if reset: self.reset()
if threads == 0: threads= self.threads
if type(X) != ssp.csr.csr_matrix: X = ssp.csr_matrix(X, dtype=np.float64)
if reset or self.D==0:
self.D= X.shape[1]
self.reset()
elif X.shape[1] != self.D:
print("Dimension mismatch! self.D=", self.D, "X.shape[1]=", X.shape[1])
if type(y) != np.array: y = np.array(y, dtype=np.float64)
return self.fit_f(X.data, X.indices, X.indptr, y, threads, seed)

Expand Down
4 changes: 2 additions & 2 deletions wordbatch/pipelines/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ def fit_transform(self, data, input_split= False, merge_output= True, minibatch_
def transform(self, data, input_split= False, merge_output= True, minibatch_size= None, batcher= None):
if batcher is None: batcher = self.batcher
return batcher.process_batches(batch_transform, data, [self.function] + self.args + self.kwargs,
input_split=input_split, merge_output=merge_output,
minibatch_size= minibatch_size)
input_split=input_split, merge_output=merge_output,
minibatch_size= minibatch_size)
# import wordbatch.batcher as batcher
# b= batcher.Batcher(minibatch_size=2)#, method="serial")
# import numpy as np
Expand Down
Loading

0 comments on commit f96d967

Please sign in to comment.