Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(examples) Add Tabular Dataset Example #3568

Merged
merged 26 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ Other [examples](https://github.com/adap/flower/tree/main/examples):
- [Flower with KaplanMeierFitter from the lifelines library](https://github.com/adap/flower/tree/main/examples/federated-kaplan-meier-fitter)
- [Sample Level Privacy with Opacus](https://github.com/adap/flower/tree/main/examples/opacus)
- [Sample Level Privacy with TensorFlow-Privacy](https://github.com/adap/flower/tree/main/examples/tensorflow-privacy)
- [Flower with a Tabular Dataset] (https://github.com/adap/flower/tree/main/examples/fl-tabular)

## Community

Expand Down
39 changes: 39 additions & 0 deletions examples/fl-tabular/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Flower Example on Adult Census Income Tabular Dataset

This code exemplifies a federated learning setup using the Flower framework on the ["Adult Census Income"](https://huggingface.co/datasets/scikit-learn/adult-census-income) tabular dataset. The "Adult Census Income" dataset contains demographic information such as age, education, occupation, etc., with the target attribute being income level (\<=50K or >50K). The dataset is partitioned into subsets, simulating a federated environment with 5 clients, each holding a distinct portion of the data. Categorical variables are one-hot encoded, and the data is split into training and testing sets. Federated learning is conducted using the FedAvg strategy for 5 rounds.

This example uses [Flower Datasets](https://flower.ai/docs/datasets/) to download, partition and preprocess the dataset.

## Environments Setup

Start by cloning the example. We prepared a single-line command that you can copy into your shell which will checkout the example for you:

```shell
git clone --depth=1 https://github.com/adap/flower.git && mv flower/examples/fl-tabular . && rm -rf flower && cd fl-tabular
```

This will create a new directory called `fl-tabular` containing the following files:

```shell
-- pyproject.toml
-- client.py
-- server.py
-- task.py
-- README.md
```

### Installing dependencies

Project dependencies are defined in `pyproject.toml`. Install them with:

```shell
pip install .
```

## Running Code

### Federated Using Flower Simulation

```bash
flower-simulation --server-app server:app --client-app client:app --num-supernodes 5
```
38 changes: 38 additions & 0 deletions examples/fl-tabular/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from flwr.client import Client, ClientApp, NumPyClient
from flwr_datasets import FederatedDataset
from task import set_weights, get_weights, train, evaluate, IncomeClassifier, load_data

NUMBER_OF_CLIENTS = 5


class FlowerClient(NumPyClient):
def __init__(self, net, trainloader, testloader):
self.net = net
self.trainloader = trainloader
self.testloader = testloader

def fit(self, parameters, config):
set_weights(self.net, parameters)
train(self.net, self.trainloader)
return get_weights(self.net), len(self.trainloader), {}

def evaluate(self, parameters, config):
set_weights(self.net, parameters)
loss, accuracy = evaluate(self.net, self.testloader)
return loss, len(self.testloader), {"accuracy": accuracy}


def get_client_fn(dataset: FederatedDataset):
def client_fn(cid: str) -> Client:
train_loader, test_loader = load_data(partition_id=int(cid), fds=dataset)
net = IncomeClassifier(14)
return FlowerClient(net, train_loader, test_loader).to_client()

return client_fn


fds = FederatedDataset(
dataset="scikit-learn/adult-census-income",
partitioners={"train": NUMBER_OF_CLIENTS},
)
app = ClientApp(client_fn=get_client_fn(fds))
20 changes: 20 additions & 0 deletions examples/fl-tabular/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "fl-tabular"
version = "0.1.0"
description = "Adult Census Income Tabular Dataset and Federated Learning in Flower"
authors = [
{ name = "The Flower Authors", email = "hello@flower.ai" },
]
dependencies = [
"flwr[simulation]>=1.9.0,<2.0",
"flwr-datasets>=0.1.0,<1.0.0",
"torch==2.1.1",
"scikit-learn==1.4.2",
]

[tool.hatch.build.targets.wheel]
packages = ["."]
24 changes: 24 additions & 0 deletions examples/fl-tabular/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from flwr.common import ndarrays_to_parameters
from flwr.server import ServerApp, ServerConfig
from flwr.server.strategy import FedAvg
from task import IncomeClassifier, get_weights

net = IncomeClassifier(input_dim=14)
params = ndarrays_to_parameters(get_weights(net))


def weighted_average(metrics):
accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics]
examples = [num_examples for num_examples, _ in metrics]

return {"accuracy": sum(accuracies) / sum(examples)}


strategy = FedAvg(
initial_parameters=params,
evaluate_metrics_aggregation_fn=weighted_average,
)
app = ServerApp(
strategy=strategy,
config=ServerConfig(num_rounds=5),
)
108 changes: 108 additions & 0 deletions examples/fl-tabular/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OrdinalEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from collections import OrderedDict
from flwr_datasets import FederatedDataset


def load_data(partition_id: int, fds: FederatedDataset):
dataset = fds.load_partition(partition_id, "train").with_format("pandas")[:]

dataset.dropna(inplace=True)

categorical_cols = dataset.select_dtypes(include=["object"]).columns
ordinal_encoder = OrdinalEncoder()
dataset[categorical_cols] = ordinal_encoder.fit_transform(dataset[categorical_cols])

X = dataset.drop("income", axis=1)
y = dataset["income"]

X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)

numeric_features = X.select_dtypes(include=["float64", "int64"]).columns
numeric_transformer = Pipeline(steps=[("scaler", StandardScaler())])

preprocessor = ColumnTransformer(
transformers=[("num", numeric_transformer, numeric_features)]
)

X_train = preprocessor.fit_transform(X_train)
X_test = preprocessor.transform(X_test)

X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32).view(-1, 1)
y_test_tensor = torch.tensor(y_test.values, dtype=torch.float32).view(-1, 1)

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

return train_loader, test_loader


class IncomeClassifier(nn.Module):
def __init__(self, input_dim: int):
super(IncomeClassifier, self).__init__()
self.layer1 = nn.Linear(input_dim, 128)
self.layer2 = nn.Linear(128, 64)
self.output = nn.Linear(64, 1)
self.relu = nn.ReLU()
self.sigmoid = nn.Sigmoid()

def forward(self, x):
x = self.relu(self.layer1(x))
x = self.relu(self.layer2(x))
x = self.sigmoid(self.output(x))
return x


def train(model, train_loader, num_epochs=1):
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
model.train()
for epoch in range(num_epochs):
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
loss.backward()
optimizer.step()


def evaluate(model, test_loader):
model.eval()
criterion = nn.BCELoss()
loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for X_batch, y_batch in test_loader:
outputs = model(X_batch)
batch_loss = criterion(outputs, y_batch)
loss += batch_loss.item()
predicted = (outputs > 0.5).float()
total += y_batch.size(0)
correct += (predicted == y_batch).sum().item()
accuracy = correct / total
loss = loss / len(test_loader)
return loss, accuracy


def set_weights(net, parameters):
params_dict = zip(net.state_dict().keys(), parameters)
state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict})
net.load_state_dict(state_dict, strict=True)


def get_weights(net):
ndarrays = [val.cpu().numpy() for _, val in net.state_dict().items()]
return ndarrays