Skip to content

Commit

Permalink
Merge pull request #32 from pepsico-ecommerce/AUTO-3289-general-impro…
Browse files Browse the repository at this point in the history
…vements

[AUTO-3289] General improvements, add DBConnection adapter
  • Loading branch information
Ch4s3 authored Jan 18, 2022
2 parents 04511ae + 4747536 commit 193c505
Show file tree
Hide file tree
Showing 15 changed files with 884 additions and 57 deletions.
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,54 @@ def deps do
]
end
```

## DBConnection Support

[DBConnection](https://github.com/elixir-ecto/db_connection) support is currently in experimental phase, setting it up is very similar to current implementation with the expection of configuration options and obtaining the same results will require an extra step:

### Configuration:

Setting a Module to hold the connection is very similar, but instead you'll use `Snowflex.DBConnection`:

Example:

```elixir
defmodule MyApp.SnowflakeConnection do
use Snowflex.DBConnection,
otp_app: :my_app,
timeout: :timer.minutes(5)
end
```

```elixir
config :my_app, MyApp.SnowflakeConnection,
pool_size: 5, # the connection pool size
worker: MyApp.CustomWorker, # defaults to Snowflex.DBConnection.Server
connection: [
role: "PROD",
warehouse: System.get_env("SNOWFLAKE_POS_WH"),
uid: System.get_env("SNOWFLAKE_POS_UID"),
pwd: System.get_env("SNOWFLAKE_POS_PWD")
]
```

### Usage:

After setup, you can use your connection to query:

```elixir
alias Snowflex.DBConnection.Result

{:ok, %Result{} = result} = MyApp.SnowflakeConnection.execute("my query")
{:ok, %Result{} = result} = MyApp.SnowflakeConnection.execute("my query", ["my params"])
```

As you can see we now receive an `{:ok, result}` tuple, to get results as expected with current implementation, we need to call `process_result/1`:

```elixir
alias Snowflex.DBConnection.Result

{:ok, %Result{} = result} = MyApp.SnowflakeConnection.execute("my query")

[%{"col" => 1}, %{"col" => 2}] = SnowflakeDBConnection.process_result(result)
```
9 changes: 9 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,12 @@ config :snowflex, Snowflex.ConnectionTest.SnowflakeConnection,
min: 1,
max: 1
]

config :snowflex, Snowflex.DBConnectionTest.SnowflakeDBConnection,
worker: Snowflex.DBConnectionTest.MockWorker,
pool_size: 3,
connection: [
server: "snowflex.us-east-8.snowflakecomputing.com",
role: "DEV",
warehouse: "CUSTOMER_DEV_WH"
]
65 changes: 65 additions & 0 deletions lib/snowflex/db_connection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
defmodule Snowflex.DBConnection do
@moduledoc """
Defines a Snowflake connection with DBConnection adapter.
## Definition
When used, the connection expects the `:otp_app` option. You may also define a standard timeout.
This will default to 60 seconds.
```
defmodule SnowflakeDBConnection do
use Snowflex.DBConnection,
otp_app: :my_app,
timeout: :timer.seconds(60)
end
```
"""

alias Snowflex.DBConnection.{
Protocol,
Query,
Result
}

@doc false
defmacro __using__(opts) do
quote bind_quoted: [opts: opts] do
# setup compile time config
otp_app = Keyword.fetch!(opts, :otp_app)
timeout = Keyword.get(opts, :timeout, :timer.seconds(60))

@otp_app otp_app
@timeout timeout
@name __MODULE__

def child_spec(_) do
config = Application.get_env(@otp_app, __MODULE__, [])
connection = Keyword.get(config, :connection, [])

opts =
Keyword.merge(config,
name: @name,
timeout: @timeout,
connection: connection
)

DBConnection.child_spec(Protocol, opts)
end

def execute(statement, params \\ []) when is_binary(statement) and is_list(params) do
case prepare_execute("", statement, params) do
{:ok, _query, result} -> {:ok, result}
{:error, error} -> {:error, error}
end
end

defdelegate process_result(result, opts \\ [map_nulls_to_nil?: true]), to: Result

defp prepare_execute(name, statement, params, opts \\ []) do
query = %Query{name: name, statement: statement}
DBConnection.prepare_execute(@name, query, params, opts)
end
end
end
end
30 changes: 30 additions & 0 deletions lib/snowflex/db_connection/error.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule Snowflex.DBConnection.Error do
@moduledoc """
Defines an error returned from the ODBC adapter.
"""

defexception [:message]

@type t :: %__MODULE__{
message: String.t()
}

@spec exception(term()) :: t()
def exception({odbc_code, native_code, reason}) do
message =
to_string(reason) <>
" - ODBC_CODE: " <>
to_string(odbc_code) <>
" - SNOWFLAKE_CODE: " <> to_string(native_code)

%__MODULE__{
message: message
}
end

def exception(message) do
%__MODULE__{
message: to_string(message)
}
end
end
149 changes: 149 additions & 0 deletions lib/snowflex/db_connection/protocol.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
defmodule Snowflex.DBConnection.Protocol do
use DBConnection

require Logger

alias Snowflex.DBConnection.{
Query,
Result,
Server
}

defstruct pid: nil, status: :idle, conn_opts: [], worker: Server

@type state :: %__MODULE__{
pid: pid(),
status: :idle,
conn_opts: Keyword.t(),
worker: Server | any()
}

## DBConnection Callbacks

@impl DBConnection
def connect(opts) do
connection_args = Keyword.fetch!(opts, :connection)
worker = Keyword.get(opts, :worker, Server)

{:ok, pid} = worker.start_link(opts)

state = %__MODULE__{
pid: pid,
status: :idle,
conn_opts: connection_args,
worker: worker
}

{:ok, state}
end

@impl DBConnection
def disconnect(_err, %{pid: pid}), do: Server.disconnect(pid)

@impl DBConnection
def checkout(state), do: {:ok, state}

@impl DBConnection
def ping(state) do
query = %Query{name: "ping", statement: "SELECT /* snowflex:heartbeat */ 1;"}

case do_query(query, [], [], state) do
{:ok, _, _, new_state} -> {:ok, new_state}
{:error, reason, new_state} -> {:disconnect, reason, new_state}
end
end

@impl DBConnection
def handle_prepare(query, _opts, state) do
{:ok, query, state}
end

@impl DBConnection
def handle_execute(query, params, opts, state) do
do_query(query, params, opts, state)
end

@impl DBConnection
def handle_status(_, %{status: {status, _}} = state), do: {status, state}
def handle_status(_, %{status: status} = state), do: {status, state}

@impl DBConnection
def handle_close(_query, _opts, state) do
{:ok, %Result{}, state}
end

## Not implemented Callbacks

@impl DBConnection
def handle_begin(_opts, _state) do
throw("not implemented")
end

@impl DBConnection
def handle_commit(_opts, _state) do
throw("not implemented")
end

@impl DBConnection
def handle_rollback(_opts, _state) do
throw("not implemented")
end

@impl DBConnection
def handle_declare(_query, _params, _opts, _state) do
throw("not implemeted")
end

@impl DBConnection
def handle_deallocate(_query, _cursor, _opts, _state) do
throw("not implemeted")
end

@impl DBConnection
def handle_fetch(_query, _cursor, _opts, _state) do
throw("not implemeted")
end

## Helpers

defp do_query(%Query{} = query, [], opts, %{worker: worker} = state) do
case worker.sql_query(state.pid, query.statement, opts) do
{:ok, result} ->
result = parse_result(result, query)
{:ok, query, result, state}

{:error, reason} ->
{:error, reason, state}
end
end

defp do_query(%Query{} = query, params, opts, %{worker: worker} = state) do
case worker.param_query(state.pid, query.statement, params, opts) do
{:ok, result} ->
result = parse_result(result, query)
{:ok, query, result, state}

{:error, reason} ->
{:error, reason, state}
end
end

defp parse_result({:selected, columns, rows, _}, query),
do: parse_result({:selected, columns, rows}, query)

defp parse_result({:selected, columns, rows}, query) do
parse_result(columns, rows, query)
end

defp parse_result(result, _query), do: result

defp parse_result(columns, rows, query) do
%Result{
columns: Enum.map(columns, &to_string(&1)),
rows: rows,
num_rows: Enum.count(rows),
success: true,
statement: query.statement
}
end
end
29 changes: 29 additions & 0 deletions lib/snowflex/db_connection/query.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule Snowflex.DBConnection.Query do
defstruct [
:ref,
:name,
:statement,
:columns,
:result_oids,
cache: :reference
]

defimpl DBConnection.Query do
def parse(query, _opts), do: query
def describe(query, _opts), do: query
def encode(_query, params, _opts), do: params
def decode(_query, result, _opts), do: result
end

defimpl String.Chars do
alias Snowflex.DBConnection.Query

def to_string(%{statement: statement}) do
case statement do
statement when is_binary(statement) -> IO.iodata_to_binary(statement)
statement when is_list(statement) -> IO.iodata_to_binary(statement)
%{statement: %Query{} = q} -> String.Chars.to_string(q)
end
end
end
end
Loading

0 comments on commit 193c505

Please sign in to comment.