Skip to content

Commit

Permalink
Merge branch 'main' into blp-ring
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias authored Apr 19, 2024
2 parents 36edfb3 + 1ea4ee2 commit b39192b
Show file tree
Hide file tree
Showing 5 changed files with 1,038 additions and 10 deletions.
273 changes: 271 additions & 2 deletions exporters/otlp/otlplog/otlploghttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,26 @@
package otlploghttp // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"

import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strconv"
"sync"
"time"

"google.golang.org/protobuf/proto"

"go.opentelemetry.io/otel"
collogpb "go.opentelemetry.io/proto/slim/otlp/collector/logs/v1"
logpb "go.opentelemetry.io/proto/slim/otlp/logs/v1"

"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/retry"
)

type client struct {
Expand All @@ -26,6 +43,258 @@ func newNoopClient() *client {

// newHTTPClient creates a new HTTP log client.
func newHTTPClient(cfg config) (*client, error) {
// TODO: implement.
return &client{}, nil
hc := &http.Client{
Transport: ourTransport,
Timeout: cfg.timeout.Value,
}

if cfg.tlsCfg.Value != nil || cfg.proxy.Value != nil {
clonedTransport := ourTransport.Clone()
hc.Transport = clonedTransport

if cfg.tlsCfg.Value != nil {
clonedTransport.TLSClientConfig = cfg.tlsCfg.Value
}
if cfg.proxy.Value != nil {
clonedTransport.Proxy = cfg.proxy.Value
}
}

u := &url.URL{
Scheme: "https",
Host: cfg.endpoint.Value,
Path: cfg.path.Value,
}
if cfg.insecure.Value {
u.Scheme = "http"
}
// Body is set when this is cloned during upload.
req, err := http.NewRequest(http.MethodPost, u.String(), http.NoBody)
if err != nil {
return nil, err
}

userAgent := "OTel Go OTLP over HTTP/protobuf logs exporter/" + Version()
req.Header.Set("User-Agent", userAgent)

if n := len(cfg.headers.Value); n > 0 {
for k, v := range cfg.headers.Value {
req.Header.Set(k, v)
}
}
req.Header.Set("Content-Type", "application/x-protobuf")

c := &httpClient{
compression: cfg.compression.Value,
req: req,
requestFunc: cfg.retryCfg.Value.RequestFunc(evaluate),
client: hc,
}
return &client{uploadLogs: c.uploadLogs}, nil
}

type httpClient struct {
// req is cloned for every upload the client makes.
req *http.Request
compression Compression
requestFunc retry.RequestFunc
client *http.Client
}

// Keep it in sync with golang's DefaultTransport from net/http! We
// have our own copy to avoid handling a situation where the
// DefaultTransport is overwritten with some different implementation
// of http.RoundTripper or it's modified by another package.
var ourTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}

func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs) error {
// The Exporter synchronizes access to client methods. This is not called
// after the Exporter is shutdown. Only thing to do here is send data.

pbRequest := &collogpb.ExportLogsServiceRequest{ResourceLogs: data}
body, err := proto.Marshal(pbRequest)
if err != nil {
return err
}
request, err := c.newRequest(ctx, body)
if err != nil {
return err
}

return c.requestFunc(ctx, func(iCtx context.Context) error {
select {
case <-iCtx.Done():
return iCtx.Err()
default:
}

request.reset(iCtx)
resp, err := c.client.Do(request.Request)
var urlErr *url.Error
if errors.As(err, &urlErr) && urlErr.Temporary() {
return newResponseError(http.Header{})
}
if err != nil {
return err
}

var rErr error
switch sc := resp.StatusCode; {
case sc >= 200 && sc <= 299:
// Success, do not retry.

// Read the partial success message, if any.
var respData bytes.Buffer
if _, err := io.Copy(&respData, resp.Body); err != nil {
return err
}
if respData.Len() == 0 {
return nil
}

if resp.Header.Get("Content-Type") == "application/x-protobuf" {
var respProto collogpb.ExportLogsServiceResponse
if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil {
return err
}

if respProto.PartialSuccess != nil {
msg := respProto.PartialSuccess.GetErrorMessage()
n := respProto.PartialSuccess.GetRejectedLogRecords()
if n != 0 || msg != "" {
err := fmt.Errorf("OTLP partial success: %s (%d log records rejected)", msg, n)
otel.Handle(err)
}
}
}
return nil
case sc == http.StatusTooManyRequests,
sc == http.StatusBadGateway,
sc == http.StatusServiceUnavailable,
sc == http.StatusGatewayTimeout:
// Retry-able failure.
rErr = newResponseError(resp.Header)

// Going to retry, drain the body to reuse the connection.
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
_ = resp.Body.Close()
return err
}
default:
rErr = fmt.Errorf("failed to send logs to %s: %s", request.URL, resp.Status)
}

if err := resp.Body.Close(); err != nil {
return err
}
return rErr
})
}

var gzPool = sync.Pool{
New: func() interface{} {
w := gzip.NewWriter(io.Discard)
return w
},
}

func (c *httpClient) newRequest(ctx context.Context, body []byte) (request, error) {
r := c.req.Clone(ctx)
req := request{Request: r}

switch c.compression {
case NoCompression:
r.ContentLength = (int64)(len(body))
req.bodyReader = bodyReader(body)
case GzipCompression:
// Ensure the content length is not used.
r.ContentLength = -1
r.Header.Set("Content-Encoding", "gzip")

gz := gzPool.Get().(*gzip.Writer)
defer gzPool.Put(gz)

var b bytes.Buffer
gz.Reset(&b)

if _, err := gz.Write(body); err != nil {
return req, err
}
// Close needs to be called to ensure body is fully written.
if err := gz.Close(); err != nil {
return req, err
}

req.bodyReader = bodyReader(b.Bytes())
}

return req, nil
}

// bodyReader returns a closure returning a new reader for buf.
func bodyReader(buf []byte) func() io.ReadCloser {
return func() io.ReadCloser {
return io.NopCloser(bytes.NewReader(buf))
}
}

// request wraps an http.Request with a resettable body reader.
type request struct {
*http.Request

// bodyReader allows the same body to be used for multiple requests.
bodyReader func() io.ReadCloser
}

// reset reinitializes the request Body and uses ctx for the request.
func (r *request) reset(ctx context.Context) {
r.Body = r.bodyReader()
r.Request = r.WithContext(ctx)
}

// retryableError represents a request failure that can be retried.
type retryableError struct {
throttle int64
}

// newResponseError returns a retryableError and will extract any explicit
// throttle delay contained in headers.
func newResponseError(header http.Header) error {
var rErr retryableError
if v := header.Get("Retry-After"); v != "" {
if t, err := strconv.ParseInt(v, 10, 64); err == nil {
rErr.throttle = t
}
}
return rErr
}

func (e retryableError) Error() string {
return "retry-able request failure"
}

// evaluate returns if err is retry-able. If it is and it includes an explicit
// throttling delay, that delay is also returned.
func evaluate(err error) (bool, time.Duration) {
if err == nil {
return false, 0
}

rErr, ok := err.(retryableError)
if !ok {
return false, 0
}

return true, time.Duration(rErr.throttle)
}
Loading

0 comments on commit b39192b

Please sign in to comment.