diff --git a/go.mod b/go.mod index 7905f8cfdec..c3ed4118747 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/golang/protobuf v1.5.3 github.com/gorilla/websocket v1.5.0 github.com/grafana/xk6-browser v0.10.0 - github.com/grafana/xk6-grpc v0.1.2 + github.com/grafana/xk6-grpc v0.1.3-0.20230717090346-fb49221e0ce1 github.com/grafana/xk6-output-prometheus-remote v0.2.2-0.20230719110733-69f101ee8ade github.com/grafana/xk6-redis v0.1.1 github.com/grafana/xk6-timers v0.1.2 diff --git a/go.sum b/go.sum index 94e7b218d88..ae1dfcb2b7d 100644 --- a/go.sum +++ b/go.sum @@ -181,8 +181,8 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grafana/xk6-browser v0.10.0 h1:Mnx0Ho+mlyFGlV7zW7zXkN0njRglh9JflLV+OzXSaRk= github.com/grafana/xk6-browser v0.10.0/go.mod h1:ax6OHARpNEu9hSGYOAI4grAwiRapsNPi9TBQxDYurKw= -github.com/grafana/xk6-grpc v0.1.2 h1:gNN3PYV2dIPoq1zTVz8YOxrWhl1D15jhRR0EA9ZYhBw= -github.com/grafana/xk6-grpc v0.1.2/go.mod h1:iq6qHN64XgAEmDHKf0OXZ4mvoqF4Udr22fiCIXNpXA0= +github.com/grafana/xk6-grpc v0.1.3-0.20230717090346-fb49221e0ce1 h1:SdMihJN+fkH6cO/1NeAnVxSVOnJ3ZkZ1v7FJnrcqhog= +github.com/grafana/xk6-grpc v0.1.3-0.20230717090346-fb49221e0ce1/go.mod h1:iq6qHN64XgAEmDHKf0OXZ4mvoqF4Udr22fiCIXNpXA0= github.com/grafana/xk6-output-prometheus-remote v0.2.2-0.20230719110733-69f101ee8ade h1:DY8H7hMVBpP4yuKlNnPBykL+WJ4hBobwT0BG8nDcFLw= github.com/grafana/xk6-output-prometheus-remote v0.2.2-0.20230719110733-69f101ee8ade/go.mod h1:rI7naMHdQ+vLVoonZ6LYkrXz8ZiWIptTstVbV2CSahY= github.com/grafana/xk6-redis v0.1.1 h1:rvWnLanRB2qzDwuY6NMBe6PXei3wJ3kjYvfCwRJ+q+8= diff --git a/vendor/github.com/grafana/xk6-grpc/grpc/client.go b/vendor/github.com/grafana/xk6-grpc/grpc/client.go index 06d4e614bcf..7607d9cef82 100644 --- a/vendor/github.com/grafana/xk6-grpc/grpc/client.go +++ b/vendor/github.com/grafana/xk6-grpc/grpc/client.go @@ -2,6 +2,9 @@ package grpc import ( "context" + "crypto/tls" + "crypto/x509" + "encoding/pem" "errors" "fmt" "io" @@ -106,6 +109,101 @@ func (c *Client) LoadProtoset(protosetPath string) ([]MethodInfo, error) { return c.convertToMethodInfo(fdset) } +// Note: this function was lifted from `lib/options.go` +func decryptPrivateKey(key, password []byte) ([]byte, error) { + block, _ := pem.Decode(key) + if block == nil { + return nil, errors.New("failed to decode PEM key") + } + + blockType := block.Type + if blockType == "ENCRYPTED PRIVATE KEY" { + return nil, errors.New("encrypted pkcs8 formatted key is not supported") + } + /* + Even though `DecryptPEMBlock` has been deprecated since 1.16.x it is still + being used here because it is deprecated due to it not supporting *good* cryptography + ultimately though we want to support something so we will be using it for now. + */ + decryptedKey, err := x509.DecryptPEMBlock(block, password) //nolint:staticcheck + if err != nil { + return nil, err + } + key = pem.EncodeToMemory(&pem.Block{ + Type: blockType, + Bytes: decryptedKey, + }) + return key, nil +} + +func buildTLSConfig(parentConfig *tls.Config, certificate, key []byte, caCertificates [][]byte) (*tls.Config, error) { + var cp *x509.CertPool + if len(caCertificates) > 0 { + cp, _ = x509.SystemCertPool() + for i, caCert := range caCertificates { + if ok := cp.AppendCertsFromPEM(caCert); !ok { + return nil, fmt.Errorf("failed to append ca certificate [%d] from PEM", i) + } + } + } + + // Ignoring 'TLS MinVersion is too low' because this tls.Config will inherit MinValue and MaxValue + // from the vu state tls.Config + + //nolint:golint,gosec + tlsCfg := &tls.Config{ + CipherSuites: parentConfig.CipherSuites, + InsecureSkipVerify: parentConfig.InsecureSkipVerify, + MinVersion: parentConfig.MinVersion, + MaxVersion: parentConfig.MaxVersion, + Renegotiation: parentConfig.Renegotiation, + RootCAs: cp, + } + if len(certificate) > 0 && len(key) > 0 { + cert, err := tls.X509KeyPair(certificate, key) + if err != nil { + return nil, fmt.Errorf("failed to append certificate from PEM: %w", err) + } + tlsCfg.Certificates = []tls.Certificate{cert} + } + return tlsCfg, nil +} + +func buildTLSConfigFromMap(parentConfig *tls.Config, tlsConfigMap map[string]interface{}) (*tls.Config, error) { + var cert, key, pass []byte + var ca [][]byte + var err error + if certstr, ok := tlsConfigMap["cert"].(string); ok { + cert = []byte(certstr) + } + if keystr, ok := tlsConfigMap["key"].(string); ok { + key = []byte(keystr) + } + if passwordStr, ok := tlsConfigMap["password"].(string); ok { + pass = []byte(passwordStr) + if len(pass) > 0 { + if key, err = decryptPrivateKey(key, pass); err != nil { + return nil, err + } + } + } + if cas, ok := tlsConfigMap["cacerts"]; ok { + var caCertsArray []interface{} + if caCertsArray, ok = cas.([]interface{}); ok { + ca = make([][]byte, len(caCertsArray)) + for i, entry := range caCertsArray { + var entryStr string + if entryStr, ok = entry.(string); ok { + ca[i] = []byte(entryStr) + } + } + } else if caCertStr, caCertStrOk := cas.(string); caCertStrOk { + ca = [][]byte{[]byte(caCertStr)} + } + } + return buildTLSConfig(parentConfig, cert, key, ca) +} + // Connect is a block dial to the gRPC server at the given address (host:port) func (c *Client) Connect(addr string, params map[string]interface{}) (bool, error) { state := c.vu.State() @@ -123,9 +221,13 @@ func (c *Client) Connect(addr string, params map[string]interface{}) (bool, erro var tcred credentials.TransportCredentials if !p.IsPlaintext { tlsCfg := state.TLSConfig.Clone() + if len(p.TLS) > 0 { + if tlsCfg, err = buildTLSConfigFromMap(tlsCfg, p.TLS); err != nil { + return false, err + } + } tlsCfg.NextProtos = []string{"h2"} - // TODO(rogchap): Would be good to add support for custom RootCAs (self signed) tcred = credentials.NewTLS(tlsCfg) } else { tcred = insecure.NewCredentials() @@ -282,9 +384,18 @@ func (c *Client) convertToMethodInfo(fdset *descriptorpb.FileDescriptorSet) ([]M appendMethodInfo(fd, sd, md) } } + messages := fd.Messages() + + stack := make([]protoreflect.MessageDescriptor, 0, messages.Len()) for i := 0; i < messages.Len(); i++ { - message := messages.Get(i) + stack = append(stack, messages.Get(i)) + } + + for len(stack) > 0 { + message := stack[len(stack)-1] + stack = stack[:len(stack)-1] + _, errFind := protoregistry.GlobalTypes.FindMessageByName(message.FullName()) if errors.Is(errFind, protoregistry.NotFound) { err = protoregistry.GlobalTypes.RegisterMessage(dynamicpb.NewMessageType(message)) @@ -292,7 +403,13 @@ func (c *Client) convertToMethodInfo(fdset *descriptorpb.FileDescriptorSet) ([]M return false } } + + nested := message.Messages() + for i := 0; i < nested.Len(); i++ { + stack = append(stack, nested.Get(i)) + } } + return true }) if err != nil { @@ -307,6 +424,7 @@ type connectParams struct { Timeout time.Duration MaxReceiveSize int64 MaxSendSize int64 + TLS map[string]interface{} } func (c *Client) parseConnectParams(raw map[string]interface{}) (connectParams, error) { @@ -355,7 +473,10 @@ func (c *Client) parseConnectParams(raw map[string]interface{}) (connectParams, if params.MaxSendSize < 0 { return params, fmt.Errorf("invalid maxSendSize value: '%#v, it needs to be a positive integer", v) } - + case "tls": + if err := parseConnectTLSParam(¶ms, v); err != nil { + return params, err + } default: return params, fmt.Errorf("unknown connect param: %q", k) } @@ -363,6 +484,46 @@ func (c *Client) parseConnectParams(raw map[string]interface{}) (connectParams, return params, nil } +func parseConnectTLSParam(params *connectParams, v interface{}) error { + var ok bool + params.TLS, ok = v.(map[string]interface{}) + + if !ok { + return fmt.Errorf("invalid tls value: '%#v', expected (optional) keys: cert, key, password, and cacerts", v) + } + // optional map keys below + if cert, certok := params.TLS["cert"]; certok { + if _, ok = cert.(string); !ok { + return fmt.Errorf("invalid tls cert value: '%#v', it needs to be a PEM formatted string", v) + } + } + if key, keyok := params.TLS["key"]; keyok { + if _, ok = key.(string); !ok { + return fmt.Errorf("invalid tls key value: '%#v', it needs to be a PEM formatted string", v) + } + } + if pass, passok := params.TLS["password"]; passok { + if _, ok = pass.(string); !ok { + return fmt.Errorf("invalid tls password value: '%#v', it needs to be a string", v) + } + } + if cacerts, cacertsok := params.TLS["cacerts"]; cacertsok { + var cacertsArray []interface{} + if cacertsArray, ok = cacerts.([]interface{}); ok { + for _, cacertsArrayEntry := range cacertsArray { + if _, ok = cacertsArrayEntry.(string); !ok { + return fmt.Errorf("invalid tls cacerts value: '%#v',"+ + " it needs to be a string or an array of PEM formatted strings", v) + } + } + } else if _, ok = cacerts.(string); !ok { + return fmt.Errorf("invalid tls cacerts value: '%#v',"+ + " it needs to be a string or an array of PEM formatted strings", v) + } + } + return nil +} + func walkFileDescriptors(seen map[string]struct{}, fd *desc.FileDescriptor) []*descriptorpb.FileDescriptorProto { fds := []*descriptorpb.FileDescriptorProto{} diff --git a/vendor/github.com/grafana/xk6-grpc/grpc/grpc.go b/vendor/github.com/grafana/xk6-grpc/grpc/grpc.go index 496536ed76f..b83a485552c 100644 --- a/vendor/github.com/grafana/xk6-grpc/grpc/grpc.go +++ b/vendor/github.com/grafana/xk6-grpc/grpc/grpc.go @@ -117,12 +117,14 @@ func (mi *ModuleInstance) stream(c goja.ConstructorCall) *goja.Object { p.SetSystemTags(mi.vu.State(), client.addr, methodName) + logger := mi.vu.State().Logger.WithField("streamMethod", methodName) + s := &stream{ vu: mi.vu, client: client, methodDescriptor: methodDescriptor, method: methodName, - logger: mi.vu.State().Logger, + logger: logger, tq: taskqueue.New(mi.vu.RegisterCallback), diff --git a/vendor/github.com/grafana/xk6-grpc/grpc/stream.go b/vendor/github.com/grafana/xk6-grpc/grpc/stream.go index 54b6034cb03..e89eaf4de7b 100644 --- a/vendor/github.com/grafana/xk6-grpc/grpc/stream.go +++ b/vendor/github.com/grafana/xk6-grpc/grpc/stream.go @@ -234,12 +234,7 @@ func (s *stream) writeData(wg *sync.WaitGroup) { err := s.stream.Send(msg.msg) if err != nil { - s.logger.WithError(err).Error("failed to send data to the stream") - - s.tq.Queue(func() error { - return s.closeWithError(err) - }) - + s.processSendError(err) return } @@ -284,6 +279,17 @@ func (s *stream) writeData(wg *sync.WaitGroup) { } } +func (s *stream) processSendError(err error) { + if errors.Is(err, io.EOF) { + s.logger.WithError(err).Debug("skip sending a message stream is cancelled/finished") + err = nil + } + + s.tq.Queue(func() error { + return s.closeWithError(err) + }) +} + // on registers a listener for a certain event type func (s *stream) on(event string, listener func(goja.Value) (goja.Value, error)) { if err := s.eventListeners.add(event, listener); err != nil { @@ -323,10 +329,19 @@ func (s *stream) end() { } func (s *stream) closeWithError(err error) error { + s.close(err) + + return s.callErrorListeners(err) +} + +// close changes the stream state to closed and triggers the end event listeners +func (s *stream) close(err error) { if s.state == closed { - return nil + return } + s.logger.WithError(err).Debug("stream is closing") + s.state = closed close(s.done) s.tq.Queue(func() error { @@ -336,24 +351,24 @@ func (s *stream) closeWithError(err error) error { if s.timeoutCancel != nil { s.timeoutCancel() } - - s.logger.WithError(err).Debug("connection closed") - - if err != nil { - if errList := s.callErrorListeners(err); errList != nil { - return errList - } - } - - return nil } func (s *stream) callErrorListeners(e error) error { + if e == nil { + return nil + } + rt := s.vu.Runtime() obj := extractError(e) - for _, errorListener := range s.eventListeners.all(eventError) { + list := s.eventListeners.all(eventError) + + if len(list) == 0 { + s.logger.Warnf("no handlers for error registered, but an error happened: %s", e) + } + + for _, errorListener := range list { if _, err := errorListener(rt.ToValue(obj)); err != nil { return err } diff --git a/vendor/modules.txt b/vendor/modules.txt index c02aa9629fa..2f0a99baddc 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -156,7 +156,7 @@ github.com/grafana/xk6-browser/k6ext github.com/grafana/xk6-browser/keyboardlayout github.com/grafana/xk6-browser/log github.com/grafana/xk6-browser/storage -# github.com/grafana/xk6-grpc v0.1.2 +# github.com/grafana/xk6-grpc v0.1.3-0.20230717090346-fb49221e0ce1 ## explicit; go 1.19 github.com/grafana/xk6-grpc/grpc github.com/grafana/xk6-grpc/lib/netext/grpcext