diff --git a/connection.go b/connection.go index dd24d92..6caac50 100644 --- a/connection.go +++ b/connection.go @@ -2,7 +2,6 @@ package connection import ( "bufio" - "bytes" "crypto/tls" "errors" "fmt" @@ -40,18 +39,30 @@ const ( StatusUnknown Status = "" ) -// ErrUnpack returns error with possibility to access RawMessage when +// UnpackError returns error with possibility to access RawMessage when // connection failed to unpack message -type ErrUnpack struct { +type UnpackError struct { Err error RawMessage []byte } -func (e *ErrUnpack) Error() string { +func (e *UnpackError) Error() string { return e.Err.Error() } -func (e *ErrUnpack) Unwrap() error { +func (e *UnpackError) Unwrap() error { + return e.Err +} + +type PackError struct { + Err error +} + +func (e *PackError) Error() string { + return e.Err.Error() +} + +func (e *PackError) Unwrap() error { return e.Err } @@ -62,7 +73,7 @@ type Connection struct { Opts Options conn io.ReadWriteCloser requestsCh chan request - readResponseCh chan []byte + readResponseCh chan *iso8583.Message done chan struct{} // spec that will be used to unpack received messages @@ -105,7 +116,7 @@ func New(addr string, spec *iso8583.MessageSpec, mlReader MessageLengthReader, m addr: addr, Opts: opts, requestsCh: make(chan request), - readResponseCh: make(chan []byte), + readResponseCh: make(chan *iso8583.Message), done: make(chan struct{}), respMap: make(map[string]response), spec: spec, @@ -296,8 +307,8 @@ func (c *Connection) Done() <-chan struct{} { // request represents request to the ISO 8583 server type request struct { - // includes length header and message itself - rawMessage []byte + // message to send + message *iso8583.Message // ID of the request (based on STAN, RRN, etc.) requestID string @@ -330,23 +341,6 @@ func (c *Connection) Send(message *iso8583.Message) (*iso8583.Message, error) { c.mutex.Unlock() defer c.wg.Done() - var buf bytes.Buffer - packed, err := message.Pack() - if err != nil { - return nil, fmt.Errorf("packing message: %w", err) - } - - // create header - _, err = c.writeMessageLength(&buf, len(packed)) - if err != nil { - return nil, fmt.Errorf("writing message header to buffer: %w", err) - } - - _, err = buf.Write(packed) - if err != nil { - return nil, fmt.Errorf("writing packed message to buffer: %w", err) - } - // prepare request reqID, err := c.Opts.RequestIDGenerator.GenerateRequestID(message) if err != nil { @@ -354,10 +348,10 @@ func (c *Connection) Send(message *iso8583.Message) (*iso8583.Message, error) { } req := request{ - rawMessage: buf.Bytes(), - requestID: reqID, - replyCh: make(chan *iso8583.Message), - errCh: make(chan error), + message: message, + requestID: reqID, + replyCh: make(chan *iso8583.Message), + errCh: make(chan error), } var resp *iso8583.Message @@ -399,42 +393,49 @@ func (c *Connection) Send(message *iso8583.Message) (*iso8583.Message, error) { return resp, err } -// Reply sends the message and does not wait for a reply to be received. -// Any reply received for message send using Reply will be handled with -// unmatchedMessageHandler -func (c *Connection) Reply(message *iso8583.Message) error { - c.mutex.Lock() - if c.closing { - c.mutex.Unlock() - return ErrConnectionClosed +func (c *Connection) writeMessage(w io.Writer, message *iso8583.Message) error { + if c.Opts.MessageWriter != nil { + return c.Opts.MessageWriter.WriteMessage(w, message) } - // calling wg.Add(1) within mutex guarantees that it does not pass the wg.Wait() call in the Close method - // otherwise we will have data race issue - c.wg.Add(1) - c.mutex.Unlock() - defer c.wg.Done() - // prepare message for sending - var buf bytes.Buffer + // default message writer packed, err := message.Pack() if err != nil { - return fmt.Errorf("packing message: %w", err) + return utils.NewSafeError(&PackError{err}, "failed to pack message") } // create header - _, err = c.writeMessageLength(&buf, len(packed)) + _, err = c.writeMessageLength(w, len(packed)) if err != nil { return fmt.Errorf("writing message header to buffer: %w", err) } - _, err = buf.Write(packed) + _, err = w.Write(packed) if err != nil { return fmt.Errorf("writing packed message to buffer: %w", err) } + return nil +} + +// Reply sends the message and does not wait for a reply to be received. +// Any reply received for message send using Reply will be handled with +// unmatchedMessageHandler +func (c *Connection) Reply(message *iso8583.Message) error { + c.mutex.Lock() + if c.closing { + c.mutex.Unlock() + return ErrConnectionClosed + } + // calling wg.Add(1) within mutex guarantees that it does not pass the wg.Wait() call in the Close method + // otherwise we will have data race issue + c.wg.Add(1) + c.mutex.Unlock() + defer c.wg.Done() + req := request{ - rawMessage: buf.Bytes(), - errCh: make(chan error), + message: message, + errCh: make(chan error), } c.requestsCh <- req @@ -442,6 +443,8 @@ func (c *Connection) Reply(message *iso8583.Message) error { sendTimeoutTimer := time.NewTimer(c.Opts.SendTimeout) defer sendTimeoutTimer.Stop() + var err error + select { case err = <-req.errCh: case <-sendTimeoutTimer.C: @@ -507,9 +510,25 @@ func (c *Connection) writeLoop() { c.pendingRequestsMu.Unlock() } - _, err = c.conn.Write([]byte(req.rawMessage)) + err = c.writeMessage(c.conn, req.message) if err != nil { - c.handleError(utils.NewSafeError(err, "failed to write message into connection")) + c.handleError(fmt.Errorf("writing message: %w", err)) + + var packErr *PackError + if errors.As(err, &packErr) { + // let caller know that his message was not not sent + // because of pack error. We don't set all type of errors to errCh + // as this case is handled by handleConnectionError(err) + // which sends the same error to all pending requests, including + // this one + req.errCh <- err + + err = nil + + // we can continue to write other messages + continue + } + break } @@ -539,29 +558,70 @@ func (c *Connection) writeLoop() { // readLoop reads data from the socket (message length header and raw message) // and runs a goroutine to handle the message func (c *Connection) readLoop() { - var err error - var messageLength int + var outErr error r := bufio.NewReader(c.conn) for { - messageLength, err = c.readMessageLength(r) + message, err := c.readMessage(r) if err != nil { - c.handleError(utils.NewSafeError(err, "failed to read message length")) + c.handleError(utils.NewSafeError(err, "failed to read message from connection")) + + // if err is ErrUnpack, we can still continue reading + // from the connection + var unpackErr *UnpackError + if errors.As(err, &unpackErr) { + continue + } + + outErr = err break } - // read the packed message - rawMessage := make([]byte, messageLength) - _, err = io.ReadFull(r, rawMessage) - if err != nil { - c.handleError(utils.NewSafeError(err, "failed to read message from connection")) - break + // if readMessage returns nil message, it means that + // it was a ping message or something else, not a regular + // iso8583 message and we can continue reading + if message == nil { + continue } - c.readResponseCh <- rawMessage + c.readResponseCh <- message } - c.handleConnectionError(err) + c.handleConnectionError(outErr) +} + +// readMessage reads message length header and raw message from the connection +// and returns iso8583.Message and error if any +func (c *Connection) readMessage(r io.Reader) (*iso8583.Message, error) { + if c.Opts.MessageReader != nil { + return c.Opts.MessageReader.ReadMessage(r) + } + + // default message reader + messageLength, err := c.readMessageLength(r) + if err != nil { + return nil, fmt.Errorf("failed to read message length: %w", err) + } + + // read the packed message + rawMessage := make([]byte, messageLength) + _, err = io.ReadFull(r, rawMessage) + if err != nil { + return nil, fmt.Errorf("failed to read message from connection: %w", err) + } + + // unpack the message + message := iso8583.NewMessage(c.spec) + err = message.Unpack(rawMessage) + if err != nil { + unpackErr := &UnpackError{ + Err: err, + RawMessage: rawMessage, + } + return nil, fmt.Errorf("unpacking message: %w", unpackErr) + } + + return message, nil } func (c *Connection) readResponseLoop() { @@ -584,21 +644,9 @@ func (c *Connection) readResponseLoop() { } } -// handleResponse unpacks the message and then sends it to the reply channel -// that corresponds to the message ID (request ID) -func (c *Connection) handleResponse(rawMessage []byte) { - // create message - message := iso8583.NewMessage(c.spec) - err := message.Unpack(rawMessage) - if err != nil { - unpackErr := &ErrUnpack{ - Err: err, - RawMessage: rawMessage, - } - c.handleError(utils.NewSafeError(unpackErr, "failed to unpack message")) - return - } - +// handleResponse sends message to the reply channel that corresponds to the +// message ID (request ID) +func (c *Connection) handleResponse(message *iso8583.Message) { if isResponse(message) { reqID, err := c.Opts.RequestIDGenerator.GenerateRequestID(message) if err != nil { diff --git a/connection_test.go b/connection_test.go index f821c80..b0b65ea 100644 --- a/connection_test.go +++ b/connection_test.go @@ -1,6 +1,7 @@ package connection_test import ( + "encoding/binary" "errors" "fmt" "io" @@ -13,7 +14,6 @@ import ( "github.com/moov-io/iso8583" connection "github.com/moov-io/iso8583-connection" - "github.com/moov-io/iso8583-connection/server" "github.com/moov-io/iso8583/encoding" "github.com/moov-io/iso8583/field" "github.com/moov-io/iso8583/prefix" @@ -205,6 +205,34 @@ func TestClient_Send(t *testing.T) { require.NoError(t, c.Close()) }) + t.Run("returns PackError when it fails to pack message", func(t *testing.T) { + c, err := connection.New(server.Addr, testSpec, readMessageLength, writeMessageLength) + require.NoError(t, err) + defer c.Close() + + err = c.Connect() + require.NoError(t, err) + + message := iso8583.NewMessage(testSpec) + + // setting MTI to 1 digit will cause PackError as it should be + // 4 digits + message.MTI("1") + + // we should set STAN as we check it before we pack the message + err = message.Field(11, "123456") + require.NoError(t, err) + + // when we send the message + _, err = c.Send(message) + + // then Send should return PackError + require.Error(t, err) + + var packError *connection.PackError + require.ErrorAs(t, err, &packError) + }) + t.Run("returns UnpackError with RawMessage when it fails to unpack message", func(t *testing.T) { // Given // connection with specification different from server @@ -279,9 +307,9 @@ func TestClient_Send(t *testing.T) { mu.Lock() defer mu.Unlock() - var unpackErr *connection.ErrUnpack + var unpackErr *connection.UnpackError if errors.As(handledError, &unpackErr) { - require.EqualError(t, handledError, "failed to unpack message") + require.EqualError(t, handledError, "failed to read message from connection") require.EqualError(t, unpackErr, "failed to unpack field 63: no specification found") require.NotEmpty(t, unpackErr.RawMessage) return true @@ -915,6 +943,194 @@ func TestClient_Options(t *testing.T) { }) } +func TestClientWithMessageReaderAndWriter(t *testing.T) { + server, err := NewTestServer() + require.NoError(t, err) + defer server.Close() + + // create client with custom message reader and writer + msgIO := &messageIO{Spec: testSpec} + + t.Run("send and receive iso 8583 message", func(t *testing.T) { + c, err := connection.New(server.Addr, nil, nil, nil, + connection.SetMessageReader(msgIO), + connection.SetMessageWriter(msgIO), + connection.ErrorHandler(func(err error) { + require.NoError(t, err) + }), + ) + require.NoError(t, err) + + err = c.Connect() + require.NoError(t, err) + + // network management message + message := iso8583.NewMessage(testSpec) + err = message.Marshal(baseFields{ + MTI: field.NewStringValue("0800"), + STAN: field.NewStringValue(getSTAN()), + }) + require.NoError(t, err) + + // we can send iso message to the server + response, err := c.Send(message) + require.NoError(t, err) + time.Sleep(1 * time.Second) + + mti, err := response.GetMTI() + require.NoError(t, err) + require.Equal(t, "0810", mti) + + require.NoError(t, c.Close()) + }) + + t.Run("continue to read messages if message reader returns nil message and nil error", func(t *testing.T) { + // skipMessage is used to skip first message + skipMessage := &atomic.Bool{} + messagesReceived := &atomic.Int32{} + + // create connection with custom message reader so we can + // control what message is returned. msgReader will return nil + // message and nil error for the first message and return real + // message for the second message + msgReader := &messageReader{ + MessageReader: func(r io.Reader) (*iso8583.Message, error) { + msg, err := msgIO.ReadMessage(r) + if err != nil { + return nil, err + } + + messagesReceived.Add(1) + + if skipMessage.Load() { + return nil, nil + } + + return msg, nil + }, + } + + c, err := connection.New(server.Addr, nil, nil, nil, + connection.SetMessageReader(connection.MessageReader(msgReader)), + connection.SetMessageWriter(msgIO), + connection.ErrorHandler(func(err error) { + require.NoError(t, err) + }), + connection.SendTimeout(500*time.Millisecond), + ) + require.NoError(t, err) + + err = c.Connect() + require.NoError(t, err) + defer c.Close() + + // set skipMessage to true so readMessage will return nil message + skipMessage.Store(true) + + // send first message + message := iso8583.NewMessage(testSpec) + err = message.Marshal(baseFields{ + MTI: field.NewStringValue("0800"), + STAN: field.NewStringValue(getSTAN()), + }) + require.NoError(t, err) + + // this message will timeout because we are skipping its reply + _, err = c.Send(message) + require.ErrorIs(t, err, connection.ErrSendTimeout) + + // set skipMessage to false to read message as usual + skipMessage.Store(false) + + message = iso8583.NewMessage(testSpec) + err = message.Marshal(baseFields{ + MTI: field.NewStringValue("0800"), + STAN: field.NewStringValue(getSTAN()), + }) + require.NoError(t, err) + + _, err = c.Send(message) + require.NoError(t, err) + + // we should receive two replies even if first message was + // skipped + require.Equal(t, int32(2), messagesReceived.Load()) + }) +} + +// messageIO is a helper struct to read/write iso8583 messages from/to +// io.Reader/io.Writer +type messageIO struct { + Spec *iso8583.MessageSpec +} + +func (m *messageIO) ReadMessage(r io.Reader) (*iso8583.Message, error) { + // read 2 bytes header + h := &header{} + err := binary.Read(r, binary.BigEndian, h) + if err != nil { + return nil, fmt.Errorf("failed to read message length: %w", err) + } + + // read message + rawMessage := make([]byte, h.Length) + _, err = io.ReadFull(r, rawMessage) + if err != nil { + return nil, fmt.Errorf("failed to read message: %w", err) + } + + // unpack message + message := iso8583.NewMessage(m.Spec) + err = message.Unpack(rawMessage) + if err != nil { + return nil, fmt.Errorf("failed to unpack message: %w", err) + } + + return message, nil +} + +func (m *messageIO) WriteMessage(w io.Writer, message *iso8583.Message) error { + // pack message + rawMessage, err := message.Pack() + if err != nil { + return fmt.Errorf("failed to pack message: %w", err) + } + + // create header with message length + h := header{ + Length: uint16(len(rawMessage)), + } + + // write header + err = binary.Write(w, binary.BigEndian, h) + if err != nil { + return fmt.Errorf("failed to write message length: %w", err) + } + + // write message + _, err = w.Write(rawMessage) + if err != nil { + return fmt.Errorf("failed to write message: %w", err) + } + + return nil +} + +// messageReader is a helper struct to read iso8583 messages from +// io.Reader with custom message reader that test can control +type messageReader struct { + MessageReader func(r io.Reader) (*iso8583.Message, error) +} + +func (r *messageReader) ReadMessage(reader io.Reader) (*iso8583.Message, error) { + return r.MessageReader(reader) +} + +// header is 2 bytes length of the message +type header struct { + Length uint16 +} + func TestConnection(t *testing.T) { t.Run("Status", func(t *testing.T) { c, err := connection.New("1.1.1.1", nil, nil, nil) @@ -962,23 +1178,23 @@ func TestClient_SetOptions(t *testing.T) { require.NotNil(t, c.Opts.TLSConfig) } -func BenchmarkSend100(b *testing.B) { benchmarkSend(100, b) } +func BenchmarkProcess100Messages(b *testing.B) { benchmarkSend(100, b) } -func BenchmarkSend1000(b *testing.B) { benchmarkSend(1000, b) } +func BenchmarkProcess1000Messages(b *testing.B) { benchmarkSend(1000, b) } -func BenchmarkSend10000(b *testing.B) { benchmarkSend(10000, b) } +func BenchmarkProcess10000Messages(b *testing.B) { benchmarkSend(10000, b) } -func BenchmarkSend100000(b *testing.B) { benchmarkSend(100000, b) } +func BenchmarkProcess100000Messages(b *testing.B) { benchmarkSend(100000, b) } func benchmarkSend(m int, b *testing.B) { - server := server.New(testSpec, readMessageLength, writeMessageLength) - // start on random port - err := server.Start("127.0.0.1:") + server, err := NewTestServer() if err != nil { - b.Fatal("starting server: ", err) + b.Fatal("starting test server: ", err) } - c, err := connection.New(server.Addr, testSpec, readMessageLength, writeMessageLength) + c, err := connection.New(server.Addr, testSpec, readMessageLength, writeMessageLength, + connection.SendTimeout(500*time.Millisecond), + ) if err != nil { b.Fatal("creating client: ", err) } @@ -1015,6 +1231,7 @@ func processMessages(b *testing.B, m int, c *connection.Connection) { message := iso8583.NewMessage(testSpec) message.MTI("0800") + message.Field(11, getSTAN()) _, err := c.Send(message) if err != nil { diff --git a/go.mod b/go.mod index bbdbfa1..f975a65 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,6 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/yerden/go-util v1.1.4 // indirect - golang.org/x/text v0.8.0 // indirect + golang.org/x/text v0.9.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 33351da..23eacb0 100644 --- a/go.sum +++ b/go.sum @@ -6,12 +6,6 @@ github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mediocregopher/radix.v2 v0.0.0-20181115013041-b67df6e626f9/go.mod h1:fLRUbhbSd5Px2yKUaGYYPltlyxi1guJz1vCmo1RQL50= -github.com/moov-io/iso8583 v0.14.1 h1:kebG3hiWW41UvDMnNDbmgtDxjLqLOSxWCIITDTT3QMk= -github.com/moov-io/iso8583 v0.14.1/go.mod h1:RXWrTyAXP1diIY0J6sF0Jf7YRd9Y6fMV3+UFgKUfK/I= -github.com/moov-io/iso8583 v0.15.1 h1:mg2+B7rX7ESS6xoZip4g5eTBFb7AgK+H5H/HToUKFPw= -github.com/moov-io/iso8583 v0.15.1/go.mod h1:RXWrTyAXP1diIY0J6sF0Jf7YRd9Y6fMV3+UFgKUfK/I= -github.com/moov-io/iso8583 v0.15.2 h1:2CdYXA1I+hY7AaE2Ey0eTMmb3YuwL63uVsHDWNBfJFE= -github.com/moov-io/iso8583 v0.15.2/go.mod h1:P/u3VHcgXKB88jFNMoBL4clWWkHNqWDe8C8DoyKwxJk= github.com/moov-io/iso8583 v0.15.3 h1:4N36RMCxplHmF8mQPFych2RCaGgqKvbMyNp1v5Jrk4A= github.com/moov-io/iso8583 v0.15.3/go.mod h1:P/u3VHcgXKB88jFNMoBL4clWWkHNqWDe8C8DoyKwxJk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -21,17 +15,13 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/yerden/go-util v1.1.4 h1:jd8JyjLHzpEs1ZZQzDkfRgosDtXp/BtIAV1kpNjVTtw= github.com/yerden/go-util v1.1.4/go.mod h1:3HeLrvtkEeAv67ARostM9Yn0DcAVqgJ3uAiCuywEEXk= golang.org/x/sys v0.0.0-20190913121621-c3b328c6e5a7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= -golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= -golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/options.go b/options.go index d0ee7d5..32512c9 100644 --- a/options.go +++ b/options.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "io" "os" "time" @@ -64,6 +65,22 @@ type Options struct { // RequestIDGenerator is used to generate a unique identifier for a request // so that responses from the server can be matched to the original request. RequestIDGenerator RequestIDGenerator + + // MessageReader is used to read a message from the connection + // if set, connection's MessageLengthReader will be ignored + MessageReader MessageReader + + // MessageWriter is used to write a message to the connection + // if set, connection's MessageLengthWriter will be ignored + MessageWriter MessageWriter +} + +type MessageReader interface { + ReadMessage(r io.Reader) (*iso8583.Message, error) +} + +type MessageWriter interface { + WriteMessage(w io.Writer, message *iso8583.Message) error } type Option func(*Options) error @@ -244,3 +261,19 @@ func SetRequestIDGenerator(g RequestIDGenerator) Option { return nil } } + +// SetMessageReader sets a MessageReader option +func SetMessageReader(r MessageReader) Option { + return func(o *Options) error { + o.MessageReader = r + return nil + } +} + +// SetMessageWriter sets a MessageWriter option +func SetMessageWriter(w MessageWriter) Option { + return func(o *Options) error { + o.MessageWriter = w + return nil + } +}