Skip to content
This repository has been archived by the owner on Jan 23, 2022. It is now read-only.

Commit

Permalink
Add TTL for clickhouse storage
Browse files Browse the repository at this point in the history
Could be changed only by global variable dor.DefaultTTL. Will change on
refactor. Closes #16.
  • Loading branch information
ilyaglow committed Sep 7, 2019
1 parent 31a1610 commit d045812
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 17 deletions.
19 changes: 9 additions & 10 deletions clickhouse_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ const (
rawdata String,
source String,
date Date DEFAULT today()
) ENGINE = MergeTree
PARTITION BY toYYYYMM(date)
ORDER BY (domain, date)
) ENGINE = MergeTree(date, (domain, source), 8192)
TTL date + toIntervalDay(%d)
`
insertStmt = `
INSERT INTO %s (rank, domain, rawdata, source) VALUES (?, ?, ?, ?)
Expand All @@ -36,9 +35,9 @@ type ClickhouseStorage struct {

// NewClickhouseStorage bootstraps ClickhouseStorage.
func NewClickhouseStorage(location, table string, batch int) (*ClickhouseStorage, error) {
db, err := prepareDB(location, table)
db, err := prepareDB(location, table, DefaultTTL)
if err != nil {
return nil, err
return nil, fmt.Errorf("prepare: %w", err)
}

return &ClickhouseStorage{
Expand All @@ -48,19 +47,19 @@ func NewClickhouseStorage(location, table string, batch int) (*ClickhouseStorage
}, nil
}

func prepareDB(location, table string) (*sql.DB, error) {
func prepareDB(location, table string, ttl int) (*sql.DB, error) {
conn, err := sql.Open("clickhouse", location)
if err != nil {
return nil, err
return nil, fmt.Errorf("sql.Open: %w", err)
}

if err := conn.Ping(); err != nil {
return nil, err
return nil, fmt.Errorf("ping: %w", err)
}

_, err = conn.Exec(fmt.Sprintf(createStmt, table))
_, err = conn.Exec(fmt.Sprintf(createStmt, table, ttl))
if err != nil {
return conn, err
return conn, fmt.Errorf("create table: %w", err)
}

return conn, nil
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@ services:
clickhouse:
restart: always
image: yandex/clickhouse-server:latest
environment:
TZ: UTC
volumes:
- ./storage:/var/lib/clickhouse
11 changes: 4 additions & 7 deletions dor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@ const (
tblName = "ranks"
)

// Ingester fetches data and uploads it to the Storage
type Ingester interface {
Do() (chan *Entry, error) // returns a channel for consumers
GetDesc() string // simple getter for the source
}
// DefaultTTL for records in days.
var DefaultTTL = 30

// Storage represents an interface to store and query ranks.
type Storage interface {
Expand Down Expand Up @@ -60,14 +57,14 @@ func New(stn string, stl string, keep bool) (*App, error) {
case "clickhouse":
s, err = NewClickhouseStorage(stl, tblName, batchSize)
if err != nil {
return nil, err
return nil, fmt.Errorf("new clickhouse storage: %w", err)
}
case "memory":
s = &MemoryStorage{make(map[string]*memoryCollection)}
case "mongodb":
s, err = NewMongoStorage(stl, "dor", tblName, batchSize, 5, keep)
if err != nil {
return nil, err
return nil, fmt.Errorf("new mongo storage: %w", err)
}
default:
return nil, fmt.Errorf("%s storage is not implemented", stn)
Expand Down
6 changes: 6 additions & 0 deletions ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import (
"time"
)

// Ingester fetches data and uploads it to the Storage
type Ingester interface {
Do() (chan *Entry, error) // returns a channel for consumers
GetDesc() string // simple getter for the source
}

// ingesters is a slice of implemented Ingester structs
var ingesters = []Ingester{
NewAlexa(),
Expand Down
3 changes: 3 additions & 0 deletions service/dor-http/dor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ func main() {
storage = fs.String("storage", "clickhouse", "storage type")
location = fs.String("storage-url", "tcp://clickhouse:9000", "url of the storage")
listen = fs.String("listen-addr", ":8080", "listen address")
ttl = fs.Int("ttl", 30, "records TTL in days")
)
ff.Parse(fs, os.Args[1:], ff.WithEnvVarPrefix("DOR"))

dor.DefaultTTL = *ttl

d, err := dor.New(*storage, *location, false)
if err != nil {
panic(err)
Expand Down

0 comments on commit d045812

Please sign in to comment.