Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support loading datasets of 10GB in BQ in less than 5 min #429

Closed
1 of 5 tasks
tatiana opened this issue Jun 6, 2022 · 4 comments · Fixed by #489
Closed
1 of 5 tasks

Support loading datasets of 10GB in BQ in less than 5 min #429

tatiana opened this issue Jun 6, 2022 · 4 comments · Fixed by #489
Assignees
Labels
improvement Enhancement or improvement in an existing feature
Milestone

Comments

@tatiana
Copy link
Collaborator

tatiana commented Jun 6, 2022

Dependencies

Acceptance criteria

  • Run the bq load command to see the "quickest" way we could load the file from GCS -> BQ (depending on this, adjust how long it is acceptable for us to do the same operation in Astro SDK)
  • Make changes to how we load data into BigQuery.
  • Re-run the benchmark and identify the performance improvements (?)
  • The changes must work for all file types supported (CSV, JSON, NDJSON, Parquet)
  • The changes made would also work on S3 and GCS
@kaxil kaxil added this to the 1.0.0 milestone Jun 6, 2022
@kaxil kaxil added the improvement Enhancement or improvement in an existing feature label Jun 7, 2022
@tatiana tatiana changed the title Support loading datasets of 10GB in BQ in less than 2 min Support loading datasets of 10GB in BQ in less than 5 min Jun 7, 2022
@utkarsharma2
Copy link
Collaborator

results of bq load :

Dataset Size Duration(h-m-s)
covid_overview/covid_overview_10kb.csv 10 KB 0:00:02
tate_britain/artist_data_100kb.csv 100KB 0:00:02
imdb/title_ratings_10mb.csv 10MB 0:00:05
stackoverflow/stackoverflow_posts_1g.ndjson 1GB 0:00:50
trimmed/pypi/* 5GB 0:00:41
github/github-archive/* 10GB 0:01:09

@utkarsharma2
Copy link
Collaborator

Below are the results of the profiling on the load_file operator with the gs://astro-sdk/benchmark/trimmed/imdb/title_ratings_10mb.csv dataset.

454728 function calls (452486 primitive calls) in 69.968 seconds

   Ordered by: cumulative time
   
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     29/1    0.000    0.000   69.969   69.969 {built-in method builtins.exec}
        1    0.000    0.000   69.969   69.969 load_file.py:63(load_data)
        1    0.000    0.000   69.969   69.969 load_file.py:71(load_data_to_table)
     8273    0.021    0.000   59.489    0.007 socket.py:655(readinto)
     8273    0.020    0.000   59.451    0.007 ssl.py:1230(recv_into)
     8273    0.014    0.000   59.428    0.007 ssl.py:1090(read)
     8273   59.412    0.007   59.412    0.007 {method 'read' of '_ssl._SSLSocket' objects}
       41    0.000    0.000   54.168    1.321 _request_helpers.py:109(wait_and_retry)
        1    0.000    0.000   52.174   52.174 base.py:74(export_to_dataframe)
       54    0.001    0.000   51.561    0.955 sessions.py:457(request)
       51    0.002    0.000   51.237    1.005 requests.py:421(request)
       54    0.003    0.000   51.036    0.945 sessions.py:613(send)
       54    0.003    0.000   51.016    0.945 adapters.py:395(send)
       54    0.005    0.000   50.953    0.944 connectionpool.py:522(urlopen)
       54    0.004    0.000   50.900    0.943 connectionpool.py:361(_make_request)
        4    0.000    0.000   49.615   12.404 _decorators.py:302(wrapper)
        1    0.000    0.000   49.588   49.588 csv.py:12(export_to_dataframe)
        1    0.000    0.000   49.588   49.588 readers.py:491(read_csv)
        1    0.001    0.001   49.588   49.588 readers.py:467(_read)
       42    0.000    0.000   49.220    1.172 gcs.py:327(read1)
       42    0.000    0.000   49.219    1.172 gcs.py:301(read)
       ...(trimmed results)

tottime : for the total time spent in the given function (and excluding time made in calls to sub-functions)

Cumtime: is the cumulative time spent in this and all subfunctions (from invocation till exit). This figure is accurate even for recursive functions.

On doing the further investigation found the pd.read_csv() was taking 49.5 sec

1 0.000 0.000 49.588 49.588 readers.py:491(read_csv)

Solution

We can leverage multithreading, and create a pool of threads that can read the file partially and process the rows and ingest the data in the upstream database.

As this approach is overkill for smaller files we can be selective about the file size to apply this.

Assumptions and limitations

  1. Let’s say we create 4 and assume the network bandwidth is not the bottleneck. This should give us a better performance by a factor of 4.
  2. We don’t guarantee the order of data in the file and the database will be the same.
  3. This approach can only work for file types where the individual line represents the complete data. Like CSV, NDJSON, and Parquet.

Results

Dataset Astro SDK Stand alone script without Optimization Stand alone script with Optimization
gs://astro-sdk/benchmark/trimmed/imdb/title_ratings_10mb.csv 72.99403381347656 65.38399887084961 34.02283191680908

Stand alone python script

import time

import pandas as pd
import multiprocessing as mp
import smart_open
import random
import string
from tenacity import retry, stop_after_attempt, wait_fixed

MAX_TABLE_NAME_LENGTH = 62
DEFAULT_CHUNK_SIZE = 1000000  # processing no. rows at a time
# csv rows - 600000


def _create_unique_table_name(prefix: str = "") -> str:
   """
   If a table is instantiated without a name, create a unique table for it.
   This new name should be compatible with all supported databases.
   """
   schema_length = 10 + 1
   prefix_length = len(prefix)

   unique_id = random.choice(string.ascii_lowercase) + "".join(
       random.choice(string.ascii_lowercase + string.digits)
       for _ in range(MAX_TABLE_NAME_LENGTH - schema_length - prefix_length)
   )
   if prefix:
       unique_id = f"{prefix}{unique_id}"

       return unique_id


@retry(stop=stop_after_attempt(5))
def process_frame(df, table):
   df.to_gbq(
       f'Benchmarking.{table}',
       if_exists='append',
       chunksize=DEFAULT_CHUNK_SIZE,
       project_id='astronomer-dag-authoring',
   )

def dummy_load_file():
   path = "gs://astro-sdk/benchmark/trimmed/imdb/title_ratings_10mb.csv"
   # path = "gs://astro-sdk/benchmark/trimmed/github/github_timeline.csv"
   # path = "/Users/utkarsharma/Desktop/title_ratings_10mb.csv"
   with smart_open.open(
           path, mode='r',
   ) as stream:
       reader = pd.read_csv(stream, chunksize=DEFAULT_CHUNK_SIZE)

       # reader = pd.read_csv(path, chunksize=DEFAULT_CHUNK_SIZE)
       pool = mp.Pool(4)  # use 4 processes

       funclist = []
       count = 0
       table = _create_unique_table_name(prefix="_tmp_")
       print("table : ", table)
       for df in reader:
           print("count : ", count)
           # process each data frame
           f = pool.apply_async(process_frame, [df, table])
           funclist.append(f)
           count = count + 1

       for f in funclist:
           df = f.get()  # timeout in 10 seconds


if __name__ == '__main__':
   t1 = time.time()
   dummy_load_file()
   t2 = time.time()
   print("time taken : ", t2 - t1)

@kaxil
Copy link
Collaborator

kaxil commented Jul 4, 2022

Waiting for Kaxil's review before we merge.

@kaxil
Copy link
Collaborator

kaxil commented Jul 4, 2022

https://cloud.google.com/bigquery-transfer/docs/s3-transfer - Bigquery might be able to support loading files from S3

utkarsharma2 added a commit that referenced this issue Jul 6, 2022
What is the current behavior?

When we run the load_file operator for a table in Bigquery and a file in GCS, the data is first loaded to local from the GCS and then uploaded in Bigquery. Because of this, we require both memory and network bandwidth on the worker node.

partially closes: #429
What is the new behavior?

We have used an optimized path that can directly ingest data from GCS to Bigquery, therefore removing both memory and bandwidth requirements on a worker node. Because of this, the speed and amount of data transfer increase.
Does this introduce a breaking change?

No
utkarsharma2 added a commit that referenced this issue Jul 19, 2022
What is the current behavior?

Currently, we are transferring the data from local to Bigquery by first creating the dataframe and then using pandas.to_sql() to ingest data into bigquery.

closes: #534
related: #429
What is the new behavior?

We intend to use google's a python SDK to directly ingest data and we can save time that is wasted on converting files into data frames.
Does this introduce a breaking change?

Nope
utkarsharma2 added a commit that referenced this issue Jul 26, 2022
Description
What is the current behavior?

Benchmarking results for the S3 to bigquery is missing

related: #429
What is the new behavior?

Added benchmarking results for S3 to Bigquery
Does this introduce a breaking change?

No
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
improvement Enhancement or improvement in an existing feature
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants