Skip to content

Commit

Permalink
1.4.0 Release. Support for several distributed backends: Ray, Dask, S…
Browse files Browse the repository at this point in the history
…park. Major refactoring and API change.
  • Loading branch information
anttttti committed Jun 8, 2019
1 parent ef57b5c commit 01012a6
Show file tree
Hide file tree
Showing 27 changed files with 1,475 additions and 1,077 deletions.
2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ include scripts/wordhash_regressor.py
include scripts/wordseq_regressor.py
include scripts/wordvec_regressor.py
include scripts/classify_airline_sentiment.py
include scripts/wordbag_regressor_spark.py
include scripts/backends_benchmark.py

include data/Tweets.csv
168 changes: 119 additions & 49 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,85 +1,155 @@
Wordbatch
=========
=============
Wordbatch 1.4
=============

Parallel text feature extraction for machine learning.
Overview
========

Wordbatch produces parallel feature extraction from raw text data for uses such as deep learning and text analytics. The most basic use for Wordbatch is as a drop-in replacement for the more basic non-parallelized extraction available in toolkits such as Scikit-learn, TfLearn and Keras. Wordbatch additionally provides customizable preprocessing and feature extractors that improve predictive performance.
Python library for distributed AI processing pipelines, using swappable scheduler backends.

Unlike text preprocessing in these toolkits that either deal with text as a single batch or as a stream, Wordbatch works best with large minibatches of text data. Wordbatch internally stores per-batch statistics of the data, and applies these for uses such as dictionary selection, spelling correction, and online IDF weighting. The larger the batches, the better choices Wordbatch can make in extracting features.
Wordbatch parallelizes task pipelines as minibatches processed by a chosen scheduler backend. This allows
the user to develop AI programs on a local workstation or laptop, and scale the same
solution on a cluster or the cloud, simply by changing the pipeline backend to a distributed scheduler such as Spark,
Dask and Ray. A backend can be chosen based on performance characteristics on a particular task, and swapped for
different situations. For example, an AI model can be trained using a distributed backend, and then debugged or
deployed using a single serial process.

The current text preprocessing options include passing any function as text normalization to be parallelized, a constant-time adaptive version of Norvig spelling correction, and passing any function for parallel stemming.

Currently four basic feature extractor classes are provided:

- WordHash is simply the Scikit-learn HashingVectorizer wrapped with the Wordbatch parallelization, providing multiplied processing speeds
- WordBag is a flexible alternative to Wordhash, providing cababilities missing from Scikit-learn, such as IDF and per n-gram order weighting of hashed features, windowed and distance-weighted polynomial interactions, and more transforms for counts.
- WordSeq provides sequences of word integers, as used by the deep learning toolkits for input into LSTM models.
- WordVec provides embedding transforms from words into wordvectors

A list of extractors can be defined. For example, word vector sequences can be projected into per-document vectors, and concatenated with the vectors from other word vector embeddings.

Four basic OpenMP-parallelized L1&L2-regularized online learning models are provided, for single-label regression and classification:

- FTRL : Linear model Proximal-FTRL that has become the most popular algorithm for online learning of linear models in Kaggle competions. The Cython-optimized implementation should be the fastest available version of FTRL.
- FM_FTRL : Factorization Machines. Linear effects estimated with FTRL and factor effects estimated with adaptive SGD. Prediction and estimation multithreaded across factors.
- NN_Relu_H1 : Neural Network with 1 hidden layer and Rectified Linear Unit activations, estimated with adaptive SGD. Prediction and estimation multithreaded across hidden layer.
- NN_Relu_H2: Neural Network with 2 hidden layers and Rectified Linear Unit activations, estimated with adaptive SGD. Prediction multithreaded across 2nd hidden layer, estimation across 1st hidden layer outputs.

The adaptive SGD optimizer works like Adagrad, but pools the adaptive learning rates across hidden nodes using the same feature. This makes learning more robust and requires less memory.

Wordbatch is written with Cython, and uses concurrent threading, multiprocessing and OpenMP parallelization for circumventing the Python GIL. License is GNU GPL 2.0, and less restrictive licenses are available on request.
The library is organized around the orchestrator class Batcher, and Sklearn-compatible components,
split into Pipelines, Transformers, Extractors and Models. These extend the Scikit-learn API with a
fit_partial()-method, that enables transformers and models to be used in a streaming fashion.
The current set of components has been developed mostly for text processing tasks, but components for other domains
can be developed based on the available classes.

Requirements
============
Linux/Windows. Python 2.7 / 3.6 / 3.7
Linux / Windows / macOS. Python 3.6 / 3.7

Installation
============
pip install wordbatch

macOS: compile using GCC-7 (https://github.com/anttttti/Wordbatch/issues/1)

Getting started
===============

| #from sklearn.feature_extraction.text import HashingVectorizer
| #from sklearn.linear_model import *
| #vct= HashingVectorizer()
| #clf= SGDRegressor()
|
| import wordbatch
| from wordbatch.models import FTRL
| from wordbatch.extractors import WordBag
| wb= wordbatch.WordBatch(extractor=(WordBag, {"hash_ngrams":2, "hash_ngrams_weights":[0.5, -1.0], "hash_size":2**23, "norm":'l2', "tf":'log', "idf":50.0}))
| from wordbatch.pipelines import WordBatch
| from wordbatch.batcher import Batcher
|
| wb= WordBatch(extractor=WordBag(hash_ngrams=0, norm= 'l2', tf= 'binary', idf= 50.0),
| batcher=Batcher(backend="multiprocessing"))
|
| clf= FTRL(alpha=1.0, beta=1.0, L1=0.00001, L2=1.0, D=2 ** 25, iters=1)
|
| train_texts= ["Cut down a tree with a herring? It can't be done.", "Don't say that word.", "How can we not say the word if you don't tell us what it is?"]
| train_texts= ["Cut down a tree with a herring? It can't be done.",
| "Don't say that word.",
| "How can we not say the word if you don't tell us what it is?"]
| train_labels= [1, 0, 1]
| test_texts= ["Wait! I said it! I said it! Ooh! I said it again!"]
|
| clf.fit(wb.transform(train_texts), train_labels)
| preds= clf.predict(wb.transform(test_texts))
| clf.fit(wb.fit_transform(train_texts), train_labels)
| print(clf.predict(wb.transform(test_texts)))
|
| import ray
| ray.init()
| wb.batcher.backend= "ray"
| wb.batcher.backend_handle= ray
|
| clf.fit(wb.fit_transform(train_texts), train_labels)
| print(clf.predict(wb.transform(test_texts)))

Components
==========

Batcher
-------
Batcher orchestrates MapReduce processing of tasks using a backend, by splitting input data into separately processed
minibatches. Currently three local (serial, multiprocessing, Loky) and three distributed backends (Spark, Dask,
Ray) are supported. Some distributed backends will process the tasks concurrently as a graph of lazily evaluated
futures, with Batcher dynamically sending the graph for the backend to process. All three supported distributed
backends allow real time monitoring of the processing pipeline using the backend's own GUI.


Pipelines
---------
Pipelines are classes that send functions, methods and classes to Batcher for processing. Unlike other components in
Wordbatch, pipelines contain a reference to Batcher, and are never referenced themselves in the calls sent to Batcher.
This prevents trying to serialize and send the backend handle itself. The simplest pipeline is Apply,
which processes a function or method over the input data row-by-row. WordBatch is a full complex pipeline for text
processing, with optional steps such as text normalization, spelling correction, stemming, feature extraction, and
LZ4-caching of results.


Transformers
------------
Transformers are transformer classes extending the Scikit-learn API, by accepting a Batcher instance as argument
of fit and transform methods. Transformers won't store Batcher references, allowing the transformer objects to be sent
to distributed workers. This allows transformers to do MapReduce operations as part of its methods, for example
gathering a dictionary of words from data when fitting a Dictionary. The current set of transformers are
text-specific classes, such as Dictionary, Tokenizer and TextNormalizer.


Extractors
----------
Extractors are transformer classes which don't directly call Batcher. Since extractors can't call Batcher directly,
they are mostly immutable and used for their transform() method calls distributed using a pipeline. The current set of
extractors is Cython-optimized, and aside from PandasHash intended for text feature extraction. These are:

- WordHash is wrapper for Scikit-learn HashingVectorizer, extended with option for LZ4-caching
- WordBag is a flexible alternative to Wordhash, with options such as IDF and per n-gram order weighting
- WordSeq provides sequences of word integers, as used by deep learning language models
- WordVec embeds words into word vector representations
- PandasHash extracts hashed features from a Pandas DataFrame, similar to VowpalWabbit's feature extraction


Models
------
Models are predictive models such as classifiers. Similar to extractors, they don't directly call Batcher, but are
Scikit-learn compatible and distributed using a pipeline if needed. Currently four
OpenMP-multithreaded L1&L2-regularized online learning models are provided, for single-label regression and
classification:

- FTRL : Linear model Proximal-FTRL that has become the most popular algorithm for online learning of linear models in Kaggle competions. The Cython-optimized implementation should be the fastest available version of FTRL.
- FM_FTRL : Factorization Machines. Linear effects estimated with FTRL and factor effects estimated with adaptive SGD. Prediction and estimation multithreaded across factors.
- NN_Relu_H1 : Neural Network with 1 hidden layer and Rectified Linear Unit activations, estimated with adaptive SGD. Prediction and estimation multithreaded across hidden layer.
- NN_Relu_H2: Neural Network with 2 hidden layers and Rectified Linear Unit activations, estimated with adaptive SGD. Prediction multithreaded across 2nd hidden layer, estimation across 1st hidden layer outputs.

The adaptive SGD optimizer works like Adagrad, but pools the adaptive learning rates across hidden nodes using the same
feature. This makes learning more robust and requires less memory. FM_FTRL uses AVX2-optimization, so that processors
supporting AVX2 will run the factorization model up to four times faster.

Example scripts
===============

The directory /scripts/ contains four scripts for demonstrating the basic extractors, and a Scikit-learn ensemble model to combine predictions. To run the scripts you should first install the dependencies: Keras, NLTK, TextBlob and Pandas. The scripts also use the TripAdvisor dataset (http://times.cs.uiuc.edu/~wang296/Data/) for training models, and the precomputed word embeddings glove.twitter.27B.100d and glove.6B.50d (http://nlp.stanford.edu/projects/glove/). The test data from Crowdflower Open data & Kaggle is provided in the /data directory.

- wordhash_regressor.py shows wordbatch.extractors.WordHash, and feature extraction concurrent with file reading
- wordbag_regressor.py shows wordbatch.extractors.WordBag, and online feature extraction and parallel FTRL training
- wordseq_regressor.py shows wordbatch.extractors.WordSeq, and training a 1D-convnet regression model
- wordvec_regressor.py shows wordbatch.extractors.WordVec, and combining word vector embeddings for FTRL training
- classify_airline_sentiment.py show how to combine predictions from the four scripts using a Random Forest Regressor on the airline sentiment data
The directory /scripts/ contains scripts for demonstrating and testing basic uses of the toolkit. To run the scripts
one should first install the dependencies: Keras, NLTK, TextBlob, Pandas, Ray, Dask Distributed and PySpark.
The scripts also use the TripAdvisor dataset (http://times.cs.uiuc.edu/~wang296/Data/), and the
precomputed word embeddings glove.twitter.27B.100d and glove.6B.50d (http://nlp.stanford.edu/projects/glove/). Test
data from Crowdflower Open data & Kaggle is provided in the /data directory.

Spark integration
=================
Starting from 1.2, Wordbatch has full Spark integration. All processing steps will be parallelized by Spark, simply by setting wb.use_sc=True and providing data in the RDD format produced by wb.lists2rddbatches(texts).
Airline Classification Example
------------------------------
classify_airline_sentiment.py shows training and combining predictions with four classifier scripts that use the
Wordbatch extractors and models: wordhash_regressor.py, wordbag_regressor.py, wordseq_regressor.py and
wordvec_regressor.py. The header part of the script can be modified to choose the backend. By default Ray is used and
passed to the other scripts.

A basic script using this is wordbag_regressor_spark.py, which is the wordbag_regressor.py script modified to run on Spark. This converts each minibatch of training data into an RDD, does feature extraction on the RDD, and collects the resulting features for local FTRL model training. A more practical script should read the data from parallelized storage, and implement model training on the RDD as well.
Backends Benchmark Example
--------------------------
backends_benchmark.py shows how to benchmark different backends on two simple pipeline tasks:
using ApplyBatch with Scikit-learn HashingVectorizer, and running WordBatch Pipeline with most of its possible
processing steps. Dask and Spark are commented out by default, as these need command-line configuration.
All three distributed backends can be configured to run across a distributed cluster, as done in the
commented-out code.

Parallel prediction is also demonstrated in wordbag_regressor_spark.py. By calling the class with predict_parallel(), it will parallelize prediction either locally or on Spark, depending on whether a SparkContext has been set for the class.

Contributors
============
Antti Puurula

Anders Topper

Cheng-Tsung Liu
116 changes: 116 additions & 0 deletions scripts/backends_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import re
from contextlib import closing, contextmanager
import time
from wordbatch.pipelines import WordBatch, Apply, ApplyBatch
from wordbatch.extractors import WordHash, WordBag
from wordbatch.transformers import Tokenizer, Dictionary
from wordbatch.batcher import Batcher
import os
import json
from sklearn.feature_extraction.text import HashingVectorizer
import warnings
import pandas as pd

tripadvisor_dir= "../data/tripadvisor/json"

#Configure below to allow Dask / Spark
# scheduler_ip= "169.254.93.14"
# from dask.distributed import Client
# #dask-scheduler --host 169.254.93.14
# #dask-worker 169.254.93.14:8786 --nprocs 16
# dask_client = Client(scheduler_ip+":8786")
#
# from pyspark import SparkContext, SparkConf
# # conf= SparkConf().setAll([('spark.executor.memory', '4g'), ('spark.driver.memory', '30g'),
# # ('spark.driver.maxResultSize', '10g')])
# import os
# os.environ['PYSPARK_PYTHON'] = '/home/USERNAME/anaconda3/envs/ENV_NAME/bin/python'
# conf= SparkConf().setAll([('spark.executor.memory', '4g'), ('spark.driver.memory', '30g'),
# ('spark.driver.maxResultSize', '10g')]).setMaster("spark://169.254.93.14:7077")
# spark_context = SparkContext(conf=conf)

import ray
#ray start --head --node-ip-address 169.254.93.14
#ray.init(redis_address=scheduler_ip+":57113") #Change port accordingly
ray.init()

@contextmanager
def timer(name):
t0 = time.time()
yield
print(name + " done in " + str(time.time() - t0) + "s")

if 1==1:
texts= []
for jsonfile in os.listdir(tripadvisor_dir):
with open(tripadvisor_dir + "/" + jsonfile, 'r') as inputfile:
for line in inputfile:
try:
line = json.loads(line.strip())
except:
continue
for review in line["Reviews"]:
texts.append(review["Content"])
# pd.to_pickle(texts, "tripadvisor_data.pkl")
# else:
# texts= pd.read_pickle("tripadvisor_data.pkl")

non_alphanums = re.compile('[\W+]')
nums_re= re.compile("\W*[0-9]+\W*")
triples_re= re.compile(r"(\w)\1{2,}")
trash_re= [re.compile("<[^>]*>"), re.compile("[^a-z0-9' -]+"), re.compile(" [.0-9'-]+ "), re.compile("[-']{2,}"),
re.compile(" '"),re.compile(" +")]
from nltk.stem.porter import PorterStemmer
stemmer= PorterStemmer()

def normalize_text(text):
text= text.lower()
text= nums_re.sub(" NUM ", text)
text= " ".join([word for word in non_alphanums.sub(" ",text).strip().split() if len(word)>1])
return text

print(len(texts))
backends= [
['serial', ""],
['multiprocessing', ""],
['loky', ""],
#['dask', dask_client], #Uncomment once configured
#['spark', spark_context], #Uncomment once configured
['ray', ray]
]

tasks= [
"ApplyBatch",
"WordBag",
]

data_sizes= [40000, 80000, 160000, 320000, 640000, 1280000]

for task in tasks:
for data_size in data_sizes:
texts_chunk = texts[:data_size]
print("Task:", task, "Data size:", data_size)
for backend in backends:
batcher = Batcher(procs=16, minibatch_size=5000, backend=backend[0], backend_handle=backend[1])
#try:
with timer("Completed: ["+task+","+str(len(texts_chunk))+","+backend[0]+"]"), warnings.catch_warnings():
warnings.simplefilter("ignore")
if task=="ApplyBatch":
hv = HashingVectorizer(decode_error='ignore', n_features=2 ** 25, preprocessor=normalize_text,
ngram_range=(1, 2), norm='l2')
t= ApplyBatch(hv.transform, batcher=batcher).transform(texts_chunk)
print(t.shape, t.data[:5])

if task=="WordBag":
wb = WordBatch(normalize_text=normalize_text,
dictionary=Dictionary(min_df=10, max_words=1000000, verbose=0),
tokenizer= Tokenizer(spellcor_count=2, spellcor_dist=2, raw_min_df= 2,
stemmer= stemmer),
extractor=WordBag(hash_ngrams=0, norm= 'l2', tf= 'binary', idf= 50.0),
batcher= batcher,
verbose= 0)
t = wb.fit_transform(texts_chunk)
print(t.shape, t.data[:5])
# except:
# print("Failed ["+task+","+str(len(texts_chunk))+","+backend[0]+"]")
print("")
Loading

0 comments on commit 01012a6

Please sign in to comment.