Skip to content

Commit

Permalink
Merge pull request #20 from MarcusElwin/post-chunking
Browse files Browse the repository at this point in the history
feat(post): Add BQ chunk post
  • Loading branch information
MarcusElwin authored Oct 25, 2023
2 parents 75a7cb9 + 4880486 commit 8b3e43b
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 0 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,15 @@ typos ./ds-with-mac/content/
```sh
# correct typos in folder
typos ./ds-with-mac/content/ -w
```

### Creating posts
```sh
# create new post
hugo new posts/<name>/index.md
```

```sh
# server website locally
hugo server
```
192 changes: 192 additions & 0 deletions ds-with-mac/content/posts/chunk-data/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
---
title: Chunking of Data the BQ Way
seo_title: Chunk Data
summary:
description:
slug: chunk-data
author: Marcus Elwin

draft: false
date: 2023-10-25T21:53:32+02:00
lastmod:
expiryDate:
publishDate:

feature_image:
feature_image_alt:

categories:
- Data Engineering
tags:
- BigQuery
- BQ
- Data
- SQL
series:

toc: true
related: true
social_share: true
newsletter: true
disable_comments: false
---

Recently I had to work with a large client dataset of 100+ million rows and do quite some data cleaning and plumbing :wrench: to prepare this dataset for running a parallel batch job. What it boiled down to was to create `chunks` of `1000` users to use in the batch job. In this post I'll share one nice method of doing so in `BigQuery` to reduce chunking time from *hours* down to *minutes*!

## Chunking
When processing data that contains a large number of records, processing each record one by one can be quite slow. Often data is also fetched from external sources such as API. Whilst processing data in memory tends to be fast, there are natural memory limitations. By *chunking* the data the processing of the job can be speed up multifold. A *chunk* is simple a grouped of records according to some key and size e.g. chunks of `1000` users in each file. In order to fit everything in memory.

## Chunking the "naive" pythonic way
Let's say that you have a table called `transactions` with the schema below and `100` million transactions and `50` thousand unique users:
{{< highlight json "linenos=inline, style=monokai" >}}
[
{
"name": "transactionid",
"mode": "REQUIRED",
"type": "STRING",
"description" "Unique ID of transaction",
"fields": []
},
{
"name": "userid",
"mode": "REQUIRED",
"type": "STRING",
"description" "Unique ID of user",
"fields": []
},
{
"name": "accountid",
"mode": "REQUIRED",
"type": "STRING",
"description" "Unique ID of users account",
"fields": []
},
{
"name": "date",
"mode": "NULLABLE",
"type": "DATE",
"description" "Date of transaction",
"fields": []
},
{
"name": "amount",
"mode": "NULLABLE",
"type": "FLOAT64",
"description" "Amount of transaction debit or credit based on sign",
"fields": []
}
{
"name": "description",
"mode": "NULLABLE",
"type": "STRING",
"description" "Transaction description",
"fields": []
}
]
{{< / highlight >}}

The pythonic way shown below would be to:
* Create chunks based on all unique user IDs (`user_id_list`)
* Only query users for a given chunk
* Export data to a file e.g. `.csv`

{{< highlight python "linenos=inline, style=monokai" >}}
def chunk_list(x: list, chunk_size) -> list:
return [x[i : i + chunk_size] for i in range(0, len(x), chunk_size)]

i=0
for user_id_list in chunk_list(df_userids.userid.tolist(), 1000):
start_time = time.time()

query = f"""SELECT *, _FILE_NAME
FROM `project.dataset.transactions`
WHERE userid IN UNNEST({user_id_list})
"""
result = datalake.query(query)
df_batch = pd.read_csv(result)
{{< / highlight >}}

Although the code looks easy to understand, testing this on `100` million transactions takes roughly `~7+` hours. This is way to slow and we can do much better :brain:.

## Chunking using BigQuery
As BigQuery is "practically" spark under the hood we can use `partitioning` and especially two inbult functions [NTILE](https://cloud.google.com/bigquery/docs/reference/standard-sql/numbering_functions#ntile) and [RANGE_BUCKET](https://cloud.google.com/bigquery/docs/reference/standard-sql/mathematical_functions#range_bucket).

What the `NTILE` function is doing:
{{< notice info >}}
`NTILE` *divides the rows into **constant_integer_expression** buckets based on row ordering and returns the 1-based bucket number that is assigned to each row. The number of rows in the buckets can differ by at most 1. The remainder values (the remainder of number of rows divided by buckets) are distributed one for each bucket, starting with bucket 1. If constant_integer_expression evaluates to NULL, 0 or negative, an error is provided.*
{{< /notice >}}

And what `RANGE_BUCKET` function is doing:
{{< notice info >}}
`RANGE_BUCKET` *scans through a sorted array and returns the 0-based position of the point's upper bound. This can be useful if you need to group your data to build partitions, histograms, business-defined rules, and more..*
{{< /notice >}}

In short `NTILE` creates the `chunk` groups we want used in a `window function` whilst `RANGE_BUCKET` takes care of creating the partitions.

In our previous example combining these two together would look like:
{{< highlight sql "linenos=inline, style=monokai" >}}
--we have 50k users and want 1000 users in each bucket i.e. 50k/1000-> 50
DECLARE num_buckets INT64;
SET num_buckets = 50;

CREATE TABLE `project.dataset.trx_user_batches`
PARTITION BY RANGE_BUCKET(id, GENERATE_ARRAY(1, num_buckets + 1, 1)) AS
-- get transactions
WITH transactions AS (
SELECT *
FROM
`project.dataset.transactions`
),
--create n buckets / chunk to contain n users in our case 50
user_rank AS (
SELECT
*,
NTILE(num_buckets) OVER(ORDER BY userid) AS id
FROM report
),
--notice that we need to create num_buckets + 1 here
user_bucket AS (
SELECT
*,
RANGE_BUCKET(id, GENERATE_ARRAY(1, num_buckets + 1, 1)) AS bucket
FROM
user_rank
),
SELECT
trx.userid,
trx.accountid,
trx.transactionid,
trx.date,
trx.amount,
trx.description
rb.id
FROM
transactions AS trx
INNER JOIN
report_bucket AS rb
ON
AND trx.userid = rb.userid
{{< / highlight >}}
The nice thing with this query apart form it being fast is that we get `50` partitions with `~1000` users in each file or around `2` million transactions per file. This is a much smaller dataset that we can fit in memory compared to the `100` million rows we started with. For instance if you want to export the partitions as files for another job or workflow you could use:

{{< highlight sql "linenos=inline, style=monokai" >}}
EXPORT DATA
OPTIONS (
uri = 'gs://trx_user_batches/user_batch_*.csv',
format = 'CSV',
overwrite = true,
header = true,
field_delimiter = ';')
AS (
SELECT
*
FROM
`project.dataset.trx_user_batches`
ORDER BY
userid ASC,
accountid ASC,
date ASC
);
{{< / highlight >}}

The nice thing with the partition and `EXPORT DATA` is that this is much faster the then the pytonic approach. Exporting 50 partitions files takes roughly `~30-40` minutes instead if `7+` hours :rocket:.

0 comments on commit 8b3e43b

Please sign in to comment.