From a285334b239b41c4f95886e78f85273fe52c8530 Mon Sep 17 00:00:00 2001 From: Breeze0806 <53822089+Breeze0806@users.noreply.github.com> Date: Sat, 28 Sep 2024 15:08:14 +0800 Subject: [PATCH] add Sqlite3 to v0.2.x (#53) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add database sqlite3 reader/writer * sqlite3: reset (#43) * none: some env tidy up (#35) 1. Makefile: local may have 'DB2_HOME' already 2. .gitignore: adding ignores for vim and tags Signed-off-by: shane.xb.qian * fix: fix split bug when setting is not set (#37) * feat & fix & docs: Add column name to error message when there's an error in reading or writing a column, fix the issue where ignoreOneByOneError does not work when set to false, and add F&Q for the database reader and writer. (#38) * none: some tiny code tidy-up (#34) Signed-off-by: shane.xb.qian * docs: Update README.md * feat & docs: improve go1.16 to go 1.20, change package sqlserver to github.com/microsoft/go-mssqldb,and Update all packages to the latest version. * fix: fix some complime error * fix:fix complie error * scripts: do lint for all go version * fix: 修复mysql的问题 * docs: fix go version * feat: in package element add NewBigIntColumnValueFromUint64 and NewDecimalColumnValueFromFloat32, remove Float64 and add Uint64. --------- Signed-off-by: shane.xb.qian Co-authored-by: Shane-XB-Qian Co-authored-by: 励码万言 <48382427+limawanyan@users.noreply.github.com> * feat: add sqlite3 data read * add Sqlite3 (#52) * none: some env tidy up (#35) 1. Makefile: local may have 'DB2_HOME' already 2. .gitignore: adding ignores for vim and tags Signed-off-by: shane.xb.qian * fix: fix split bug when setting is not set (#37) * feat:add sqlite3 --------- Signed-off-by: shane.xb.qian Co-authored-by: Shane-XB-Qian Co-authored-by: Breeze0806 <53822089+Breeze0806@users.noreply.github.com> * fix: 修复sqlite3测试的问题 * fix: fit go1.20 * fix: makefile --------- Signed-off-by: shane.xb.qian Co-authored-by: Shane-XB-Qian Co-authored-by: 励码万言 <48382427+limawanyan@users.noreply.github.com> --- README.md | 1 + README_USER.md | 1 + README_USER_zh-CN.md | 1 + README_zh-CN.md | 1 + cmd/datax/examples/sqlite3/config.json | 47 ++ cmd/datax/examples/sqlite3/init.sql | 15 + cmd/datax/tools/testData/sqlite3.json | 47 ++ datax/README.md | 2 +- datax/README_zh-CN.md | 2 +- datax/plugin/reader/sqlite3/README.md | 145 ++++++ datax/plugin/reader/sqlite3/README_zh-CN.md | 147 ++++++ datax/plugin/reader/sqlite3/job.go | 22 + datax/plugin/reader/sqlite3/reader.go | 63 +++ .../reader/sqlite3/resources/plugin.json | 6 + .../resources/plugin_job_template.json | 14 + datax/plugin/reader/sqlite3/task.go | 31 ++ datax/plugin/writer/sqlite3/README.md | 137 +++++ datax/plugin/writer/sqlite3/README_zh-CN.md | 132 +++++ datax/plugin/writer/sqlite3/job.go | 22 + .../writer/sqlite3/resources/plugin.json | 6 + .../resources/plugin_job_template.json | 16 + datax/plugin/writer/sqlite3/task.go | 48 ++ datax/plugin/writer/sqlite3/writer.go | 62 +++ go.mod | 1 + go.sum | 2 + storage/database/sqlite3/config.go | 40 ++ storage/database/sqlite3/config_test.go | 95 ++++ storage/database/sqlite3/example_test.go | 77 +++ storage/database/sqlite3/field.go | 162 ++++++ storage/database/sqlite3/field_test.go | 473 ++++++++++++++++++ storage/database/sqlite3/source.go | 85 ++++ storage/database/sqlite3/source_test.go | 214 ++++++++ storage/database/sqlite3/table.go | 77 +++ storage/database/sqlite3/table_test.go | 166 ++++++ 34 files changed, 2358 insertions(+), 2 deletions(-) create mode 100644 cmd/datax/examples/sqlite3/config.json create mode 100644 cmd/datax/examples/sqlite3/init.sql create mode 100644 cmd/datax/tools/testData/sqlite3.json create mode 100644 datax/plugin/reader/sqlite3/README.md create mode 100644 datax/plugin/reader/sqlite3/README_zh-CN.md create mode 100644 datax/plugin/reader/sqlite3/job.go create mode 100644 datax/plugin/reader/sqlite3/reader.go create mode 100644 datax/plugin/reader/sqlite3/resources/plugin.json create mode 100644 datax/plugin/reader/sqlite3/resources/plugin_job_template.json create mode 100644 datax/plugin/reader/sqlite3/task.go create mode 100644 datax/plugin/writer/sqlite3/README.md create mode 100644 datax/plugin/writer/sqlite3/README_zh-CN.md create mode 100644 datax/plugin/writer/sqlite3/job.go create mode 100644 datax/plugin/writer/sqlite3/resources/plugin.json create mode 100644 datax/plugin/writer/sqlite3/resources/plugin_job_template.json create mode 100644 datax/plugin/writer/sqlite3/task.go create mode 100644 datax/plugin/writer/sqlite3/writer.go create mode 100644 storage/database/sqlite3/config.go create mode 100644 storage/database/sqlite3/config_test.go create mode 100644 storage/database/sqlite3/example_test.go create mode 100644 storage/database/sqlite3/field.go create mode 100644 storage/database/sqlite3/field_test.go create mode 100644 storage/database/sqlite3/source.go create mode 100644 storage/database/sqlite3/source_test.go create mode 100644 storage/database/sqlite3/table.go create mode 100644 storage/database/sqlite3/table_test.go diff --git a/README.md b/README.md index 3ebfd64..f0a4df1 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ This data synchronization tool has the synchronization capability for the follow | | DB2 LUW | √ | √ | [Read](datax/plugin/reader/db2/README.md)、[Write](datax/plugin/writer/db2/README.md) | | | SQL Server | √ | √ | [Read](datax/plugin/reader/sqlserver/README.md)、[Write](datax/plugin/writer/sqlserver/README.md) | | | Oracle | √ | √ | [Read](datax/plugin/reader/oracle/README.md)、[Write](datax/plugin/writer/oracle/README.md) | +| | Sqlite3 | √ | √ | [Read](datax/plugin/reader/sqlite3/README.md)、[Write](datax/plugin/writer/sqlite3/README.md) | | Unstructured Data Stream | CSV | √ | √ | [Read](datax/plugin/reader/csv/README.md)、[Write](datax/plugin/writer/csv/README.md) | | | XLSX(excel) | √ | √ | [Read](datax/plugin/reader/xlsx/README.md)、[Write](datax/plugin/writer/xlsx/README.md) | diff --git a/README_USER.md b/README_USER.md index 86d69de..6f177ea 100644 --- a/README_USER.md +++ b/README_USER.md @@ -92,6 +92,7 @@ The configurations for `reader` and `writer` are as follows: | | DB2 LUW | √ | √ | [Read](datax/plugin/reader/db2/README.md), [Write](datax/plugin/writer/db2/README.md) | | | SQL Server | √ | √ | [Read](datax/plugin/reader/sqlserver/README.md), [Write](datax/plugin/writer/sqlserver/README.md) | | | Oracle | √ | √ | [Read](datax/plugin/reader/oracle/README.md), [Write](datax/plugin/writer/oracle/README.md) | +| | Sqlite3 | √ | √ | [Read](datax/plugin/reader/sqlite3/README.md)、[Write](datax/plugin/writer/sqlite3/README.md) | | Unstructured Stream | CSV | √ | √ | [Read](datax/plugin/reader/csv/README.md), [Write](datax/plugin/writer/csv/README.md) | | | XLSX (excel) | √ | √ | [Read](datax/plugin/reader/xlsx/README.md), [Write](datax/plugin/writer/xlsx/README.md) | diff --git a/README_USER_zh-CN.md b/README_USER_zh-CN.md index 3402f46..774d0bc 100644 --- a/README_USER_zh-CN.md +++ b/README_USER_zh-CN.md @@ -92,6 +92,7 @@ data -c config.json | | DB2 LUW | √ | √ | [读](datax/plugin/reader/db2/README_zh-CN.md)、[写](datax/plugin/writer/db2/README_zh-CN.md) | | | SQL Server | √ | √ | [读](datax/plugin/reader/sqlserver/README_zh-CN.md)、[写](datax/plugin/writer/sqlserver/README_zh-CN.md) | | | Oracle | √ | √ | [读](datax/plugin/reader/oracle/README_zh-CN.md)、[写](datax/plugin/writer/oracle/README_zh-CN.md) | +| | Sqlite3 | √ | √ | [读](datax/plugin/reader/sqlite3/README.md)、[写](datax/plugin/writer/sqlite3/README.md) | | 无结构流 | CSV | √ | √ | [读](datax/plugin/reader/csv/README_zh-CN.md)、[写](datax/plugin/writer/csv/README_zh-CN.md) | | | XLSX(excel) | √ | √ | [读](datax/plugin/reader/xlsx/README_zh-CN.md)、[写](datax/plugin/writer/xlsx/README_zh-CN.md) | diff --git a/README_zh-CN.md b/README_zh-CN.md index 3f93f89..7a5c237 100644 --- a/README_zh-CN.md +++ b/README_zh-CN.md @@ -31,6 +31,7 @@ go-etl将提供的etl能力如下: | | DB2 LUW | √ | √ | [读](datax/plugin/reader/db2/README_zh-CN.md)、[写](datax/plugin/writer/db2/README_zh-CN.md) | | | SQL Server | √ | √ | [读](datax/plugin/reader/sqlserver/README_zh-CN.md)、[写](datax/plugin/writer/sqlserver/README_zh-CN.md) | | | Oracle | √ | √ | [读](datax/plugin/reader/oracle/README_zh-CN.md)、[写](datax/plugin/writer/oracle/README_zh-CN.md) | +| | Sqlite3 | √ | √ | [读](datax/plugin/reader/sqlite3/README.md)、[写](datax/plugin/writer/sqlite3/README.md) | | 无结构流 | CSV | √ | √ | [读](datax/plugin/reader/csv/README_zh-CN.md)、[写](datax/plugin/writer/csv/README_zh-CN.md) | | | XLSX(excel) | √ | √ | [读](datax/plugin/reader/xlsx/README_zh-CN.md)、[写](datax/plugin/writer/xlsx/README_zh-CN.md) | diff --git a/cmd/datax/examples/sqlite3/config.json b/cmd/datax/examples/sqlite3/config.json new file mode 100644 index 0000000..8087deb --- /dev/null +++ b/cmd/datax/examples/sqlite3/config.json @@ -0,0 +1,47 @@ +{ + "core" : { + "container": { + "job":{ + "id": 1, + "sleepInterval":100 + } + } + }, + "job":{ + "content":[ + { + "reader":{ + "name": "sqlite3reader", + "parameter": { + "column": ["*"], + "connection": { + "url": "E:\\Sqlite3\\test.db", + "table": { + "db":"main", + "name":"type_table" + } + }, + "where": "" + } + }, + "writer":{ + "name": "sqlite3writer", + "parameter": { + "writeMode": "insert", + "column": ["*"], + "connection": { + "url": "E:\\Sqlite3\\test.db", + "table": { + "db":"main", + "name":"type_table_copy" + } + }, + "batchTimeout": "1s", + "batchSize":1000 + } + }, + "transformer":[] + } + ] + } +} \ No newline at end of file diff --git a/cmd/datax/examples/sqlite3/init.sql b/cmd/datax/examples/sqlite3/init.sql new file mode 100644 index 0000000..0d295d9 --- /dev/null +++ b/cmd/datax/examples/sqlite3/init.sql @@ -0,0 +1,15 @@ +drop table if exists "type_table"; +create table "type_table" ( + "t_integer" integer, + "t_real" real, + "t_text" text +); + +insert into "type_table" values (1, 1.01, 123456); + +drop table if exists "type_table_copy"; +create table "type_table_copy" ( + "t_integer" integer, + "t_real" real, + "t_text" text +); diff --git a/cmd/datax/tools/testData/sqlite3.json b/cmd/datax/tools/testData/sqlite3.json new file mode 100644 index 0000000..8087deb --- /dev/null +++ b/cmd/datax/tools/testData/sqlite3.json @@ -0,0 +1,47 @@ +{ + "core" : { + "container": { + "job":{ + "id": 1, + "sleepInterval":100 + } + } + }, + "job":{ + "content":[ + { + "reader":{ + "name": "sqlite3reader", + "parameter": { + "column": ["*"], + "connection": { + "url": "E:\\Sqlite3\\test.db", + "table": { + "db":"main", + "name":"type_table" + } + }, + "where": "" + } + }, + "writer":{ + "name": "sqlite3writer", + "parameter": { + "writeMode": "insert", + "column": ["*"], + "connection": { + "url": "E:\\Sqlite3\\test.db", + "table": { + "db":"main", + "name":"type_table_copy" + } + }, + "batchTimeout": "1s", + "batchSize":1000 + } + }, + "transformer":[] + } + ] + } +} \ No newline at end of file diff --git a/datax/README.md b/datax/README.md index 6bec448..2de98a8 100644 --- a/datax/README.md +++ b/datax/README.md @@ -105,7 +105,7 @@ The Task combines *plugin.BaseTask and implements the following methods: #### 3.1.4 Command Generation ```bash -cd tools/go-etl/plugin +cd tools/datax/plugin # Adds a new Reader named Mysql. The -p command can be in any case and is used to specify the name of the Reader. If -d is added, it means the original template will be deleted. go run main.go -t reader -p Mysql ``` diff --git a/datax/README_zh-CN.md b/datax/README_zh-CN.md index c1f0cb2..0fc6e09 100644 --- a/datax/README_zh-CN.md +++ b/datax/README_zh-CN.md @@ -111,7 +111,7 @@ Task组合*plugin.BaseTask,实现方法 #### 3.1.4 命令生成 ```bash -cd tools/go-etl/plugin +cd tools//datax/plugin #新增一个名为Mysql的reader -p命令可以时任意大小写,用于指定reader的名字,如果新增-d 代表会删除原来的模板 go run main.go -t reader -p Mysql ``` diff --git a/datax/plugin/reader/sqlite3/README.md b/datax/plugin/reader/sqlite3/README.md new file mode 100644 index 0000000..8f347c8 --- /dev/null +++ b/datax/plugin/reader/sqlite3/README.md @@ -0,0 +1,145 @@ +# Sqlite3Reader Plugin Documentation + +## Quick Introduction + +The Sqlite3Reader plugin enables data reading from Sqlite3 databases. Under the hood, Sqlite3Reader connects to remote Sqlite3 databases using `github.com/mattn/go-sqlite3` and executes corresponding SQL statements to query data from the database. + +## Implementation Principles + +Sqlite3Reader connects to remote Sqlite3 databases using `github.com/mattn/go-sqlite3` and generates SQL queries based on user-provided configuration information. These queries are then sent to the remote Sqlite3 database, and the returned results are assembled into an abstract dataset using go-etl's custom data types. This dataset is then passed to downstream Writer processing. +Sqlite3Reader implements specific queries by calling go-etl's custom `storage/database` DBWrapper, which is defined in the dbmsreader's query process. DBWrapper encapsulates many interfaces of `database/sql` and abstracts the database dialect, Dialect. For sqlite3, the implementation of Dialect provided by `storage/database/sqlite3` is used. + +## Functionality Description + +### Configuration Example + +Configuring a job to synchronize data from a Sqlite3 database to a local system: + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "sqlite3reader", + "parameter": { + "column": [ + "*" + ], + "connection": { + "url": "E:\\Sqlite3\\test.db", + "table": { + "db": "main", + "name": "type_table" + } + }, + "where": "" + } + } + } + ] + } +} +``` + +### Parameter Explanation + +#### url + +- Description: It is mainly used to configure the path of sqlite3 database files +- Required: Yes +- Default: None + +#### table + +Describes the sqlite3 table information. + +##### name + +- Description: Mainly used to configure the table name of the sqlite3 table. +- Required: Yes +- Default: None + +#### column + +- Description: The set of column names that need to be synchronized from the configured table. JSON array syntax is used to describe the column information. Using "*" represents that all columns are used by default, for example, `["*"]`. + + Supports column pruning, which means users can select specific columns for export. + + Supports column reordering, meaning the columns can be exported in an order different from the table schema. + + Supports constant configuration. Users need to follow the sqlite3 syntax format. + +- Required: Yes +- Default: None + +#### split + +##### key + +- Description: Mainly used to configure the splitting key for the sqlite3 table. The splitting key must be of type bigInt/string/time, assuming that the data is evenly distributed based on the splitting key. +- Required: No +- Default: None + +##### timeAccuracy + +- Description: Mainly used to configure the time splitting key for the sqlite3 table, mainly to describe the smallest unit of time, such as day (for dates), min (for minutes), s (for seconds), ms (for milliseconds), us (for microseconds), ns (for nanoseconds). +- Required: No +- Default: None + +##### range + +###### type +- Description: Mainly used to configure the default value type of the splitting key for the sqlite3 table, with values being bigInt/string/time. Here, it will check the type of the splitting key in the table, so please make sure the type is correct. +- Required: No +- Default: None + +###### left +- Description: Mainly used to configure the default maximum value of the splitting key for the sqlite3 table. +- Required: No +- Default: None + +###### right +- Description: Mainly used to configure the default minimum value of the splitting key for the sqlite3 table. +- Required: No +- Default: None + +#### where + +- Description: Mainly used to configure the where condition for the select statement. +- Required: No +- Default: None + +#### querySql + +- Description: In some business scenarios, the `where` configuration item is not sufficient to describe the filtering conditions, so users can use this configuration item to customize the filtering SQL. When users configure this item, the DataX system will ignore the `table`, `column`, and other configuration items, and directly use the content of this configuration item for data filtering. For example, if you need to perform a join operation on multiple tables before synchronizing the data, you can use `select a,b from table_a join table_b on table_a.id = table_b.id`. +When the user configures `querySql`, Sqlite3Reader directly ignores the configuration of `table`, `column`, and `where` conditions. The priority of `querySql` is higher than that of `table`, `column`, and `where` options. +- Required: No +- Default: None + +#### trimChar + +- Description: Whether to remove leading and trailing spaces for the char type in sqlite3. +- Required: No +- Default: false + +### Type Conversion + +Currently, Sqlite3Reader supports most sqlite3 types, but there are still some individual types that are not supported. Please check your types carefully. + +Below is a list of type conversions that Sqlite3Reader performs for sqlite3 types: + +| go-etl的类型 | sqlite3数据类型 | +| ------------ |--------------------| +| string | INTEGER、TEXT、REAL、BLOB | + +## Performance Report + +To be tested. + +## Constraints and Limitations + +### Database Encoding Issues +Currently, only the utf8 character set is supported. + +## FAQ diff --git a/datax/plugin/reader/sqlite3/README_zh-CN.md b/datax/plugin/reader/sqlite3/README_zh-CN.md new file mode 100644 index 0000000..d4377dd --- /dev/null +++ b/datax/plugin/reader/sqlite3/README_zh-CN.md @@ -0,0 +1,147 @@ +# Sqlite3Reader插件文档 + +## 快速介绍 + +Sqlite3Reader插件实现了从Sqlite3数据库读取数据。在底层实现上,Sqlite3Reader通过github.com/mattn/go-sqlite3连接远程Sqlite3数据库,并执行相应的sql语句将数据从数据库库中查询出来。 + +## 实现原理 + +Sqlite3Reader通过github.com/mattn/go-sqlite3连接远程Sqlite3数据库,并根据用户配置的信息生成查询SQL语句,然后发送到远程Sqlite3数据库,并将该SQL执行返回结果使用go-etl自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。 + +Sqlite3Reader通过使用dbmsreader中定义的查询流程调用go-etl自定义的storage/database的DBWrapper来实现具体的查询。DBWrapper封装了database/sql的众多接口,并且抽象出了数据库方言Dialect。其中sqlite3采取了storage/database/sqlite3实现的Dialect。 + +## 功能说明 + +### 配置样例 + +配置一个从Sqlite3数据库同步抽取数据到本地的作业: + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "sqlite3reader", + "parameter": { + "column": [ + "*" + ], + "connection": { + "url": "E:\\Sqlite3\\test.db", + "table": { + "db": "main", + "name": "type_table" + } + }, + "where": "" + } + } + } + ] + } +} +``` + +### 参数说明 + +#### url + +- 描述 主要用于配置sqlite3数据库文件路径。 +- 必选:是 +- 默认值: 无 + +#### table + +描述sqlite3表信息 + +##### name + +- 描述 主要用于配置sqlite3表的表名 +- 必选:是 +- 默认值: 无 + +#### column + +- 描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用*代表默认使用所有列配置,例如["\*"]。 + + 支持列裁剪,即列可以挑选部分列进行导出。 + + 支持列换序,即列可以不按照表schema信息进行导出。 + + 支持常量配置,用户需要按照sqlite3语法格式。 + +- 必选:是 + +- 默认值: 无 + +#### split + +##### key + +- 描述 主要用于配置sqlite3表的切分键,切分键必须为bigInt/string/time类型,假设数据按切分键分布是均匀的 +- 必选:否 +- 默认值: 无 + +##### timeAccuracy + +- 描述 主要用于配置sqlite3ql表的时间切分键,主要用于描述时间最小单位,day(日),min(分钟),s(秒),ms(毫秒),us(微秒),ns(纳秒) +- 必选:否 +- 默认值: 无 + +##### range + +###### type +- 描述 主要用于配置sqlite3表的切分键默认值类型,值为bigInt/string/time,这里会检查表切分键中的类型,请务必确保类型正确。 +- 必选:否 +- 默认值: 无 + +###### left +- 描述 主要用于配置sqlite3表的切分键默认最大值 +- 必选:否 +- 默认值: 无 + +###### right +- 描述 主要用于配置sqlite3表的切分键默认最小值 +- 必选:否 +- 默认值: 无 + +#### where + +- 描述 主要用于配置select的where条件 +- 必选:否 +- 默认值: 无 + +#### querySql + +- 描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id +当用户配置querySql时,Sqlite3Reader直接忽略table、column、where条件的配置,querySql优先级大于table、column、where选项。 +- 必选:否 +- 默认值:无 + +#### trimChar + +- 描述:对于sqlite3的char类型是否去掉其前后的空格 +- 必选:否 +- 默认值:false + +### 类型转换 + +目前Sqlite3Reader支持大部分sqlite3类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。 + +下面列出Sqlite3Reader针对sqlite3类型转换列表: + +| go-etl的类型 | sqlite3数据类型 | +| ------------ |--------------------| +| string | INTEGER、TEXT、REAL、BLOB | + +## 性能报告 + +待测试 + +## 约束限制 + +### 数据库编码问题 +目前仅支持utf8字符集 + +## FAQ diff --git a/datax/plugin/reader/sqlite3/job.go b/datax/plugin/reader/sqlite3/job.go new file mode 100644 index 0000000..b7c8a19 --- /dev/null +++ b/datax/plugin/reader/sqlite3/job.go @@ -0,0 +1,22 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import "github.com/Breeze0806/go-etl/datax/plugin/reader/dbms" + +// Job +type Job struct { + *dbms.Job +} diff --git a/datax/plugin/reader/sqlite3/reader.go b/datax/plugin/reader/sqlite3/reader.go new file mode 100644 index 0000000..381a656 --- /dev/null +++ b/datax/plugin/reader/sqlite3/reader.go @@ -0,0 +1,63 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "github.com/Breeze0806/go-etl/config" + spireader "github.com/Breeze0806/go-etl/datax/common/spi/reader" + "github.com/Breeze0806/go-etl/datax/plugin/reader/dbms" + "github.com/Breeze0806/go-etl/storage/database" + + // sqlite3 storage - Sqlite3 database storage + _ "github.com/Breeze0806/go-etl/storage/database/sqlite3" +) + +// Reader is uesed to extract data from data source +type Reader struct { + pluginConf *config.JSON +} + +// ResourcesConfig returns the configuration of the data source to initiate the reader. +func (r *Reader) ResourcesConfig() *config.JSON { + return r.pluginConf +} + +// Job returns a description of how the reader extracts data from the data source. +func (r *Reader) Job() spireader.Job { + job := &Job{ + Job: dbms.NewJob(dbms.NewBaseDbHandler(func(name string, conf *config.JSON) (q dbms.Querier, err error) { + if q, err = database.Open(name, conf); err != nil { + return nil, err + } + return + }, nil)), + } + job.SetPluginConf(r.pluginConf) + return job +} + +// Task returns the smallest execution unit obtained by maximizing the split of a Job +func (r *Reader) Task() spireader.Task { + task := &Task{ + Task: dbms.NewTask(dbms.NewBaseDbHandler(func(name string, conf *config.JSON) (q dbms.Querier, err error) { + if q, err = database.Open(name, conf); err != nil { + return nil, err + } + return + }, nil)), + } + task.SetPluginConf(r.pluginConf) + return task +} diff --git a/datax/plugin/reader/sqlite3/resources/plugin.json b/datax/plugin/reader/sqlite3/resources/plugin.json new file mode 100644 index 0000000..cff193a --- /dev/null +++ b/datax/plugin/reader/sqlite3/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name" : "sqlite3reader", + "developer":"LiMaWanYan", + "dialect":"sqlite3", + "description":"use github.com/mattn/go-sqlite3" +} \ No newline at end of file diff --git a/datax/plugin/reader/sqlite3/resources/plugin_job_template.json b/datax/plugin/reader/sqlite3/resources/plugin_job_template.json new file mode 100644 index 0000000..7d6ce5f --- /dev/null +++ b/datax/plugin/reader/sqlite3/resources/plugin_job_template.json @@ -0,0 +1,14 @@ +{ + "name": "sqlite3reader", + "parameter": { + "column": ["*"], + "connection": { + "url": "", + "table": { + "db":"", + "name":"" + } + }, + "where": "" + } +} \ No newline at end of file diff --git a/datax/plugin/reader/sqlite3/task.go b/datax/plugin/reader/sqlite3/task.go new file mode 100644 index 0000000..24838ba --- /dev/null +++ b/datax/plugin/reader/sqlite3/task.go @@ -0,0 +1,31 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "github.com/Breeze0806/go-etl/datax/common/plugin" + "github.com/Breeze0806/go-etl/datax/plugin/reader/dbms" +) + +// Task +type Task struct { + *dbms.Task +} + +// StartRead +func (t *Task) StartRead(ctx context.Context, sender plugin.RecordSender) (err error) { + return dbms.StartRead(ctx, dbms.NewBaseBatchReader(t.Task, "", nil), sender) +} diff --git a/datax/plugin/writer/sqlite3/README.md b/datax/plugin/writer/sqlite3/README.md new file mode 100644 index 0000000..0b88a61 --- /dev/null +++ b/datax/plugin/writer/sqlite3/README.md @@ -0,0 +1,137 @@ +# Sqlite3Writer Plugin Documentation + +## Quick Introduction + +The Sqlite3Writer plugin enables writing data to Sqlite3 databases. Under the hood, Sqlite3Writer connects to remote Sqlite3 databases using github.com/mattn/go-sqlite3, executing corresponding SQL statements to write data into the Sqlite3 database. + +## Implementation Principles + +Sqlite3Writer connects to remote Sqlite3 databases via github.com/mattn/go-sqlite3. It generates SQL statements for writing based on user-provided configuration information and go-etl's custom data types from the Reader. These statements are then sent to the remote Sqlite3 database for execution. + +Sqlite3 implements specific queries by utilizing the query process defined in dbmswriter, calling go-etl's custom storage/database DBWrapper. DBWrapper encapsulates numerous interfaces from database/sql and abstracts the database dialect, Dialect. For Sqlite3, it adopts the Dialect implemented in storage/database/postgres. + +Based on your configured `writeMode`, it generates either: + +- `insert into...` (which may fail to insert conflicting rows in case of primary key/unique index conflicts) + + + +## Functionality Description + +### Configuration Example + +Configuring a job to synchronously write data to a Sqlite3 database: + +```json +{ + "content": [ + { + "writer": { + "name": "sqlite3writer", + "parameter": { + "writeMode": "insert", + "column": [ + "*" + ], + "connection": { + "url": "E:\\Sqlite3\\test.db", + "table": { + "db": "main", + "name": "type_table_copy" + } + }, + "preSql": ["create table a like b"], + "postSql": ["drop table a"], + "batchTimeout": "1s", + "batchSize": 1000 + } + } + } + ] +} +``` + +### Parameter Description + +#### url + +- Description: Primarily used to configure the connection information for the remote end. +- Required: Yes +- Default: None + +#### name + +Describes the Sqlite3 table information. + +##### table + +- Description: Primarily used to configure the table name of the Sqlite3 table. +- Required: Yes +- Default: None + +#### column + +- Description: A set of column names from the configured table that need to be synchronized, described using a JSON array. Users can use * to indicate that all columns should be used by default, for example, ["*"]. + + Supports column pruning, allowing only selected columns to be exported. + + Supports column reordering, meaning columns can be exported in an order different from the table schema. + + Supports constant configuration. Users need to follow the PostgreSQL syntax format. + +- Required: Yes +- Default: None + +#### writeMode + +- Description: Write mode. "insert" represents writing data using the insert into method, while "copyIn" represents writing data using the copy in method. +- Required: No +- Default: insert + +#### batchTimeout + +- Description: Primarily used to configure the timeout interval for each batch write operation. The format is: number + unit, where the unit can be s for seconds, ms for milliseconds, or us for microseconds. If the specified time interval is exceeded, the data will be written directly. This parameter, along with batchSize, can be adjusted for optimal write performance. +- Required: No +- Default: 1s + +#### batchSize + +- Description: Primarily used to configure the size of each batch write operation. If the specified size is exceeded, the data will be written directly. This parameter, along with batchTimeout, can be adjusted for optimal write performance. +- Required: No +- Default: 1000 + +#### preSql + +- Description: Primarily used for SQL statement groups executed before writing data. Do not use select statements as they will result in an error. +- Required: No +- Default: None + +#### postSql + +- Description: Primarily used for SQL statement groups executed after writing data. Do not use select statements as they will result in an error. +- Required: No +- Default: None + +### Type Conversion + +Currently, Sqlite3Writer supports most Sqlite3 types, but there may be some individual types that are not supported. Please check your types accordingly. + +Below is a conversion table for Sqlite3Writer with regards to Sqlite3 types: + +| go-etl的类型 | sqlite3数据类型 | +| ------------ | -------------------------------------------------------- | +| string | INTEGER、TEXT、REAL、BLOB | + +## Performance Report + +Pending testing. + +## Constraints and Limitations + +### Database Encoding Issues + +Currently, only the utf8 character set is supported. + +## FAQ + +(Frequently Asked Questions section to be added if applicable.) \ No newline at end of file diff --git a/datax/plugin/writer/sqlite3/README_zh-CN.md b/datax/plugin/writer/sqlite3/README_zh-CN.md new file mode 100644 index 0000000..595606a --- /dev/null +++ b/datax/plugin/writer/sqlite3/README_zh-CN.md @@ -0,0 +1,132 @@ +# Sqlite3Writer插件文档 + +## 快速介绍 + +Sqlite3Writer插件实现了向sqlite3数据库写入数据。在底层实现上,Sqlite3Writer通过github.com/mattn/go-sqlite3以及database/sql连接远程sqlite3数据库,并执行相应的sql语句将数据写入sqlite3数据库。 + +## 实现原理 + +Sqlite3Writer通过github.com/mattn/go-sqlite3连接远程sqlite3数据库,并根据用户配置的信息和来自Reader的go-etl自定义的数据类型生成写入SQL语句,然后发送到远程sqlite3数据库执行。 + +sqlite3通过使用dbmswriter中定义的查询流程调用go-etl自定义的storage/database的DBWrapper来实现具体的查询。DBWrapper封装了database/sql的众多接口,并且抽象出了数据库方言Dialect。其中sqlite3采取了storage/database/sqlite3实现的Dialect。 + + +**或者** + +- `copy in ...` 与 insert into 行为一致,速度比insert into方式迅速。出于性能考虑,将数据缓冲到内存 中,当 内存累计到预定阈值时,才发起写入请求。 + +## 功能说明 + +### 配置样例 + +配置一个向sqlite3数据库同步写入数据的作业: + +```json +{ + "content": [ + { + "writer": { + "name": "sqlite3writer", + "parameter": { + "writeMode": "insert", + "column": [ + "*" + ], + "connection": { + "url": "E:\\Sqlite3\\test.db", + "table": { + "db": "main", + "name": "type_table_copy" + } + }, + "batchTimeout": "1s", + "batchSize": 1000 + } + } + } + ] +} +``` + +### 参数说明 + +#### url + +- 描述 主要用于配置对端连接信息。 +- 必选:是 +- 默认值: 无 + +#### name + +描述sqlite3表信息 + +##### table + +- 描述 主要用于配置sqlite3表的表名 +- 必选:是 +- 默认值: 无 + +#### column + +- 描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用*代表默认使用所有列配置,例如["\*"]。 + + 支持列裁剪,即列可以挑选部分列进行导出。 + + 支持列换序,即列可以不按照表schema信息进行导出。 + + 支持常量配置,用户需要按照PostgreSQL语法格式。 + +- 必选:是 + +- 默认值: 无 + +#### writeMode + +- 描述:写入模式,insert代表insert into方式写入数据,copyIn代表copy in方式写入数据。 +- 必选:否 +- 默认值: insert + +#### batchTimeout + +- 描述 主要用于配置每次批量写入超时时间间隔,格式:数字+单位, 单位:s代表秒,ms代表毫秒,us代表微妙。如果超过该时间间隔就直接写入,和batchSize一起调节写入性能。 +- 必选:否 +- 默认值: 1s + +#### batchSize + +- 描述 主要用于配置每次批量写入大小,如果超过该大小就直接写入,和batchTimeout一起调节写入性能。 +- 必选:否 +- 默认值: 1000 + +#### preSql + +- 描述 主要用于在写入数据前的sql语句组,不要使用select语句,否则会报错。 +- 必选:否 +- 默认值: 无 + +#### postSql + +- 描述 主要用于在写入数据后的sql语句组,不要使用select语句,否则会报错。 +- 必选:否 +- 默认值: 无 + +### 类型转换 + +目前Sqlite3Writer支持大部分sqlite3类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。 + +下面列出Sqlite3Writer针对sqlite3类型转换列表: + +| go-etl的类型 | sqlite3数据类型 | +| ------------ | -------------------------------------------------------- | +| string | INTEGER、TEXT、REAL、BLOB | + +## 性能报告 + +待测试 + +## 约束限制 + +### 数据库编码问题 +目前仅支持utf8字符集 + +## FAQ diff --git a/datax/plugin/writer/sqlite3/job.go b/datax/plugin/writer/sqlite3/job.go new file mode 100644 index 0000000..12858d0 --- /dev/null +++ b/datax/plugin/writer/sqlite3/job.go @@ -0,0 +1,22 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import "github.com/Breeze0806/go-etl/datax/plugin/writer/dbms" + +// Job +type Job struct { + *dbms.Job +} diff --git a/datax/plugin/writer/sqlite3/resources/plugin.json b/datax/plugin/writer/sqlite3/resources/plugin.json new file mode 100644 index 0000000..71384c1 --- /dev/null +++ b/datax/plugin/writer/sqlite3/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name" : "sqlite3writer", + "developer":"LiMaWanYan", + "dialect":"sqlite3", + "description":"use github.com/mattn/go-sqlite3" +} \ No newline at end of file diff --git a/datax/plugin/writer/sqlite3/resources/plugin_job_template.json b/datax/plugin/writer/sqlite3/resources/plugin_job_template.json new file mode 100644 index 0000000..5d55505 --- /dev/null +++ b/datax/plugin/writer/sqlite3/resources/plugin_job_template.json @@ -0,0 +1,16 @@ +{ + "name": "sqlite3writer", + "parameter": { + "writeMode": "insert", + "column": ["*"], + "connection": { + "url": "", + "table": { + "db":"", + "name":"" + } + }, + "batchTimeout": "1s", + "batchSize":1000 + } +} \ No newline at end of file diff --git a/datax/plugin/writer/sqlite3/task.go b/datax/plugin/writer/sqlite3/task.go new file mode 100644 index 0000000..585bb55 --- /dev/null +++ b/datax/plugin/writer/sqlite3/task.go @@ -0,0 +1,48 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + + "github.com/Breeze0806/go-etl/datax/common/plugin" + "github.com/Breeze0806/go-etl/datax/plugin/writer/dbms" + "github.com/Breeze0806/go-etl/storage/database" +) + +var execModeMap = map[string]string{ + database.WriteModeInsert: dbms.ExecModeNormal, +} + +func execMode(writeMode string) string { + if mode, ok := execModeMap[writeMode]; ok { + return mode + } + return dbms.ExecModeNormal +} + +// Task +type Task struct { + *dbms.Task +} + +type batchWriter struct { + *dbms.BaseBatchWriter +} + +// StartWrite +func (t *Task) StartWrite(ctx context.Context, receiver plugin.RecordReceiver) (err error) { + return dbms.StartWrite(ctx, &batchWriter{BaseBatchWriter: dbms.NewBaseBatchWriter(t.Task, execMode(t.Config.GetWriteMode()), nil)}, receiver) +} diff --git a/datax/plugin/writer/sqlite3/writer.go b/datax/plugin/writer/sqlite3/writer.go new file mode 100644 index 0000000..d99786c --- /dev/null +++ b/datax/plugin/writer/sqlite3/writer.go @@ -0,0 +1,62 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "github.com/Breeze0806/go-etl/config" + spiwriter "github.com/Breeze0806/go-etl/datax/common/spi/writer" + "github.com/Breeze0806/go-etl/datax/plugin/writer/dbms" + "github.com/Breeze0806/go-etl/storage/database" +) + +// Writer is uesed to load data into data source +type Writer struct { + pluginConf *config.JSON +} + +// ResourcesConfig returns the configuration of the data source to initiate the writer. +func (w *Writer) ResourcesConfig() *config.JSON { + return w.pluginConf +} + +// Job returns a description of how the reader extracts data from the data source. +func (w *Writer) Job() spiwriter.Job { + job := &Job{ + Job: dbms.NewJob(dbms.NewBaseDbHandler( + func(name string, conf *config.JSON) (e dbms.Execer, err error) { + if e, err = database.Open(name, conf); err != nil { + return nil, err + } + return + }, nil)), + } + job.SetPluginConf(w.pluginConf) + return job +} + +// Task returns the smallest execution unit obtained by maximizing the split of a Job +func (w *Writer) Task() spiwriter.Task { + task := &Task{ + Task: dbms.NewTask(dbms.NewBaseDbHandler( + func(name string, conf *config.JSON) (e dbms.Execer, err error) { + if e, err = database.Open(name, conf); err != nil { + return nil, err + } + return + }, nil)), + } + task.SetPluginConf(w.pluginConf) + return task +} diff --git a/go.mod b/go.mod index a6c53e9..6515a4a 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/gorilla/handlers v1.5.2 github.com/ibmdb/go_ibm_db v0.4.5 github.com/lib/pq v1.10.9 + github.com/mattn/go-sqlite3 v1.14.22 github.com/microsoft/go-mssqldb v1.7.2 github.com/pingcap/errors v0.11.4 github.com/shopspring/decimal v1.4.0 diff --git a/go.sum b/go.sum index 3fc0285..1dfa422 100644 --- a/go.sum +++ b/go.sum @@ -38,6 +38,8 @@ github.com/ibmdb/go_ibm_db v0.4.5/go.mod h1:nl5aUh1IzBVExcqYXaZLApaq8RUvTEph3VP4 github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/microsoft/go-mssqldb v1.7.2 h1:CHkFJiObW7ItKTJfHo1QX7QBBD1iV+mn1eOyRP3b/PA= github.com/microsoft/go-mssqldb v1.7.2/go.mod h1:kOvZKUdrhhFQmxLZqbwUV0rHkNkZpthMITIb2Ko1IoA= github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw= diff --git a/storage/database/sqlite3/config.go b/storage/database/sqlite3/config.go new file mode 100644 index 0000000..60ed2e0 --- /dev/null +++ b/storage/database/sqlite3/config.go @@ -0,0 +1,40 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "encoding/json" + "github.com/Breeze0806/go-etl/config" +) + +// Config is the Sqlite3 configuration +type Config struct { + URL string `json:"url"` // Database URL, including the database address and other database parameters +} + +// NewConfig creates a Sqlite3 configuration and will report an error if the format does not meet the requirements +func NewConfig(conf *config.JSON) (c *Config, err error) { + c = &Config{} + err = json.Unmarshal([]byte(conf.String()), c) + if err != nil { + return nil, err + } + return +} + +// FormatDSN generates data source connection information and will report an error if the URL is incorrect +func (c *Config) FormatDSN() (dsn string, err error) { + return c.URL, nil +} diff --git a/storage/database/sqlite3/config_test.go b/storage/database/sqlite3/config_test.go new file mode 100644 index 0000000..816961a --- /dev/null +++ b/storage/database/sqlite3/config_test.go @@ -0,0 +1,95 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "github.com/Breeze0806/go-etl/config" + "reflect" + "testing" +) + +func TestNewConfig(t *testing.T) { + type args struct { + conf *config.JSON + } + tests := []struct { + name string + args args + wantC *Config + wantErr bool + }{ + { + name: "1", + args: args{ + conf: testJSONFromString(`{ + "url" : "E:\\Sqlite3\\test.db" + }`), + }, + wantC: &Config{ + URL: "E:\\Sqlite3\\test.db", + }, + }, + { + name: "2", + args: args{ + conf: testJSONFromString(`{ + "url" : 1 + }`), + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotC, err := NewConfig(tt.args.conf) + if (err != nil) != tt.wantErr { + t.Errorf("NewConfig() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(gotC, tt.wantC) { + t.Errorf("NewConfig() = %v, want %v", gotC, tt.wantC) + } + }) + } +} + +func TestConfig_FormatDSN(t *testing.T) { + tests := []struct { + name string + c *Config + wantDsn string + wantErr bool + }{ + { + name: "1", + c: &Config{ + URL: "E:\\Sqlite3\\test.db", + }, + wantDsn: "E:\\Sqlite3\\test.db", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotDsn, err := tt.c.FormatDSN() + if (err != nil) != tt.wantErr { + t.Errorf("Config.FormatDSN() error = %v, wantErr %v", err, tt.wantErr) + return + } + if gotDsn != tt.wantDsn { + t.Errorf("Config.FormatDSN() = %v, want %v", gotDsn, tt.wantDsn) + } + }) + } +} diff --git a/storage/database/sqlite3/example_test.go b/storage/database/sqlite3/example_test.go new file mode 100644 index 0000000..50b3ae3 --- /dev/null +++ b/storage/database/sqlite3/example_test.go @@ -0,0 +1,77 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3_test + +import ( + "context" + "fmt" + "testing" + + "github.com/Breeze0806/go-etl/config" + "github.com/Breeze0806/go-etl/element" + "github.com/Breeze0806/go-etl/storage/database" + "github.com/Breeze0806/go-etl/storage/database/sqlite3" +) + +func testJSONFromString(s string) *config.JSON { + json, err := config.NewJSONFromString(s) + if err != nil { + panic(err) + } + return json +} + +type TableParam struct { + *database.BaseParam +} + +func NewTableParam() *TableParam { + return &TableParam{ + BaseParam: database.NewBaseParam(sqlite3.NewTable(database.NewBaseTable("", "", "test")), nil), + } +} + +func (t *TableParam) Query(_ []element.Record) (string, error) { + return "select * from test", nil +} + +func (t *TableParam) Agrs(_ []element.Record) ([]interface{}, error) { + return nil, nil +} + +type FetchHandler struct { +} + +func (f *FetchHandler) OnRecord(r element.Record) error { + fmt.Println(r) + return nil +} + +func (f *FetchHandler) CreateRecord() (element.Record, error) { + return element.NewDefaultRecord(), nil +} + +func ExampleSqlite3(t *testing.T) { + t.Log("strat") + db, err := database.Open("sqlite3", testJSONFromString(`{"url":"E:\\projects\\sqlite3\\test.db"}`)) + if err != nil { + t.Errorf("open fail. err: %v", err) + } + defer db.Close() + err = db.FetchRecord(context.TODO(), NewTableParam(), &FetchHandler{}) + if err != nil { + t.Errorf("fetchRecord fail. err: %v", err) + } +} diff --git a/storage/database/sqlite3/field.go b/storage/database/sqlite3/field.go new file mode 100644 index 0000000..12f4160 --- /dev/null +++ b/storage/database/sqlite3/field.go @@ -0,0 +1,162 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "fmt" + + "github.com/Breeze0806/go-etl/element" + "github.com/Breeze0806/go-etl/storage/database" +) + +var ( + dateLayout = element.DefaultTimeFormat[:10] + timestampLayout = element.DefaultTimeFormat[:26] +) + +// Field - Represents a field in a database table. +type Field struct { + *database.BaseField + database.BaseConfigSetter +} + +// NewField - Generates a field based on basic column attributes. +func NewField(bf *database.BaseField) *Field { + return &Field{ + BaseField: bf, + } +} + +// Quoted - Used for quoting in SQL statements. +func (f *Field) Quoted() string { + return Quoted(f.Name()) +} + +// BindVar - SQL placeholder used in SQL statements. +func (f *Field) BindVar(i int) string { + return "?" +} + +// Select - Represents a field for querying purposes in SQL query statements. +func (f *Field) Select() string { + return f.Quoted() +} + +// Type - Represents the type of the field. +func (f *Field) Type() database.FieldType { + return NewFieldType(f.FieldType()) +} + +// Scanner - Used for reading data from a field. +func (f *Field) Scanner() database.Scanner { + return NewScanner(f) +} + +// Valuer - Handles data processing using GoValuer. +func (f *Field) Valuer(c element.Column) database.Valuer { + return database.NewGoValuer(f, c) +} + +// FieldType - Represents the type of a field. +type FieldType struct { + *database.BaseFieldType + + goType database.GoType +} + +// NewFieldType - Creates a new field type. +func NewFieldType(typ database.ColumnType) *FieldType { + f := &FieldType{ + BaseFieldType: database.NewBaseFieldType(typ), + } + f.goType = database.GoTypeString + return f +} + +// IsSupported - Indicates whether parsing is supported for a specific type. +func (f *FieldType) IsSupported() bool { + return true + //return f.GoType() != database.GoTypeUnknown +} + +// GoType - Returns the Golang type used when processing numerical values. +func (f *FieldType) GoType() database.GoType { + return f.goType +} + +// Scanner - A scanner used for reading data based on the column type. +type Scanner struct { + database.BaseScanner + + f *Field +} + +// NewScanner - Generates a scanner based on the column type. +func NewScanner(f *Field) *Scanner { + return &Scanner{ + f: f, + } +} + +// Scan - Reads data from a column based on its type. +func (s *Scanner) Scan(src any) (err error) { + defer s.f.SetError(&err) + var cv element.ColumnValue + byteSize := element.ByteSize(src) + switch s.f.Type().DatabaseTypeName() { + case "INTEGER": + switch data := src.(type) { + case nil: + cv = element.NewNilBigIntColumnValue() + case int64: + cv = element.NewBigIntColumnValueFromInt64(data) + default: + return fmt.Errorf("src is %v(%T), but not %v", src, src, element.TypeBigInt) + } + case "BLOB": + switch data := src.(type) { + case nil: + cv = element.NewNilBytesColumnValue() + case []byte: + cv = element.NewBytesColumnValue(data) + default: + return fmt.Errorf("src is %v(%T),but not %v", src, src, element.TypeBytes) + } + case "NUMERIC", "REAL": + switch data := src.(type) { + case nil: + cv = element.NewNilTimeColumnValue() + case int64: + cv = element.NewBigIntColumnValueFromInt64(data) + case float64: + cv = element.NewDecimalColumnValueFromFloat(data) + default: + return fmt.Errorf("src is %v(%T), but not %v", src, src, element.TypeDecimal) + } + case "TEXT": + switch data := src.(type) { + case nil: + cv = element.NewNilStringColumnValue() + case string: + cv = element.NewStringColumnValue(data) + default: + return fmt.Errorf("src is %v(%T), but not %v", src, src, element.TypeString) + } + default: + return fmt.Errorf("src is %v(%T), but db type is %v", src, src, s.f.Type().DatabaseTypeName()) + } + s.SetColumn(element.NewDefaultColumn(cv, s.f.Name(), byteSize)) + return +} diff --git a/storage/database/sqlite3/field_test.go b/storage/database/sqlite3/field_test.go new file mode 100644 index 0000000..f64780a --- /dev/null +++ b/storage/database/sqlite3/field_test.go @@ -0,0 +1,473 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "database/sql" + "reflect" + "testing" + + "github.com/Breeze0806/go-etl/config" + "github.com/Breeze0806/go-etl/element" + "github.com/Breeze0806/go-etl/storage/database" +) + +type mockColumnType struct { + name string +} + +func newMockColumnType(name string) *mockColumnType { + return &mockColumnType{ + name: name, + } +} + +func (m *mockColumnType) Name() string { + return "" +} + +func (m *mockColumnType) ScanType() reflect.Type { + return nil +} + +func (m *mockColumnType) Length() (length int64, ok bool) { + return +} + +func (m *mockColumnType) DecimalSize() (precision, scale int64, ok bool) { + return +} + +func (m *mockColumnType) Nullable() (nullable, ok bool) { + return +} + +func (m *mockColumnType) DatabaseTypeName() string { + return m.name +} + +func TestField_Quoted(t *testing.T) { + tests := []struct { + name string + f *Field + want string + }{ + { + name: "1", + f: NewField(database.NewBaseField(0, "f1", NewFieldType(&sql.ColumnType{}))), + want: `"f1"`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.f.Quoted(); got != tt.want { + t.Errorf("Field.Quoted() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestField_BindVar(t *testing.T) { + type args struct { + i int + } + tests := []struct { + name string + f *Field + args args + want string + }{ + { + name: "1", + f: NewField(database.NewBaseField(0, "f1", NewFieldType(&sql.ColumnType{}))), + args: args{ + i: 22, + }, + want: "?", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.f.BindVar(tt.args.i); got != tt.want { + t.Errorf("Field.BindVar() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestField_Select(t *testing.T) { + tests := []struct { + name string + f *Field + want string + }{ + { + name: "1", + f: NewField(database.NewBaseField(0, "f1", NewFieldType(&sql.ColumnType{}))), + want: `"f1"`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.f.Select(); got != tt.want { + t.Errorf("Field.Select() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestField_Type(t *testing.T) { + tests := []struct { + name string + f *Field + want database.FieldType + }{ + { + name: "1", + f: NewField(database.NewBaseField(0, "f1", NewFieldType(newMockColumnType("1")))), + want: NewFieldType(&mockColumnType{ + name: "1", + }), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.f.Type(); !reflect.DeepEqual(got.DatabaseTypeName(), tt.want.DatabaseTypeName()) { + t.Errorf("Field.Type() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestField_Scanner(t *testing.T) { + tests := []struct { + name string + f *Field + want database.Scanner + }{ + { + name: "1", + f: NewField(database.NewBaseField(0, "f1", NewFieldType(newMockColumnType("1")))), + want: NewScanner(NewField(database.NewBaseField(0, "f1", NewFieldType(newMockColumnType("1"))))), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.f.Scanner(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Field.Scanner() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestField_Valuer(t *testing.T) { + type args struct { + c element.Column + } + tests := []struct { + name string + f *Field + args args + want database.Valuer + }{ + { + name: "1", + f: NewField(database.NewBaseField(0, "f1", NewFieldType(newMockColumnType("1")))), + args: args{ + c: element.NewDefaultColumn(element.NewBigIntColumnValueFromInt64(1), "f1", 0), + }, + want: database.NewGoValuer(NewField(database.NewBaseField(0, "f1", NewFieldType(newMockColumnType("1")))), element.NewDefaultColumn(element.NewBigIntColumnValueFromInt64(1), "f1", 0)), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.f.Valuer(tt.args.c); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Field.Valuer() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestFieldType_GoType(t *testing.T) { + tests := []struct { + name string + f *FieldType + want database.GoType + }{ + //INTEGER + { + name: "1", + f: NewFieldType(newMockColumnType("INTEGER")), + want: database.GoTypeString, + }, + + //BLOB + { + name: "2", + f: NewFieldType(newMockColumnType("BLOB")), + want: database.GoTypeString, + }, + //NUMERIC + { + name: "3", + f: NewFieldType(newMockColumnType("NUMERIC")), + want: database.GoTypeString, + }, + //REAL + { + name: "4", + f: NewFieldType(newMockColumnType("REAL")), + want: database.GoTypeString, + }, + //TEXT + { + name: "5", + f: NewFieldType(newMockColumnType("TEXT")), + want: database.GoTypeString, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.f.GoType(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("FieldType.GoType() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestFieldType_IsSupportted(t *testing.T) { + tests := []struct { + name string + f *FieldType + want bool + }{ + { + name: "1", + f: NewFieldType(newMockColumnType("INTEGER")), + want: true, + }, + { + name: "2", + f: NewFieldType(newMockColumnType("BLOB")), + want: true, + }, + { + name: "3", + f: NewFieldType(newMockColumnType("NUMERIC")), + want: true, + }, + { + name: "4", + f: NewFieldType(newMockColumnType("REAL")), + want: true, + }, + { + name: "5", + f: NewFieldType(newMockColumnType("TEXT")), + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.f.IsSupported(); got != tt.want { + t.Errorf("FieldType.IsSupportted() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestScanner_Scan(t *testing.T) { + type args struct { + src any + } + tests := []struct { + name string + s *Scanner + conf *config.JSON + args args + want element.Column + wantErr bool + }{ + { + name: "1", + s: NewScanner(NewField(database.NewBaseField(0, + "f1", NewFieldType(newMockColumnType("INTEGER"))))), + args: args{ + src: nil, + }, + want: element.NewDefaultColumn(element.NewNilBigIntColumnValue(), "f1", 0), + }, + { + name: "2", + s: NewScanner(NewField(database.NewBaseField(0, + "f1", NewFieldType(newMockColumnType("INTEGER"))))), + args: args{ + src: int64(2 ^ 63 - 1), + }, + want: element.NewDefaultColumn(element.NewBigIntColumnValueFromInt64(int64(2^63-1)), "f1", element.ByteSize(int64(2^63-1))), + }, + { + name: "3", + s: NewScanner(NewField(database.NewBaseField(0, + "f1", NewFieldType(newMockColumnType("BLOB"))))), + args: args{ + src: nil, + }, + want: element.NewDefaultColumn(element.NewNilBytesColumnValue(), "f1", 0), + }, + { + name: "4", + s: NewScanner(NewField(database.NewBaseField(0, + "f1", NewFieldType(newMockColumnType("BLOB"))))), + args: args{ + src: []byte("123"), + }, + want: element.NewDefaultColumn(element.NewBytesColumnValue([]byte("123")), "f1", element.ByteSize([]byte("123"))), + }, + { + name: "5", + s: NewScanner(NewField(database.NewBaseField(0, + "f1", NewFieldType(newMockColumnType("NUMERIC"))))), + args: args{ + src: nil, + }, + want: element.NewDefaultColumn(element.NewNilTimeColumnValue(), "f1", 0), + }, + { + name: "6", + s: NewScanner(NewField(database.NewBaseField(0, + "f1", NewFieldType(newMockColumnType("NUMERIC"))))), + args: args{ + src: int64(2 ^ 63 - 1), + }, + want: element.NewDefaultColumn(element.NewBigIntColumnValueFromInt64(int64(2^63-1)), "f1", element.ByteSize(int64(2^63-1))), + }, + { + name: "7", + s: NewScanner(NewField(database.NewBaseField(0, + "f1", NewFieldType(newMockColumnType("NUMERIC"))))), + args: args{ + src: 1.23456789, + }, + want: element.NewDefaultColumn(element.NewDecimalColumnValueFromFloat(1.23456789), "f1", element.ByteSize(1.23456789)), + }, + { + name: "8", + s: NewScanner(NewField(database.NewBaseField(0, + "f1", NewFieldType(newMockColumnType("REAL"))))), + args: args{ + src: nil, + }, + want: element.NewDefaultColumn(element.NewNilTimeColumnValue(), "f1", 0), + }, + { + name: "9", + s: NewScanner(NewField(database.NewBaseField(0, + "f1", NewFieldType(newMockColumnType("REAL"))))), + args: args{ + src: int64(2 ^ 63 - 1), + }, + want: element.NewDefaultColumn(element.NewBigIntColumnValueFromInt64(int64(2^63-1)), "f1", element.ByteSize(int64(2^63-1))), + }, + { + name: "10", + s: NewScanner(NewField(database.NewBaseField(0, + "f1", NewFieldType(newMockColumnType("REAL"))))), + args: args{ + src: 1.23456789, + }, + want: element.NewDefaultColumn(element.NewDecimalColumnValueFromFloat(1.23456789), "f1", element.ByteSize(1.23456789)), + }, + { + name: "11", + s: NewScanner(NewField(database.NewBaseField(0, + "f1", NewFieldType(newMockColumnType("TEXT"))))), + args: args{ + src: nil, + }, + want: element.NewDefaultColumn(element.NewNilStringColumnValue(), "f1", 0), + }, + { + name: "12", + s: NewScanner(NewField(database.NewBaseField(0, + "f1", NewFieldType(newMockColumnType("TEXT"))))), + args: args{ + src: "123", + }, + want: element.NewDefaultColumn(element.NewStringColumnValue("123"), "f1", element.ByteSize("123")), + }, + { + name: "13", + s: NewScanner(NewField(database.NewBaseField(0, + "f1", NewFieldType(newMockColumnType("INTEGER"))))), + args: args{ + src: "123", + }, + wantErr: true, + }, + { + name: "14", + s: NewScanner(NewField(database.NewBaseField(0, + "f1", NewFieldType(newMockColumnType("BLOB"))))), + args: args{ + src: 123, + }, + wantErr: true, + }, + { + name: "15", + s: NewScanner(NewField(database.NewBaseField(0, + "f1", NewFieldType(newMockColumnType("NUMERIC"))))), + args: args{ + src: 123, + }, + wantErr: true, + }, + { + name: "16", + s: NewScanner(NewField(database.NewBaseField(0, + "f1", NewFieldType(newMockColumnType("REAL"))))), + args: args{ + src: 123, + }, + wantErr: true, + }, + { + name: "17", + s: NewScanner(NewField(database.NewBaseField(0, + "f1", NewFieldType(newMockColumnType("TEXT"))))), + args: args{ + src: 123, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.conf != nil { + tt.s.f.SetConfig(tt.conf) + } + + if err := tt.s.Scan(tt.args.src); (err != nil) != tt.wantErr { + t.Errorf("Scanner.Scan() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if !reflect.DeepEqual(tt.s.Column(), tt.want) { + t.Errorf("Column() = %v %v, want %v", tt.s.Column().ByteSize(), tt.s.Column(), tt.want) + } + }) + } +} diff --git a/storage/database/sqlite3/source.go b/storage/database/sqlite3/source.go new file mode 100644 index 0000000..196a517 --- /dev/null +++ b/storage/database/sqlite3/source.go @@ -0,0 +1,85 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "github.com/Breeze0806/go-etl/storage/database" +) + +func init() { + var d Dialect + database.RegisterDialect(d.Name(), d) +} + +// Dialect represents the database dialect for PostgreSQL +type Dialect struct{} + +// Source generates a PostgreSQL data source +func (d Dialect) Source(bs *database.BaseSource) (database.Source, error) { + return NewSource(bs) +} + +// Name is the registered name of the database dialect +func (d Dialect) Name() string { + return "sqlite3" +} + +// Source postgres refers to the PostgreSQL data source +type Source struct { + *database.BaseSource // Basic data source + + dsn string +} + +// NewSource generates a PostgreSQL data source and will report an error if there's an issue with the configuration file +func NewSource(bs *database.BaseSource) (s database.Source, err error) { + source := &Source{ + BaseSource: bs, + } + var c *Config + if c, err = NewConfig(source.Config()); err != nil { + return + } + + if source.dsn, err = c.FormatDSN(); err != nil { + return + } + return source, nil +} + +// DriverName is the driver name +func (s *Source) DriverName() string { + return "sqlite3" +} + +// ConnectName is the connection information for the Sqlite3 data source +func (s *Source) ConnectName() string { + return s.dsn +} + +// Key is a keyword for the data source, used for reuse by DBWrapper +func (s *Source) Key() string { + return s.dsn +} + +// Table generates a table for Sqlite3 (Note: This line seems inconsistent with the context, as it mentions MySQL while the surrounding text is about PostgreSQL. It might be a mistake or needs clarification.) +func (s *Source) Table(b *database.BaseTable) database.Table { + return NewTable(b) +} + +// Quoted is the quoting function for PostgreSQL +func Quoted(s string) string { + return `"` + s + `"` +} diff --git a/storage/database/sqlite3/source_test.go b/storage/database/sqlite3/source_test.go new file mode 100644 index 0000000..c5391a5 --- /dev/null +++ b/storage/database/sqlite3/source_test.go @@ -0,0 +1,214 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "reflect" + "testing" + + "github.com/Breeze0806/go-etl/config" + "github.com/Breeze0806/go-etl/storage/database" +) + +func testJSONFromString(s string) *config.JSON { + json, err := config.NewJSONFromString(s) + if err != nil { + panic(err) + } + return json +} + +func TestDialect_Name(t *testing.T) { + tests := []struct { + name string + d Dialect + want string + }{ + { + name: "1", + d: Dialect{}, + want: "sqlite3", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.d.Name(); got != tt.want { + t.Errorf("Dialect.Name() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestDialect_Source(t *testing.T) { + type args struct { + bs *database.BaseSource + } + tests := []struct { + name string + d Dialect + args args + want database.Source + wantErr bool + }{ + { + name: "1", + d: Dialect{}, + args: args{ + bs: database.NewBaseSource(testJSONFromString(`{ + "url":"E:\\projects\\sqlite3\\test.db" + }`)), + }, + want: &Source{ + BaseSource: database.NewBaseSource(testJSONFromString(`{ + "url":"E:\\projects\\sqlite3\\test.db" + }`)), + dsn: "E:\\projects\\sqlite3\\test.db", + }, + }, + { + name: "2", + d: Dialect{}, + args: args{ + bs: database.NewBaseSource(testJSONFromString(`{ + "url": 123 + }`)), + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tt.d.Source(tt.args.bs) + if (err != nil) != tt.wantErr { + t.Errorf("Dialect.Source() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("Dialect.Source() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSource_DriverName(t *testing.T) { + tests := []struct { + name string + s *Source + want string + }{ + { + name: "1", + s: &Source{ + BaseSource: database.NewBaseSource(testJSONFromString(`{ + "url":"E:\\projects\\sqlite3\\test.db" + }`)), + dsn: "E:\\projects\\sqlite3\\test.db", + }, + want: "sqlite3", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.s.DriverName(); got != tt.want { + t.Errorf("Source.DriverName() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSource_ConnectName(t *testing.T) { + tests := []struct { + name string + s *Source + want string + }{ + { + name: "1", + s: &Source{ + BaseSource: database.NewBaseSource(testJSONFromString(`{ + "url":"E:\\projects\\sqlite3\\test.db" + }`)), + dsn: "E:\\projects\\sqlite3\\test.db", + }, + want: "E:\\projects\\sqlite3\\test.db", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.s.ConnectName(); got != tt.want { + t.Errorf("Source.ConnectName() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSource_Key(t *testing.T) { + tests := []struct { + name string + s *Source + want string + }{ + { + name: "1", + s: &Source{ + BaseSource: database.NewBaseSource(testJSONFromString(`{ + "url":"E:\\projects\\sqlite3\\test.db" + }`)), + dsn: "E:\\projects\\sqlite3\\test.db", + }, + want: "E:\\projects\\sqlite3\\test.db", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.s.Key(); got != tt.want { + t.Errorf("Source.Key() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSource_Table(t *testing.T) { + type args struct { + b *database.BaseTable + } + tests := []struct { + name string + s *Source + args args + want database.Table + }{ + { + name: "1", + s: &Source{ + BaseSource: database.NewBaseSource(testJSONFromString(`{ + "url":"E:\\projects\\sqlite3\\test.db" + }`)), + dsn: "E:\\projects\\sqlite3\\test.db", + }, + args: args{ + b: database.NewBaseTable("db", "schema", "table"), + }, + want: NewTable(database.NewBaseTable("db", "schema", "table")), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.s.Table(tt.args.b); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Source.Table() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/storage/database/sqlite3/table.go b/storage/database/sqlite3/table.go new file mode 100644 index 0000000..802a128 --- /dev/null +++ b/storage/database/sqlite3/table.go @@ -0,0 +1,77 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "database/sql" + "database/sql/driver" + + "github.com/Breeze0806/go-etl/storage/database" + sqlite3 "github.com/mattn/go-sqlite3" + "github.com/pingcap/errors" +) + +// Table represents a Sqlite3 table. +type Table struct { + *database.BaseTable + database.BaseConfigSetter +} + +// NewTable creates a new Sqlite3 table. Note that at this point, the schema parameter in BaseTable refers to the schema name, instance is the database name, and name is the table name. +func NewTable(b *database.BaseTable) *Table { + return &Table{ + BaseTable: b, + } +} + +// Quoted refers to the fully qualified name of the table. +func (t *Table) Quoted() string { + return Quoted(t.Name()) +} + +func (t *Table) String() string { + return t.Quoted() +} + +// AddField adds a new column to the table. +func (t *Table) AddField(baseField *database.BaseField) { + f := NewField(baseField) + f.SetConfig(t.Config()) + t.AppendField(f) +} + +// ExecParam retrieves execution parameters, where the copy in parameter mode has been registered. +func (t *Table) ExecParam(mode string, txOpts *sql.TxOptions) (database.Parameter, bool) { + return nil, false +} + +// ShouldRetry determines whether a retry is necessary. +func (t *Table) ShouldRetry(err error) bool { + switch cause := errors.Cause(err).(type) { + case sqlite3.Error: + return true + default: + return cause == driver.ErrBadConn + } +} + +// ShouldOneByOne specifies whether to retry one operation at a time. +func (t *Table) ShouldOneByOne(err error) bool { + switch errors.Cause(err).(type) { + case sqlite3.Error: + return true + } + return false +} diff --git a/storage/database/sqlite3/table_test.go b/storage/database/sqlite3/table_test.go new file mode 100644 index 0000000..cf58792 --- /dev/null +++ b/storage/database/sqlite3/table_test.go @@ -0,0 +1,166 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "database/sql/driver" + "net" + "testing" + + "github.com/Breeze0806/go-etl/storage/database" + "github.com/mattn/go-sqlite3" +) + +func TestTable_Quoted(t *testing.T) { + tests := []struct { + name string + tr *Table + want string + }{ + { + name: "1", + tr: NewTable(database.NewBaseTable("", "", "table")), + want: `"table"`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.tr.Quoted(); got != tt.want { + t.Errorf("Table.Quoted() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestTable_String(t *testing.T) { + tests := []struct { + name string + tr *Table + want string + }{ + { + name: "1", + tr: NewTable(database.NewBaseTable("", "", "table")), + want: `"table"`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.tr.String(); got != tt.want { + t.Errorf("Table.String() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestTable_ShouldRetry(t *testing.T) { + type args struct { + err error + } + tests := []struct { + name string + tr *Table + args args + want bool + }{ + { + name: "1", + tr: NewTable(database.NewBaseTable("", "", "table")), + args: args{ + err: nil, + }, + }, + { + name: "2", + tr: NewTable(database.NewBaseTable("", "", "table")), + args: args{ + err: &net.AddrError{}, + }, + want: false, + }, + { + name: "3", + tr: NewTable(database.NewBaseTable("", "", "table")), + args: args{ + err: driver.ErrBadConn, + }, + want: true, + }, + { + name: "4", + tr: NewTable(database.NewBaseTable("", "", "table")), + args: args{ + err: &sqlite3.Error{}, + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.tr.ShouldRetry(tt.args.err); got != tt.want { + t.Errorf("Table.ShouldRetry() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestTable_ShouldOneByOne(t *testing.T) { + type args struct { + err error + } + tests := []struct { + name string + tr *Table + args args + want bool + }{ + { + name: "1", + tr: NewTable(database.NewBaseTable("", "", "table")), + args: args{ + err: nil, + }, + }, + { + name: "2", + tr: NewTable(database.NewBaseTable("", "", "table")), + args: args{ + err: &net.AddrError{}, + }, + }, + { + name: "3", + tr: NewTable(database.NewBaseTable("", "", "table")), + args: args{ + err: driver.ErrBadConn, + }, + }, + { + name: "4", + tr: NewTable(database.NewBaseTable("", "", "table")), + args: args{ + err: sqlite3.Error{}, + }, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.tr.ShouldOneByOne(tt.args.err); got != tt.want { + t.Errorf("Table.ShouldOneByOne() = %v, want %v", got, tt.want) + } + }) + } +}