diff --git a/pkg/db/db_init.go b/pkg/db/db_init.go index 150b543b82c..b8eac1bdfca 100644 --- a/pkg/db/db_init.go +++ b/pkg/db/db_init.go @@ -6,89 +6,90 @@ import ( func (d *dbConn) DBInit() { db := d.db - _, err := db.Exec("CREATE TABLE IF NOT EXISTS studies" + - "(id CHAR(16) PRIMARY KEY, " + - "name VARCHAR(255), " + - "owner VARCHAR(255), " + - "optimization_type TINYINT, " + - "optimization_goal DOUBLE, " + - "parameter_configs TEXT, " + - "tags TEXT, " + - "objective_value_name VARCHAR(255), " + - "metrics TEXT, " + - "job_id TEXT)") + _, err := db.Exec(`CREATE TABLE IF NOT EXISTS studies + (id CHAR(16) PRIMARY KEY, + name VARCHAR(255), + owner VARCHAR(255), + optimization_type TINYINT, + optimization_goal DOUBLE, + parameter_configs TEXT, + tags TEXT, + objective_value_name VARCHAR(255), + metrics TEXT, + job_id TEXT)`) if err != nil { log.Fatalf("Error creating studies table: %v", err) } - _, err = db.Exec("CREATE TABLE IF NOT EXISTS study_permissions" + - "(study_id CHAR(16) NOT NULL, " + - "access_permission VARCHAR(255), " + - "PRIMARY KEY (study_id, access_permission))") + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS study_permissions + (study_id CHAR(16) NOT NULL, + access_permission VARCHAR(255), + PRIMARY KEY (study_id, access_permission))`) if err != nil { log.Fatalf("Error creating study_permissions table: %v", err) } - _, err = db.Exec("CREATE TABLE IF NOT EXISTS trials" + - "(id CHAR(16) PRIMARY KEY, " + - "study_id CHAR(16), " + - "parameters TEXT, " + - "objective_value VARCHAR(255), " + - "tags TEXT, " + - "FOREIGN KEY(study_id) REFERENCES studies(id))") + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS trials + (id CHAR(16) PRIMARY KEY, + study_id CHAR(16), + parameters TEXT, + objective_value VARCHAR(255), + tags TEXT, + FOREIGN KEY(study_id) REFERENCES studies(id))`) if err != nil { log.Fatalf("Error creating trials table: %v", err) } - _, err = db.Exec("CREATE TABLE IF NOT EXISTS workers" + - "(id CHAR(16) PRIMARY KEY, " + - "study_id CHAR(16), " + - "trial_id CHAR(16), " + - "type VARCHAR(255), " + - "status TINYINT, " + - "template_path TEXT, " + - "tags TEXT, " + - "FOREIGN KEY(study_id) REFERENCES studies(id), " + - "FOREIGN KEY(trial_id) REFERENCES trials(id))") + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS workers + (id CHAR(16) PRIMARY KEY, + study_id CHAR(16), + trial_id CHAR(16), + type VARCHAR(255), + status TINYINT, + template_path TEXT, + tags TEXT, + FOREIGN KEY(study_id) REFERENCES studies(id), + FOREIGN KEY(trial_id) REFERENCES trials(id))`) if err != nil { log.Fatalf("Error creating workers table: %v", err) } - _, err = db.Exec("CREATE TABLE IF NOT EXISTS worker_metrics" + - "(worker_id CHAR(16) NOT NULL, " + - "id INT AUTO_INCREMENT PRIMARY KEY, " + - "time DATETIME(6), " + - "name VARCHAR(255), " + - "value TEXT, " + - "is_objective TINYINT)") + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS worker_metrics + (worker_id CHAR(16) NOT NULL, + id INT AUTO_INCREMENT PRIMARY KEY, + time DATETIME(6), + name VARCHAR(255), + value TEXT, + is_objective TINYINT, + FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE)`) if err != nil { log.Fatalf("Error creating worker_metrics table: %v", err) } - _, err = db.Exec("CREATE TABLE IF NOT EXISTS worker_lastlogs" + - "(worker_id CHAR(16) PRIMARY KEY, " + - "time DATETIME(6), " + - "value TEXT)") + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS worker_lastlogs + (worker_id CHAR(16) PRIMARY KEY, + time DATETIME(6), + FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE)`) if err != nil { log.Fatalf("Error creating worker_lastlogs table: %v", err) } - _, err = db.Exec("CREATE TABLE IF NOT EXISTS suggestion_param" + - "(id CHAR(16) PRIMARY KEY," + - "suggestion_algo TEXT, " + - "study_id CHAR(16), " + - "parameters TEXT, " + - "FOREIGN KEY(study_id) REFERENCES studies(id))") + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS suggestion_param + (id CHAR(16) PRIMARY KEY, + suggestion_algo TEXT, + study_id CHAR(16), + parameters TEXT, + FOREIGN KEY(study_id) REFERENCES studies(id))`) if err != nil { log.Fatalf("Error creating suggestion_param table: %v", err) } - _, err = db.Exec("CREATE TABLE IF NOT EXISTS earlystop_param" + - "(id CHAR(16) PRIMARY KEY, " + - "earlystop_argo TEXT, " + - "study_id CHAR(16), " + - "parameters TEXT, " + - "FOREIGN KEY(study_id) REFERENCES studies(id))") + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS earlystop_param + (id CHAR(16) PRIMARY KEY, + earlystop_argo TEXT, + study_id CHAR(16), + parameters TEXT, + FOREIGN KEY(study_id) REFERENCES studies(id))`) if err != nil { log.Fatalf("Error creating earlystop_param table: %v", err) } diff --git a/pkg/db/interface.go b/pkg/db/interface.go index 01e554e3ae4..61b27b418ff 100644 --- a/pkg/db/interface.go +++ b/pkg/db/interface.go @@ -455,18 +455,61 @@ func (d *dbConn) GetWorkerLogs(id string, opts *GetWorkerLogOpts) ([]*WorkerLog, return result, nil } -func (d *dbConn) getWorkerLastlog(id string, value *string) (*time.Time, error) { - var lastTimestamp string +func (d *dbConn) getWorkerLastlogs(id string) (time.Time, []*WorkerLog, error) { + var timeStr string + var timeVal time.Time var err error - if value != nil { - row := d.db.QueryRow("SELECT time, value FROM worker_lastlogs WHERE worker_id = ?", id) - err = row.Scan(&lastTimestamp, value) - } else { - row := d.db.QueryRow("SELECT time FROM worker_lastlogs WHERE worker_id = ?", id) - err = row.Scan(&lastTimestamp) + // Use LEFT JOIN to ensure a result even if there's no matching + // in worker_metrics. + rows, err := d.db.Query( + `SELECT worker_lastlogs.time, name, value FROM worker_lastlogs + LEFT JOIN worker_metrics + ON (worker_lastlogs.worker_id = worker_metrics.worker_id AND worker_lastlogs.time = worker_metrics.time) + WHERE worker_lastlogs.worker_id = ?`, id) + if err != nil { + return timeVal, nil, err } + var result []*WorkerLog + for rows.Next() { + log1 := new(WorkerLog) + var thisTime string + var name, value sql.NullString + + err := rows.Scan(&thisTime, &name, &value) + if err != nil { + log.Printf("Error scanning log: %v", err) + continue + } + if timeStr == "" { + timeStr = thisTime + timeVal, err = time.Parse(mysqlTimeFmt, timeStr) + if err != nil { + log.Printf("Error parsing time %s: %v", timeStr, err) + return timeVal, nil, err + } + } else if timeStr != thisTime { + log.Printf("Unexpected query result %s != %s", + timeStr, thisTime) + } + log1.Time = timeVal + if !name.Valid { + continue + } + (*log1).Name = name.String + (*log1).Value = value.String + result = append(result, log1) + } + return timeVal, result, nil +} + +func (d *dbConn) GetWorkerTimestamp(id string) (*time.Time, error) { + var lastTimestamp string + + row := d.db.QueryRow("SELECT time FROM worker_lastlogs WHERE worker_id = ?", id) + err := row.Scan(&lastTimestamp) + switch { case err == sql.ErrNoRows: return nil, nil @@ -483,10 +526,6 @@ func (d *dbConn) getWorkerLastlog(id string, value *string) (*time.Time, error) } } -func (d *dbConn) GetWorkerTimestamp(id string) (*time.Time, error) { - return d.getWorkerLastlog(id, nil) -} - func (d *dbConn) storeWorkerLog(workerID string, time string, metricsName string, metricsValue string, objectiveValueName string) error { isObjective := 0 if metricsName == objectiveValueName { @@ -502,9 +541,8 @@ func (d *dbConn) storeWorkerLog(workerID string, time string, metricsName string func (d *dbConn) StoreWorkerLogs(workerID string, logs []*api.MetricsLog) error { var lasterr error - var lastValue string - dbT, err := d.getWorkerLastlog(workerID, &lastValue) + dbT, lastLogs, err := d.getWorkerLastlogs(workerID) if err != nil { log.Printf("Error getting last log timestamp: %v", err) } @@ -519,10 +557,14 @@ func (d *dbConn) StoreWorkerLogs(workerID string, logs []*api.MetricsLog) error return err } + // Store logs when + // 1. a log is newer than dbT, or, + // 2. a log is not yet in the DB when the timestamps are equal var formattedTime string - var ls []string + var lastTime time.Time for _, mlog := range logs { metricsName := mlog.Name + logLoop: for _, mv := range mlog.Values { t, err := time.Parse(time.RFC3339Nano, mv.Time) if err != nil { @@ -530,48 +572,40 @@ func (d *dbConn) StoreWorkerLogs(workerID string, logs []*api.MetricsLog) error lasterr = err continue } - if dbT != nil && !t.After(*dbT) { + if t.Before(dbT) { // dbT is from mysql and has microsec precision. // This code assumes nanosec fractions are rounded down. continue } // use UTC as mysql DATETIME lacks timezone formattedTime = t.UTC().Format(mysqlTimeFmt) - if dbT != nil { - // Parse again to get rounding effect - //reparsed_time, err := time.Parse(mysqlTimeFmt, formattedTime) - //if reparsed_time == *dbT { - // if mv.Value == lastValue { - // stored_logs are already in DB - // This assignment ensures the remaining - // logs will be stored in DB. - // dbT = nil - // continue - // } - // // We don't know this is necessary or not yet. - // stored_logs = append(stored_logs, &mv.Value) - // continue - //} - // (reparsed_time > *dbT) can be assumed - err = d.storeWorkerLog(workerID, - dbT.UTC().Format(mysqlTimeFmt), - metricsName, mv.Value, - objectiveValueName) + if !dbT.IsZero() { + // Parse again to get rounding effect, otherwise + // the next comparison will be almost always false. + reparsed_time, err := time.Parse(mysqlTimeFmt, formattedTime) if err != nil { - log.Printf("Error storing log %s: %v", mv.Value, err) + log.Printf("Error parsing time %s: %v", formattedTime, err) lasterr = err + continue } - dbT = nil - } else { - err = d.storeWorkerLog(workerID, - formattedTime, - metricsName, mv.Value, - objectiveValueName) - if err != nil { - log.Printf("Error storing log %s: %v", mv.Value, err) - lasterr = err + if reparsed_time == dbT { + for _, l := range lastLogs { + if l.Name == metricsName && l.Value == mv.Value { + continue logLoop + } + } } } + err = d.storeWorkerLog(workerID, + formattedTime, + metricsName, mv.Value, + objectiveValueName) + if err != nil { + log.Printf("Error storing log %s: %v", mv.Value, err) + lasterr = err + } else if t.After(lastTime) { + lastTime = t + } } } if lasterr != nil { @@ -579,9 +613,10 @@ func (d *dbConn) StoreWorkerLogs(workerID string, logs []*api.MetricsLog) error // would be lost. return lasterr } - if len(ls) == 2 { - _, err = d.db.Exec("REPLACE INTO worker_lastlogs VALUES (?, ?, ?)", - workerID, formattedTime, ls[1]) + if !lastTime.IsZero() { + formattedTime = lastTime.UTC().Format(mysqlTimeFmt) + _, err = d.db.Exec("REPLACE INTO worker_lastlogs VALUES (?, ?)", + workerID, formattedTime) } return err }