diff --git a/Godeps b/Godeps index aa9ace1ab1f3c..3be8d57dd7467 100644 --- a/Godeps +++ b/Godeps @@ -30,6 +30,8 @@ github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413 github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893 github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142 github.com/klauspost/crc32 cb6bfca970f6908083f26f39a79009d608efd5cd +github.com/lib/pq e182dc4027e2ded4b19396d638610f2653295f36 +github.com/mattn/go-sqlite3 cf7286f069c3ef596efcc87781a4653a2e7607bd github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c github.com/miekg/dns 99f84ae56e75126dd77e5de4fae2ea034a468ca1 github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b diff --git a/README.md b/README.md index 2c7d40b28139e..6f81dc3ded7c2 100644 --- a/README.md +++ b/README.md @@ -166,6 +166,7 @@ configuration options. * [sensors](./plugins/inputs/sensors) * [snmp](./plugins/inputs/snmp) * [snmp_legacy](./plugins/inputs/snmp_legacy) +* [sql](./plugins/inputs/sql) (sql generic) * [sql server](./plugins/inputs/sqlserver) (microsoft) * [twemproxy](./plugins/inputs/twemproxy) * [varnish](./plugins/inputs/varnish) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 10af14864f74d..4023eba89d7e0 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -71,6 +71,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/snmp" _ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy" _ "github.com/influxdata/telegraf/plugins/inputs/socket_listener" + _ "github.com/influxdata/telegraf/plugins/inputs/sql" _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" _ "github.com/influxdata/telegraf/plugins/inputs/statsd" _ "github.com/influxdata/telegraf/plugins/inputs/sysstat" diff --git a/plugins/inputs/sql/README.md b/plugins/inputs/sql/README.md new file mode 100644 index 0000000000000..372648395e761 --- /dev/null +++ b/plugins/inputs/sql/README.md @@ -0,0 +1,191 @@ +# SQL plugin + +The plugin executes simple queries or query scripts on multiple servers. +It permits to select the tags and the fields to export, if is needed fields can be forced to a choosen datatype. +Supported/integrated drivers are mssql (SQLServer), mysql (MySQL), postgres (Postgres) +Activable drivers (read below) are all golang SQL compliant drivers (see https://github.com/golang/go/wiki/SQLDrivers): for instance oci8 for Oracle or sqlite3 (SQLite) + +## Getting started : +First you need to grant read/select privileges on queried tables to the database user you use for the connection + +### Non pure go drivers +For some not pure go drivers you may need external shared libraries and environment variables: look at sql driver implementation site +Actually the dependencies to all those drivers (oracle,db2,sap) are commented in the sql.go source. You can enable it, just remove the comment and perform a 'go get ' and recompile telegraf. As alternative you can use the 'golang 1.8 plugins feature'like described here below + +### Oracle driver with golang 1.8 plugins feature +Follow the docu in https://github.com/mattn/go-oci8 for build the oci8 driver. +If all is going well now golang oci8 driver is compiled and linked against oracle shared libs. But not linked in telegraf. + +For let i use in telegraf, do the following: +create a file plugin.go with this content: + +``` +package main + +import "C" + +import ( + "log" + // .. here you can add import to other drivers + _ "github.com/mattn/go-oci8" // requires external prorietary libs + // _ "bitbucket.org/phiggins/db2cli" // requires external prorietary libs + // _ "github.com/mattn/go-sqlite3" // not compiles on windows +) +func main() { + log.Printf("I! Loaded plugin of shared libs") +} +``` +build it with +``` +mkdir $GOPATH/lib +go build -buildmode=plugin -o $GOPATH/lib/oci8_go.so plugin.go +``` +in the input plugin configuration specigy the path of the created shared lib +``` +[[inputs.sql]] + ... + driver = "oci8" + shared_lib = "/home/luca/.gocode/lib/oci8_go.so" + ... +``` + +The steps of above can be reused in a similar way for other proprietary and non proprietary drivers + + +## Configuration: + +``` + # debug=false # Enables very verbose output + + ## Database Driver + driver = "mysql" # required. Valid options: mssql (SQLServer), mysql (MySQL), postgres (Postgres), sqlite3 (SQLite), [oci8 ora.v4 (Oracle)] + # shared_lib = "/home/luca/.gocode/lib/oci8_go.so" # optional: path to the golang 1.8 plugin shared lib + # keep_connection = false # optional: if true keeps the connection with database instead to reconnect at each poll and uses prepared statements (false: reconnection at each poll, no prepared statements) + + ## Server DSNs + servers = ["readuser:sEcReT@tcp(neteye.wp.lan:3307)/rue", "readuser:sEcReT@tcp(hostmysql.wp.lan:3307)/monitoring"] # required. Connection DSN to pass to the DB driver + #hosts=["neteye", "hostmysql"] # optional: for each server a relative host entry should be specified and will be added as host tag + #db_names=["rue", "monitoring"] # optional: for each server a relative db name entry should be specified and will be added as dbname tag + + ## Queries to perform (block below can be repeated) + [[inputs.sql.query]] + # query has precedence on query_script, if both query and query_script are defined only query is executed + query="SELECT avg_application_latency,avg_bytes,act_throughput FROM Baselines WHERE application>0" + # query_script = "/path/to/sql/script.sql" # if query is empty and a valid file is provided, the query will be read from file + # + measurement="connection_errors" # destination measurement + tag_cols=["application"] # colums used as tags + field_cols=["avg_application_latency","avg_bytes","act_throughput"] # select fields and use the database driver automatic datatype conversion + # + # bool_fields=["ON"] # adds fields and forces his value as bool + # int_fields=["MEMBERS",".*BYTES"] # adds fields and forces his value as integer + # float_fields=["TEMPERATURE"] # adds fields and forces his value as float + # time_fields=[".*_TIME"] # adds fields and forces his value as time + # + # field_measurement = "CLASS" # the column that contains the name of the measurement + # field_host = "DBHOST" # the column that contains the name of the database host used for host tag value + # field_database = "DBHOST" # the column that contains the name of the database used for dbname tag value + # field_name = "counter_name" # the column that contains the name of the counter + # field_value = "counter_value" # the column that contains the value of the counter + # + # field_timestamp = "sample_time" # the column where is to find the time of sample (should be a date datatype) + # + # ignore_other_fields = false # optional: if query returns columns not defined, they are automatically added (true: ignore columns) + # sanitize = false # optional: will perform some chars substitutions (false: use value as is) + # null_as_zero = false # optional: converts null values into zero or empty strings (false: ignore fields) + # ignore_row_errors # optional: if an error in row parse is raised then the row will be skipped and the parse continue on next row (false: fatal error) +``` +sql_script is read only once, if you change the script you need to reload telegraf + +## Field names +Field names are the same of the relative column name or taken from value of a column. If there is the need of rename the fields, just do it in the sql, try to use an ' AS ' . + +## Datatypes: +Using field_cols list the values are converted by the go database driver implementation. +In some cases this automatic conversion is not what we expect, therefore you can force the destination datatypes specifing the columns in the bool/int/float/time_fields lists, then if possible the plugin converts the data. +All field lists can contain an regex for column name matching. +If an error in conversion occurs then telegraf exits, therefore a --test run is suggested. + +## Tested Databases +Actually I run the plugin using oci8,mysql,mssql,postgres,sqlite3 + + +## Example for collect multiple counters defined as COLUMNS in a table (vertical counter structure): +Here we read a table where each counter is on a different row. Each row contains a column with the name of the counter (counter_name) and a column with his value (cntr_value) and some other columns that we use as tags (instance_name,object_name) + +###Config +``` +[[inputs.sql]] + interval = "60s" + driver = "mssql" + servers = [ + "Server=mssqlserver1.my.lan;Port=1433;User Id=telegraf;Password=secret;app name=telegraf" + "Server=mssqlserver2.my.lan;Port=1433;User Id=telegraf;Password=secret;app name=telegraf" + ] + hosts=["mssqlserver_cluster_1","mssqlserver_cluster_2"] + + [[inputs.sql.query]] + measurement = "os_performance_counters" + ignore_other_fields=true + sanitize=true + query="SELECT * FROM sys.dm_os_performance_counters WHERE object_name NOT LIKE '%Deprecated%' ORDER BY counter_name" + tag_cols=["instance_name","object_name"] + field_name = "counter_name" + field_value = "cntr_value" +``` +### Result: +``` +> os_performance_counters,host=mssqlserver_cluster_1,object_name=MSSQL$TESTSQL2014:Broker_Statistics Activation_Errors_Total=0i 1494496261000000000 +> os_performance_counters,host=mssqlserver_cluster_1,object_name=MSSQL$TESTSQL2014:Cursor_Manager_by_Type,instance_name=TSQL_Local_Cursor Active_cursors=0i 1494496261000000000 +> os_performance_counters,instance_name=TSQL_Global_Cursor,host=mssqlserver_cluster_1,object_name=MSSQL$TESTSQL2014:Cursor_Manager_by_Type Active_cursors=0i 1494496261000000000 +> os_performance_counters,host=mssqlserver_cluster_1,object_name=MSSQL$TESTSQL2014:Cursor_Manager_by_Type,instance_name=API_Cursor Active_cursors=0i 1494496261000000000 +> os_performance_counters,host=mssqlserver_cluster_1,object_name=MSSQL$TESTSQL2014:Cursor_Manager_by_Type,instance_name=_Total Active_cursors=0i 1494496261000000000 +... + +``` +## Example for collect multiple counters defined as ROWS in a table (horizontal counter structure): +Here we read multiple counters defined on same row where the counter name is the name of his column. +In this example we force some counters datatypes: "MEMBERS","FIRST_CHANGE#" as integer, "BYTES" as float, "FIRST_TIME" as time. The field "UNIT" is used with the automatic driver datatype conversion. +The column "ARCHIVED" is ignored + +###Config +``` +[[inputs.sql]] + interval = "20s" + + driver = "oci8" + keep_connection=true + servers = ["telegraf/monitor@10.62.6.1:1522/tunapit"] + hosts=["oraclehost.my.lan"] + ## Queries to perform + [[inputs.sql.query]] + query="select GROUP#,MEMBERS,STATUS,FIRST_TIME,FIRST_CHANGE#,BYTES,ARCHIVED from v$log" + measurement="log" + tag_cols=["GROUP#","STATUS","NAME"] + field_cols=["UNIT"] + int_fields=["MEMBERS","FIRST_CHANGE#"] + float_fields=["BYTES"] + time_fields=["FIRST_TIME"] + ignore_other_fields=true +``` +### Result: +``` +> log,host=pbzasplx001.wp.lan,GROUP#=1,STATUS=INACTIVE MEMBERS=1i,FIRST_TIME="2017-05-10 22:08:38 +0200 CEST",FIRST_CHANGE#=368234811i,BYTES=52428800 1494496874000000000 +> log,host=pbzasplx001.wp.lan,GROUP#=2,STATUS=CURRENT MEMBERS=1i,FIRST_TIME="2017-05-10 22:08:38 +0200 CEST",FIRST_CHANGE#=368234816i,BYTES=52428800 1494496874000000000 +> log,host=pbzasplx001.wp.lan,GROUP#=3,STATUS=INACTIVE MEMBERS=1i,FIRST_TIME="2017-05-10 16:00:55 +0200 CEST",FIRST_CHANGE#=368220858i,BYTES=52428800 1494496874000000000 + + +``` + +## TODO +1) Implement tests +2) Keep trace of timestamp of last poll for use in the where statement +3) Group by serie if timestamp and measurement are the same within a query for perform single insert in db instead of multiple +4) Give the possibility to define parameters to pass to the prepared statement +5) Get the host and database tag value automatically parsing the connection DSN string +6) Add option for parse tags once and reuse it for all rows in a query +X) Add your needs here ..... + +## ENJOY +Luca + diff --git a/plugins/inputs/sql/sql.go b/plugins/inputs/sql/sql.go new file mode 100644 index 0000000000000..a36433a29a4dd --- /dev/null +++ b/plugins/inputs/sql/sql.go @@ -0,0 +1,701 @@ +// The MIT License (MIT) +// +// Copyright (c) 2016 Luca Di Stefano (luca@distefano.bz.it) +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package sql + +import ( + "bytes" + "database/sql" + "errors" + "fmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "log" + "os" + "plugin" + "reflect" + "regexp" + "strconv" + "strings" + "time" + // database drivers here: + _ "github.com/go-sql-driver/mysql" + _ "github.com/jackc/pgx/stdlib" + // _ "github.com/lib/pq" + _ "github.com/mattn/go-sqlite3" // builds only on linux + _ "github.com/zensqlmonitor/go-mssqldb" +) + +const TYPE_STRING = 1 +const TYPE_BOOL = 2 +const TYPE_INT = 3 +const TYPE_FLOAT = 4 +const TYPE_TIME = 5 +const TYPE_AUTO = 0 + +type Query struct { + Query string + QueryScript string + Measurement string + // + FieldTimestamp string + TimestampUnit string + // + TagCols []string + + // Vertical structure + FieldHost string + FieldName string + FieldValue string + FieldDatabase string + FieldMeasurement string + // + Sanitize bool + // + + // -------- internal data ----------- + statement *sql.Stmt + + column_name []string + cell_refs []interface{} + cells []interface{} + + // Horizontal structure + field_count int + field_idx []int // Column indexes of fields + field_type []int // Column types of fields + + tag_count int + tag_idx []int // Column indexes of tags (strings) + + // Vertical structure + field_host_idx int + field_database_idx int + field_measurement_idx int + field_name_idx int + field_timestamp_idx int + + // Data Conversion + field_value_idx int + field_value_type int +} + +type Sql struct { + Driver string + SharedLib string + + //KeepConnection bool + MaxLifetime time.Duration + + Source struct { + Dsn string + } + + Query []Query + + // internal + connection_ts time.Time + connection *sql.DB + initialized bool +} + +var sanitizedChars = strings.NewReplacer("/sec", "_persec", "/Sec", "_persec", + " ", "_", "%", "Percent", `\`, "") + +func trimSuffix(s, suffix string) string { + for strings.HasSuffix(s, suffix) { + s = s[:len(s)-len(suffix)] + } + return s +} + +func sanitize(text string) string { + text = sanitizedChars.Replace(text) + text = trimSuffix(text, "_") + return text +} + +func match_str(key string, str_array []string) bool { + for _, pattern := range str_array { + if pattern == key { + return true + } + matched, _ := regexp.MatchString(pattern, key) + if matched { + return true + } + } + return false +} + +func (s *Sql) SampleConfig() string { + return ` +[[inputs.sql]] + ## Database Driver, required. + ## Valid options: mssql (SQLServer), mysql (MySQL), postgres (Postgres), sqlite3 (SQLite), [oci8 ora.v4 (Oracle)] + driver = "mysql" + + ## optional: path to the golang 1.8 plugin shared lib where additional sql drivers are linked + # shared_lib = "/home/luca/.gocode/lib/oci8_go.so" + + ## optional: + ## if true keeps the connection with database instead to reconnect at each poll and uses prepared statements + ## if false reconnection at each poll, no prepared statements + # keep_connection = false + + ## Maximum lifetime of a connection. + max_lifetime = "0s" + + ## Connection information for data source. Table can be repeated to define multiple sources. + [[inputs.sql.source]] + ## Data source name for connecting. Syntax depends on selected driver. + dsn = "readuser:sEcReT@tcp(neteye.wp.lan:3307)/rue" + + ## Queries to perform (block below can be repeated) + [[inputs.sql.query]] + ## query has precedence on query_script, if both query and query_script are defined only query is executed + query="SELECT avg_application_latency,avg_bytes,act_throughput FROM Baselines WHERE application>0" + # query_script = "/path/to/sql/script.sql" # if query is empty and a valid file is provided, the query will be read from file + ## destination measurement + measurement="connection_errors" + + ## Horizontal srtucture + ## colums used as tags + tag_cols=["application"] + ## select fields and use the database driver automatic datatype conversion + field_cols=["avg_application_latency","avg_bytes","act_throughput"] + + ## Vertical srtucture + ## optional: the column that contains the name of the measurement, if not specified the value of the option measurement is used + # field_measurement = "CLASS" + ## the column that contains the name of the database host used for host tag value + # field_host = "DBHOST" + ## the column that contains the name of the database used for dbname tag value + # field_database = "DBHOST" + ## required if vertical: the column that contains the name of the counter + # field_name = "counter_name" + ## required if vertical: the column that contains the value of the counter + # field_value = "counter_value" + ## optional: the column where is to find the time of sample (should be a date datatype) + # field_timestamp = "sample_time" +` +} + +func (_ *Sql) Description() string { + return "SQL Plugin" +} + +func (s *Sql) Init() error { + log.Printf("D! Init %s servers %d queries, driver %s", s.Source.Dsn, len(s.Query), s.Driver) + + if len(s.SharedLib) > 0 { + _, err := plugin.Open(s.SharedLib) + if err != nil { + panic(err) + } + log.Printf("D! Loaded shared lib '%s'", s.SharedLib) + } + + return nil +} + +func (s *Query) Init(cols []string) error { + log.Printf("D! Init Query with %d columns", len(cols)) + + // Define index of tags and fields and keep it for reuse + s.column_name = cols + + // init the arrays for store row data + col_count := len(s.column_name) + s.cells = make([]interface{}, col_count) + s.cell_refs = make([]interface{}, col_count) + + // because of regex, now we must assume the max cols + expected_field_count := col_count + expected_tag_count := col_count + + s.tag_idx = make([]int, expected_tag_count) + s.field_idx = make([]int, expected_field_count) + s.field_type = make([]int, expected_field_count) + s.tag_count = 0 + s.field_count = 0 + + // Vertical structure + // prepare vars for vertical counter parsing + s.field_name_idx = -1 + s.field_value_idx = -1 + s.field_timestamp_idx = -1 + s.field_measurement_idx = -1 + s.field_database_idx = -1 + s.field_host_idx = -1 + + if len(s.FieldHost) > 0 && !match_str(s.FieldHost, s.column_name) { + return fmt.Errorf("Missing column %s for given field_host", s.FieldHost) + } + if len(s.FieldDatabase) > 0 && !match_str(s.FieldDatabase, s.column_name) { + return fmt.Errorf("Missing column %s for given field_database", s.FieldDatabase) + } + if len(s.FieldMeasurement) > 0 && !match_str(s.FieldMeasurement, s.column_name) { + return fmt.Errorf("Missing column %s for given field_measurement", s.FieldMeasurement) + } + if len(s.FieldTimestamp) > 0 && !match_str(s.FieldTimestamp, s.column_name) { + return fmt.Errorf("Missing column %s for given field_timestamp", s.FieldTimestamp) + } + if len(s.FieldName) > 0 && !match_str(s.FieldName, s.column_name) { + return fmt.Errorf("Missing column %s for given field_name", s.FieldName) + } + if len(s.FieldValue) > 0 && !match_str(s.FieldValue, s.column_name) { + return fmt.Errorf("Missing column %s for given field_value", s.FieldValue) + } + if (len(s.FieldValue) > 0 && len(s.FieldName) == 0) || (len(s.FieldName) > 0 && len(s.FieldValue) == 0) { + return fmt.Errorf("Both field_name and field_value should be set") + } + //------------ + + // fill columns info + var cell interface{} + for i := 0; i < col_count; i++ { + dest_type := TYPE_AUTO + field_matched := true // is horizontal field + + if match_str(s.column_name[i], s.TagCols) { + field_matched = false + s.tag_idx[s.tag_count] = i + s.tag_count++ + cell = new(string) + } else { + dest_type = TYPE_AUTO + cell = new(sql.RawBytes) + } + + // Vertical structure + if s.column_name[i] == s.FieldHost { + s.field_host_idx = i + field_matched = false + } else if s.column_name[i] == s.FieldDatabase { + s.field_database_idx = i + field_matched = false + } else if s.column_name[i] == s.FieldMeasurement { + s.field_measurement_idx = i + field_matched = false + } else if s.column_name[i] == s.FieldName { + s.field_name_idx = i + field_matched = false + } else if s.column_name[i] == s.FieldValue { + s.field_value_idx = i + s.field_value_type = dest_type + field_matched = false + } else if s.column_name[i] == s.FieldTimestamp { + s.field_timestamp_idx = i + field_matched = false + } + + // Horizontal + if field_matched { + s.field_type[s.field_count] = dest_type + s.field_idx[s.field_count] = i + s.field_count++ + } + + // + s.cells[i] = cell + s.cell_refs[i] = &s.cells[i] + } + + log.Printf("D! Query structure with %d tags and %d fields on %d columns...", s.tag_count, s.field_count, col_count) + + return nil +} + +func ConvertString(name string, cell interface{}) (string, bool) { + value, ok := cell.(string) + if !ok { + var barr []byte + barr, ok = cell.([]byte) + if !ok { + var ivalue int64 + ivalue, ok = cell.(int64) + if !ok { + value = fmt.Sprintf("%v", cell) + ok = true + log.Printf("W! converting '%s' type %s raw data '%s'", name, reflect.TypeOf(cell).Kind(), fmt.Sprintf("%v", cell)) + } else { + value = strconv.FormatInt(ivalue, 10) + } + } else { + value = string(barr) + } + } + return value, ok +} + +func (s *Query) ConvertField(name string, cell interface{}, field_type int) (interface{}, error) { + var value interface{} + var ok bool + var str string + var err error + + ok = true + if cell != nil { + switch field_type { + case TYPE_INT: + str, ok = cell.(string) + if ok { + value, err = strconv.ParseInt(str, 10, 64) + } + case TYPE_FLOAT: + str, ok = cell.(string) + if ok { + value, err = strconv.ParseFloat(str, 64) + } + case TYPE_BOOL: + str, ok = cell.(string) + if ok { + value, err = strconv.ParseBool(str) + } + case TYPE_TIME: + value, ok = cell.(time.Time) + if !ok { + var intvalue int64 + intvalue, ok = value.(int64) + if ok { + // TODO convert to s/ms/us/ns?? + value = time.Unix(intvalue, 0) + } + } + case TYPE_STRING: + value, ok = ConvertString(name, cell) + default: + value = cell + } + } else { + value = nil + } + if !ok { + err = fmt.Errorf("Error by converting field %s", name) + } + if err != nil { + log.Printf("E! converting name '%s' type %s into type %d, raw data '%s'", name, reflect.TypeOf(cell).Kind(), field_type, fmt.Sprintf("%v", cell)) + return nil, err + } + return value, nil +} + +func (s *Query) GetStringFieldValue(index int) (string, error) { + cell := s.cells[index] + if cell == nil { + return "", fmt.Errorf("Error converting name '%s' is nil", s.column_name[index]) + } + + value, ok := ConvertString(s.column_name[index], cell) + if !ok { + return "", fmt.Errorf("Error converting name '%s' type %s, raw data '%s'", s.column_name[index], reflect.TypeOf(cell).Kind(), fmt.Sprintf("%v", cell)) + } + + // if s.Sanitize { + // value = sanitize(value) + // } + return value, nil +} + +func (s *Query) ParseRow(timestamp time.Time, measurement string, tags map[string]string, fields map[string]interface{}) (time.Time, string, error) { + // Vertical structure + + // get timestamp from row + if s.field_timestamp_idx >= 0 { + // get the value of timestamp field + value, err := s.ConvertField(s.column_name[s.field_timestamp_idx], s.cells[s.field_timestamp_idx], TYPE_TIME) + if err != nil { + return timestamp, measurement, errors.New("Cannot convert timestamp") + } + timestamp, _ = value.(time.Time) + } + // get measurement from row + if s.field_measurement_idx >= 0 { + var err error + measurement, err = s.GetStringFieldValue(s.field_measurement_idx) + if err != nil { + log.Printf("E! converting field measurement '%s'", s.column_name[s.field_measurement_idx]) + //cannot put data in correct measurement, skip line + return timestamp, measurement, err + } + } + // get dbname from row + if s.field_database_idx >= 0 { + dbname, err := s.GetStringFieldValue(s.field_database_idx) + if err != nil { + log.Printf("E! converting field dbname '%s'", s.column_name[s.field_database_idx]) + //cannot put data in correct, skip line + return timestamp, measurement, err + } else { + tags["dbname"] = dbname + } + } + // get server from row + if s.field_host_idx >= 0 { + server, err := s.GetStringFieldValue(s.field_host_idx) + if err != nil { + log.Printf("E! converting field host '%s'", s.column_name[s.field_host_idx]) + //cannot put data in correct, skip line + return timestamp, measurement, err + } else { + tags["server"] = server + } + } + // vertical counter + if s.field_name_idx >= 0 { + // get the name of the field + name, err := s.GetStringFieldValue(s.field_name_idx) + if err != nil { + log.Printf("E! converting field name '%s'", s.column_name[s.field_name_idx]) + // cannot get name of field, skip line + return timestamp, measurement, err + } + + // get the value of field + var value interface{} + value, err = s.ConvertField(s.column_name[s.field_value_idx], s.cells[s.field_value_idx], s.field_value_type) + if err != nil { + // cannot get value of column with expected datatype, skip line + return timestamp, measurement, err + } + + // fill the field + fields[name] = value + } + // --------------- + + // fill tags + for i := 0; i < s.tag_count; i++ { + index := s.tag_idx[i] + name := s.column_name[index] + value, err := s.GetStringFieldValue(index) + if err != nil { + log.Printf("E! ignored tag %s", name) + // cannot put data in correct series, skip line + return timestamp, measurement, err + } else { + tags[name] = value + } + } + + // horizontal counters + // fill fields from column values + for i := 0; i < s.field_count; i++ { + name := s.column_name[s.field_idx[i]] + // get the value of field + value, err := s.ConvertField(name, s.cells[s.field_idx[i]], s.field_type[i]) + if err != nil { + // cannot get value of column with expected datatype, warning and continue + log.Printf("W! converting value of field '%s'", name) + } else { + fields[name] = value + } + } + + return timestamp, measurement, nil +} + +func (p *Sql) Connect() (*sql.DB, error) { + var err error + + // create connection to db server if not already done + var db *sql.DB + if p.MaxLifetime > 0 && time.Since(p.connection_ts) < p.MaxLifetime { + db = p.connection + } else { + db = nil + } + + if db == nil { + log.Printf("D! Setting up DB %s %s ...", p.Driver, p.Source.Dsn) + db, err = sql.Open(p.Driver, p.Source.Dsn) + if err != nil { + return nil, err + } + p.connection_ts = time.Now() + } else { + log.Printf("D! Reusing connection to %s ...", p.Source.Dsn) + } + + log.Printf("D! Connecting to DB %s ...", p.Source.Dsn) + err = db.Ping() + if err != nil { + return nil, err + } + + if p.MaxLifetime > 0 { + p.connection = db + } + return db, nil +} + +func (q *Query) Execute(db *sql.DB, KeepConnection bool) (*sql.Rows, error) { + var err error + var rows *sql.Rows + // read query from sql script and put it in query string + if len(q.QueryScript) > 0 && len(q.Query) == 0 { + if _, err := os.Stat(q.QueryScript); os.IsNotExist(err) { + log.Printf("E! SQL script file not exists '%s'...", q.QueryScript) + return nil, err + } + filerc, err := os.Open(q.QueryScript) + if err != nil { + log.Fatal(err) + return nil, err + } + defer filerc.Close() + + buf := new(bytes.Buffer) + buf.ReadFrom(filerc) + q.Query = buf.String() + log.Printf("D! Read %d bytes SQL script from '%s' for query ...", len(q.Query), q.QueryScript) + } + if len(q.Query) > 0 { + if KeepConnection { + // prepare statement if not already done + if q.statement == nil { + log.Printf("D! Preparing statement query ...") + q.statement, err = db.Prepare(q.Query) + if err != nil { + return nil, err + } + } + + // execute prepared statement + log.Printf("D! Performing query:\n\t\t%s\n...", q.Query) + rows, err = q.statement.Query() + } else { + // execute query + log.Printf("D! Performing query '%s'...", q.Query) + rows, err = db.Query(q.Query) + } + } else { + log.Printf("W! No query to execute") + return nil, nil + } + + return rows, err +} + +func (p *Sql) Gather(acc telegraf.Accumulator) error { + var err error + + start_time := time.Now() + + if !p.initialized { + err = p.Init() + if err != nil { + return err + } + p.initialized = true + } + + log.Printf("D! Starting poll") + + var db *sql.DB + var query_time time.Time + + db, err = p.Connect() + query_time = time.Now() + duration := time.Since(query_time) + log.Printf("D! Server %s connection time: %s", p.Source.Dsn, duration) + + if err != nil { + return err + } + if p.MaxLifetime == 0 { + defer db.Close() + } + + // execute queries + for qi := 0; qi < len(p.Query); qi++ { + var rows *sql.Rows + q := &p.Query[qi] + + query_time = time.Now() + rows, err = q.Execute(db, p.MaxLifetime > 0) + log.Printf("D! Query exectution time: %s", time.Since(query_time)) + + query_time = time.Now() + + if err != nil { + return err + } + if rows == nil { + continue + } + defer rows.Close() + + if q.field_count == 0 { + // initialize once the structure of query + var cols []string + cols, err = rows.Columns() + if err != nil { + return err + } + err = q.Init(cols) + if err != nil { + return err + } + } + + row_count := 0 + + for rows.Next() { + var timestamp time.Time + + if err = rows.Err(); err != nil { + return err + } + // database driver datatype conversion + err := rows.Scan(q.cell_refs...) + if err != nil { + return err + } + + // collect tags and fields + tags := map[string]string{} + fields := map[string]interface{}{} + var measurement string + + timestamp, measurement, err = q.ParseRow(query_time, q.Measurement, tags, fields) + if err != nil { + log.Printf("W! Ignored error on row %d: %s", row_count, err) + } else { + acc.AddFields(measurement, fields, tags, timestamp) + } + row_count += 1 + } + log.Printf("D! Query found %d rows written, processing duration %s", row_count, time.Since(query_time)) + } + + log.Printf("D! Poll done, duration %s", time.Since(start_time)) + + return nil +} + +func init() { + inputs.Add("sql", func() telegraf.Input { + return &Sql{} + }) +}