Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
pantuza authored Dec 12, 2023
2 parents b137fd7 + b81d4ef commit f7a83e2
Show file tree
Hide file tree
Showing 27 changed files with 243 additions and 288 deletions.
25 changes: 25 additions & 0 deletions .chloggen/fix-running-as-service-detection.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: cmd/otelcorecol

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix the code detecting if the collector is running as a service on Windows.

# One or more tracking issues or pull requests related to the change
issues: [7350]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Removed the `NO_WINDOWS_SERVICE` environment variable given it is not needed anymore.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
27 changes: 27 additions & 0 deletions .chloggen/memory-limiter-update-config-validation-errors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processor/memory_limiter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Update config validation errors

# One or more tracking issues or pull requests related to the change
issues: [9059]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
- Fix names of the config fields that are validated in the error messages
- Move the validation from start to the initialization phrase
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
8 changes: 1 addition & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,7 @@ push-tags: $(MULTIMOD)

.PHONY: check-changes
check-changes: $(MULTIMOD)
# NOTE: ! inverses the return code of multimod diff. This is
# because prepare-release expects a 0 if there are diffs and
# non-0 if there are no diffs, which is the inverse of most
# diff tools
! $(MULTIMOD) diff -p $(PREVIOUS_VERSION) -m $(MODSET)
$(MULTIMOD) diff -p $(PREVIOUS_VERSION) -m $(MODSET)

.PHONY: prepare-release
prepare-release:
Expand All @@ -413,8 +409,6 @@ else
endif
# ensure a clean branch
git diff -s --exit-code || (echo "local repository not clean"; exit 1)
# check if any modules have changed since the previous release
$(MAKE) check-changes
# update files with new version
sed -i.bak 's/$(PREVIOUS_VERSION)/$(RELEASE_CANDIDATE)/g' versions.yaml
sed -i.bak 's/$(PREVIOUS_VERSION)/$(RELEASE_CANDIDATE)/g' ./cmd/builder/internal/builder/config.go
Expand Down
1 change: 0 additions & 1 deletion Makefile.Common
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ CHLOGGEN := $(TOOLS_BIN_DIR)/chloggen
CROSSLINK := $(TOOLS_BIN_DIR)/crosslink
ENVSUBST := $(TOOLS_BIN_DIR)/envsubst
GOIMPORTS := $(TOOLS_BIN_DIR)/goimports
GOJSONSCHEMA := $(TOOLS_BIN_DIR)/gojsonschema
GOVULNCHECK := $(TOOLS_BIN_DIR)/govulncheck
LINT := $(TOOLS_BIN_DIR)/golangci-lint
IMPI := $(TOOLS_BIN_DIR)/impi
Expand Down
38 changes: 10 additions & 28 deletions cmd/builder/internal/builder/templates/main_windows.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,23 @@
package main

import (
"errors"
"fmt"
"os"
"golang.org/x/sys/windows"
"golang.org/x/sys/windows/svc"
"go.opentelemetry.io/collector/otelcol"
)

func run(params otelcol.CollectorSettings) error {
if useInteractiveMode, err := checkUseInteractiveMode(); err != nil {
return err
} else if useInteractiveMode {
return runInteractive(params)
} else {
return runService(params)
}
}

func checkUseInteractiveMode() (bool, error) {
// If environment variable NO_WINDOWS_SERVICE is set with any value other
// than 0, use interactive mode instead of running as a service. This should
// be set in case running as a service is not possible or desired even
// though the current session is not detected to be interactive
if value, present := os.LookupEnv("NO_WINDOWS_SERVICE"); present && value != "0" {
return true, nil
}

isInteractiveSession, err := svc.IsAnInteractiveSession()
if err != nil {
return false, fmt.Errorf("failed to determine if we are running in an interactive session: %w", err)
}
return isInteractiveSession, nil
}

func runService(params otelcol.CollectorSettings) error {
// do not need to supply service name when startup is invoked through Service Control Manager directly
// No need to supply service name when startup is invoked through
// the Service Control Manager directly.
if err := svc.Run("", otelcol.NewSvcHandler(params)); err != nil {
if errors.Is(err, windows.ERROR_FAILED_SERVICE_CONTROLLER_CONNECT) {
// Per https://learn.microsoft.com/en-us/windows/win32/api/winsvc/nf-winsvc-startservicectrldispatchera#return-value
// this means that the process is not running as a service, so run interactively.
return runInteractive(params)
}

return fmt.Errorf("failed to start collector server: %w", err)
}

Expand Down
38 changes: 10 additions & 28 deletions cmd/otelcorecol/main_windows.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type MetricsRouter interface {

// A Logs connector acts as an exporter from a logs pipeline and a receiver
// to one or more traces, metrics, or logs pipelines.
// Logs feeds a consumer.Logs, consumer.Metrics, or consumer.Logs with data.
// Logs feeds a consumer.Traces, consumer.Metrics, or consumer.Logs with data.
//
// Examples:
// - Structured logs containing span information could be consumed and emitted as traces.
Expand Down
9 changes: 5 additions & 4 deletions docs/release.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ It is possible that a core approver isn't a contrib approver. In that case, the

1. Update Contrib to use the latest in development version of Core. Run `make update-otel` in Contrib root directory and if it results in any changes submit a draft PR to Contrib. Ensure the CI passes before proceeding. This is to ensure that the latest core does not break contrib in any way. We’ll update it once more to the final release number later.

2. Determine the version number that will be assigned to the release. During the beta phase, we increment the minor version number and set the patch number to 0. In this document, we are using `v0.85.0` as the version to be released, following `v0.84.0`.
Check if stable modules have any changes since the last release by running `make check-changes PREVIOUS_VERSION=v1.0.0-rcv0014 MODSET=stable`. If there are no changes, there is no need to release new version for stable modules.
2. Determine the version number that will be assigned to the release. Usually, we increment the minor version number and set the patch number to 0. In this document, we are using `v0.85.0` as the version to be released, following `v0.84.0`.
Check if stable modules have any changes since the last release by running `make check-changes PREVIOUS_VERSION=v1.0.0 MODSET=stable`. If there are no changes, there is no need to release new version for stable modules.
If there are changes found but .chloggen directory doesn't have any corresponding entries, add missing changelog entries. If the changes are insignificant, consider not releasing a new version for stable modules.

3. Manually run the action [Automation - Prepare Release](https://github.com/open-telemetry/opentelemetry-collector/actions/workflows/prepare-release.yml). This action will create an issue to track the progress of the release and a pull request to update the changelog and version numbers in the repo. **While this PR is open all merging in Core should be haulted**.
- When prompted, enter the version numbers determined in Step 2, but do not include a leading `v`.
Expand All @@ -49,7 +50,7 @@ It is possible that a core approver isn't a contrib approver. In that case, the
7. A new `v0.85.0` release should be automatically created on Github by now. Edit it and use the contents from the CHANGELOG.md and CHANGELOG-API.md as the release's description.

8. Update the draft PR to Contrib created in step 1 to use the newly released Core version and set it to Ready for Review.
- Run `make update-otel OTEL_VERSION=v0.85.0 OTEL_RC_VERSION=v1.0.0-rcv0014`
- Run `make update-otel OTEL_VERSION=v0.85.0 OTEL_STABLE_VERSION=v1.1.0`
- Manually update `cmd/otelcontribcol/builder-config.yaml`
- Manually update `cmd/oteltestbedcol/builder-config.yaml`
- Run `make genotelcontribcol`
Expand Down Expand Up @@ -156,10 +157,10 @@ Once a module is ready to be released under the `1.x` version scheme, file a PR
| Date | Version | Release manager |
|------------|---------|-----------------|
| 2023-12-11 | v0.91.0 | @dmitryax |
| 2024-01-08 | v0.92.0 | @codeboten |
| 2024-01-22 | v0.93.0 | @bogdandrutu |
| 2024-02-05 | v0.94.0 | @Aneurysm9 |
| 2024-02-19 | v0.95.0 | @mx-psi |
| 2024-03-04 | v0.96.0 | @jpkrohling |
| 2024-03-18 | v0.97.0 | @djaglowski |
| 2024-04-01 | v0.98.0 | @dmitryax |
2 changes: 1 addition & 1 deletion docs/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ configuration issue. This could be due to a firewall, DNS, or proxy
issue. Note that the Collector does have
[proxy support](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter#proxy-support).

### Startup failing in Windows Docker containers
### Startup failing in Windows Docker containers (v0.90.1 and earlier)

The process may fail to start in a Windows Docker container with the following
error: `The service process could not connect to the service controller`. In
Expand Down
20 changes: 6 additions & 14 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe

import (
"context"
"sync/atomic"

"go.opentelemetry.io/collector/component"
)
Expand All @@ -17,25 +16,19 @@ import (
// the producer are dropped.
type boundedMemoryQueue[T any] struct {
component.StartFunc
stopped *atomic.Bool
items chan queueRequest[T]
items chan queueRequest[T]
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue[T any](capacity int) Queue[T] {
return &boundedMemoryQueue[T]{
items: make(chan queueRequest[T], capacity),
stopped: &atomic.Bool{},
items: make(chan queueRequest[T], capacity),
}
}

// Offer is used by the producer to submit new item to the queue.
// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
if q.stopped.Load() {
return ErrQueueIsStopped
}

select {
case q.items <- queueRequest[T]{ctx: ctx, req: req}:
return nil
Expand All @@ -46,8 +39,8 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {

// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T)) bool {
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) bool) bool {
item, ok := <-q.items
if !ok {
return false
Expand All @@ -56,9 +49,8 @@ func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T)) bo
return true
}

// Shutdown stops accepting items, and stops all consumers. It blocks until all consumers have stopped.
// Shutdown closes the queue channel to initiate draining of the queue.
func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
q.stopped.Store(true) // disable producer
close(q.items)
return nil
}
Expand Down
24 changes: 11 additions & 13 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ func TestBoundedQueue(t *testing.T) {

consumerState := newConsumerState(t)

consumers := NewQueueConsumers(q, 1, func(_ context.Context, item string) {
consumers := NewQueueConsumers(q, 1, func(_ context.Context, item string) bool {
consumerState.record(item)
<-waitCh
return true
})
assert.NoError(t, consumers.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -74,7 +75,6 @@ func TestBoundedQueue(t *testing.T) {
}

assert.NoError(t, consumers.Shutdown(context.Background()))
assert.ErrorIs(t, q.Offer(context.Background(), "x"), ErrQueueIsStopped)
}

// In this test we run a queue with many items and a slow consumer.
Expand All @@ -89,9 +89,10 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
consumerState := newConsumerState(t)

waitChan := make(chan struct{})
consumers := NewQueueConsumers(q, 5, func(_ context.Context, item string) {
consumers := NewQueueConsumers(q, 5, func(_ context.Context, item string) bool {
<-waitChan
consumerState.record(item)
return true
})
assert.NoError(t, consumers.Start(context.Background(), componenttest.NewNopHost()))

Expand All @@ -106,17 +107,13 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
assert.NoError(t, q.Offer(context.Background(), "i"))
assert.NoError(t, q.Offer(context.Background(), "j"))

// we block the workers and wait for the queue to start rejecting new items to release the lock.
// This ensures that we test that the queue has been called to shutdown while items are still in the queue.
go func() {
require.EventuallyWithT(t, func(c *assert.CollectT) {
// ensure the request is rejected due to closed queue
assert.ErrorIs(t, q.Offer(context.Background(), "x"), ErrQueueIsStopped)
}, 1*time.Second, 10*time.Millisecond)
close(waitChan)
assert.NoError(t, consumers.Shutdown(context.Background()))
}()

assert.NoError(t, consumers.Shutdown(context.Background()))
// wait a bit to ensure shutdown is called and unblock the consumers.
time.Sleep(100 * time.Millisecond)
close(waitChan)

consumerState.assertConsumed(map[string]bool{
"a": true,
Expand Down Expand Up @@ -179,8 +176,9 @@ func queueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int)
b.ReportAllocs()
for i := 0; i < b.N; i++ {
q := NewBoundedMemoryQueue[string](capacity)
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) {
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) bool {
time.Sleep(1 * time.Millisecond)
return true
})
require.NoError(b, consumers.Start(context.Background(), componenttest.NewNopHost()))
for j := 0; j < numberOfItems; j++ {
Expand Down Expand Up @@ -238,7 +236,7 @@ func (s *consumerState) assertConsumed(expected map[string]bool) {
func TestZeroSizeWithConsumers(t *testing.T) {
q := NewBoundedMemoryQueue[string](0)

consumers := NewQueueConsumers(q, 1, func(context.Context, string) {})
consumers := NewQueueConsumers(q, 1, func(context.Context, string) bool { return true })
assert.NoError(t, consumers.Start(context.Background(), componenttest.NewNopHost()))

assert.NoError(t, q.Offer(context.Background(), "a")) // in process
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
type QueueConsumers[T any] struct {
queue Queue[T]
numConsumers int
consumeFunc func(context.Context, T)
consumeFunc func(context.Context, T) bool
stopWG sync.WaitGroup
}

func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T)) *QueueConsumers[T] {
func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) bool) *QueueConsumers[T] {
return &QueueConsumers[T]{
queue: q,
numConsumers: numConsumers,
Expand Down
Loading

0 comments on commit f7a83e2

Please sign in to comment.