Skip to content

Commit

Permalink
Merge pull request #738 from redHJ/diskSize
Browse files Browse the repository at this point in the history
1. 增加选项:磁盘使用总大小限制和写入磁盘的单条数据大小限制 2. 调整reader顺序
  • Loading branch information
wonderflow authored Sep 4, 2018
2 parents bfd0e08 + 3f9cc1c commit 28be2d7
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 19 deletions.
21 changes: 21 additions & 0 deletions conf/map_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
StringType = "string"
IntType = "int"
Int64Type = "int64"
Int32Type = "int32"
BoolType = "bool"
StringListType = "[]string"
AliasMapType = "[string string, string]"
Expand Down Expand Up @@ -80,6 +81,26 @@ func (conf MapConf) GetInt(key string) (int, error) {
return v, nil
}

func (conf MapConf) GetInt32Or(key string, deft int32) (int32, error) {
ret, err := conf.GetInt32(key)
if err != nil {
return deft, err
}
return ret, nil
}

func (conf MapConf) GetInt32(key string) (int32, error) {
value, exist := conf[key]
if !exist {
return 0, ErrConfMissingKey(key, Int32Type)
}
v, err := strconv.ParseInt(value, 10, 32)
if err != nil {
return 0, ErrConfKeyType(key, Int32Type)
}
return int32(v), nil
}

func (conf MapConf) GetInt64Or(key string, deft int64) (int64, error) {
ret, err := conf.GetInt64(key)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion reader/rest_reader_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ const (
var (
ModeUsages = KeyValueSlice{
{ModeFileAuto, "从文件读取( fileauto 模式)", ""},
{ModeDir, "从文件读取( dir 模式)", ""},
{ModeFile, "从文件读取( file 模式)", ""},
{ModeTailx, "从文件读取( tailx 模式)", ""},
{ModeDir, "从文件读取( dir 模式)", ""},
{ModeDirx, "从文件读取( dirx 模式)", ""},
{ModeMySQL, "从 MySQL 读取", ""},
{ModeMSSQL, "从 MSSQL 读取", ""},
Expand Down
46 changes: 28 additions & 18 deletions sender/fault_tolerant.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
mb = 1024 * 1024 // 1MB
defaultWriteLimit = 10 // 默认写速限制为10MB
maxBytesPerFile = 100 * mb
maxDiskUsedBytes = 32 * GB
qNameSuffix = "_local_save"
directSuffix = "_direct"
defaultMaxProcs = 1 // 默认没有并发
Expand Down Expand Up @@ -64,6 +65,8 @@ type FtOption struct {
longDataDiscard bool
innerSenderType string
pandoraSenderType string
maxDiskUsedBytes int64
maxSizePerFile int32
}

type datasContext struct {
Expand All @@ -88,6 +91,8 @@ func NewFtSender(innerSender Sender, conf conf.MapConf, ftSaveLogPath string) (*
}
procs, _ := conf.GetIntOr(KeyFtProcs, defaultMaxProcs)
runnerName, _ := conf.GetStringOr(KeyRunnerName, UnderfinedRunnerName)
maxDiskUsedBytes, _ := conf.GetInt64Or(KeyMaxDiskUsedBytes, maxDiskUsedBytes)
maxSizePerFile, _ := conf.GetInt32Or(KeyMaxSizePerFile, maxBytesPerFile)

opt := &FtOption{
saveLogPath: logPath,
Expand All @@ -100,6 +105,8 @@ func NewFtSender(innerSender Sender, conf conf.MapConf, ftSaveLogPath string) (*
longDataDiscard: longDataDiscard,
innerSenderType: senderType,
pandoraSenderType: pandoraSendType,
maxDiskUsedBytes: maxDiskUsedBytes,
maxSizePerFile: maxSizePerFile,
}

return newFtSender(innerSender, runnerName, opt)
Expand All @@ -115,38 +122,41 @@ func newFtSender(innerSender Sender, runnerName string, opt *FtOption) (*FtSende
lq = queue.NewDirectQueue("stream" + directSuffix)
} else if !opt.memoryChannel {
lq = queue.NewDiskQueue(queue.NewDiskQueueOptions{
Name: "stream" + qNameSuffix,
DataPath: opt.saveLogPath,
MaxBytesPerFile: maxBytesPerFile,
MaxMsgSize: maxBytesPerFile,
SyncEveryWrite: opt.syncEvery,
SyncEveryRead: opt.syncEvery,
SyncTimeout: 2 * time.Second,
WriteRateLimit: opt.writeLimit * mb,
Name: "stream" + qNameSuffix,
DataPath: opt.saveLogPath,
MaxBytesPerFile: int64(opt.maxSizePerFile),
MaxMsgSize: opt.maxSizePerFile,
SyncEveryWrite: opt.syncEvery,
SyncEveryRead: opt.syncEvery,
SyncTimeout: 2 * time.Second,
WriteRateLimit: opt.writeLimit * mb,
MaxDiskUsedBytes: opt.maxDiskUsedBytes,
})
} else {
lq = queue.NewDiskQueue(queue.NewDiskQueueOptions{
Name: "stream" + qNameSuffix,
DataPath: opt.saveLogPath,
MaxBytesPerFile: maxBytesPerFile,
MaxMsgSize: maxBytesPerFile,
MaxBytesPerFile: int64(opt.maxSizePerFile),
MaxMsgSize: opt.maxSizePerFile,
SyncEveryWrite: opt.syncEvery,
SyncEveryRead: opt.syncEvery,
SyncTimeout: 2 * time.Second,
WriteRateLimit: opt.writeLimit * mb,
EnableMemoryQueue: true,
MemoryQueueSize: int64(opt.memoryChannelSize),
MaxDiskUsedBytes: opt.maxDiskUsedBytes,
})
}
bq = queue.NewDiskQueue(queue.NewDiskQueueOptions{
Name: "backup" + qNameSuffix,
DataPath: opt.saveLogPath,
MaxBytesPerFile: maxBytesPerFile,
MaxMsgSize: maxBytesPerFile,
SyncEveryWrite: opt.syncEvery,
SyncEveryRead: opt.syncEvery,
SyncTimeout: 2 * time.Second,
WriteRateLimit: opt.writeLimit * mb,
Name: "backup" + qNameSuffix,
DataPath: opt.saveLogPath,
MaxBytesPerFile: int64(opt.maxSizePerFile),
MaxMsgSize: opt.maxSizePerFile,
SyncEveryWrite: opt.syncEvery,
SyncEveryRead: opt.syncEvery,
SyncTimeout: 2 * time.Second,
WriteRateLimit: opt.writeLimit * mb,
MaxDiskUsedBytes: opt.maxDiskUsedBytes,
})
ftSender := FtSender{
exitChan: make(chan struct{}),
Expand Down
32 changes: 32 additions & 0 deletions sender/rest_senders_models.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package sender

import (
"strconv"

. "github.com/qiniu/logkit/utils/models"
"github.com/qiniu/pandora-go-sdk/base/config"
)
Expand Down Expand Up @@ -90,6 +92,24 @@ var (
Advance: true,
ToolTip: `丢弃大于2M的数据`,
}
OptionMaxDiskUsedBytes = Option{
KeyName: KeyMaxDiskUsedBytes,
ChooseOnly: false,
Default: strconv.Itoa(maxDiskUsedBytes),
DefaultNoUse: false,
Description: "磁盘使用总大小限制(max_disk_used_bytes)",
Advance: true,
ToolTip: `磁盘使用总大小限制`,
}
OptionMaxSizePerSize = Option{
KeyName: KeyMaxSizePerFile,
ChooseOnly: false,
Default: strconv.Itoa(maxBytesPerFile),
DefaultNoUse: false,
Description: "磁盘队列单个文件最大字节(max_size_per_file)",
Advance: true,
ToolTip: `磁盘队列单个文件最大字节, 也是单个消息的最大字节。最大不超过2GB`,
}
OptionLogkitSendTime = Option{
KeyName: KeyLogkitSendTime,
Element: Radio,
Expand Down Expand Up @@ -473,6 +493,8 @@ var ModeKeyOptions = map[string][]Option{
OptionFtMemoryChannel,
OptionFtMemoryChannelSize,
OptionKeyFtLongDataDiscard,
OptionMaxDiskUsedBytes,
OptionMaxSizePerSize,
{
KeyName: KeyForceMicrosecond,
Element: Radio,
Expand Down Expand Up @@ -604,6 +626,8 @@ var ModeKeyOptions = map[string][]Option{
OptionFtMemoryChannel,
OptionFtMemoryChannelSize,
OptionKeyFtLongDataDiscard,
OptionMaxDiskUsedBytes,
OptionMaxSizePerSize,
},
TypeInfluxdb: {
{
Expand Down Expand Up @@ -708,6 +732,8 @@ var ModeKeyOptions = map[string][]Option{
OptionFtMemoryChannel,
OptionFtMemoryChannelSize,
OptionKeyFtLongDataDiscard,
OptionMaxDiskUsedBytes,
OptionMaxSizePerSize,
},
TypeDiscard: {},
TypeElastic: {
Expand Down Expand Up @@ -771,6 +797,8 @@ var ModeKeyOptions = map[string][]Option{
OptionFtMemoryChannel,
OptionFtMemoryChannelSize,
OptionKeyFtLongDataDiscard,
OptionMaxDiskUsedBytes,
OptionMaxSizePerSize,
},
TypeKafka: {
{
Expand Down Expand Up @@ -839,6 +867,8 @@ var ModeKeyOptions = map[string][]Option{
OptionFtMemoryChannel,
OptionFtMemoryChannelSize,
OptionKeyFtLongDataDiscard,
OptionMaxDiskUsedBytes,
OptionMaxSizePerSize,
},
TypeHttp: {
{
Expand Down Expand Up @@ -882,5 +912,7 @@ var ModeKeyOptions = map[string][]Option{
OptionFtMemoryChannel,
OptionFtMemoryChannelSize,
OptionKeyFtLongDataDiscard,
OptionMaxDiskUsedBytes,
OptionMaxSizePerSize,
},
}
4 changes: 4 additions & 0 deletions sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ const (
KeyFtMemoryChannelSize = "ft_memory_channel_size"
KeyFtLongDataDiscard = "ft_long_data_discard"

// queue
KeyMaxDiskUsedBytes = "max_disk_used_bytes"
KeyMaxSizePerFile = "max_size_per_file"

// ft 策略
// KeyFtStrategyBackupOnly 只在失败的时候进行容错
KeyFtStrategyBackupOnly = "backup_only"
Expand Down

0 comments on commit 28be2d7

Please sign in to comment.