diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py
index 759c7d9e56..465667254e 100644
--- a/sdk/python/feast/infra/gcp.py
+++ b/sdk/python/feast/infra/gcp.py
@@ -1,26 +1,22 @@
 import itertools
-import time
-from dataclasses import asdict, dataclass
-from datetime import datetime, timedelta
+from datetime import datetime
 from multiprocessing.pool import ThreadPool
 from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union
 
 import mmh3
 import pandas
 import pyarrow
-from google.cloud import bigquery
-from jinja2 import BaseLoader, Environment
 
 from feast import FeatureTable, utils
 from feast.data_source import BigQuerySource
 from feast.feature_view import FeatureView
 from feast.infra.key_encoding_utils import serialize_entity_key
+from feast.infra.offline_stores.helpers import get_offline_store_from_sources
 from feast.infra.provider import (
     Provider,
     RetrievalJob,
     _convert_arrow_to_proto,
     _get_column_names,
-    _get_requested_feature_views_to_features_dict,
     _run_field_mapping,
 )
 from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
@@ -29,7 +25,7 @@
 from feast.repo_config import DatastoreOnlineStoreConfig, RepoConfig
 
 
-class Gcp(Provider):
+class GcpProvider(Provider):
     _gcp_project_id: Optional[str]
 
     def __init__(self, config: Optional[DatastoreOnlineStoreConfig]):
@@ -153,31 +149,16 @@ def materialize_single_feature_view(
         start_date = utils.make_tzaware(start_date)
         end_date = utils.make_tzaware(end_date)
 
-        from_expression = feature_view.input.get_table_query_string()
-
-        partition_by_join_key_string = ", ".join(join_key_columns)
-        if partition_by_join_key_string != "":
-            partition_by_join_key_string = (
-                "PARTITION BY " + partition_by_join_key_string
-            )
-        timestamps = [event_timestamp_column]
-        if created_timestamp_column is not None:
-            timestamps.append(created_timestamp_column)
-        timestamp_desc_string = " DESC, ".join(timestamps) + " DESC"
-        field_string = ", ".join(join_key_columns + feature_name_columns + timestamps)
-
-        query = f"""
-            SELECT {field_string}
-            FROM (
-                SELECT {field_string},
-                ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row
-                FROM {from_expression}
-                WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}')
-            )
-            WHERE _feast_row = 1
-            """
-
-        table = self._pull_query(query)
+        offline_store = get_offline_store_from_sources([feature_view.input])
+        table = offline_store.pull_latest_from_table_or_query(
+            data_source=feature_view.input,
+            join_key_columns=join_key_columns,
+            feature_name_columns=feature_name_columns,
+            event_timestamp_column=event_timestamp_column,
+            created_timestamp_column=created_timestamp_column,
+            start_date=start_date,
+            end_date=end_date,
+        )
 
         if feature_view.input.field_mapping is not None:
             table = _run_field_mapping(table, feature_view.input.field_mapping)
@@ -205,31 +186,15 @@ def get_historical_features(
         feature_refs: List[str],
         entity_df: Union[pandas.DataFrame, str],
     ) -> RetrievalJob:
-        # TODO: Add entity_df validation in order to fail before interacting with BigQuery
-
-        if type(entity_df) is str:
-            entity_df_sql_table = f"({entity_df})"
-        elif isinstance(entity_df, pandas.DataFrame):
-            table_id = _upload_entity_df_into_bigquery(config.project, entity_df)
-            entity_df_sql_table = f"`{table_id}`"
-        else:
-            raise ValueError(
-                f"The entity dataframe you have provided must be a Pandas DataFrame or BigQuery SQL query, "
-                f"but we found: {type(entity_df)} "
-            )
-
-        # Build a query context containing all information required to template the BigQuery SQL query
-        query_context = get_feature_view_query_context(feature_refs, feature_views)
-
-        # TODO: Infer min_timestamp and max_timestamp from entity_df
-        # Generate the BigQuery SQL query from the query context
-        query = build_point_in_time_query(
-            query_context,
-            min_timestamp=datetime.now() - timedelta(days=365),
-            max_timestamp=datetime.now() + timedelta(days=1),
-            left_table_query_string=entity_df_sql_table,
+        offline_store = get_offline_store_from_sources(
+            [feature_view.input for feature_view in feature_views]
+        )
+        job = offline_store.get_historical_features(
+            config=config,
+            feature_views=feature_views,
+            feature_refs=feature_refs,
+            entity_df=entity_df,
         )
-        job = BigQueryRetrievalJob(query=query)
         return job
 
 
@@ -342,236 +307,3 @@ def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str:
     do with the Entity concept we have in Feast.
     """
     return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex()
-
-
-class BigQueryRetrievalJob(RetrievalJob):
-    def __init__(self, query):
-        self.query = query
-
-    def to_df(self):
-        # TODO: Ideally only start this job when the user runs "get_historical_features", not when they run to_df()
-        client = bigquery.Client()
-        df = client.query(self.query).to_dataframe(create_bqstorage_client=True)
-        return df
-
-
-@dataclass(frozen=True)
-class FeatureViewQueryContext:
-    """Context object used to template a BigQuery point-in-time SQL query"""
-
-    name: str
-    ttl: int
-    entities: List[str]
-    features: List[str]  # feature reference format
-    table_ref: str
-    event_timestamp_column: str
-    created_timestamp_column: str
-    field_mapping: Dict[str, str]
-    query: str
-    table_subquery: str
-
-
-def _upload_entity_df_into_bigquery(project, entity_df) -> str:
-    """Uploads a Pandas entity dataframe into a BigQuery table and returns a reference to the resulting table"""
-    client = bigquery.Client()
-
-    # First create the BigQuery dataset if it doesn't exist
-    dataset = bigquery.Dataset(f"{client.project}.feast_{project}")
-    dataset.location = "US"
-    client.create_dataset(
-        dataset, exists_ok=True
-    )  # TODO: Consider moving this to apply or BigQueryOfflineStore
-
-    # Drop the index so that we dont have unnecessary columns
-    entity_df.reset_index(drop=True, inplace=True)
-
-    # Upload the dataframe into BigQuery, creating a temporary table
-    job_config = bigquery.LoadJobConfig()
-    table_id = f"{client.project}.feast_{project}.entity_df_{int(time.time())}"
-    job = client.load_table_from_dataframe(entity_df, table_id, job_config=job_config,)
-    job.result()
-
-    # Ensure that the table expires after some time
-    table = client.get_table(table=table_id)
-    table.expires = datetime.utcnow() + timedelta(minutes=30)
-    client.update_table(table, ["expires"])
-
-    return table_id
-
-
-def get_feature_view_query_context(
-    feature_refs: List[str], feature_views: List[FeatureView]
-) -> List[FeatureViewQueryContext]:
-    """Build a query context containing all information required to template a BigQuery point-in-time SQL query"""
-
-    feature_views_to_feature_map = _get_requested_feature_views_to_features_dict(
-        feature_refs, feature_views
-    )
-
-    query_context = []
-    for feature_view, features in feature_views_to_feature_map.items():
-        entity_names = [entity for entity in feature_view.entities]
-
-        if isinstance(feature_view.ttl, timedelta):
-            ttl_seconds = int(feature_view.ttl.total_seconds())
-        else:
-            ttl_seconds = 0
-
-        assert isinstance(feature_view.input, BigQuerySource)
-
-        context = FeatureViewQueryContext(
-            name=feature_view.name,
-            ttl=ttl_seconds,
-            entities=entity_names,
-            features=features,
-            table_ref=feature_view.input.table_ref,
-            event_timestamp_column=feature_view.input.event_timestamp_column,
-            created_timestamp_column=feature_view.input.created_timestamp_column,
-            # TODO: Make created column optional and not hardcoded
-            field_mapping=feature_view.input.field_mapping,
-            query=feature_view.input.query,
-            table_subquery=feature_view.input.get_table_query_string(),
-        )
-        query_context.append(context)
-    return query_context
-
-
-def build_point_in_time_query(
-    feature_view_query_contexts: List[FeatureViewQueryContext],
-    min_timestamp: datetime,
-    max_timestamp: datetime,
-    left_table_query_string: str,
-):
-    """Build point-in-time query between each feature view table and the entity dataframe"""
-    template = Environment(loader=BaseLoader()).from_string(
-        source=SINGLE_FEATURE_VIEW_POINT_IN_TIME_JOIN
-    )
-
-    # Add additional fields to dict
-    template_context = {
-        "min_timestamp": min_timestamp,
-        "max_timestamp": max_timestamp,
-        "left_table_query_string": left_table_query_string,
-        "featureviews": [asdict(context) for context in feature_view_query_contexts],
-    }
-
-    query = template.render(template_context)
-    return query
-
-
-# TODO: Optimizations
-#   * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly
-#   * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe
-#   * Create temporary tables instead of keeping all tables in memory
-
-SINGLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """
-WITH entity_dataframe AS (
-    SELECT ROW_NUMBER() OVER() AS row_number, edf.* FROM {{ left_table_query_string }} as edf
-),
-{% for featureview in featureviews %}
-/*
- This query template performs the point-in-time correctness join for a single feature set table
- to the provided entity table.
- 1. Concatenate the timestamp and entities from the feature set table with the entity dataset.
- Feature values are joined to this table later for improved efficiency.
- featureview_timestamp is equal to null in rows from the entity dataset.
- */
-{{ featureview.name }}__union_features AS (
-SELECT
-  -- unique identifier for each row in the entity dataset.
-  row_number,
-  -- event_timestamp contains the timestamps to join onto
-  event_timestamp,
-  -- the feature_timestamp, i.e. the latest occurrence of the requested feature relative to the entity_dataset timestamp
-  NULL as {{ featureview.name }}_feature_timestamp,
-  -- created timestamp of the feature at the corresponding feature_timestamp
-  NULL as created_timestamp,
-  -- select only entities belonging to this feature set
-  {{ featureview.entities | join(', ')}},
-  -- boolean for filtering the dataset later
-  true AS is_entity_table
-FROM entity_dataframe
-UNION ALL
-SELECT
-  NULL as row_number,
-  {{ featureview.event_timestamp_column }} as event_timestamp,
-  {{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp,
-  {{ featureview.created_timestamp_column }} as created_timestamp,
-  {{ featureview.entities | join(', ')}},
-  false AS is_entity_table
-FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ max_timestamp }}'
-{% if featureview.ttl == 0 %}{% else %}AND {{ featureview.event_timestamp_column }} >= Timestamp_sub(TIMESTAMP '{{ min_timestamp }}', interval {{ featureview.ttl }} second){% endif %}
-),
-/*
- 2. Window the data in the unioned dataset, partitioning by entity and ordering by event_timestamp, as
- well as is_entity_table.
- Within each window, back-fill the feature_timestamp - as a result of this, the null feature_timestamps
- in the rows from the entity table should now contain the latest timestamps relative to the row's
- event_timestamp.
- For rows where event_timestamp(provided datetime) - feature_timestamp > max age, set the
- feature_timestamp to null.
- */
-{{ featureview.name }}__joined AS (
-SELECT
-  row_number,
-  event_timestamp,
-  {{ featureview.entities | join(', ')}},
-  {% for feature in featureview.features %}
-  IF(event_timestamp >= {{ featureview.name }}_feature_timestamp {% if featureview.ttl == 0 %}{% else %}AND Timestamp_sub(event_timestamp, interval {{ featureview.ttl }} second) < {{ featureview.name }}_feature_timestamp{% endif %}, {{ featureview.name }}__{{ feature }}, NULL) as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
-  {% endfor %}
-FROM (
-SELECT
-  row_number,
-  event_timestamp,
-  {{ featureview.entities | join(', ')}},
-  FIRST_VALUE(created_timestamp IGNORE NULLS) over w AS created_timestamp,
-  FIRST_VALUE({{ featureview.name }}_feature_timestamp IGNORE NULLS) over w AS {{ featureview.name }}_feature_timestamp,
-  is_entity_table
-FROM {{ featureview.name }}__union_features
-WINDOW w AS (PARTITION BY {{ featureview.entities | join(', ') }} ORDER BY event_timestamp DESC, is_entity_table DESC, created_timestamp DESC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
-)
-/*
- 3. Select only the rows from the entity table, and join the features from the original feature set table
- to the dataset using the entity values, feature_timestamp, and created_timestamps.
- */
-LEFT JOIN (
-SELECT
-  {{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp,
-  {{ featureview.created_timestamp_column }} as created_timestamp,
-  {{ featureview.entities | join(', ')}},
-  {% for feature in featureview.features %}
-  {{ feature }} as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
-  {% endfor %}
-FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ max_timestamp }}'
-{% if featureview.ttl == 0 %}{% else %}AND {{ featureview.event_timestamp_column }} >= Timestamp_sub(TIMESTAMP '{{ min_timestamp }}', interval {{ featureview.ttl }} second){% endif %}
-) USING ({{ featureview.name }}_feature_timestamp, created_timestamp, {{ featureview.entities | join(', ')}})
-WHERE is_entity_table
-),
-/*
- 4. Finally, deduplicate the rows by selecting the first occurrence of each entity table row_number.
- */
-{{ featureview.name }}__deduped AS (SELECT
-  k.*
-FROM (
-  SELECT ARRAY_AGG(row LIMIT 1)[OFFSET(0)] k
-  FROM {{ featureview.name }}__joined row
-  GROUP BY row_number
-)){% if loop.last %}{% else %}, {% endif %}
-
-{% endfor %}
-/*
- Joins the outputs of multiple time travel joins to a single table.
- */
-SELECT edf.event_timestamp as event_timestamp, * EXCEPT (row_number, event_timestamp) FROM entity_dataframe edf
-{% for featureview in featureviews %}
-LEFT JOIN (
-    SELECT
-    row_number,
-    {% for feature in featureview.features %}
-    {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
-    {% endfor %}
-    FROM {{ featureview.name }}__deduped
-) USING (row_number)
-{% endfor %}
-ORDER BY event_timestamp
-"""
diff --git a/sdk/python/feast/infra/local_sqlite.py b/sdk/python/feast/infra/local.py
similarity index 51%
rename from sdk/python/feast/infra/local_sqlite.py
rename to sdk/python/feast/infra/local.py
index 38bd77b85a..bf2b3aa231 100644
--- a/sdk/python/feast/infra/local_sqlite.py
+++ b/sdk/python/feast/infra/local.py
@@ -4,20 +4,17 @@
 from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
 
 import pandas as pd
-import pyarrow
 import pytz
 
 from feast import FeatureTable, utils
-from feast.data_source import FileSource
 from feast.feature_view import FeatureView
 from feast.infra.key_encoding_utils import serialize_entity_key
+from feast.infra.offline_stores.helpers import get_offline_store_from_sources
 from feast.infra.provider import (
-    ENTITY_DF_EVENT_TIMESTAMP_COL,
     Provider,
     RetrievalJob,
     _convert_arrow_to_proto,
     _get_column_names,
-    _get_requested_feature_views_to_features_dict,
     _run_field_mapping,
 )
 from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
@@ -26,20 +23,7 @@
 from feast.repo_config import LocalOnlineStoreConfig, RepoConfig
 
 
-class FileRetrievalJob(RetrievalJob):
-    def __init__(self, evaluation_function: Callable):
-        """Initialize a lazy historical retrieval job"""
-
-        # The evaluation function executes a stored procedure to compute a historical retrieval.
-        self.evaluation_function = evaluation_function
-
-    def to_df(self):
-        # Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
-        df = self.evaluation_function()
-        return df
-
-
-class LocalSqlite(Provider):
+class LocalProvider(Provider):
     _db_path: str
 
     def __init__(self, config: LocalOnlineStoreConfig):
@@ -170,7 +154,6 @@ def materialize_single_feature_view(
         registry: Registry,
         project: str,
     ) -> None:
-        assert isinstance(feature_view.input, FileSource)
         entities = []
         for entity_name in feature_view.entities:
             entities.append(registry.get_entity(entity_name, project))
@@ -185,34 +168,15 @@ def materialize_single_feature_view(
         start_date = utils.make_tzaware(start_date)
         end_date = utils.make_tzaware(end_date)
 
-        source_df = pd.read_parquet(feature_view.input.path)
-        # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC
-        source_df[event_timestamp_column] = source_df[event_timestamp_column].apply(
-            lambda x: x if x.tz is not None else x.replace(tzinfo=pytz.utc)
-        )
-        source_df[created_timestamp_column] = source_df[created_timestamp_column].apply(
-            lambda x: x if x.tz is not None else x.replace(tzinfo=pytz.utc)
-        )
-
-        ts_columns = (
-            [event_timestamp_column, created_timestamp_column]
-            if created_timestamp_column is not None
-            else [event_timestamp_column]
-        )
-
-        source_df.sort_values(by=ts_columns, inplace=True)
-
-        filtered_df = source_df[
-            (source_df[event_timestamp_column] >= start_date)
-            & (source_df[event_timestamp_column] < end_date)
-        ]
-        last_values_df = filtered_df.groupby(by=join_key_columns).last()
-
-        # make driver_id a normal column again
-        last_values_df.reset_index(inplace=True)
-
-        table = pyarrow.Table.from_pandas(
-            last_values_df[join_key_columns + feature_name_columns + ts_columns]
+        offline_store = get_offline_store_from_sources([feature_view.input])
+        table = offline_store.pull_latest_from_table_or_query(
+            data_source=feature_view.input,
+            join_key_columns=join_key_columns,
+            feature_name_columns=feature_name_columns,
+            event_timestamp_column=event_timestamp_column,
+            created_timestamp_column=created_timestamp_column,
+            start_date=start_date,
+            end_date=end_date,
         )
 
         if feature_view.input.field_mapping is not None:
@@ -232,101 +196,16 @@ def get_historical_features(
         feature_views: List[FeatureView],
         feature_refs: List[str],
         entity_df: Union[pd.DataFrame, str],
-    ) -> FileRetrievalJob:
-        if not isinstance(entity_df, pd.DataFrame):
-            raise ValueError(
-                f"Please provide an entity_df of type {type(pd.DataFrame)} instead of type {type(entity_df)}"
-            )
-
-        feature_views_to_features = _get_requested_feature_views_to_features_dict(
-            feature_refs, feature_views
+    ) -> RetrievalJob:
+        offline_store = get_offline_store_from_sources(
+            [feature_view.input for feature_view in feature_views]
+        )
+        return offline_store.get_historical_features(
+            config=config,
+            feature_views=feature_views,
+            feature_refs=feature_refs,
+            entity_df=entity_df,
         )
-
-        # Create lazy function that is only called from the RetrievalJob object
-        def evaluate_historical_retrieval():
-
-            # Sort entity dataframe prior to join, and create a copy to prevent modifying the original
-            entity_df_with_features = entity_df.sort_values(
-                ENTITY_DF_EVENT_TIMESTAMP_COL
-            ).copy()
-
-            # Load feature view data from sources and join them incrementally
-            for feature_view, features in feature_views_to_features.items():
-                event_timestamp_column = feature_view.input.event_timestamp_column
-                created_timestamp_column = feature_view.input.created_timestamp_column
-
-                # Read dataframe to join to entity dataframe
-                df_to_join = pd.read_parquet(feature_view.input.path).sort_values(
-                    event_timestamp_column
-                )
-
-                # Build a list of all the features we should select from this source
-                feature_names = []
-                for feature in features:
-                    # Modify the separator for feature refs in column names to double underscore. We are using
-                    # double underscore as separator for consistency with other databases like BigQuery,
-                    # where there are very few characters available for use as separators
-                    prefixed_feature_name = f"{feature_view.name}__{feature}"
-
-                    # Add the feature name to the list of columns
-                    feature_names.append(prefixed_feature_name)
-
-                    # Ensure that the source dataframe feature column includes the feature view name as a prefix
-                    df_to_join.rename(
-                        columns={feature: prefixed_feature_name}, inplace=True,
-                    )
-
-                # Build a list of entity columns to join on (from the right table)
-                right_entity_columns = [entity for entity in feature_view.entities]
-                right_entity_key_columns = [
-                    event_timestamp_column
-                ] + right_entity_columns
-
-                # Remove all duplicate entity keys (using created timestamp)
-                right_entity_key_sort_columns = right_entity_key_columns
-                if created_timestamp_column:
-                    # If created_timestamp is available, use it to dedupe deterministically
-                    right_entity_key_sort_columns = right_entity_key_sort_columns + [
-                        created_timestamp_column
-                    ]
-
-                df_to_join.sort_values(by=right_entity_key_sort_columns, inplace=True)
-                df_to_join = df_to_join.groupby(by=right_entity_key_columns).last()
-                df_to_join.reset_index(inplace=True)
-
-                # Select only the columns we need to join from the feature dataframe
-                df_to_join = df_to_join[right_entity_key_columns + feature_names]
-
-                # Do point in-time-join between entity_df and feature dataframe
-                entity_df_with_features = pd.merge_asof(
-                    entity_df_with_features,
-                    df_to_join,
-                    left_on=ENTITY_DF_EVENT_TIMESTAMP_COL,
-                    right_on=event_timestamp_column,
-                    by=right_entity_columns,
-                    tolerance=feature_view.ttl,
-                )
-
-                # Remove right (feature table/view) event_timestamp column.
-                if event_timestamp_column != ENTITY_DF_EVENT_TIMESTAMP_COL:
-                    entity_df_with_features.drop(
-                        columns=[event_timestamp_column], inplace=True
-                    )
-
-                # Ensure that we delete dataframes to free up memory
-                del df_to_join
-
-            # Move "datetime" column to front
-            current_cols = entity_df_with_features.columns.tolist()
-            current_cols.remove(ENTITY_DF_EVENT_TIMESTAMP_COL)
-            entity_df_with_features = entity_df_with_features[
-                [ENTITY_DF_EVENT_TIMESTAMP_COL] + current_cols
-            ]
-
-            return entity_df_with_features
-
-        job = FileRetrievalJob(evaluation_function=evaluate_historical_retrieval)
-        return job
 
 
 def _table_id(project: str, table: Union[FeatureTable, FeatureView]) -> str:
diff --git a/sdk/python/feast/infra/offline_stores/__init__.py b/sdk/python/feast/infra/offline_stores/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py
new file mode 100644
index 0000000000..499bf0ed91
--- /dev/null
+++ b/sdk/python/feast/infra/offline_stores/bigquery.py
@@ -0,0 +1,332 @@
+import time
+from dataclasses import asdict, dataclass
+from datetime import datetime, timedelta
+from typing import Dict, List, Optional, Union
+
+import pandas
+import pyarrow
+from google.cloud import bigquery
+from jinja2 import BaseLoader, Environment
+
+from feast.data_source import BigQuerySource, DataSource
+from feast.feature_view import FeatureView
+from feast.infra.offline_stores.offline_store import OfflineStore
+from feast.infra.provider import (
+    RetrievalJob,
+    _get_requested_feature_views_to_features_dict,
+)
+from feast.repo_config import RepoConfig
+
+
+class BigQueryOfflineStore(OfflineStore):
+    @staticmethod
+    def pull_latest_from_table_or_query(
+        data_source: DataSource,
+        join_key_columns: List[str],
+        feature_name_columns: List[str],
+        event_timestamp_column: str,
+        created_timestamp_column: Optional[str],
+        start_date: datetime,
+        end_date: datetime,
+    ) -> pyarrow.Table:
+        assert isinstance(data_source, BigQuerySource)
+        from_expression = data_source.get_table_query_string()
+
+        partition_by_join_key_string = ", ".join(join_key_columns)
+        if partition_by_join_key_string != "":
+            partition_by_join_key_string = (
+                "PARTITION BY " + partition_by_join_key_string
+            )
+        timestamps = [event_timestamp_column]
+        if created_timestamp_column is not None:
+            timestamps.append(created_timestamp_column)
+        timestamp_desc_string = " DESC, ".join(timestamps) + " DESC"
+        field_string = ", ".join(join_key_columns + feature_name_columns + timestamps)
+
+        query = f"""
+            SELECT {field_string}
+            FROM (
+                SELECT {field_string},
+                ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row
+                FROM {from_expression}
+                WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}')
+            )
+            WHERE _feast_row = 1
+            """
+
+        return BigQueryOfflineStore._pull_query(query)
+
+    @staticmethod
+    def _pull_query(query: str) -> pyarrow.Table:
+        from google.cloud import bigquery
+
+        client = bigquery.Client()
+        query_job = client.query(query)
+        return query_job.to_arrow()
+
+    @staticmethod
+    def get_historical_features(
+        config: RepoConfig,
+        feature_views: List[FeatureView],
+        feature_refs: List[str],
+        entity_df: Union[pandas.DataFrame, str],
+    ) -> RetrievalJob:
+        # TODO: Add entity_df validation in order to fail before interacting with BigQuery
+
+        if type(entity_df) is str:
+            entity_df_sql_table = f"({entity_df})"
+        elif isinstance(entity_df, pandas.DataFrame):
+            table_id = _upload_entity_df_into_bigquery(config.project, entity_df)
+            entity_df_sql_table = f"`{table_id}`"
+        else:
+            raise ValueError(
+                f"The entity dataframe you have provided must be a Pandas DataFrame or BigQuery SQL query, "
+                f"but we found: {type(entity_df)} "
+            )
+
+        # Build a query context containing all information required to template the BigQuery SQL query
+        query_context = get_feature_view_query_context(feature_refs, feature_views)
+
+        # TODO: Infer min_timestamp and max_timestamp from entity_df
+        # Generate the BigQuery SQL query from the query context
+        query = build_point_in_time_query(
+            query_context,
+            min_timestamp=datetime.now() - timedelta(days=365),
+            max_timestamp=datetime.now() + timedelta(days=1),
+            left_table_query_string=entity_df_sql_table,
+        )
+        job = BigQueryRetrievalJob(query=query)
+        return job
+
+
+class BigQueryRetrievalJob(RetrievalJob):
+    def __init__(self, query):
+        self.query = query
+
+    def to_df(self):
+        # TODO: Ideally only start this job when the user runs "get_historical_features", not when they run to_df()
+        client = bigquery.Client()
+        df = client.query(self.query).to_dataframe(create_bqstorage_client=True)
+        return df
+
+
+@dataclass(frozen=True)
+class FeatureViewQueryContext:
+    """Context object used to template a BigQuery point-in-time SQL query"""
+
+    name: str
+    ttl: int
+    entities: List[str]
+    features: List[str]  # feature reference format
+    table_ref: str
+    event_timestamp_column: str
+    created_timestamp_column: str
+    field_mapping: Dict[str, str]
+    query: str
+    table_subquery: str
+
+
+def _upload_entity_df_into_bigquery(project, entity_df) -> str:
+    """Uploads a Pandas entity dataframe into a BigQuery table and returns a reference to the resulting table"""
+    client = bigquery.Client()
+
+    # First create the BigQuery dataset if it doesn't exist
+    dataset = bigquery.Dataset(f"{client.project}.feast_{project}")
+    dataset.location = "US"
+    client.create_dataset(
+        dataset, exists_ok=True
+    )  # TODO: Consider moving this to apply or BigQueryOfflineStore
+
+    # Drop the index so that we dont have unnecessary columns
+    entity_df.reset_index(drop=True, inplace=True)
+
+    # Upload the dataframe into BigQuery, creating a temporary table
+    job_config = bigquery.LoadJobConfig()
+    table_id = f"{client.project}.feast_{project}.entity_df_{int(time.time())}"
+    job = client.load_table_from_dataframe(entity_df, table_id, job_config=job_config,)
+    job.result()
+
+    # Ensure that the table expires after some time
+    table = client.get_table(table=table_id)
+    table.expires = datetime.utcnow() + timedelta(minutes=30)
+    client.update_table(table, ["expires"])
+
+    return table_id
+
+
+def get_feature_view_query_context(
+    feature_refs: List[str], feature_views: List[FeatureView]
+) -> List[FeatureViewQueryContext]:
+    """Build a query context containing all information required to template a BigQuery point-in-time SQL query"""
+
+    feature_views_to_feature_map = _get_requested_feature_views_to_features_dict(
+        feature_refs, feature_views
+    )
+
+    query_context = []
+    for feature_view, features in feature_views_to_feature_map.items():
+        entity_names = [entity for entity in feature_view.entities]
+
+        if isinstance(feature_view.ttl, timedelta):
+            ttl_seconds = int(feature_view.ttl.total_seconds())
+        else:
+            ttl_seconds = 0
+
+        assert isinstance(feature_view.input, BigQuerySource)
+
+        context = FeatureViewQueryContext(
+            name=feature_view.name,
+            ttl=ttl_seconds,
+            entities=entity_names,
+            features=features,
+            table_ref=feature_view.input.table_ref,
+            event_timestamp_column=feature_view.input.event_timestamp_column,
+            created_timestamp_column=feature_view.input.created_timestamp_column,
+            # TODO: Make created column optional and not hardcoded
+            field_mapping=feature_view.input.field_mapping,
+            query=feature_view.input.query,
+            table_subquery=feature_view.input.get_table_query_string(),
+        )
+        query_context.append(context)
+    return query_context
+
+
+def build_point_in_time_query(
+    feature_view_query_contexts: List[FeatureViewQueryContext],
+    min_timestamp: datetime,
+    max_timestamp: datetime,
+    left_table_query_string: str,
+):
+    """Build point-in-time query between each feature view table and the entity dataframe"""
+    template = Environment(loader=BaseLoader()).from_string(
+        source=SINGLE_FEATURE_VIEW_POINT_IN_TIME_JOIN
+    )
+
+    # Add additional fields to dict
+    template_context = {
+        "min_timestamp": min_timestamp,
+        "max_timestamp": max_timestamp,
+        "left_table_query_string": left_table_query_string,
+        "featureviews": [asdict(context) for context in feature_view_query_contexts],
+    }
+
+    query = template.render(template_context)
+    return query
+
+
+# TODO: Optimizations
+#   * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly
+#   * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe
+#   * Create temporary tables instead of keeping all tables in memory
+
+SINGLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """
+WITH entity_dataframe AS (
+    SELECT ROW_NUMBER() OVER() AS row_number, edf.* FROM {{ left_table_query_string }} as edf
+),
+{% for featureview in featureviews %}
+/*
+ This query template performs the point-in-time correctness join for a single feature set table
+ to the provided entity table.
+ 1. Concatenate the timestamp and entities from the feature set table with the entity dataset.
+ Feature values are joined to this table later for improved efficiency.
+ featureview_timestamp is equal to null in rows from the entity dataset.
+ */
+{{ featureview.name }}__union_features AS (
+SELECT
+  -- unique identifier for each row in the entity dataset.
+  row_number,
+  -- event_timestamp contains the timestamps to join onto
+  event_timestamp,
+  -- the feature_timestamp, i.e. the latest occurrence of the requested feature relative to the entity_dataset timestamp
+  NULL as {{ featureview.name }}_feature_timestamp,
+  -- created timestamp of the feature at the corresponding feature_timestamp
+  NULL as created_timestamp,
+  -- select only entities belonging to this feature set
+  {{ featureview.entities | join(', ')}},
+  -- boolean for filtering the dataset later
+  true AS is_entity_table
+FROM entity_dataframe
+UNION ALL
+SELECT
+  NULL as row_number,
+  {{ featureview.event_timestamp_column }} as event_timestamp,
+  {{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp,
+  {{ featureview.created_timestamp_column }} as created_timestamp,
+  {{ featureview.entities | join(', ')}},
+  false AS is_entity_table
+FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ max_timestamp }}'
+{% if featureview.ttl == 0 %}{% else %}AND {{ featureview.event_timestamp_column }} >= Timestamp_sub(TIMESTAMP '{{ min_timestamp }}', interval {{ featureview.ttl }} second){% endif %}
+),
+/*
+ 2. Window the data in the unioned dataset, partitioning by entity and ordering by event_timestamp, as
+ well as is_entity_table.
+ Within each window, back-fill the feature_timestamp - as a result of this, the null feature_timestamps
+ in the rows from the entity table should now contain the latest timestamps relative to the row's
+ event_timestamp.
+ For rows where event_timestamp(provided datetime) - feature_timestamp > max age, set the
+ feature_timestamp to null.
+ */
+{{ featureview.name }}__joined AS (
+SELECT
+  row_number,
+  event_timestamp,
+  {{ featureview.entities | join(', ')}},
+  {% for feature in featureview.features %}
+  IF(event_timestamp >= {{ featureview.name }}_feature_timestamp {% if featureview.ttl == 0 %}{% else %}AND Timestamp_sub(event_timestamp, interval {{ featureview.ttl }} second) < {{ featureview.name }}_feature_timestamp{% endif %}, {{ featureview.name }}__{{ feature }}, NULL) as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
+  {% endfor %}
+FROM (
+SELECT
+  row_number,
+  event_timestamp,
+  {{ featureview.entities | join(', ')}},
+  FIRST_VALUE(created_timestamp IGNORE NULLS) over w AS created_timestamp,
+  FIRST_VALUE({{ featureview.name }}_feature_timestamp IGNORE NULLS) over w AS {{ featureview.name }}_feature_timestamp,
+  is_entity_table
+FROM {{ featureview.name }}__union_features
+WINDOW w AS (PARTITION BY {{ featureview.entities | join(', ') }} ORDER BY event_timestamp DESC, is_entity_table DESC, created_timestamp DESC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
+)
+/*
+ 3. Select only the rows from the entity table, and join the features from the original feature set table
+ to the dataset using the entity values, feature_timestamp, and created_timestamps.
+ */
+LEFT JOIN (
+SELECT
+  {{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp,
+  {{ featureview.created_timestamp_column }} as created_timestamp,
+  {{ featureview.entities | join(', ')}},
+  {% for feature in featureview.features %}
+  {{ feature }} as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
+  {% endfor %}
+FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ max_timestamp }}'
+{% if featureview.ttl == 0 %}{% else %}AND {{ featureview.event_timestamp_column }} >= Timestamp_sub(TIMESTAMP '{{ min_timestamp }}', interval {{ featureview.ttl }} second){% endif %}
+) USING ({{ featureview.name }}_feature_timestamp, created_timestamp, {{ featureview.entities | join(', ')}})
+WHERE is_entity_table
+),
+/*
+ 4. Finally, deduplicate the rows by selecting the first occurrence of each entity table row_number.
+ */
+{{ featureview.name }}__deduped AS (SELECT
+  k.*
+FROM (
+  SELECT ARRAY_AGG(row LIMIT 1)[OFFSET(0)] k
+  FROM {{ featureview.name }}__joined row
+  GROUP BY row_number
+)){% if loop.last %}{% else %}, {% endif %}
+
+{% endfor %}
+/*
+ Joins the outputs of multiple time travel joins to a single table.
+ */
+SELECT edf.event_timestamp as event_timestamp, * EXCEPT (row_number, event_timestamp) FROM entity_dataframe edf
+{% for featureview in featureviews %}
+LEFT JOIN (
+    SELECT
+    row_number,
+    {% for feature in featureview.features %}
+    {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
+    {% endfor %}
+    FROM {{ featureview.name }}__deduped
+) USING (row_number)
+{% endfor %}
+ORDER BY event_timestamp
+"""
diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py
new file mode 100644
index 0000000000..469f6dc0f8
--- /dev/null
+++ b/sdk/python/feast/infra/offline_stores/file.py
@@ -0,0 +1,176 @@
+from datetime import datetime
+from typing import Callable, List, Optional, Union
+
+import pandas as pd
+import pyarrow
+import pytz
+
+from feast.data_source import DataSource, FileSource
+from feast.feature_view import FeatureView
+from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
+from feast.infra.provider import (
+    ENTITY_DF_EVENT_TIMESTAMP_COL,
+    _get_requested_feature_views_to_features_dict,
+)
+from feast.repo_config import RepoConfig
+
+
+class FileRetrievalJob(RetrievalJob):
+    def __init__(self, evaluation_function: Callable):
+        """Initialize a lazy historical retrieval job"""
+
+        # The evaluation function executes a stored procedure to compute a historical retrieval.
+        self.evaluation_function = evaluation_function
+
+    def to_df(self):
+        # Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
+        df = self.evaluation_function()
+        return df
+
+
+class FileOfflineStore(OfflineStore):
+    @staticmethod
+    def get_historical_features(
+        config: RepoConfig,
+        feature_views: List[FeatureView],
+        feature_refs: List[str],
+        entity_df: Union[pd.DataFrame, str],
+    ) -> FileRetrievalJob:
+        if not isinstance(entity_df, pd.DataFrame):
+            raise ValueError(
+                f"Please provide an entity_df of type {type(pd.DataFrame)} instead of type {type(entity_df)}"
+            )
+
+        feature_views_to_features = _get_requested_feature_views_to_features_dict(
+            feature_refs, feature_views
+        )
+
+        # Create lazy function that is only called from the RetrievalJob object
+        def evaluate_historical_retrieval():
+
+            # Sort entity dataframe prior to join, and create a copy to prevent modifying the original
+            entity_df_with_features = entity_df.sort_values(
+                ENTITY_DF_EVENT_TIMESTAMP_COL
+            ).copy()
+
+            # Load feature view data from sources and join them incrementally
+            for feature_view, features in feature_views_to_features.items():
+                event_timestamp_column = feature_view.input.event_timestamp_column
+                created_timestamp_column = feature_view.input.created_timestamp_column
+
+                # Read dataframe to join to entity dataframe
+                df_to_join = pd.read_parquet(feature_view.input.path).sort_values(
+                    event_timestamp_column
+                )
+
+                # Build a list of all the features we should select from this source
+                feature_names = []
+                for feature in features:
+                    # Modify the separator for feature refs in column names to double underscore. We are using
+                    # double underscore as separator for consistency with other databases like BigQuery,
+                    # where there are very few characters available for use as separators
+                    prefixed_feature_name = f"{feature_view.name}__{feature}"
+
+                    # Add the feature name to the list of columns
+                    feature_names.append(prefixed_feature_name)
+
+                    # Ensure that the source dataframe feature column includes the feature view name as a prefix
+                    df_to_join.rename(
+                        columns={feature: prefixed_feature_name}, inplace=True,
+                    )
+
+                # Build a list of entity columns to join on (from the right table)
+                right_entity_columns = [entity for entity in feature_view.entities]
+                right_entity_key_columns = [
+                    event_timestamp_column
+                ] + right_entity_columns
+
+                # Remove all duplicate entity keys (using created timestamp)
+                right_entity_key_sort_columns = right_entity_key_columns
+                if created_timestamp_column:
+                    # If created_timestamp is available, use it to dedupe deterministically
+                    right_entity_key_sort_columns = right_entity_key_sort_columns + [
+                        created_timestamp_column
+                    ]
+
+                df_to_join.sort_values(by=right_entity_key_sort_columns, inplace=True)
+                df_to_join = df_to_join.groupby(by=right_entity_key_columns).last()
+                df_to_join.reset_index(inplace=True)
+
+                # Select only the columns we need to join from the feature dataframe
+                df_to_join = df_to_join[right_entity_key_columns + feature_names]
+
+                # Do point in-time-join between entity_df and feature dataframe
+                entity_df_with_features = pd.merge_asof(
+                    entity_df_with_features,
+                    df_to_join,
+                    left_on=ENTITY_DF_EVENT_TIMESTAMP_COL,
+                    right_on=event_timestamp_column,
+                    by=right_entity_columns,
+                    tolerance=feature_view.ttl,
+                )
+
+                # Remove right (feature table/view) event_timestamp column.
+                if event_timestamp_column != ENTITY_DF_EVENT_TIMESTAMP_COL:
+                    entity_df_with_features.drop(
+                        columns=[event_timestamp_column], inplace=True
+                    )
+
+                # Ensure that we delete dataframes to free up memory
+                del df_to_join
+
+            # Move "datetime" column to front
+            current_cols = entity_df_with_features.columns.tolist()
+            current_cols.remove(ENTITY_DF_EVENT_TIMESTAMP_COL)
+            entity_df_with_features = entity_df_with_features[
+                [ENTITY_DF_EVENT_TIMESTAMP_COL] + current_cols
+            ]
+
+            return entity_df_with_features
+
+        job = FileRetrievalJob(evaluation_function=evaluate_historical_retrieval)
+        return job
+
+    @staticmethod
+    def pull_latest_from_table_or_query(
+        data_source: DataSource,
+        join_key_columns: List[str],
+        feature_name_columns: List[str],
+        event_timestamp_column: str,
+        created_timestamp_column: Optional[str],
+        start_date: datetime,
+        end_date: datetime,
+    ) -> pyarrow.Table:
+        assert isinstance(data_source, FileSource)
+
+        source_df = pd.read_parquet(data_source.path)
+        # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC
+        source_df[event_timestamp_column] = source_df[event_timestamp_column].apply(
+            lambda x: x if x.tz is not None else x.replace(tzinfo=pytz.utc)
+        )
+        source_df[created_timestamp_column] = source_df[created_timestamp_column].apply(
+            lambda x: x if x.tz is not None else x.replace(tzinfo=pytz.utc)
+        )
+
+        ts_columns = (
+            [event_timestamp_column, created_timestamp_column]
+            if created_timestamp_column is not None
+            else [event_timestamp_column]
+        )
+
+        source_df.sort_values(by=ts_columns, inplace=True)
+
+        filtered_df = source_df[
+            (source_df[event_timestamp_column] >= start_date)
+            & (source_df[event_timestamp_column] < end_date)
+        ]
+        last_values_df = filtered_df.groupby(by=join_key_columns).last()
+
+        # make driver_id a normal column again
+        last_values_df.reset_index(inplace=True)
+
+        table = pyarrow.Table.from_pandas(
+            last_values_df[join_key_columns + feature_name_columns + ts_columns]
+        )
+
+        return table
diff --git a/sdk/python/feast/infra/offline_stores/helpers.py b/sdk/python/feast/infra/offline_stores/helpers.py
new file mode 100644
index 0000000000..24188d0ca5
--- /dev/null
+++ b/sdk/python/feast/infra/offline_stores/helpers.py
@@ -0,0 +1,26 @@
+from typing import List
+
+from feast.data_source import BigQuerySource, DataSource, FileSource
+from feast.infra.offline_stores.bigquery import BigQueryOfflineStore
+from feast.infra.offline_stores.file import FileOfflineStore
+from feast.infra.offline_stores.offline_store import OfflineStore
+
+
+def get_offline_store_from_sources(sources: List[DataSource]) -> OfflineStore:
+    """Detect which offline store should be used for retrieving historical features"""
+
+    source_types = [type(source) for source in sources]
+
+    # Retrieve features from ParquetOfflineStore
+    if all(source == FileSource for source in source_types):
+        return FileOfflineStore()
+
+    # Retrieve features from BigQueryOfflineStore
+    if all(source == BigQuerySource for source in source_types):
+        return BigQueryOfflineStore()
+
+    # Could not map inputs to an OfflineStore implementation
+    raise NotImplementedError(
+        "Unsupported combination of feature view input source types. Please ensure that all source types are "
+        "consistent and available in the same offline store."
+    )
diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py
new file mode 100644
index 0000000000..76c7301f86
--- /dev/null
+++ b/sdk/python/feast/infra/offline_stores/offline_store.py
@@ -0,0 +1,67 @@
+# Copyright 2019 The Feast Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from abc import ABC, abstractmethod
+from datetime import datetime
+from typing import List, Optional, Union
+
+import pandas as pd
+import pyarrow
+
+from feast.data_source import DataSource
+from feast.feature_view import FeatureView
+from feast.repo_config import RepoConfig
+
+
+class RetrievalJob(ABC):
+    """RetrievalJob is used to manage the execution of a historical feature retrieval"""
+
+    @abstractmethod
+    def to_df(self):
+        """Return dataset as Pandas DataFrame synchronously"""
+        pass
+
+
+class OfflineStore(ABC):
+    """
+    OfflineStore is an object used for all interaction between Feast and the service used for offline storage of
+    features. Currently BigQuery is supported.
+    """
+
+    @staticmethod
+    @abstractmethod
+    def pull_latest_from_table_or_query(
+        data_source: DataSource,
+        join_key_columns: List[str],
+        feature_name_columns: List[str],
+        event_timestamp_column: str,
+        created_timestamp_column: Optional[str],
+        start_date: datetime,
+        end_date: datetime,
+    ) -> pyarrow.Table:
+        """
+        Note that join_key_columns, feature_name_columns, event_timestamp_column, and created_timestamp_column
+        have all already been mapped to column names of the source table and those column names are the values passed
+        into this function.
+        """
+        pass
+
+    @staticmethod
+    @abstractmethod
+    def get_historical_features(
+        config: RepoConfig,
+        feature_views: List[FeatureView],
+        feature_refs: List[str],
+        entity_df: Union[pd.DataFrame, str],
+    ) -> RetrievalJob:
+        pass
diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py
index d20d4b9e84..0f2997123f 100644
--- a/sdk/python/feast/infra/provider.py
+++ b/sdk/python/feast/infra/provider.py
@@ -8,6 +8,7 @@
 from feast.entity import Entity
 from feast.feature_table import FeatureTable
 from feast.feature_view import FeatureView
+from feast.infra.offline_stores.offline_store import RetrievalJob
 from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
 from feast.protos.feast.types.Value_pb2 import Value as ValueProto
 from feast.registry import Registry
@@ -17,15 +18,6 @@
 ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp"
 
 
-class RetrievalJob(abc.ABC):
-    """RetrievalJob is used to manage the execution of a historical feature retrieval"""
-
-    @abc.abstractmethod
-    def to_df(self):
-        """Return dataset as Pandas DataFrame synchronously"""
-        pass
-
-
 class Provider(abc.ABC):
     @abc.abstractmethod
     def update_infra(
@@ -131,15 +123,17 @@ def online_read(
 
 def get_provider(config: RepoConfig) -> Provider:
     if config.provider == "gcp":
-        from feast.infra.gcp import Gcp
+        from feast.infra.gcp import GcpProvider
 
-        return Gcp(config.online_store.datastore if config.online_store else None)
+        return GcpProvider(
+            config.online_store.datastore if config.online_store else None
+        )
     elif config.provider == "local":
-        from feast.infra.local_sqlite import LocalSqlite
+        from feast.infra.local import LocalProvider
 
         assert config.online_store is not None
         assert config.online_store.local is not None
-        return LocalSqlite(config.online_store.local)
+        return LocalProvider(config.online_store.local)
     else:
         raise ValueError(config)
 
diff --git a/sdk/python/tests/test_historical_retrieval.py b/sdk/python/tests/test_historical_retrieval.py
index ac4856e3fe..64735b56b9 100644
--- a/sdk/python/tests/test_historical_retrieval.py
+++ b/sdk/python/tests/test_historical_retrieval.py
@@ -1,4 +1,6 @@
 import os
+import random
+import string
 import time
 from datetime import datetime, timedelta
 from tempfile import TemporaryDirectory
@@ -278,7 +280,10 @@ def test_historical_features_from_parquet_sources():
 
 
 @pytest.mark.integration
-def test_historical_features_from_bigquery_sources():
+@pytest.mark.parametrize(
+    "provider_type", ["local", "gcp"],
+)
+def test_historical_features_from_bigquery_sources(provider_type):
     start_date = datetime.now().replace(microsecond=0, second=0, minute=0)
     (
         customer_entities,
@@ -329,18 +334,32 @@ def test_historical_features_from_bigquery_sources():
         driver = Entity(name="driver", value_type=ValueType.INT64)
         customer = Entity(name="customer", value_type=ValueType.INT64)
 
-        store = FeatureStore(
-            config=RepoConfig(
-                registry=os.path.join(temp_dir, "registry.db"),
-                project="default",
-                provider="gcp",
-                online_store=OnlineStoreConfig(
-                    local=LocalOnlineStoreConfig(
-                        path=os.path.join(temp_dir, "online_store.db"),
-                    )
-                ),
+        if provider_type == "local":
+            store = FeatureStore(
+                config=RepoConfig(
+                    registry=os.path.join(temp_dir, "registry.db"),
+                    project="default",
+                    provider="local",
+                    online_store=OnlineStoreConfig(
+                        local=LocalOnlineStoreConfig(
+                            path=os.path.join(temp_dir, "online_store.db"),
+                        )
+                    ),
+                )
             )
-        )
+        elif provider_type == "gcp":
+            store = FeatureStore(
+                config=RepoConfig(
+                    registry=os.path.join(temp_dir, "registry.db"),
+                    project="".join(
+                        random.choices(string.ascii_uppercase + string.digits, k=10)
+                    ),
+                    provider="gcp",
+                )
+            )
+        else:
+            raise Exception("Invalid provider used as part of test configuration")
+
         store.apply([driver, customer, driver_fv, customer_fv])
 
         expected_df = get_expected_training_df(