Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race in log #16490

Merged
merged 7 commits into from
Jul 20, 2021
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 30 additions & 26 deletions modules/log/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ type MultiChannelledLog struct {
name string
bufferLength int64
queue chan *Event
mutex sync.Mutex
rwmutex sync.RWMutex
loggers map[string]EventLogger
flush chan bool
close chan bool
Expand Down Expand Up @@ -173,10 +173,10 @@ func NewMultiChannelledLog(name string, bufferLength int64) *MultiChannelledLog

// AddLogger adds a logger to this MultiChannelledLog
func (m *MultiChannelledLog) AddLogger(logger EventLogger) error {
m.mutex.Lock()
m.rwmutex.Lock()
name := logger.GetName()
if _, has := m.loggers[name]; has {
m.mutex.Unlock()
m.rwmutex.Unlock()
return ErrDuplicateName{name}
}
m.loggers[name] = logger
Expand All @@ -186,7 +186,7 @@ func (m *MultiChannelledLog) AddLogger(logger EventLogger) error {
if logger.GetStacktraceLevel() < m.stacktraceLevel {
m.stacktraceLevel = logger.GetStacktraceLevel()
}
m.mutex.Unlock()
m.rwmutex.Unlock()
go m.Start()
return nil
}
Expand All @@ -195,31 +195,31 @@ func (m *MultiChannelledLog) AddLogger(logger EventLogger) error {
// NB: If you delete the last sublogger this logger will simply drop
// log events
func (m *MultiChannelledLog) DelLogger(name string) bool {
m.mutex.Lock()
m.rwmutex.Lock()
logger, has := m.loggers[name]
if !has {
m.mutex.Unlock()
m.rwmutex.Unlock()
return false
}
delete(m.loggers, name)
m.internalResetLevel()
m.mutex.Unlock()
m.rwmutex.Unlock()
logger.Flush()
logger.Close()
return true
}

// GetEventLogger returns a sub logger from this MultiChannelledLog
func (m *MultiChannelledLog) GetEventLogger(name string) EventLogger {
m.mutex.Lock()
defer m.mutex.Unlock()
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
return m.loggers[name]
}

// GetEventLoggerNames returns a list of names
func (m *MultiChannelledLog) GetEventLoggerNames() []string {
m.mutex.Lock()
defer m.mutex.Unlock()
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
var keys []string
for k := range m.loggers {
keys = append(keys, k)
Expand All @@ -228,12 +228,12 @@ func (m *MultiChannelledLog) GetEventLoggerNames() []string {
}

func (m *MultiChannelledLog) closeLoggers() {
m.mutex.Lock()
m.rwmutex.Lock()
for _, logger := range m.loggers {
logger.Flush()
logger.Close()
}
m.mutex.Unlock()
m.rwmutex.Unlock()
m.closed <- true
}

Expand All @@ -249,8 +249,8 @@ func (m *MultiChannelledLog) Resume() {

// ReleaseReopen causes this logger to tell its subloggers to release and reopen
func (m *MultiChannelledLog) ReleaseReopen() error {
m.mutex.Lock()
defer m.mutex.Unlock()
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
var accumulatedErr error
for _, logger := range m.loggers {
if err := logger.ReleaseReopen(); err != nil {
Expand All @@ -266,13 +266,13 @@ func (m *MultiChannelledLog) ReleaseReopen() error {

// Start processing the MultiChannelledLog
func (m *MultiChannelledLog) Start() {
m.mutex.Lock()
m.rwmutex.Lock()
if m.started {
m.mutex.Unlock()
m.rwmutex.Unlock()
return
}
m.started = true
m.mutex.Unlock()
m.rwmutex.Unlock()
paused := false
for {
if paused {
Expand All @@ -286,11 +286,11 @@ func (m *MultiChannelledLog) Start() {
m.closeLoggers()
return
}
m.mutex.Lock()
m.rwmutex.RLock()
for _, logger := range m.loggers {
logger.Flush()
}
m.mutex.Unlock()
m.rwmutex.RUnlock()
case <-m.close:
m.closeLoggers()
return
Expand All @@ -307,24 +307,24 @@ func (m *MultiChannelledLog) Start() {
m.closeLoggers()
return
}
m.mutex.Lock()
m.rwmutex.RLock()
for _, logger := range m.loggers {
err := logger.LogEvent(event)
if err != nil {
fmt.Println(err)
}
}
m.mutex.Unlock()
m.rwmutex.RUnlock()
case _, ok := <-m.flush:
if !ok {
m.closeLoggers()
return
}
m.mutex.Lock()
m.rwmutex.RLock()
for _, logger := range m.loggers {
logger.Flush()
}
m.mutex.Unlock()
m.rwmutex.RUnlock()
case <-m.close:
m.closeLoggers()
return
Expand Down Expand Up @@ -359,11 +359,15 @@ func (m *MultiChannelledLog) Flush() {

// GetLevel gets the level of this MultiChannelledLog
func (m *MultiChannelledLog) GetLevel() Level {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
return m.level
}

// GetStacktraceLevel gets the level of this MultiChannelledLog
func (m *MultiChannelledLog) GetStacktraceLevel() Level {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
return m.stacktraceLevel
}

Expand All @@ -384,8 +388,8 @@ func (m *MultiChannelledLog) internalResetLevel() Level {

// ResetLevel will reset the level of this MultiChannelledLog
func (m *MultiChannelledLog) ResetLevel() Level {
m.mutex.Lock()
defer m.mutex.Unlock()
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
return m.internalResetLevel()
}

Expand Down