From 67db9a2cc878392609cbee1c04410c45cbd9775e Mon Sep 17 00:00:00 2001 From: Pavel Gabriel Date: Mon, 15 May 2023 20:29:15 +0200 Subject: [PATCH 01/10] add method to read message --- connection.go | 78 ++++++++++++++++++++++++++++++---------------- connection_test.go | 2 +- 2 files changed, 53 insertions(+), 27 deletions(-) diff --git a/connection.go b/connection.go index dd24d92..56a0456 100644 --- a/connection.go +++ b/connection.go @@ -62,7 +62,7 @@ type Connection struct { Opts Options conn io.ReadWriteCloser requestsCh chan request - readResponseCh chan []byte + readResponseCh chan *iso8583.Message done chan struct{} // spec that will be used to unpack received messages @@ -105,7 +105,7 @@ func New(addr string, spec *iso8583.MessageSpec, mlReader MessageLengthReader, m addr: addr, Opts: opts, requestsCh: make(chan request), - readResponseCh: make(chan []byte), + readResponseCh: make(chan *iso8583.Message), done: make(chan struct{}), respMap: make(map[string]response), spec: spec, @@ -539,29 +539,55 @@ func (c *Connection) writeLoop() { // readLoop reads data from the socket (message length header and raw message) // and runs a goroutine to handle the message func (c *Connection) readLoop() { - var err error - var messageLength int + var outErr error r := bufio.NewReader(c.conn) for { - messageLength, err = c.readMessageLength(r) - if err != nil { - c.handleError(utils.NewSafeError(err, "failed to read message length")) - break - } - - // read the packed message - rawMessage := make([]byte, messageLength) - _, err = io.ReadFull(r, rawMessage) + message, err := c.readMessage(r) if err != nil { c.handleError(utils.NewSafeError(err, "failed to read message from connection")) + + // if err is ErrUnpack, we can still continue reading + // from the connection + var unpackErr *ErrUnpack + if errors.As(err, &unpackErr) { + continue + } + + outErr = err break } - c.readResponseCh <- rawMessage + c.readResponseCh <- message } - c.handleConnectionError(err) + c.handleConnectionError(outErr) +} + +func (c *Connection) readMessage(r io.Reader) (*iso8583.Message, error) { + messageLength, err := c.readMessageLength(r) + if err != nil { + return nil, fmt.Errorf("failed to read message length: %w", err) + } + + // read the packed message + rawMessage := make([]byte, messageLength) + _, err = io.ReadFull(r, rawMessage) + if err != nil { + return nil, fmt.Errorf("failed to read message from connection: %w", err) + } + + message := iso8583.NewMessage(c.spec) + err = message.Unpack(rawMessage) + if err != nil { + unpackErr := &ErrUnpack{ + Err: err, + RawMessage: rawMessage, + } + return nil, fmt.Errorf("unpacking message: %w", unpackErr) + } + + return message, nil } func (c *Connection) readResponseLoop() { @@ -586,18 +612,18 @@ func (c *Connection) readResponseLoop() { // handleResponse unpacks the message and then sends it to the reply channel // that corresponds to the message ID (request ID) -func (c *Connection) handleResponse(rawMessage []byte) { +func (c *Connection) handleResponse(message *iso8583.Message) { // create message - message := iso8583.NewMessage(c.spec) - err := message.Unpack(rawMessage) - if err != nil { - unpackErr := &ErrUnpack{ - Err: err, - RawMessage: rawMessage, - } - c.handleError(utils.NewSafeError(unpackErr, "failed to unpack message")) - return - } + // message := iso8583.NewMessage(c.spec) + // err := message.Unpack(rawMessage) + // if err != nil { + // unpackErr := &ErrUnpack{ + // Err: err, + // RawMessage: rawMessage, + // } + // c.handleError(utils.NewSafeError(unpackErr, "failed to unpack message")) + // return + // } if isResponse(message) { reqID, err := c.Opts.RequestIDGenerator.GenerateRequestID(message) diff --git a/connection_test.go b/connection_test.go index f821c80..8ed2f5d 100644 --- a/connection_test.go +++ b/connection_test.go @@ -281,7 +281,7 @@ func TestClient_Send(t *testing.T) { var unpackErr *connection.ErrUnpack if errors.As(handledError, &unpackErr) { - require.EqualError(t, handledError, "failed to unpack message") + require.EqualError(t, handledError, "failed to read message from connection") require.EqualError(t, unpackErr, "failed to unpack field 63: no specification found") require.NotEmpty(t, unpackErr.RawMessage) return true From c85ba43daf8e342d4fd1ad9afaf8245f162e3134 Mon Sep 17 00:00:00 2001 From: Pavel Gabriel Date: Mon, 15 May 2023 20:38:50 +0200 Subject: [PATCH 02/10] add writeMessage method --- connection.go | 49 ++++++++++++++++++++++++------------------------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/connection.go b/connection.go index 56a0456..1fc5a20 100644 --- a/connection.go +++ b/connection.go @@ -331,20 +331,10 @@ func (c *Connection) Send(message *iso8583.Message) (*iso8583.Message, error) { defer c.wg.Done() var buf bytes.Buffer - packed, err := message.Pack() - if err != nil { - return nil, fmt.Errorf("packing message: %w", err) - } - // create header - _, err = c.writeMessageLength(&buf, len(packed)) + err := c.writeMessage(&buf, message) if err != nil { - return nil, fmt.Errorf("writing message header to buffer: %w", err) - } - - _, err = buf.Write(packed) - if err != nil { - return nil, fmt.Errorf("writing packed message to buffer: %w", err) + return nil, fmt.Errorf("writing message: %w", err) } // prepare request @@ -399,6 +389,26 @@ func (c *Connection) Send(message *iso8583.Message) (*iso8583.Message, error) { return resp, err } +func (c *Connection) writeMessage(w io.Writer, message *iso8583.Message) error { + packed, err := message.Pack() + if err != nil { + return utils.NewSafeError(err, "packing message") + } + + // create header + _, err = c.writeMessageLength(w, len(packed)) + if err != nil { + return fmt.Errorf("writing message header to buffer: %w", err) + } + + _, err = w.Write(packed) + if err != nil { + return fmt.Errorf("writing packed message to buffer: %w", err) + } + + return nil +} + // Reply sends the message and does not wait for a reply to be received. // Any reply received for message send using Reply will be handled with // unmatchedMessageHandler @@ -416,20 +426,9 @@ func (c *Connection) Reply(message *iso8583.Message) error { // prepare message for sending var buf bytes.Buffer - packed, err := message.Pack() - if err != nil { - return fmt.Errorf("packing message: %w", err) - } - - // create header - _, err = c.writeMessageLength(&buf, len(packed)) - if err != nil { - return fmt.Errorf("writing message header to buffer: %w", err) - } - - _, err = buf.Write(packed) + err := c.writeMessage(&buf, message) if err != nil { - return fmt.Errorf("writing packed message to buffer: %w", err) + return fmt.Errorf("writing message: %w", err) } req := request{ From a1cb3c2d77dcfe66b81375664c157fe6da4fc09f Mon Sep 17 00:00:00 2001 From: Pavel Gabriel Date: Mon, 15 May 2023 20:55:59 +0200 Subject: [PATCH 03/10] add message reader and writer as options --- connection.go | 12 ++++++++++++ options.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/connection.go b/connection.go index 1fc5a20..d81a84c 100644 --- a/connection.go +++ b/connection.go @@ -390,6 +390,11 @@ func (c *Connection) Send(message *iso8583.Message) (*iso8583.Message, error) { } func (c *Connection) writeMessage(w io.Writer, message *iso8583.Message) error { + if c.Opts.MessageWriter != nil { + return c.Opts.MessageWriter.WriteMessage(w, message) + } + + // default message writer packed, err := message.Pack() if err != nil { return utils.NewSafeError(err, "packing message") @@ -563,7 +568,14 @@ func (c *Connection) readLoop() { c.handleConnectionError(outErr) } +// readMessage reads message length header and raw message from the connection +// and returns iso8583.Message and error if any func (c *Connection) readMessage(r io.Reader) (*iso8583.Message, error) { + if c.Opts.MessageReader != nil { + return c.Opts.MessageReader.ReadMessage(r) + } + + // default message reader messageLength, err := c.readMessageLength(r) if err != nil { return nil, fmt.Errorf("failed to read message length: %w", err) diff --git a/options.go b/options.go index d0ee7d5..32512c9 100644 --- a/options.go +++ b/options.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "io" "os" "time" @@ -64,6 +65,22 @@ type Options struct { // RequestIDGenerator is used to generate a unique identifier for a request // so that responses from the server can be matched to the original request. RequestIDGenerator RequestIDGenerator + + // MessageReader is used to read a message from the connection + // if set, connection's MessageLengthReader will be ignored + MessageReader MessageReader + + // MessageWriter is used to write a message to the connection + // if set, connection's MessageLengthWriter will be ignored + MessageWriter MessageWriter +} + +type MessageReader interface { + ReadMessage(r io.Reader) (*iso8583.Message, error) +} + +type MessageWriter interface { + WriteMessage(w io.Writer, message *iso8583.Message) error } type Option func(*Options) error @@ -244,3 +261,19 @@ func SetRequestIDGenerator(g RequestIDGenerator) Option { return nil } } + +// SetMessageReader sets a MessageReader option +func SetMessageReader(r MessageReader) Option { + return func(o *Options) error { + o.MessageReader = r + return nil + } +} + +// SetMessageWriter sets a MessageWriter option +func SetMessageWriter(w MessageWriter) Option { + return func(o *Options) error { + o.MessageWriter = w + return nil + } +} From 7de41879d6cf35f789951fcfa17918a4bb0968ab Mon Sep 17 00:00:00 2001 From: Pavel Gabriel Date: Tue, 16 May 2023 16:53:49 +0200 Subject: [PATCH 04/10] write test to use message reader/writer --- connection.go | 17 +---- connection_test.go | 155 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+), 14 deletions(-) diff --git a/connection.go b/connection.go index d81a84c..32321a7 100644 --- a/connection.go +++ b/connection.go @@ -588,6 +588,7 @@ func (c *Connection) readMessage(r io.Reader) (*iso8583.Message, error) { return nil, fmt.Errorf("failed to read message from connection: %w", err) } + // unpack the message message := iso8583.NewMessage(c.spec) err = message.Unpack(rawMessage) if err != nil { @@ -621,21 +622,9 @@ func (c *Connection) readResponseLoop() { } } -// handleResponse unpacks the message and then sends it to the reply channel -// that corresponds to the message ID (request ID) +// handleResponse sends message to the reply channel that corresponds to the +// message ID (request ID) func (c *Connection) handleResponse(message *iso8583.Message) { - // create message - // message := iso8583.NewMessage(c.spec) - // err := message.Unpack(rawMessage) - // if err != nil { - // unpackErr := &ErrUnpack{ - // Err: err, - // RawMessage: rawMessage, - // } - // c.handleError(utils.NewSafeError(unpackErr, "failed to unpack message")) - // return - // } - if isResponse(message) { reqID, err := c.Opts.RequestIDGenerator.GenerateRequestID(message) if err != nil { diff --git a/connection_test.go b/connection_test.go index 8ed2f5d..86e90c9 100644 --- a/connection_test.go +++ b/connection_test.go @@ -1,6 +1,7 @@ package connection_test import ( + "encoding/binary" "errors" "fmt" "io" @@ -915,6 +916,160 @@ func TestClient_Options(t *testing.T) { }) } +// messageIO is a helper struct to read/write iso8583 messages from/to +// io.Reader/io.Writer +type messageIO struct { + Spec *iso8583.MessageSpec + requestHeaders map[string]*header + mu sync.Mutex +} + +func (m *messageIO) ReadMessage(r io.Reader) (*iso8583.Message, error) { + // read 2 bytes header + h := &header{} + err := binary.Read(r, binary.BigEndian, h) + if err != nil { + return nil, fmt.Errorf("failed to read message length: %w", err) + } + + // read message + rawMessage := make([]byte, h.Length) + _, err = io.ReadFull(r, rawMessage) + if err != nil { + return nil, fmt.Errorf("failed to read message: %w", err) + } + + // unpack message + message := iso8583.NewMessage(m.Spec) + err = message.Unpack(rawMessage) + if err != nil { + return nil, fmt.Errorf("failed to unpack message: %w", err) + } + + mti, err := message.GetMTI() + if err != nil { + return nil, fmt.Errorf("failed to get mti: %w", err) + } + + // if message is request then save header + if mti[2] == '0' || mti[2] == '2' { + // save message header to be able to write it back + // when writing message + m.mu.Lock() + defer m.mu.Unlock() + + stan, err := message.GetString(11) + if err != nil { + return nil, fmt.Errorf("failed to get stan: %w", err) + } + + // use stan as a key to save header. it can be stan + rrn + m.requestHeaders[stan] = h + } + + return message, nil +} + +func (m *messageIO) WriteMessage(w io.Writer, message *iso8583.Message) error { + // pack message + rawMessage, err := message.Pack() + if err != nil { + return fmt.Errorf("failed to pack message: %w", err) + } + + // create header with message length + h := header{ + Length: uint16(len(rawMessage)), + } + + mti, err := message.GetMTI() + if err != nil { + return fmt.Errorf("failed to get mti: %w", err) + } + + // if message is response then work with saved headers + // simple check for response mti, use more complex check + if mti[2] == '1' || mti[2] == '3' { + // get stan from message + stan, err := message.GetString(11) + if err != nil { + return fmt.Errorf("failed to get stan: %w", err) + } + + // get header from saved headers + m.mu.Lock() + requestHeader, ok := m.requestHeaders[stan] + m.mu.Unlock() + if !ok { + return fmt.Errorf("failed to get header for stan %s", stan) + } + + h.SourceID = requestHeader.DestID + h.DestID = requestHeader.SourceID + } + + // write header + err = binary.Write(w, binary.BigEndian, h) + if err != nil { + return fmt.Errorf("failed to write message length: %w", err) + } + + // write message + _, err = w.Write(rawMessage) + if err != nil { + return fmt.Errorf("failed to write message: %w", err) + } + + return nil +} + +// header is 2 bytes length of the message +type header struct { + Length uint16 + SourceID [4]byte + DestID [4]byte +} + +func TestClientOptionsWithMessageReaderAndWriter(t *testing.T) { + server, err := NewTestServer() + require.NoError(t, err) + defer server.Close() + + // create client with custom message reader and writer + msgIO := &messageIO{Spec: testSpec} + + c, err := connection.New(server.Addr, nil, nil, nil, + connection.SetMessageReader(msgIO), + connection.SetMessageWriter(msgIO), + connection.ErrorHandler(func(err error) { + require.NoError(t, err) + }), + ) + require.NoError(t, err) + + err = c.Connect() + require.NoError(t, err) + + // network management message + message := iso8583.NewMessage(testSpec) + err = message.Marshal(baseFields{ + MTI: field.NewStringValue("0800"), + STAN: field.NewStringValue(getSTAN()), + }) + require.NoError(t, err) + + // we can send iso message to the server + response, err := c.Send(message) + require.NoError(t, err) + time.Sleep(1 * time.Second) + + mti, err := response.GetMTI() + require.NoError(t, err) + require.Equal(t, "0810", mti) + + require.NoError(t, c.Close()) +} + func TestConnection(t *testing.T) { t.Run("Status", func(t *testing.T) { c, err := connection.New("1.1.1.1", nil, nil, nil) From b0558e8205915cc6c5dafc69c9894117826dd530 Mon Sep 17 00:00:00 2001 From: Pavel Gabriel Date: Tue, 16 May 2023 18:12:54 +0200 Subject: [PATCH 05/10] remove code that was just to show the concept --- connection_test.go | 55 ++-------------------------------------------- 1 file changed, 2 insertions(+), 53 deletions(-) diff --git a/connection_test.go b/connection_test.go index 86e90c9..0549248 100644 --- a/connection_test.go +++ b/connection_test.go @@ -919,9 +919,7 @@ func TestClient_Options(t *testing.T) { // messageIO is a helper struct to read/write iso8583 messages from/to // io.Reader/io.Writer type messageIO struct { - Spec *iso8583.MessageSpec - requestHeaders map[string]*header - mu sync.Mutex + Spec *iso8583.MessageSpec } func (m *messageIO) ReadMessage(r io.Reader) (*iso8583.Message, error) { @@ -946,27 +944,6 @@ func (m *messageIO) ReadMessage(r io.Reader) (*iso8583.Message, error) { return nil, fmt.Errorf("failed to unpack message: %w", err) } - mti, err := message.GetMTI() - if err != nil { - return nil, fmt.Errorf("failed to get mti: %w", err) - } - - // if message is request then save header - if mti[2] == '0' || mti[2] == '2' { - // save message header to be able to write it back - // when writing message - m.mu.Lock() - defer m.mu.Unlock() - - stan, err := message.GetString(11) - if err != nil { - return nil, fmt.Errorf("failed to get stan: %w", err) - } - - // use stan as a key to save header. it can be stan + rrn - m.requestHeaders[stan] = h - } - return message, nil } @@ -982,32 +959,6 @@ func (m *messageIO) WriteMessage(w io.Writer, message *iso8583.Message) error { Length: uint16(len(rawMessage)), } - mti, err := message.GetMTI() - if err != nil { - return fmt.Errorf("failed to get mti: %w", err) - } - - // if message is response then work with saved headers - // simple check for response mti, use more complex check - if mti[2] == '1' || mti[2] == '3' { - // get stan from message - stan, err := message.GetString(11) - if err != nil { - return fmt.Errorf("failed to get stan: %w", err) - } - - // get header from saved headers - m.mu.Lock() - requestHeader, ok := m.requestHeaders[stan] - m.mu.Unlock() - if !ok { - return fmt.Errorf("failed to get header for stan %s", stan) - } - - h.SourceID = requestHeader.DestID - h.DestID = requestHeader.SourceID - } - // write header err = binary.Write(w, binary.BigEndian, h) if err != nil { @@ -1025,9 +976,7 @@ func (m *messageIO) WriteMessage(w io.Writer, message *iso8583.Message) error { // header is 2 bytes length of the message type header struct { - Length uint16 - SourceID [4]byte - DestID [4]byte + Length uint16 } func TestClientOptionsWithMessageReaderAndWriter(t *testing.T) { From 128c4290ccbe9e8241c3c5eee5adb97c2a384d60 Mon Sep 17 00:00:00 2001 From: Pavel Gabriel Date: Tue, 16 May 2023 19:27:03 +0200 Subject: [PATCH 06/10] rename error type --- connection.go | 12 ++++++------ connection_test.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/connection.go b/connection.go index 32321a7..28f190c 100644 --- a/connection.go +++ b/connection.go @@ -40,18 +40,18 @@ const ( StatusUnknown Status = "" ) -// ErrUnpack returns error with possibility to access RawMessage when +// UnpackError returns error with possibility to access RawMessage when // connection failed to unpack message -type ErrUnpack struct { +type UnpackError struct { Err error RawMessage []byte } -func (e *ErrUnpack) Error() string { +func (e *UnpackError) Error() string { return e.Err.Error() } -func (e *ErrUnpack) Unwrap() error { +func (e *UnpackError) Unwrap() error { return e.Err } @@ -553,7 +553,7 @@ func (c *Connection) readLoop() { // if err is ErrUnpack, we can still continue reading // from the connection - var unpackErr *ErrUnpack + var unpackErr *UnpackError if errors.As(err, &unpackErr) { continue } @@ -592,7 +592,7 @@ func (c *Connection) readMessage(r io.Reader) (*iso8583.Message, error) { message := iso8583.NewMessage(c.spec) err = message.Unpack(rawMessage) if err != nil { - unpackErr := &ErrUnpack{ + unpackErr := &UnpackError{ Err: err, RawMessage: rawMessage, } diff --git a/connection_test.go b/connection_test.go index 0549248..5eb626d 100644 --- a/connection_test.go +++ b/connection_test.go @@ -280,7 +280,7 @@ func TestClient_Send(t *testing.T) { mu.Lock() defer mu.Unlock() - var unpackErr *connection.ErrUnpack + var unpackErr *connection.UnpackError if errors.As(handledError, &unpackErr) { require.EqualError(t, handledError, "failed to read message from connection") require.EqualError(t, unpackErr, "failed to unpack field 63: no specification found") From 97469727723ec322e33e957eb442a53af139d559 Mon Sep 17 00:00:00 2001 From: Pavel Gabriel Date: Tue, 16 May 2023 19:43:26 +0200 Subject: [PATCH 07/10] move writeMessage into writeLoop --- connection.go | 59 ++++++++++++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/connection.go b/connection.go index 28f190c..fc4211e 100644 --- a/connection.go +++ b/connection.go @@ -2,7 +2,6 @@ package connection import ( "bufio" - "bytes" "crypto/tls" "errors" "fmt" @@ -55,6 +54,18 @@ func (e *UnpackError) Unwrap() error { return e.Err } +type PackError struct { + Err error +} + +func (e *PackError) Error() string { + return e.Err.Error() +} + +func (e *PackError) Unwrap() error { + return e.Err +} + // Connection represents an ISO 8583 Connection. Connection may be used // by multiple goroutines simultaneously. type Connection struct { @@ -296,8 +307,8 @@ func (c *Connection) Done() <-chan struct{} { // request represents request to the ISO 8583 server type request struct { - // includes length header and message itself - rawMessage []byte + // message to send + message *iso8583.Message // ID of the request (based on STAN, RRN, etc.) requestID string @@ -330,13 +341,6 @@ func (c *Connection) Send(message *iso8583.Message) (*iso8583.Message, error) { c.mutex.Unlock() defer c.wg.Done() - var buf bytes.Buffer - - err := c.writeMessage(&buf, message) - if err != nil { - return nil, fmt.Errorf("writing message: %w", err) - } - // prepare request reqID, err := c.Opts.RequestIDGenerator.GenerateRequestID(message) if err != nil { @@ -344,10 +348,10 @@ func (c *Connection) Send(message *iso8583.Message) (*iso8583.Message, error) { } req := request{ - rawMessage: buf.Bytes(), - requestID: reqID, - replyCh: make(chan *iso8583.Message), - errCh: make(chan error), + message: message, + requestID: reqID, + replyCh: make(chan *iso8583.Message), + errCh: make(chan error), } var resp *iso8583.Message @@ -397,7 +401,7 @@ func (c *Connection) writeMessage(w io.Writer, message *iso8583.Message) error { // default message writer packed, err := message.Pack() if err != nil { - return utils.NewSafeError(err, "packing message") + return utils.NewSafeError(&PackError{err}, "packing message") } // create header @@ -429,16 +433,9 @@ func (c *Connection) Reply(message *iso8583.Message) error { c.mutex.Unlock() defer c.wg.Done() - // prepare message for sending - var buf bytes.Buffer - err := c.writeMessage(&buf, message) - if err != nil { - return fmt.Errorf("writing message: %w", err) - } - req := request{ - rawMessage: buf.Bytes(), - errCh: make(chan error), + message: message, + errCh: make(chan error), } c.requestsCh <- req @@ -446,6 +443,8 @@ func (c *Connection) Reply(message *iso8583.Message) error { sendTimeoutTimer := time.NewTimer(c.Opts.SendTimeout) defer sendTimeoutTimer.Stop() + var err error + select { case err = <-req.errCh: case <-sendTimeoutTimer.C: @@ -511,9 +510,17 @@ func (c *Connection) writeLoop() { c.pendingRequestsMu.Unlock() } - _, err = c.conn.Write([]byte(req.rawMessage)) + err = c.writeMessage(c.conn, req.message) if err != nil { - c.handleError(utils.NewSafeError(err, "failed to write message into connection")) + c.handleError(fmt.Errorf("writing message: %w", err)) + + // if it's a pack error, we can continue to write messages + var packErr *PackError + if errors.As(err, &packErr) { + err = nil + continue + } + break } From db3a3031df22347b6bbc075dcd4be26621538397 Mon Sep 17 00:00:00 2001 From: Pavel Gabriel Date: Wed, 17 May 2023 17:03:36 +0200 Subject: [PATCH 08/10] skip message if readMessage returns nil message and no error --- connection.go | 7 ++ connection_test.go | 165 ++++++++++++++++++++++++++++++++++----------- 2 files changed, 132 insertions(+), 40 deletions(-) diff --git a/connection.go b/connection.go index fc4211e..6e4a4dd 100644 --- a/connection.go +++ b/connection.go @@ -569,6 +569,13 @@ func (c *Connection) readLoop() { break } + // if readMessage returns nil message, it means that + // it was a ping message or something else, not a regular + // iso8583 message and we can continue reading + if message == nil { + continue + } + c.readResponseCh <- message } diff --git a/connection_test.go b/connection_test.go index 5eb626d..a8d9601 100644 --- a/connection_test.go +++ b/connection_test.go @@ -916,6 +916,121 @@ func TestClient_Options(t *testing.T) { }) } +func TestClientWithMessageReaderAndWriter(t *testing.T) { + server, err := NewTestServer() + require.NoError(t, err) + defer server.Close() + + // create client with custom message reader and writer + msgIO := &messageIO{Spec: testSpec} + + t.Run("send and receive iso 8583 message", func(t *testing.T) { + c, err := connection.New(server.Addr, nil, nil, nil, + connection.SetMessageReader(msgIO), + connection.SetMessageWriter(msgIO), + connection.ErrorHandler(func(err error) { + require.NoError(t, err) + }), + ) + require.NoError(t, err) + + err = c.Connect() + require.NoError(t, err) + + // network management message + message := iso8583.NewMessage(testSpec) + err = message.Marshal(baseFields{ + MTI: field.NewStringValue("0800"), + STAN: field.NewStringValue(getSTAN()), + }) + require.NoError(t, err) + + // we can send iso message to the server + response, err := c.Send(message) + require.NoError(t, err) + time.Sleep(1 * time.Second) + + mti, err := response.GetMTI() + require.NoError(t, err) + require.Equal(t, "0810", mti) + + require.NoError(t, c.Close()) + }) + + t.Run("continue to read messages if message reader returns nil message and nil error", func(t *testing.T) { + // skipMessage is used to skip first message + skipMessage := &atomic.Bool{} + messagesReceived := &atomic.Int32{} + + // create connection with custom message reader so we can + // control what message is returned. msgReader will return nil + // message and nil error for the first message and return real + // message for the second message + msgReader := &messageReader{ + MessageReader: func(r io.Reader) (*iso8583.Message, error) { + msg, err := msgIO.ReadMessage(r) + if err != nil { + return nil, err + } + + messagesReceived.Add(1) + + if skipMessage.Load() { + return nil, nil + } + + return msg, nil + }, + } + + c, err := connection.New(server.Addr, nil, nil, nil, + connection.SetMessageReader(connection.MessageReader(msgReader)), + connection.SetMessageWriter(msgIO), + connection.ErrorHandler(func(err error) { + require.NoError(t, err) + }), + connection.SendTimeout(500*time.Millisecond), + ) + require.NoError(t, err) + + err = c.Connect() + require.NoError(t, err) + defer c.Close() + + // set skipMessage to true so readMessage will return nil message + skipMessage.Store(true) + + // send first message + message := iso8583.NewMessage(testSpec) + err = message.Marshal(baseFields{ + MTI: field.NewStringValue("0800"), + STAN: field.NewStringValue(getSTAN()), + }) + require.NoError(t, err) + + // this message will timeout because we are skipping its reply + _, err = c.Send(message) + require.ErrorIs(t, err, connection.ErrSendTimeout) + + // set skipMessage to false to read message as usual + skipMessage.Store(false) + + message = iso8583.NewMessage(testSpec) + err = message.Marshal(baseFields{ + MTI: field.NewStringValue("0800"), + STAN: field.NewStringValue(getSTAN()), + }) + require.NoError(t, err) + + _, err = c.Send(message) + require.NoError(t, err) + + // we should receive two replies even if first message was + // skipped + require.Equal(t, int32(2), messagesReceived.Load()) + }) +} + // messageIO is a helper struct to read/write iso8583 messages from/to // io.Reader/io.Writer type messageIO struct { @@ -974,49 +1089,19 @@ func (m *messageIO) WriteMessage(w io.Writer, message *iso8583.Message) error { return nil } -// header is 2 bytes length of the message -type header struct { - Length uint16 +// messageReader is a helper struct to read iso8583 messages from +// io.Reader with custom message reader that test can control +type messageReader struct { + MessageReader func(r io.Reader) (*iso8583.Message, error) } -func TestClientOptionsWithMessageReaderAndWriter(t *testing.T) { - server, err := NewTestServer() - require.NoError(t, err) - defer server.Close() - - // create client with custom message reader and writer - msgIO := &messageIO{Spec: testSpec} - - c, err := connection.New(server.Addr, nil, nil, nil, - connection.SetMessageReader(msgIO), - connection.SetMessageWriter(msgIO), - connection.ErrorHandler(func(err error) { - require.NoError(t, err) - }), - ) - require.NoError(t, err) - - err = c.Connect() - require.NoError(t, err) - - // network management message - message := iso8583.NewMessage(testSpec) - err = message.Marshal(baseFields{ - MTI: field.NewStringValue("0800"), - STAN: field.NewStringValue(getSTAN()), - }) - require.NoError(t, err) - - // we can send iso message to the server - response, err := c.Send(message) - require.NoError(t, err) - time.Sleep(1 * time.Second) - - mti, err := response.GetMTI() - require.NoError(t, err) - require.Equal(t, "0810", mti) +func (r *messageReader) ReadMessage(reader io.Reader) (*iso8583.Message, error) { + return r.MessageReader(reader) +} - require.NoError(t, c.Close()) +// header is 2 bytes length of the message +type header struct { + Length uint16 } func TestConnection(t *testing.T) { From 5fa7e3c8f84f53777ee48667a89446d21010a015 Mon Sep 17 00:00:00 2001 From: Pavel Gabriel Date: Wed, 17 May 2023 17:41:16 +0200 Subject: [PATCH 09/10] fix benchmarks --- connection_test.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/connection_test.go b/connection_test.go index a8d9601..454be08 100644 --- a/connection_test.go +++ b/connection_test.go @@ -14,7 +14,6 @@ import ( "github.com/moov-io/iso8583" connection "github.com/moov-io/iso8583-connection" - "github.com/moov-io/iso8583-connection/server" "github.com/moov-io/iso8583/encoding" "github.com/moov-io/iso8583/field" "github.com/moov-io/iso8583/prefix" @@ -1151,23 +1150,23 @@ func TestClient_SetOptions(t *testing.T) { require.NotNil(t, c.Opts.TLSConfig) } -func BenchmarkSend100(b *testing.B) { benchmarkSend(100, b) } +func BenchmarkProcess100Messages(b *testing.B) { benchmarkSend(100, b) } -func BenchmarkSend1000(b *testing.B) { benchmarkSend(1000, b) } +func BenchmarkProcess1000Messages(b *testing.B) { benchmarkSend(1000, b) } -func BenchmarkSend10000(b *testing.B) { benchmarkSend(10000, b) } +func BenchmarkProcess10000Messages(b *testing.B) { benchmarkSend(10000, b) } -func BenchmarkSend100000(b *testing.B) { benchmarkSend(100000, b) } +func BenchmarkProcess100000Messages(b *testing.B) { benchmarkSend(100000, b) } func benchmarkSend(m int, b *testing.B) { - server := server.New(testSpec, readMessageLength, writeMessageLength) - // start on random port - err := server.Start("127.0.0.1:") + server, err := NewTestServer() if err != nil { - b.Fatal("starting server: ", err) + b.Fatal("starting test server: ", err) } - c, err := connection.New(server.Addr, testSpec, readMessageLength, writeMessageLength) + c, err := connection.New(server.Addr, testSpec, readMessageLength, writeMessageLength, + connection.SendTimeout(500*time.Millisecond), + ) if err != nil { b.Fatal("creating client: ", err) } @@ -1204,6 +1203,7 @@ func processMessages(b *testing.B, m int, c *connection.Connection) { message := iso8583.NewMessage(testSpec) message.MTI("0800") + message.Field(11, getSTAN()) _, err := c.Send(message) if err != nil { From dff4373aed9892ded3891d7e68f128a9ceb85257 Mon Sep 17 00:00:00 2001 From: Pavel Gabriel Date: Thu, 18 May 2023 11:06:25 +0200 Subject: [PATCH 10/10] return PackError from the Send when message packing failed --- connection.go | 12 ++++++++++-- connection_test.go | 28 ++++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 14 ++------------ 4 files changed, 41 insertions(+), 15 deletions(-) diff --git a/connection.go b/connection.go index 6e4a4dd..6caac50 100644 --- a/connection.go +++ b/connection.go @@ -401,7 +401,7 @@ func (c *Connection) writeMessage(w io.Writer, message *iso8583.Message) error { // default message writer packed, err := message.Pack() if err != nil { - return utils.NewSafeError(&PackError{err}, "packing message") + return utils.NewSafeError(&PackError{err}, "failed to pack message") } // create header @@ -514,10 +514,18 @@ func (c *Connection) writeLoop() { if err != nil { c.handleError(fmt.Errorf("writing message: %w", err)) - // if it's a pack error, we can continue to write messages var packErr *PackError if errors.As(err, &packErr) { + // let caller know that his message was not not sent + // because of pack error. We don't set all type of errors to errCh + // as this case is handled by handleConnectionError(err) + // which sends the same error to all pending requests, including + // this one + req.errCh <- err + err = nil + + // we can continue to write other messages continue } diff --git a/connection_test.go b/connection_test.go index 454be08..b0b65ea 100644 --- a/connection_test.go +++ b/connection_test.go @@ -205,6 +205,34 @@ func TestClient_Send(t *testing.T) { require.NoError(t, c.Close()) }) + t.Run("returns PackError when it fails to pack message", func(t *testing.T) { + c, err := connection.New(server.Addr, testSpec, readMessageLength, writeMessageLength) + require.NoError(t, err) + defer c.Close() + + err = c.Connect() + require.NoError(t, err) + + message := iso8583.NewMessage(testSpec) + + // setting MTI to 1 digit will cause PackError as it should be + // 4 digits + message.MTI("1") + + // we should set STAN as we check it before we pack the message + err = message.Field(11, "123456") + require.NoError(t, err) + + // when we send the message + _, err = c.Send(message) + + // then Send should return PackError + require.Error(t, err) + + var packError *connection.PackError + require.ErrorAs(t, err, &packError) + }) + t.Run("returns UnpackError with RawMessage when it fails to unpack message", func(t *testing.T) { // Given // connection with specification different from server diff --git a/go.mod b/go.mod index bbdbfa1..f975a65 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,6 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/yerden/go-util v1.1.4 // indirect - golang.org/x/text v0.8.0 // indirect + golang.org/x/text v0.9.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 33351da..23eacb0 100644 --- a/go.sum +++ b/go.sum @@ -6,12 +6,6 @@ github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mediocregopher/radix.v2 v0.0.0-20181115013041-b67df6e626f9/go.mod h1:fLRUbhbSd5Px2yKUaGYYPltlyxi1guJz1vCmo1RQL50= -github.com/moov-io/iso8583 v0.14.1 h1:kebG3hiWW41UvDMnNDbmgtDxjLqLOSxWCIITDTT3QMk= -github.com/moov-io/iso8583 v0.14.1/go.mod h1:RXWrTyAXP1diIY0J6sF0Jf7YRd9Y6fMV3+UFgKUfK/I= -github.com/moov-io/iso8583 v0.15.1 h1:mg2+B7rX7ESS6xoZip4g5eTBFb7AgK+H5H/HToUKFPw= -github.com/moov-io/iso8583 v0.15.1/go.mod h1:RXWrTyAXP1diIY0J6sF0Jf7YRd9Y6fMV3+UFgKUfK/I= -github.com/moov-io/iso8583 v0.15.2 h1:2CdYXA1I+hY7AaE2Ey0eTMmb3YuwL63uVsHDWNBfJFE= -github.com/moov-io/iso8583 v0.15.2/go.mod h1:P/u3VHcgXKB88jFNMoBL4clWWkHNqWDe8C8DoyKwxJk= github.com/moov-io/iso8583 v0.15.3 h1:4N36RMCxplHmF8mQPFych2RCaGgqKvbMyNp1v5Jrk4A= github.com/moov-io/iso8583 v0.15.3/go.mod h1:P/u3VHcgXKB88jFNMoBL4clWWkHNqWDe8C8DoyKwxJk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -21,17 +15,13 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/yerden/go-util v1.1.4 h1:jd8JyjLHzpEs1ZZQzDkfRgosDtXp/BtIAV1kpNjVTtw= github.com/yerden/go-util v1.1.4/go.mod h1:3HeLrvtkEeAv67ARostM9Yn0DcAVqgJ3uAiCuywEEXk= golang.org/x/sys v0.0.0-20190913121621-c3b328c6e5a7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= -golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= -golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=