Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support Arrow FlightSQL #300

Merged
merged 4 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion .github/workflows/clients-compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ on:
- main
- compatibility
- test
- support_flightsql
pull_request:
branches: [ "main" ]

Expand Down Expand Up @@ -133,4 +134,39 @@ jobs:

- name: Run the Compatibility Test for Python Data Tools
run: |
bats ./compatibility/pg-pytools/test.bats
bats ./compatibility/pg-pytools/test.bats

test-flightsql:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.23'

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.13'

- name: Install dependencies
run: |
go get .

pip3 install "sqlglot[rs]"
pip3 install "psycopg[binary]" pandas pyarrow polars adbc_driver_flightsql

- name: Build
run: go build -v

- name: Start MyDuck Server
run: |
./myduckserver &
sleep 5

- name: Run the Compatibility Test for FlightSQL
run: |
go test -v ./compatibility/flightsql/go/flightsql_test.go
python3 -m unittest discover ./compatibility/flightsql/python -p "flightsql_test.py"
152 changes: 152 additions & 0 deletions compatibility/flightsql/go/flightsql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package main

import (
"context"
"reflect"
"testing"

"github.com/apache/arrow-adbc/go/adbc"
"github.com/apache/arrow-adbc/go/adbc/driver/flightsql"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
)

// Set connection options
var options = map[string]string{
adbc.OptionKeyURI: "grpc://localhost:47470",
flightsql.OptionSSLSkipVerify: adbc.OptionValueEnabled,
}

// Create database connection
func createDatabaseConnection(t *testing.T) adbc.Connection {
alloc := memory.NewGoAllocator()
drv := flightsql.NewDriver(alloc)
db, err := drv.NewDatabase(options)
if err != nil {
t.Fatalf("Error creating database: %v", err)
}

cnxn, err := db.Open(context.Background())
if err != nil {
t.Fatalf("Error opening connection: %v", err)
}

return cnxn
}

// Execute SQL statement
func executeSQLStatement(cnxn adbc.Connection, query string, t *testing.T) {
stmt, err := cnxn.NewStatement()
if err != nil {
t.Fatalf("failed to create statement: %v", err)
}
defer stmt.Close()

err = stmt.SetSqlQuery(query)
if err != nil {
t.Fatalf("failed to set SQL query: %v", err)
}

_, err = stmt.ExecuteUpdate(context.Background())
if err != nil {
t.Fatalf("failed to execute SQL statement: %v", err)
}
}

// Execute query and verify results
func executeQueryAndVerify(cnxn adbc.Connection, query string, expectedResults []struct {
id int64
name string
value int64
}, t *testing.T) {
stmt, err := cnxn.NewStatement()
if err != nil {
t.Fatalf("failed to create statement: %v", err)
}
defer stmt.Close()

err = stmt.SetSqlQuery(query)
if err != nil {
t.Fatalf("failed to set SQL query: %v", err)
}

rows, _, err := stmt.ExecuteQuery(context.Background())
if err != nil {
t.Fatalf("failed to execute query: %v", err)
}
defer rows.Release()

var actualResults []struct {
id int64
name string
value int64
}

// Read query results and verify
for rows.Next() {
record := rows.Record()
numRows := record.NumRows()

id := record.Column(0).(*array.Int64)
name := record.Column(1).(*array.String)
value := record.Column(2).(*array.Int64)
for i := 0; i < int(numRows); i++ {
actualResults = append(actualResults, struct {
id int64
name string
value int64
}{
id: id.Value(i),
name: name.Value(i),
value: value.Value(i),
})
}
}

// Verify query results
if len(actualResults) != len(expectedResults) {
t.Errorf("Expected %d rows, but got %d", len(expectedResults), len(actualResults))
}

for i, result := range actualResults {
expected := expectedResults[i]
if !reflect.DeepEqual(result, expected) {
t.Errorf("Row %d: Expected %+v, but got %+v", i, expected, result)
}
}
}

// Go test function
func TestSQLOperations(t *testing.T) {
cnxn := createDatabaseConnection(t)
defer cnxn.Close()

// 1. Execute DROP TABLE IF EXISTS intTable
executeSQLStatement(cnxn, "DROP TABLE IF EXISTS intTable", t)

// 2. Execute CREATE TABLE IF NOT EXISTS intTable
executeSQLStatement(cnxn, `CREATE TABLE IF NOT EXISTS intTable (
id INTEGER PRIMARY KEY,
name VARCHAR(50),
value INT
)`, t)

// 3. Execute INSERT INTO intTable
executeSQLStatement(cnxn, "INSERT INTO intTable (id, name, value) VALUES (1, 'TestName', 100)", t)
executeSQLStatement(cnxn, "INSERT INTO intTable (id, name, value) VALUES (2, 'AnotherName', 200)", t)

// 4. Query data and verify insertion was successful
expectedResults := []struct {
id int64
name string
value int64
}{
{id: 1, name: "TestName", value: 100},
{id: 2, name: "AnotherName", value: 200},
}
query := "SELECT id, name, value FROM intTable"
executeQueryAndVerify(cnxn, query, expectedResults, t)

// 5. Execute DROP TABLE IF EXISTS intTable
executeSQLStatement(cnxn, "DROP TABLE IF EXISTS intTable", t)
}
61 changes: 61 additions & 0 deletions compatibility/flightsql/python/flightsql_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import unittest
from adbc_driver_flightsql import DatabaseOptions
from adbc_driver_flightsql.dbapi import connect

class TestFlightSQLDatabase(unittest.TestCase):

@classmethod
def setUpClass(cls):
"""Runs once before any tests are executed, used to set up the database connection."""
headers = {"foo": "bar"}
cls.conn = connect(
"grpc://localhost:47470", # FlightSQL server address
db_kwargs={
DatabaseOptions.TLS_SKIP_VERIFY.value: "true", # Skip TLS verification
**{f"{DatabaseOptions.RPC_CALL_HEADER_PREFIX.value}{k}": v for k, v in headers.items()}
}
)

@classmethod
def tearDownClass(cls):
"""Runs once after all tests have been executed, used to close the database connection."""
cls.conn.close()

def setUp(self):
"""Runs before each individual test to ensure a clean environment by resetting the database."""
with self.conn.cursor() as cursor:
cursor.execute("DROP TABLE IF EXISTS intTable") # Drop the table if it exists
cursor.execute("""
CREATE TABLE IF NOT EXISTS intTable (
id INTEGER PRIMARY KEY,
name VARCHAR(50),
value INT
)
""") # Create the table

def test_insert_and_select(self):
"""Test inserting data and selecting it back to verify correctness."""
with self.conn.cursor() as cursor:
# Insert sample data
cursor.execute("INSERT INTO intTable (id, name, value) VALUES (1, 'TestName', 100)")
cursor.execute("INSERT INTO intTable (id, name, value) VALUES (2, 'AnotherName', 200)")

# Select data from the table
cursor.execute("SELECT * FROM intTable")
rows = cursor.fetchall()

# Expected result after insertions
expected_rows = [(1, 'TestName', 100), (2, 'AnotherName', 200)]
self.assertEqual(rows, expected_rows, f"Expected rows: {expected_rows}, but got: {rows}")

def test_drop_table(self):
"""Test dropping the table to ensure the table can be deleted successfully."""
with self.conn.cursor() as cursor:
cursor.execute("DROP TABLE IF EXISTS intTable") # Drop the table
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='intTable'") # Check if the table exists
rows = cursor.fetchall()
self.assertEqual(len(rows), 0, "Table 'intTable' should be dropped and not exist in the database.")
cursor.execute("COMMIT;")

if __name__ == "__main__":
unittest.main()
Loading
Loading