diff --git a/clickhouse_storage.go b/clickhouse_storage.go index 166eac5..9ac3370 100644 --- a/clickhouse_storage.go +++ b/clickhouse_storage.go @@ -18,9 +18,8 @@ const ( rawdata String, source String, date Date DEFAULT today() - ) ENGINE = MergeTree - PARTITION BY toYYYYMM(date) - ORDER BY (domain, date) + ) ENGINE = MergeTree(date, (domain, source), 8192) + TTL date + toIntervalDay(%d) ` insertStmt = ` INSERT INTO %s (rank, domain, rawdata, source) VALUES (?, ?, ?, ?) @@ -36,9 +35,9 @@ type ClickhouseStorage struct { // NewClickhouseStorage bootstraps ClickhouseStorage. func NewClickhouseStorage(location, table string, batch int) (*ClickhouseStorage, error) { - db, err := prepareDB(location, table) + db, err := prepareDB(location, table, DefaultTTL) if err != nil { - return nil, err + return nil, fmt.Errorf("prepare: %w", err) } return &ClickhouseStorage{ @@ -48,19 +47,19 @@ func NewClickhouseStorage(location, table string, batch int) (*ClickhouseStorage }, nil } -func prepareDB(location, table string) (*sql.DB, error) { +func prepareDB(location, table string, ttl int) (*sql.DB, error) { conn, err := sql.Open("clickhouse", location) if err != nil { - return nil, err + return nil, fmt.Errorf("sql.Open: %w", err) } if err := conn.Ping(); err != nil { - return nil, err + return nil, fmt.Errorf("ping: %w", err) } - _, err = conn.Exec(fmt.Sprintf(createStmt, table)) + _, err = conn.Exec(fmt.Sprintf(createStmt, table, ttl)) if err != nil { - return conn, err + return conn, fmt.Errorf("create table: %w", err) } return conn, nil diff --git a/docker-compose.yml b/docker-compose.yml index 3db54fc..61dca41 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -27,5 +27,7 @@ services: clickhouse: restart: always image: yandex/clickhouse-server:latest + environment: + TZ: UTC volumes: - ./storage:/var/lib/clickhouse diff --git a/dor.go b/dor.go index 70d8514..b7b716d 100644 --- a/dor.go +++ b/dor.go @@ -12,11 +12,8 @@ const ( tblName = "ranks" ) -// Ingester fetches data and uploads it to the Storage -type Ingester interface { - Do() (chan *Entry, error) // returns a channel for consumers - GetDesc() string // simple getter for the source -} +// DefaultTTL for records in days. +var DefaultTTL = 30 // Storage represents an interface to store and query ranks. type Storage interface { @@ -60,14 +57,14 @@ func New(stn string, stl string, keep bool) (*App, error) { case "clickhouse": s, err = NewClickhouseStorage(stl, tblName, batchSize) if err != nil { - return nil, err + return nil, fmt.Errorf("new clickhouse storage: %w", err) } case "memory": s = &MemoryStorage{make(map[string]*memoryCollection)} case "mongodb": s, err = NewMongoStorage(stl, "dor", tblName, batchSize, 5, keep) if err != nil { - return nil, err + return nil, fmt.Errorf("new mongo storage: %w", err) } default: return nil, fmt.Errorf("%s storage is not implemented", stn) diff --git a/ingester.go b/ingester.go index f9337ed..c48165d 100644 --- a/ingester.go +++ b/ingester.go @@ -5,6 +5,12 @@ import ( "time" ) +// Ingester fetches data and uploads it to the Storage +type Ingester interface { + Do() (chan *Entry, error) // returns a channel for consumers + GetDesc() string // simple getter for the source +} + // ingesters is a slice of implemented Ingester structs var ingesters = []Ingester{ NewAlexa(), diff --git a/service/dor-http/dor.go b/service/dor-http/dor.go index a7f4c58..dfe8307 100644 --- a/service/dor-http/dor.go +++ b/service/dor-http/dor.go @@ -16,9 +16,12 @@ func main() { storage = fs.String("storage", "clickhouse", "storage type") location = fs.String("storage-url", "tcp://clickhouse:9000", "url of the storage") listen = fs.String("listen-addr", ":8080", "listen address") + ttl = fs.Int("ttl", 30, "records TTL in days") ) ff.Parse(fs, os.Args[1:], ff.WithEnvVarPrefix("DOR")) + dor.DefaultTTL = *ttl + d, err := dor.New(*storage, *location, false) if err != nil { panic(err)