Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filelogreceiver]: Add ability to sort by mtime #28850

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/filelogreceiver_sort-by-mtime-rework.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add the ability to order files by mtime, to only read the most recently modified files

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27812]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
8 changes: 7 additions & 1 deletion pkg/stanza/fileconsumer/matcher/internal/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type Option interface {
// Returned error is for explanitory purposes only.
// Returned error is for explanatory purposes only.
// All options will be called regardless of error.
apply([]*item) ([]*item, error)
}
Expand Down Expand Up @@ -49,6 +49,12 @@ type item struct {
}

func newItem(value string, regex *regexp.Regexp) (*item, error) {
if regex == nil {
return &item{
value: value,
}, nil
}

match := regex.FindStringSubmatch(value)
if match == nil {
return nil, fmt.Errorf("'%s' does not match regex", value)
Expand Down
58 changes: 51 additions & 7 deletions pkg/stanza/fileconsumer/matcher/internal/filter/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package filter // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"fmt"
"os"
"sort"
"strconv"
"time"
Expand All @@ -18,24 +19,24 @@ type parseFunc func(string) (any, error)

type compareFunc func(a, b any) bool

type sortOption struct {
type regexSortOption struct {
regexKey string
parseFunc
compareFunc
}

func newSortOption(regexKey string, parseFunc parseFunc, compareFunc compareFunc) (Option, error) {
func newRegexSortOption(regexKey string, parseFunc parseFunc, compareFunc compareFunc) (Option, error) {
if regexKey == "" {
return nil, fmt.Errorf("regex key must be specified")
}
return sortOption{
return regexSortOption{
regexKey: regexKey,
parseFunc: parseFunc,
compareFunc: compareFunc,
}, nil
}

func (o sortOption) apply(items []*item) ([]*item, error) {
func (o regexSortOption) apply(items []*item) ([]*item, error) {
// Special case where sort.Slice will not run the 'less' func.
// We still need to ensure it parses in order to ensure the file should be included.
if len(items) == 1 {
Expand Down Expand Up @@ -80,7 +81,7 @@ func (o sortOption) apply(items []*item) ([]*item, error) {
}

func SortNumeric(regexKey string, ascending bool) (Option, error) {
return newSortOption(regexKey,
return newRegexSortOption(regexKey,
func(s string) (any, error) {
return strconv.Atoi(s)
},
Expand All @@ -94,7 +95,7 @@ func SortNumeric(regexKey string, ascending bool) (Option, error) {
}

func SortAlphabetical(regexKey string, ascending bool) (Option, error) {
return newSortOption(regexKey,
return newRegexSortOption(regexKey,
func(s string) (any, error) {
return s, nil
},
Expand All @@ -118,7 +119,7 @@ func SortTemporal(regexKey string, ascending bool, layout string, location strin
if err != nil {
return nil, fmt.Errorf("load location %s: %w", loc, err)
}
return newSortOption(regexKey,
return newRegexSortOption(regexKey,
func(s string) (any, error) {
return timeutils.ParseStrptime(layout, s, loc)
},
Expand All @@ -130,3 +131,46 @@ func SortTemporal(regexKey string, ascending bool, layout string, location strin
},
)
}

type mtimeSortOption struct{}

type mtimeItem struct {
mtime time.Time
path string
item *item
}

func (m mtimeSortOption) apply(items []*item) ([]*item, error) {
mtimeItems := make([]mtimeItem, 0, len(items))
var errs error
for _, item := range items {
path := item.value
fi, err := os.Stat(path)
if err != nil {
errs = multierr.Append(errs, err)
continue
}

mtimeItems = append(mtimeItems, mtimeItem{
mtime: fi.ModTime(),
path: path,
item: item,
})
}

sort.SliceStable(mtimeItems, func(i, j int) bool {
// This checks if item i > j, in order to reverse the sort (most recently modified file is first in the list)
return mtimeItems[i].mtime.After(mtimeItems[j].mtime)
})

filteredValues := make([]*item, 0, len(items))
for _, mtimeItem := range mtimeItems {
filteredValues = append(filteredValues, mtimeItem.item)
}

return filteredValues, errs
}

func SortMtime() Option {
return mtimeSortOption{}
}
71 changes: 71 additions & 0 deletions pkg/stanza/fileconsumer/matcher/internal/filter/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ package filter

import (
"fmt"
"os"
"path/filepath"
"regexp"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -169,3 +172,71 @@ func TestSort(t *testing.T) {
}
}
}

func TestMTimeFilter(t *testing.T) {
epoch := time.Unix(0, 0)
cases := []struct {
name string
files []string
fileMTimes []time.Time
expectedErr string
expect []string
}{
{
name: "No files",
files: []string{},
fileMTimes: []time.Time{},
expect: []string{},
},
{
name: "Single file",
files: []string{"a.log"},
fileMTimes: []time.Time{epoch},
expect: []string{"a.log"},
},
{
name: "Multiple files",
files: []string{"a.log", "b.log"},
fileMTimes: []time.Time{epoch, epoch.Add(time.Hour)},
expect: []string{"b.log", "a.log"},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
tmpDir := t.TempDir()
items := []*item{}
// Create files with specified mtime
for i, file := range tc.files {
mtime := tc.fileMTimes[i]
fullPath := filepath.Join(tmpDir, file)

f, err := os.Create(fullPath)
require.NoError(t, err)
require.NoError(t, f.Close())
require.NoError(t, os.Chtimes(fullPath, epoch, mtime))

it, err := newItem(fullPath, nil)
require.NoError(t, err)

items = append(items, it)
}

f := SortMtime()
result, err := f.apply(items)
if tc.expectedErr != "" {
require.EqualError(t, err, tc.expectedErr)
} else {
require.NoError(t, err)
}

relativeResult := []string{}
for _, r := range result {
rel, err := filepath.Rel(tmpDir, r.value)
require.NoError(t, err)
relativeResult = append(relativeResult, rel)
}

require.Equal(t, tc.expect, relativeResult)
})
}
}
44 changes: 37 additions & 7 deletions pkg/stanza/fileconsumer/matcher/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"regexp"

"go.opentelemetry.io/collector/featuregate"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher/internal/filter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher/internal/finder"
)
Expand All @@ -16,12 +18,20 @@ const (
sortTypeNumeric = "numeric"
sortTypeTimestamp = "timestamp"
sortTypeAlphabetical = "alphabetical"
sortTypeMtime = "mtime"
)

const (
defaultOrderingCriteriaTopN = 1
)

var mtimeSortTypeFeatureGate = featuregate.GlobalRegistry().MustRegister(
"filelog.mtimeSortType",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled, allows usage of `ordering_criteria.mode` = `mtime`."),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27812"),
)

type Criteria struct {
Include []string `mapstructure:"include,omitempty"`
Exclude []string `mapstructure:"exclude,omitempty"`
Expand Down Expand Up @@ -63,10 +73,6 @@ func New(c Criteria) (*Matcher, error) {
}, nil
}

if c.OrderingCriteria.Regex == "" {
return nil, fmt.Errorf("'regex' must be specified when 'sort_by' is specified")
}

if c.OrderingCriteria.TopN < 0 {
return nil, fmt.Errorf("'top_n' must be a positive integer")
}
Expand All @@ -75,9 +81,17 @@ func New(c Criteria) (*Matcher, error) {
c.OrderingCriteria.TopN = defaultOrderingCriteriaTopN
}

regex, err := regexp.Compile(c.OrderingCriteria.Regex)
if err != nil {
return nil, fmt.Errorf("compile regex: %w", err)
var regex *regexp.Regexp
if orderingCriteriaNeedsRegex(c.OrderingCriteria.SortBy) {
if c.OrderingCriteria.Regex == "" {
return nil, fmt.Errorf("'regex' must be specified when 'sort_by' is specified")
}

var err error
regex, err = regexp.Compile(c.OrderingCriteria.Regex)
if err != nil {
return nil, fmt.Errorf("compile regex: %w", err)
}
}

var filterOpts []filter.Option
Expand All @@ -101,6 +115,11 @@ func New(c Criteria) (*Matcher, error) {
return nil, fmt.Errorf("timestamp sort: %w", err)
}
filterOpts = append(filterOpts, f)
case sortTypeMtime:
if !mtimeSortTypeFeatureGate.IsEnabled() {
return nil, fmt.Errorf("the %q feature gate must be enabled to use %q sort type", mtimeSortTypeFeatureGate.ID(), sortTypeMtime)
}
filterOpts = append(filterOpts, filter.SortMtime())
default:
return nil, fmt.Errorf("'sort_type' must be specified")
}
Expand All @@ -115,6 +134,17 @@ func New(c Criteria) (*Matcher, error) {
}, nil
}

// orderingCriteriaNeedsRegex returns true if any of the sort options require a regex to be set.
func orderingCriteriaNeedsRegex(sorts []Sort) bool {
for _, s := range sorts {
switch s.SortType {
case sortTypeNumeric, sortTypeAlphabetical, sortTypeTimestamp:
return true
}
}
return false
}

type Matcher struct {
include []string
exclude []string
Expand Down
Loading