Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(robot-server): Store Pydantic objects as JSON instead of pickles, take 2 #14355

Merged
merged 4 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 62 additions & 12 deletions robot-server/robot_server/persistence/_migrations/up_to_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,30 @@

Summary of changes from schema 2:

- Run commands were formerly stored as monolithic blobs in the run table,
- Run commands were formerly stored as monolithic blobs in the `run` table,
with each row storing an entire list. This has been split out into a new
run_command table, where each individual command gets its own row.
`run_command` table, where each individual command gets its own row.

- All columns that were storing binary pickles have been converted to storing
JSON strings:
- `analysis.completed_analysis`
- `run.state_summary`
- `run_commands.command` (formerly `run.commands`; see above)

- `analysis.completed_analysis_as_document` has been removed,
since the updated `analysis.completed_analysis` (see above) replaces it.
"""


from contextlib import ExitStack
from pathlib import Path
from typing import Any, Dict, List
from typing import Any, Dict, Iterable, List

from opentrons.protocol_engine import Command, StateSummary
import pydantic
import sqlalchemy

from ..pydantic import pydantic_to_json
from .._database import (
create_schema_2_sql_engine,
create_schema_3_sql_engine,
Expand Down Expand Up @@ -64,12 +76,9 @@ def _migrate_db(
order_by_rowid=True,
)

copy_rows_unmodified(
schema_2.analysis_table,
schema_3.analysis_table,
_migrate_analysis_table(
source_transaction,
dest_transaction,
order_by_rowid=True,
)

_migrate_run_table(
Expand All @@ -95,25 +104,40 @@ def _migrate_run_table(
insert_new_command = sqlalchemy.insert(schema_3.run_command_table)

for old_run_row in source_transaction.execute(select_old_runs).all():
old_state_summary = old_run_row.state_summary
new_state_summary = (
None
if old_run_row.state_summary is None
else pydantic_to_json(
pydantic.parse_obj_as(StateSummary, old_state_summary)
)
)
Comment on lines +107 to +114
Copy link
Contributor Author

@SyntaxColoring SyntaxColoring Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a try: ... except: pass around each run's migration to drop it if something goes wrong, instead of making the whole server unavailable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I've decided I should do this. Dropping runs silently is bad, but it's significantly better than failing the boot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't gotten to this yet. We should do it in a follow-up.

dest_transaction.execute(
insert_new_run,
id=old_run_row.id,
created_at=old_run_row.created_at,
protocol_id=old_run_row.protocol_id,
state_summary=old_run_row.state_summary,
state_summary=new_state_summary,
engine_status=old_run_row.engine_status,
_updated_at=old_run_row._updated_at,
)

commands: List[Dict[str, Any]] = old_run_row.commands or []
old_commands: List[Dict[str, Any]] = old_run_row.commands or []
pydantic_old_commands: Iterable[Command] = (
pydantic.parse_obj_as(
Command, # type: ignore[arg-type]
c,
)
for c in old_commands
)
new_command_rows = [
{
"run_id": old_run_row.id,
"index_in_run": index_in_run,
"command_id": command["id"],
"command": command,
"command_id": pydantic_command.id,
"command": pydantic_to_json(pydantic_command),
}
for index_in_run, command in enumerate(commands)
for index_in_run, pydantic_command in enumerate(pydantic_old_commands)
]
# Insert all the commands for this run in one go, to avoid the overhead of
# separate statements, and since we had to bring them all into memory at once
Expand All @@ -123,3 +147,29 @@ def _migrate_run_table(
# SQLAlchemy misinterprets this as inserting a single row with all default
# values.
dest_transaction.execute(insert_new_command, new_command_rows)


def _migrate_analysis_table(
source_connection: sqlalchemy.engine.Connection,
dest_connection: sqlalchemy.engine.Connection,
) -> None:
select_old_analyses = sqlalchemy.select(schema_2.analysis_table).order_by(
sqlite_rowid
)
insert_new_analysis = sqlalchemy.insert(schema_3.analysis_table)
for row in (
# The table is missing an explicit sequence number column, so we need
# sqlite_rowid to retain order across this copy.
source_connection.execute(select_old_analyses).all()
):
dest_connection.execute(
insert_new_analysis,
# The new `completed_analysis` column has the data that used to be in
# `completed_analysis_as_document`. The separate
# `completed_analysis_as_document` column is dropped.
completed_analysis=row.completed_analysis_as_document,
# The remaining columns are unchanged:
id=row.id,
protocol_id=row.protocol_id,
analyzer_version=row.analyzer_version,
)
25 changes: 4 additions & 21 deletions robot-server/robot_server/persistence/_tables/schema_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import sqlalchemy

from robot_server.persistence import legacy_pickle
from robot_server.persistence.pickle_protocol_version import PICKLE_PROTOCOL_VERSION
from robot_server.persistence._utc_datetime import UTCDateTime

metadata = sqlalchemy.MetaData()
Expand Down Expand Up @@ -46,18 +44,9 @@
),
sqlalchemy.Column(
"completed_analysis",
# Stores a pickled dict. See CompletedAnalysisStore.
# TODO(mm, 2023-08-30): Remove this. See https://opentrons.atlassian.net/browse/RSS-98.
sqlalchemy.LargeBinary,
nullable=False,
),
sqlalchemy.Column(
"completed_analysis_as_document",
# Stores the same data as completed_analysis, but serialized as a JSON string.
# Stores a JSON string. See CompletedAnalysisStore.
sqlalchemy.String,
# This column should never be NULL in practice.
# It needs to be nullable=True because of limitations in SQLite and our migration code.
nullable=True,
nullable=False,
),
)

Expand All @@ -83,7 +72,7 @@
# column added in schema v1
sqlalchemy.Column(
"state_summary",
sqlalchemy.PickleType(pickler=legacy_pickle, protocol=PICKLE_PROTOCOL_VERSION),
sqlalchemy.String,
nullable=True,
),
# column added in schema v1
Expand Down Expand Up @@ -119,13 +108,7 @@
),
sqlalchemy.Column("index_in_run", sqlalchemy.Integer, nullable=False),
sqlalchemy.Column("command_id", sqlalchemy.String, nullable=False),
sqlalchemy.Column(
"command",
# TODO(mm, 2024-01-25): This should be JSON instead of a pickle. See:
# https://opentrons.atlassian.net/browse/RSS-98.
sqlalchemy.PickleType(pickler=legacy_pickle, protocol=PICKLE_PROTOCOL_VERSION),
nullable=False,
),
sqlalchemy.Column("command", sqlalchemy.String, nullable=False),
sqlalchemy.Index(
"ix_run_run_id_command_id", # An arbitrary name for the index.
"run_id",
Expand Down
22 changes: 22 additions & 0 deletions robot-server/robot_server/persistence/pydantic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Store Pydantic objects in the SQL database."""

from typing import Type, TypeVar
from pydantic import BaseModel, parse_raw_as


_BaseModelT = TypeVar("_BaseModelT", bound=BaseModel)


def pydantic_to_json(obj: BaseModel) -> str:
"""Serialize a Pydantic object for storing in the SQL database."""
return obj.json(
# by_alias and exclude_none should match how
# FastAPI + Pydantic + our customizations serialize these objects
by_alias=True,
exclude_none=True,
)


def json_to_pydantic(model: Type[_BaseModelT], json: str) -> _BaseModelT:
"""Parse a Pydantic object stored in the SQL database."""
return parse_raw_as(model, json)
54 changes: 11 additions & 43 deletions robot-server/robot_server/protocols/completed_analysis_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@
from __future__ import annotations

import asyncio
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Optional
from logging import getLogger
from dataclasses import dataclass

import sqlalchemy
import anyio

from robot_server.persistence import analysis_table, sqlite_rowid
from robot_server.persistence import legacy_pickle
from robot_server.persistence.pickle_protocol_version import PICKLE_PROTOCOL_VERSION
from robot_server.persistence.pydantic import json_to_pydantic, pydantic_to_json

from .analysis_models import CompletedAnalysis
from .analysis_memcache import MemoryCache
Expand Down Expand Up @@ -43,16 +42,10 @@ async def to_sql_values(self) -> Dict[str, object]:
Avoid calling this from inside a SQL transaction, since it might be slow.
"""

def serialize_completed_analysis() -> Tuple[bytes, str]:
serialized_pickle = _serialize_completed_analysis_to_pickle(
self.completed_analysis
)
serialized_json = _serialize_completed_analysis_to_json(
self.completed_analysis
)
return serialized_pickle, serialized_json
def serialize_completed_analysis() -> str:
return pydantic_to_json(self.completed_analysis)

serialized_pickle, serialized_json = await anyio.to_thread.run_sync(
serialized_json = await anyio.to_thread.run_sync(
serialize_completed_analysis,
# Cancellation may orphan the worker thread,
# but that should be harmless in this case.
Expand All @@ -63,8 +56,7 @@ def serialize_completed_analysis() -> Tuple[bytes, str]:
"id": self.id,
"protocol_id": self.protocol_id,
"analyzer_version": self.analyzer_version,
"completed_analysis": serialized_pickle,
"completed_analysis_as_document": serialized_json,
"completed_analysis": serialized_json,
}

@classmethod
Expand Down Expand Up @@ -93,9 +85,7 @@ async def from_sql_row(
assert isinstance(protocol_id, str)

def parse_completed_analysis() -> CompletedAnalysis:
return CompletedAnalysis.parse_obj(
legacy_pickle.loads(sql_row.completed_analysis)
)
return json_to_pydantic(CompletedAnalysis, sql_row.completed_analysis)

completed_analysis = await anyio.to_thread.run_sync(
parse_completed_analysis,
Expand Down Expand Up @@ -181,21 +171,17 @@ async def get_by_id_as_document(self, analysis_id: str) -> Optional[str]:
This is like `get_by_id()`, except it returns the analysis as a pre-serialized JSON
document.
"""
statement = sqlalchemy.select(
analysis_table.c.completed_analysis_as_document
).where(analysis_table.c.id == analysis_id)
statement = sqlalchemy.select(analysis_table.c.completed_analysis).where(
analysis_table.c.id == analysis_id
)

with self._sql_engine.begin() as transaction:
try:
document: Optional[str] = transaction.execute(statement).scalar_one()
document: str = transaction.execute(statement).scalar_one()
except sqlalchemy.exc.NoResultFound:
# No analysis with this ID.
return None

# Although the completed_analysis_as_document column is nullable,
# our migration code is supposed to ensure that it's never NULL in practice.
assert document is not None

return document

async def get_by_protocol(
Expand Down Expand Up @@ -285,21 +271,3 @@ async def add(self, completed_analysis_resource: CompletedAnalysisResource) -> N
self._memcache.insert(
completed_analysis_resource.id, completed_analysis_resource
)


def _serialize_completed_analysis_to_pickle(
completed_analysis: CompletedAnalysis,
) -> bytes:
return legacy_pickle.dumps(
completed_analysis.dict(), protocol=PICKLE_PROTOCOL_VERSION
)


def _serialize_completed_analysis_to_json(completed_analysis: CompletedAnalysis) -> str:
return completed_analysis.json(
# by_alias and exclude_none should match how
# FastAPI + Pydantic + our customizations serialize these objects
# over the `GET /protocols/:id/analyses/:id` endpoint.
by_alias=True,
exclude_none=True,
)
13 changes: 7 additions & 6 deletions robot-server/robot_server/runs/run_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Dict, List, Optional

import sqlalchemy
from pydantic import parse_obj_as, ValidationError
from pydantic import ValidationError

from opentrons.util.helpers import utc_now
from opentrons.protocol_engine import StateSummary, CommandSlice
Expand All @@ -19,6 +19,7 @@
action_table,
sqlite_rowid,
)
from robot_server.persistence.pydantic import json_to_pydantic, pydantic_to_json
from robot_server.protocols import ProtocolNotFoundError

from .action_models import RunAction, RunActionType
Expand Down Expand Up @@ -114,7 +115,7 @@ def update_run_state(
"run_id": run_id,
"index_in_run": command_index,
"command_id": command.id,
"command": command.dict(),
"command": pydantic_to_json(command),
},
)

Expand Down Expand Up @@ -278,7 +279,7 @@ def get_state_summary(self, run_id: str) -> Optional[StateSummary]:

try:
return (
StateSummary.parse_obj(row.state_summary)
json_to_pydantic(StateSummary, row.state_summary)
if row.state_summary is not None
else None
)
Expand Down Expand Up @@ -336,7 +337,7 @@ def get_commands_slice(
slice_result = transaction.execute(select_slice).all()

sliced_commands: List[Command] = [
parse_obj_as(Command, row.command) # type: ignore[arg-type]
json_to_pydantic(Command, row.command) # type: ignore[arg-type]
for row in slice_result
]

Expand Down Expand Up @@ -374,7 +375,7 @@ def get_command(self, run_id: str, command_id: str) -> Command:
if command is None:
raise CommandNotFoundError(command_id=command_id)

return parse_obj_as(Command, command) # type: ignore[arg-type]
return json_to_pydantic(Command, command) # type: ignore[arg-type]

def remove(self, run_id: str) -> None:
"""Remove a run by its unique identifier.
Expand Down Expand Up @@ -473,7 +474,7 @@ def _convert_state_to_sql_values(
engine_status: str,
) -> Dict[str, object]:
return {
"state_summary": state_summary.dict(),
"state_summary": pydantic_to_json(state_summary),
"engine_status": engine_status,
"_updated_at": utc_now(),
}
7 changes: 3 additions & 4 deletions robot-server/tests/persistence/test_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
id VARCHAR NOT NULL,
protocol_id VARCHAR NOT NULL,
analyzer_version VARCHAR NOT NULL,
completed_analysis BLOB NOT NULL,
completed_analysis_as_document VARCHAR,
completed_analysis VARCHAR NOT NULL,
PRIMARY KEY (id),
FOREIGN KEY(protocol_id) REFERENCES protocol (id)
)
Expand All @@ -52,7 +51,7 @@
id VARCHAR NOT NULL,
created_at DATETIME NOT NULL,
protocol_id VARCHAR,
state_summary BLOB,
state_summary VARCHAR,
engine_status VARCHAR,
_updated_at DATETIME,
PRIMARY KEY (id),
Expand All @@ -75,7 +74,7 @@
run_id VARCHAR NOT NULL,
index_in_run INTEGER NOT NULL,
command_id VARCHAR NOT NULL,
command BLOB NOT NULL,
command VARCHAR NOT NULL,
PRIMARY KEY (row_id),
FOREIGN KEY(run_id) REFERENCES run (id)
)
Expand Down
Loading