Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AUTO-3289] General improvements, add DBConnection adapter #32

Merged
merged 10 commits into from
Jan 18, 2022
9 changes: 9 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,12 @@ config :snowflex, Snowflex.ConnectionTest.SnowflakeConnection,
min: 1,
max: 1
]

config :snowflex, Snowflex.DBConnectionTest.SnowflakeDBConnection,
worker: Snowflex.DBConnectionTest.MockWorker,
pool_size: 3,
connection: [
server: "snowflex.us-east-8.snowflakecomputing.com",
role: "DEV",
warehouse: "CUSTOMER_DEV_WH"
]
67 changes: 67 additions & 0 deletions lib/snowflex/db_connection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
defmodule Snowflex.DBConnection do
@moduledoc """
Defines a Snowflake connection with DBConnection adapter.

## Definition

When used, the connection expects the `:otp_app` option. You may also define a standard timeout.
This will default to 60 seconds.

```
defmodule SnowflakeDBConnection do
use Snowflex.DBConnection,
otp_app: :my_app,
timeout: :timer.seconds(60)
end
```
"""

alias Snowflex.DBConnection.{
Protocol,
Query
}

@doc false
defmacro __using__(opts) do
quote bind_quoted: [opts: opts] do
# setup compile time config
otp_app = Keyword.fetch!(opts, :otp_app)
timeout = Keyword.get(opts, :timeout, :timer.seconds(60))

@otp_app otp_app
@timeout timeout
@name __MODULE__

def child_spec(_) do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we ignore child spec input options?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opts should be provided statically when configuring your connection module with use Snowflake.DBConnection, opt1: _, opt2: _, .... Ignoring the opts in child_spec/1 ensures that the static config in the use macro is always correct, and not accidentally overridden in the supervision tree. I like this approach 👍

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, thanks

config = Application.get_env(@otp_app, __MODULE__, [])
connection = Keyword.get(config, :connection, [])

opts =
Keyword.merge(config,
timeout: @timeout,
connection: connection
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't always override timeout from config with a compile-time @timeout ?


DBConnection.child_spec(Protocol, opts)
end

def start_link(opts) do
opts = Keyword.put(opts, :name, @name)

DBConnection.start_link(Protocol, opts)
end

def execute(statement, params \\ []) when is_binary(statement) and is_list(params) do
case prepare_execute("", statement, params) do
{:ok, _query, result} -> {:ok, result}
{:error, error} -> {:error, error}
end
end

defp prepare_execute(name, statement, params, opts \\ []) do
query = %Query{name: name, statement: statement}
DBConnection.prepare_execute(@name, query, params, opts)
end
end
end
end
30 changes: 30 additions & 0 deletions lib/snowflex/db_connection/error.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule Snowflex.DBConnection.Error do
@moduledoc """
Defines an error returned from the ODBC adapter.
"""

defexception [:message]

@type t :: %__MODULE__{
message: String.t()
}

@spec exception(term()) :: t()
def exception({odbc_code, native_code, reason}) do
message =
to_string(reason) <>
" - ODBC_CODE: " <>
to_string(odbc_code) <>
" - SNOWFLAKE_CODE: " <> to_string(native_code)

%__MODULE__{
message: message
}
end

def exception(message) do
%__MODULE__{
message: to_string(message)
}
end
end
151 changes: 151 additions & 0 deletions lib/snowflex/db_connection/protocol.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
defmodule Snowflex.DBConnection.Protocol do
use DBConnection

require Logger

alias Snowflex.DBConnection.{
Query,
Result,
Server
}

defstruct pid: nil, status: :idle, conn_opts: [], worker: Server

@type state :: %__MODULE__{
pid: pid(),
status: :idle,
conn_opts: Keyword.t()
}

## DBConnection Callbacks

@impl DBConnection
def connect(opts) do
connection_args = Keyword.fetch!(opts, :connection)
worker = Keyword.get(opts, :worker, Server)

{:ok, pid} = worker.start_link(opts)

state = %__MODULE__{
pid: pid,
status: :idle,
conn_opts: connection_args,
worker: worker
}

{:ok, state}
end

@impl DBConnection
def disconnect(_err, %{pid: pid}), do: Server.disconnect(pid)

@impl DBConnection
def checkout(state), do: {:ok, state}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there should be any sort of limit on the maximum number of connections that can be checked out at once? Maybe it doesn't matter 🤷


@impl DBConnection
def ping(state) do
query = %Query{name: "ping", statement: "SELECT /* snowflex:heartbeat */ 1;"}

case do_query(query, [], [], state) do
{:ok, _, _, new_state} -> {:ok, new_state}
{:error, reason, new_state} -> {:disconnect, reason, new_state}
end
end

@impl DBConnection
def handle_prepare(query, _opts, state) do
{:ok, query, state}
end

@impl DBConnection
def handle_execute(query, params, opts, state) do
do_query(query, params, opts, state)
end

@impl DBConnection
def handle_status(_, %{status: {status, _}} = state), do: {status, state}
def handle_status(_, %{status: status} = state), do: {status, state}

@impl DBConnection
def handle_close(_query, _opts, state) do
{:ok, %Result{}, state}
end

## Not implemented Callbacks

@impl DBConnection
def handle_begin(_opts, _state) do
throw("not implemented")
end

@impl DBConnection
def handle_commit(_opts, _state) do
throw("not implemented")
end

@impl DBConnection
def handle_rollback(_opts, _state) do
throw("not implemented")
end

@impl DBConnection
def handle_declare(_query, _params, _opts, _state) do
throw("not implemeted")
end

@impl DBConnection
def handle_deallocate(_query, _cursor, _opts, _state) do
throw("not implemeted")
end

@impl DBConnection
def handle_fetch(_query, _cursor, _opts, _state) do
throw("not implemeted")
end

## Helpers

defp do_query(%Query{} = query, [], opts, %{worker: worker} = state) do
case worker.sql_query(state.pid, query.statement, opts) do
{:ok, {:selected, columns, rows}} ->
result = parse_result(columns, rows)
{:ok, query, result, state}

{:ok, {:selected, columns, rows, _}} ->
result = parse_result(columns, rows)
{:ok, query, result, state}

{:ok, result} ->
{:ok, query, result, state}

{:error, reason} ->
{:error, reason, state}
end
end

defp do_query(%Query{} = query, params, opts, %{worker: worker} = state) do
case worker.param_query(state.pid, query.statement, params, opts) do
{:ok, {:selected, columns, rows}} ->
result = parse_result(columns, rows)
{:ok, query, result, state}

{:ok, {:selected, columns, rows, _}} ->
result = parse_result(columns, rows)
{:ok, query, result, state}

{:ok, result} ->
{:ok, query, result, state}

{:error, reason} ->
{:error, reason, state}
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could pull this case out into its own function so you don't have to duplicate it between sql_query and param_query.

end

defp parse_result(columns, rows) do
%Result{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should %Result structs always include statement? If not, it'd be nice to add something to the docs describing when it will/won't be set.

columns: Enum.map(columns, &to_string(&1)),
rows: rows,
num_rows: Enum.count(rows)
}
end
end
30 changes: 30 additions & 0 deletions lib/snowflex/db_connection/query.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule Snowflex.DBConnection.Query do
defstruct [
:ref,
:name,
:statement,
:columns,
:result_oids,
cache: :reference
]

defimpl DBConnection.Query do
def parse(query, _opts), do: query
def describe(query, _opts), do: query
def encode(_query, params, _opts), do: params
def decode(_query, [], _opts), do: []
def decode(_query, result, _opts), do: result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can be combined

Suggested change
def decode(_query, [], _opts), do: []
def decode(_query, result, _opts), do: result
def decode(_query, result, _opts), do: result

end

defimpl String.Chars do
alias Snowflex.DBConnection.Query

def to_string(%{statement: statement}) do
case statement do
statement when is_binary(statement) -> IO.iodata_to_binary(statement)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's already a binary, it should be OK to pass through directly.

Suggested change
statement when is_binary(statement) -> IO.iodata_to_binary(statement)
statement when is_binary(statement) -> statement

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Result.t(), statement is typed as String.t() | nil. If it can be iodata(), I would update the typespec to indicate that. A Query.t() typespec would also be a nice addition here.

statement when is_list(statement) -> IO.iodata_to_binary(statement)
%{statement: %Query{} = q} -> String.Chars.to_string(q)
end
end
end
end
19 changes: 19 additions & 0 deletions lib/snowflex/db_connection/result.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Snowflex.DBConnection.Result do
defstruct columns: nil,
rows: nil,
num_rows: 0,
metadata: [],
messages: [],
statement: nil,
success: false

@type t :: %__MODULE__{
columns: [String.t()] | nil,
rows: [tuple()] | nil,
num_rows: integer(),
metadata: [map()],
messages: [map()],
statement: String.t() | nil,
success: boolean()
}
end
Loading