-
Notifications
You must be signed in to change notification settings - Fork 248
/
tracer_provider.rb
175 lines (160 loc) · 7.65 KB
/
tracer_provider.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# frozen_string_literal: true
# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0
module OpenTelemetry
module SDK
module Trace
# {TracerProvider} is the SDK implementation of {OpenTelemetry::Trace::TracerProvider}.
class TracerProvider < OpenTelemetry::Trace::TracerProvider # rubocop:disable Metrics/ClassLength
Key = Struct.new(:name, :version)
private_constant(:Key)
attr_accessor :span_limits, :id_generator, :sampler
attr_reader :resource
# Returns a new {TracerProvider} instance.
#
# @param [optional Sampler] sampler The sampling policy for new spans
# @param [optional Resource] resource The resource to associate with spans
# created by Tracers created by this TracerProvider
# @param [optional IDGenerator] id_generator The trace and span ID generation
# policy
# @param [optional SpanLimits] span_limits The limits to apply to attribute,
# event and link counts for Spans created by Tracers created by this
# TracerProvider
#
# @return [TracerProvider]
def initialize(sampler: sampler_from_environment(Samplers.parent_based(root: Samplers::ALWAYS_ON)),
resource: OpenTelemetry::SDK::Resources::Resource.create,
id_generator: OpenTelemetry::Trace,
span_limits: SpanLimits::DEFAULT)
@mutex = Mutex.new
@registry = {}
@registry_mutex = Mutex.new
@span_processors = []
@span_limits = span_limits
@sampler = sampler
@id_generator = id_generator
@stopped = false
@resource = resource
end
# Returns a {Tracer} instance.
#
# @param [optional String] name Instrumentation package name
# @param [optional String] version Instrumentation package version
#
# @return [Tracer]
def tracer(name = nil, version = nil)
name ||= ''
version ||= ''
OpenTelemetry.logger.warn 'calling TracerProvider#tracer without providing a tracer name.' if name.empty?
@registry_mutex.synchronize { @registry[Key.new(name, version)] ||= Tracer.new(name, version, self) }
end
# Attempts to stop all the activity for this {TracerProvider}. Calls
# SpanProcessor#shutdown for all registered SpanProcessors.
#
# This operation may block until all the Spans are processed. Must be
# called before turning off the main application to ensure all data are
# processed and exported.
#
# After this is called all the newly created {Span}s will be no-op.
#
# @param [optional Numeric] timeout An optional timeout in seconds.
# @return [Integer] Export::SUCCESS if no error occurred, Export::FAILURE if
# a non-specific failure occurred, Export::TIMEOUT if a timeout occurred.
def shutdown(timeout: nil)
@mutex.synchronize do
if @stopped
OpenTelemetry.logger.warn('calling Tracer#shutdown multiple times.')
return Export::FAILURE
end
start_time = OpenTelemetry::Common::Utilities.timeout_timestamp
results = @span_processors.map do |processor|
remaining_timeout = OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)
break [Export::TIMEOUT] if remaining_timeout&.zero?
processor.shutdown(timeout: remaining_timeout)
end
@stopped = true
results.max || Export::SUCCESS
end
end
# Immediately export all spans that have not yet been exported for all the
# registered SpanProcessors.
#
# This method should only be called in cases where it is absolutely
# necessary, such as when using some FaaS providers that may suspend
# the process after an invocation, but before the `Processor` exports
# the completed spans.
#
# @param [optional Numeric] timeout An optional timeout in seconds.
# @return [Integer] Export::SUCCESS if no error occurred, Export::FAILURE if
# a non-specific failure occurred, Export::TIMEOUT if a timeout occurred.
def force_flush(timeout: nil)
@mutex.synchronize do
return Export::SUCCESS if @stopped
start_time = OpenTelemetry::Common::Utilities.timeout_timestamp
results = @span_processors.map do |processor|
remaining_timeout = OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)
return Export::TIMEOUT if remaining_timeout&.zero?
processor.force_flush(timeout: remaining_timeout)
end
results.max || Export::SUCCESS
end
end
# Adds a new SpanProcessor to this {Tracer}.
#
# @param span_processor the new SpanProcessor to be added.
def add_span_processor(span_processor)
@mutex.synchronize do
if @stopped
OpenTelemetry.logger.warn('calling Tracer#add_span_processor after shutdown.')
return
end
@span_processors = @span_processors.dup.push(span_processor)
end
end
# @api private
def internal_create_span(name, kind, trace_id, parent_span_id, attributes, links, start_timestamp, parent_context, instrumentation_library) # rubocop:disable Metrics/MethodLength
trace_id ||= @id_generator.generate_trace_id
result = @sampler.should_sample?(trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes)
span_id = @id_generator.generate_span_id
if result.recording? && !@stopped
trace_flags = result.sampled? ? OpenTelemetry::Trace::TraceFlags::SAMPLED : OpenTelemetry::Trace::TraceFlags::DEFAULT
context = OpenTelemetry::Trace::SpanContext.new(trace_id: trace_id, span_id: span_id, trace_flags: trace_flags, tracestate: result.tracestate)
attributes = attributes&.merge(result.attributes) || result.attributes
Span.new(
context,
parent_context,
name,
kind,
parent_span_id,
@span_limits,
@span_processors,
attributes,
links,
start_timestamp,
@resource,
instrumentation_library
)
else
OpenTelemetry::Trace.non_recording_span(OpenTelemetry::Trace::SpanContext.new(trace_id: trace_id, span_id: span_id, tracestate: result.tracestate))
end
end
private
def sampler_from_environment(default_sampler) # rubocop:disable Metrics/CyclomaticComplexity
case ENV['OTEL_TRACES_SAMPLER']
when 'always_on' then Samplers::ALWAYS_ON
when 'always_off' then Samplers::ALWAYS_OFF
when 'traceidratio' then Samplers.trace_id_ratio_based(Float(ENV.fetch('OTEL_TRACES_SAMPLER_ARG', 1.0)))
when 'parentbased_always_on' then Samplers.parent_based(root: Samplers::ALWAYS_ON)
when 'parentbased_always_off' then Samplers.parent_based(root: Samplers::ALWAYS_OFF)
when 'parentbased_traceidratio' then Samplers.parent_based(root: Samplers.trace_id_ratio_based(Float(ENV.fetch('OTEL_TRACES_SAMPLER_ARG', 1.0))))
else default_sampler
end
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: "installing default sampler #{default_sampler.description}")
default_sampler
end
end
end
end
end