Skip to content

Commit

Permalink
Merge branch 'main' into fix/vcenter-tls
Browse files Browse the repository at this point in the history
  • Loading branch information
dehaansa authored Nov 21, 2024
2 parents b7a14fd + 83f1fc6 commit 41f299a
Show file tree
Hide file tree
Showing 34 changed files with 163 additions and 97 deletions.
31 changes: 31 additions & 0 deletions .chloggen/elasticsearchexporter_sync-bulk-indexer-flush-bytes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Respect `flush::bytes` in sync bulk indexer, `flush::bytes` measures uncompressed size, change default `batcher::max_size_items` to `0`

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36163]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
Limit the bulk request size to roughly `flush::bytes` for sync bulk indexer.
Sync bulk indexer is used when `batcher::enabled` is either true or false. In order words, sync bulk indexer is not used when batcher config is undefined.
Change `flush::bytes` to always measure in uncompressed bytes.
Change default `batcher::max_size_items` to `0` as bulk request size limit is now more effectively enforced by `flush::bytes`.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
6 changes: 3 additions & 3 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ exporter/lokiexporter/ @open-telemetry/collector-cont
exporter/mezmoexporter/ @open-telemetry/collector-contrib-approvers @dashpole @billmeyer @gjanco
exporter/opencensusexporter/ @open-telemetry/collector-contrib-approvers @open-telemetry/collector-approvers
exporter/otelarrowexporter/ @open-telemetry/collector-contrib-approvers @jmacd @moh-osman3 @lquerel
exporter/prometheusexporter/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @dashpole
exporter/prometheusremotewriteexporter/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @rapphil @dashpole
exporter/prometheusexporter/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @dashpole @ArthurSens
exporter/prometheusremotewriteexporter/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @rapphil @dashpole @ArthurSens
exporter/pulsarexporter/ @open-telemetry/collector-contrib-approvers @dmitryax @dao-jun
exporter/rabbitmqexporter/ @open-telemetry/collector-contrib-approvers @swar8080 @atoulme
exporter/sapmexporter/ @open-telemetry/collector-contrib-approvers @dmitryax @atoulme
Expand Down Expand Up @@ -161,7 +161,7 @@ pkg/translator/azurelogs/ @open-telemetry/collector-cont
pkg/translator/jaeger/ @open-telemetry/collector-contrib-approvers @open-telemetry/collector-approvers @frzifus
pkg/translator/loki/ @open-telemetry/collector-contrib-approvers @gouthamve @jpkrohling @mar4uk
pkg/translator/opencensus/ @open-telemetry/collector-contrib-approvers @open-telemetry/collector-approvers
pkg/translator/prometheus/ @open-telemetry/collector-contrib-approvers @dashpole @bertysentry
pkg/translator/prometheus/ @open-telemetry/collector-contrib-approvers @dashpole @bertysentry @ArthurSens
pkg/translator/prometheusremotewrite/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @dashpole
pkg/translator/signalfx/ @open-telemetry/collector-contrib-approvers @dmitryax
pkg/translator/skywalking/ @open-telemetry/collector-contrib-approvers @JaredTan95
Expand Down
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ linters:
- exhaustive
- gci
- gocritic
- gofmt
- gofumpt
- goimports
- gosec
- govet
Expand Down
5 changes: 3 additions & 2 deletions Makefile.Common
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ CHECKFILE := $(TOOLS_BIN_DIR)/checkfile
CROSSLINK := $(TOOLS_BIN_DIR)/crosslink
GOJUNIT := $(TOOLS_BIN_DIR)/go-junit-report
BUILDER := $(TOOLS_BIN_DIR)/builder
GOFUMPT := $(TOOLS_BIN_DIR)/gofumpt
GOVULNCHECK := $(TOOLS_BIN_DIR)/govulncheck
GCI := $(TOOLS_BIN_DIR)/gci
GOTESTSUM := $(TOOLS_BIN_DIR)/gotestsum
Expand Down Expand Up @@ -207,8 +208,8 @@ checklinks:
$(MDLINKCHECK) -q -c $(SRC_ROOT)/.github/workflows/check_links_config.json || true

.PHONY: fmt
fmt: $(GOIMPORTS)
gofmt -w -s ./
fmt: $(GOFUMPT) $(GOIMPORTS)
$(GOFUMPT) -l -w .
$(GOIMPORTS) -w -local github.com/open-telemetry/opentelemetry-collector-contrib ./

.PHONY: lint
Expand Down
4 changes: 2 additions & 2 deletions cmd/githubgen/codeowners.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,11 @@ LOOP:
codeowners += fmt.Sprintf("reports/distributions/%s.yaml%s @open-telemetry/collector-contrib-approvers %s\n", dist.Name, strings.Repeat(" ", longestName-len(dist.Name)), strings.Join(maintainers, " "))
}

err = os.WriteFile(filepath.Join(".github", "CODEOWNERS"), []byte(codeowners+unmaintainedCodeowners), 0600)
err = os.WriteFile(filepath.Join(".github", "CODEOWNERS"), []byte(codeowners+unmaintainedCodeowners), 0o600)
if err != nil {
return err
}
err = os.WriteFile(filepath.Join(".github", "ALLOWLIST"), []byte(allowlistHeader+deprecatedList+unmaintainedList), 0600)
err = os.WriteFile(filepath.Join(".github", "ALLOWLIST"), []byte(allowlistHeader+deprecatedList+unmaintainedList), 0o600)
if err != nil {
return err
}
Expand Down
5 changes: 2 additions & 3 deletions cmd/githubgen/distributions.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ import (
"gopkg.in/yaml.v3"
)

type distributionsGenerator struct {
}
type distributionsGenerator struct{}

type distOutput struct {
Name string `yaml:"name"`
Expand Down Expand Up @@ -54,7 +53,7 @@ func (cg distributionsGenerator) generate(data *githubData) error {
if err != nil {
return nil
}
err = os.WriteFile(filepath.Join("reports", "distributions", fmt.Sprintf("%s.yaml", dist.Name)), b, 0600)
err = os.WriteFile(filepath.Join("reports", "distributions", fmt.Sprintf("%s.yaml", dist.Name)), b, 0o600)
if err != nil {
return nil
}
Expand Down
5 changes: 2 additions & 3 deletions cmd/githubgen/issuetemplates.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ func folderToShortName(folder string) string {
return strings.Join(path, "/")
}

type issueTemplatesGenerator struct {
}
type issueTemplatesGenerator struct{}

func (itg issueTemplatesGenerator) generate(data *githubData) error {
keys := map[string]struct{}{}
Expand All @@ -59,7 +58,7 @@ func (itg issueTemplatesGenerator) generate(data *githubData) error {
oldContent := matchOldContent.FindSubmatch(templateContents)
if len(oldContent) > 0 {
templateContents = bytes.ReplaceAll(templateContents, oldContent[0], replacement)
err = os.WriteFile(filepath.Join(issuesFolder, e.Name()), templateContents, 0600)
err = os.WriteFile(filepath.Join(issuesFolder, e.Name()), templateContents, 0o600)
if err != nil {
return err
}
Expand Down
11 changes: 2 additions & 9 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) {
marshalledRemoteConfig, err := proto.Marshal(remoteConfigProto)
require.NoError(t, err)

require.NoError(t, os.WriteFile(remoteConfigFilePath, marshalledRemoteConfig, 0600))
require.NoError(t, os.WriteFile(remoteConfigFilePath, marshalledRemoteConfig, 0o600))

connected := atomic.Bool{}
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{
Expand Down Expand Up @@ -407,7 +407,6 @@ func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) {

return n != 0
}, 10*time.Second, 500*time.Millisecond, "Log never appeared in output")

}

func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) {
Expand Down Expand Up @@ -1077,7 +1076,6 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {

return strings.Contains(loadedConfig, "filelog")
}, 10*time.Second, 500*time.Millisecond, "Collector was not started with the last received remote config")

}

func TestSupervisorPersistsInstanceID(t *testing.T) {
Expand All @@ -1091,7 +1089,6 @@ func TestSupervisorPersistsInstanceID(t *testing.T) {
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {

select {
case agentIDChan <- message.InstanceUid:
default:
Expand Down Expand Up @@ -1168,7 +1165,6 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) {
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {

select {
case agentIDChan <- message.InstanceUid:
default:
Expand Down Expand Up @@ -1359,7 +1355,6 @@ func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) {
} else {
require.ErrorContains(t, err, "No connection could be made")
}

}

type LogEntry struct {
Expand Down Expand Up @@ -1387,7 +1382,7 @@ func TestSupervisorLogging(t *testing.T) {
}
marshalledRemoteCfg, err := proto.Marshal(remoteCfgProto)
require.NoError(t, err)
require.NoError(t, os.WriteFile(remoteCfgFilePath, marshalledRemoteCfg, 0600))
require.NoError(t, os.WriteFile(remoteCfgFilePath, marshalledRemoteCfg, 0o600))

connected := atomic.Bool{}
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{
Expand Down Expand Up @@ -1635,15 +1630,13 @@ func TestSupervisorOpAmpServerPort(t *testing.T) {

func findRandomPort() (int, error) {
l, err := net.Listen("tcp", "localhost:0")

if err != nil {
return 0, err
}

port := l.Addr().(*net.TCPAddr).Port

err = l.Close()

if err != nil {
return 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/opampsupervisor/supervisor/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func TestValidate(t *testing.T) {
tmpDir := t.TempDir()

filePath := filepath.Join(tmpDir, "file")
require.NoError(t, os.WriteFile(filePath, []byte{}, 0600))
require.NoError(t, os.WriteFile(filePath, []byte{}, 0o600))

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/opampsupervisor/supervisor/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (p *persistentState) writeState() error {
return err
}

return os.WriteFile(p.configPath, by, 0600)
return os.WriteFile(p.configPath, by, 0o600)
}

// loadOrCreatePersistentState attempts to load the persistent state from disk. If it doesn't
Expand Down
2 changes: 1 addition & 1 deletion cmd/opampsupervisor/supervisor/persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestCreateOrLoadPersistentState(t *testing.T) {
t.Run("loads state from file if it exists", func(t *testing.T) {
f := filepath.Join(t.TempDir(), "state.yaml")

err := os.WriteFile(f, []byte(`instance_id: "018feed6-905b-7aa6-ba37-b0eec565de03"`), 0600)
err := os.WriteFile(f, []byte(`instance_id: "018feed6-905b-7aa6-ba37-b0eec565de03"`), 0o600)
require.NoError(t, err)

state, err := loadOrCreatePersistentState(f)
Expand Down
27 changes: 9 additions & 18 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, erro
}
s.config = cfg

if err := os.MkdirAll(s.config.Storage.Directory, 0700); err != nil {
if err := os.MkdirAll(s.config.Storage.Directory, 0o700); err != nil {
return nil, fmt.Errorf("error creating storage dir: %w", err)
}

Expand All @@ -203,7 +203,6 @@ func (s *Supervisor) Start() error {
healthCheckPort := s.config.Agent.HealthCheckPort
if healthCheckPort == 0 {
healthCheckPort, err = s.findRandomPort()

if err != nil {
return fmt.Errorf("could not find port for health check: %w", err)
}
Expand Down Expand Up @@ -285,7 +284,7 @@ func (s *Supervisor) getBootstrapInfo() (err error) {
return err
}

err = os.WriteFile(s.agentConfigFilePath(), bootstrapConfig, 0600)
err = os.WriteFile(s.agentConfigFilePath(), bootstrapConfig, 0o600)
if err != nil {
return fmt.Errorf("failed to write agent config: %w", err)
}
Expand Down Expand Up @@ -708,7 +707,7 @@ func (s *Supervisor) composeNoopPipeline() ([]byte, error) {
}

func (s *Supervisor) composeNoopConfig() ([]byte, error) {
var k = koanf.New("::")
k := koanf.New("::")

cfg, err := s.composeNoopPipeline()
if err != nil {
Expand Down Expand Up @@ -821,7 +820,7 @@ func (s *Supervisor) loadAndWriteInitialMergedConfig() error {

// write the initial merged config to disk
cfgState := s.cfgState.Load().(*configState)
if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfgState.mergedConfig), 0600); err != nil {
if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfgState.mergedConfig), 0o600); err != nil {
s.logger.Error("Failed to write agent config.", zap.Error(err))
}

Expand Down Expand Up @@ -861,7 +860,6 @@ func (s *Supervisor) setupOwnMetrics(_ context.Context, settings *protobufs.Tele
s.logger.Debug("Enabling own metrics pipeline in the config")

port, err := s.findRandomPort()

if err != nil {
s.logger.Error("Could not setup own metrics", zap.Error(err))
return
Expand Down Expand Up @@ -896,7 +894,7 @@ func (s *Supervisor) setupOwnMetrics(_ context.Context, settings *protobufs.Tele
// 2) the own metrics config section
// 3) the local override config that is hard-coded in the Supervisor.
func (s *Supervisor) composeMergedConfig(config *protobufs.AgentRemoteConfig) (configChanged bool, err error) {
var k = koanf.New("::")
k := koanf.New("::")

configMapIsEmpty := len(config.GetConfig().GetConfigMap()) == 0

Expand Down Expand Up @@ -924,7 +922,7 @@ func (s *Supervisor) composeMergedConfig(config *protobufs.AgentRemoteConfig) (c
if item == nil {
continue
}
var k2 = koanf.New("::")
k2 := koanf.New("::")
err = k2.Load(rawbytes.Provider(item.Body), yaml.Parser())
if err != nil {
return false, fmt.Errorf("cannot parse config named %s: %w", name, err)
Expand Down Expand Up @@ -1010,7 +1008,6 @@ func (s *Supervisor) startAgent() {
if err != nil {
s.logger.Error("Cannot start the agent", zap.Error(err))
err = s.opampClient.SetHealth(&protobufs.ComponentHealth{Healthy: false, LastError: fmt.Sprintf("Cannot start the agent: %v", err)})

if err != nil {
s.logger.Error("Failed to report OpAMP client health", zap.Error(err))
}
Expand Down Expand Up @@ -1128,7 +1125,6 @@ func (s *Supervisor) runAgentProcess() {
s.commander.Pid(), s.commander.ExitCode(),
)
err := s.opampClient.SetHealth(&protobufs.ComponentHealth{Healthy: false, LastError: errMsg})

if err != nil {
s.logger.Error("Could not report health to OpAMP server", zap.Error(err))
}
Expand Down Expand Up @@ -1173,12 +1169,11 @@ func (s *Supervisor) stopAgentApplyConfig() {
s.logger.Debug("Stopping the agent to apply new config")
cfgState := s.cfgState.Load().(*configState)
err := s.commander.Stop(context.Background())

if err != nil {
s.logger.Error("Could not stop agent process", zap.Error(err))
}

if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfgState.mergedConfig), 0600); err != nil {
if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfgState.mergedConfig), 0o600); err != nil {
s.logger.Error("Failed to write agent config.", zap.Error(err))
}
}
Expand Down Expand Up @@ -1210,13 +1205,11 @@ func (s *Supervisor) Shutdown() {
Healthy: false, LastError: "Supervisor is shutdown",
},
)

if err != nil {
s.logger.Error("Could not report health to OpAMP server", zap.Error(err))
}

err = s.stopOpAMPClient()

if err != nil {
s.logger.Error("Could not stop the OpAMP client", zap.Error(err))
}
Expand All @@ -1233,7 +1226,7 @@ func (s *Supervisor) saveLastReceivedConfig(config *protobufs.AgentRemoteConfig)
return err
}

return os.WriteFile(filepath.Join(s.config.Storage.Directory, lastRecvRemoteConfigFile), cfg, 0600)
return os.WriteFile(filepath.Join(s.config.Storage.Directory, lastRecvRemoteConfigFile), cfg, 0o600)
}

func (s *Supervisor) saveLastReceivedOwnTelemetrySettings(set *protobufs.TelemetryConnectionSettings, filePath string) error {
Expand All @@ -1242,7 +1235,7 @@ func (s *Supervisor) saveLastReceivedOwnTelemetrySettings(set *protobufs.Telemet
return err
}

return os.WriteFile(filepath.Join(s.config.Storage.Directory, filePath), cfg, 0600)
return os.WriteFile(filepath.Join(s.config.Storage.Directory, filePath), cfg, 0o600)
}

func (s *Supervisor) reportConfigStatus(status protobufs.RemoteConfigStatuses, errorMessage string) {
Expand Down Expand Up @@ -1392,15 +1385,13 @@ func (s *Supervisor) getSupervisorOpAMPServerPort() (int, error) {

func (s *Supervisor) findRandomPort() (int, error) {
l, err := net.Listen("tcp", "localhost:0")

if err != nil {
return 0, err
}

port := l.Addr().(*net.TCPAddr).Port

err = l.Close()

if err != nil {
return 0, err
}
Expand Down
Loading

0 comments on commit 41f299a

Please sign in to comment.