diff --git a/arguments.go b/arguments.go index 621b51f..a01c0be 100644 --- a/arguments.go +++ b/arguments.go @@ -269,10 +269,19 @@ func (p *argParser) Tasks() (tasks []StreamTask) { } else if len(tasks) == 0 { tasks = append(tasks, ReadFromTask(p.stdin, YAML)) } - tasks = append(tasks, StreamWriteTo(p.stdout, p.output)) + tasks = append(tasks, p.outputTask()) return tasks } +func (p *argParser) outputTask() (s StreamTask) { + switch p.output { + case OutputJSON: + return StreamWriteJSON(p.stdout) + default: + return StreamWriteYAML(p.stdout) + } +} + func (p *argParser) inputTask() (s StreamTask) { if p.input == nil { return nil diff --git a/codec.go b/codec.go index 45e12b4..9621909 100644 --- a/codec.go +++ b/codec.go @@ -3,6 +3,7 @@ package ycat import ( "encoding/json" "io" + "os" "strings" yaml "gopkg.in/yaml.v2" @@ -58,32 +59,93 @@ func OutputFromString(s string) Output { } type Decoder interface { - Decode(x interface{}) error + Decode(interface{}) error } func NewDecoder(r io.Reader, format Format) Decoder { switch format { case JSON: return json.NewDecoder(r) - case YAML: - return yaml.NewDecoder(r) default: - panic("Invalid format") + return yaml.NewDecoder(r) } } -type Encoder interface { - Encode(x interface{}) error +func ReadFromFile(path string, format Format) ProducerFunc { + if format == Auto { + format = DetectFormat(path) + } + return func(s WriteStream) error { + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + r := ReadFromTask(f, format) + return r(s) + } } -func NewEncoder(w io.Writer, format Output) Encoder { - switch format { - case OutputJSON: - return json.NewEncoder(w) - case OutputYAML: - fallthrough - default: - return yaml.NewEncoder(w) +func ReadFromTask(r io.Reader, format Format) ProducerFunc { + return func(s WriteStream) error { + dec := NewDecoder(r, format) + for { + v := new(Value) + if err := dec.Decode(v); err != nil { + if err == io.EOF { + return nil + } + return err + } + + if !s.Push(v) { + return nil + } + } + } +} + +func StreamWriteJSON(w io.WriteCloser) ConsumerFunc { + enc := json.NewEncoder(w) + return func(s ReadStream) error { + defer w.Close() + for { + v, ok := s.Next() + if !ok { + return nil + } + if err := enc.Encode(v); err != nil { + return err + } + } } +} +func StreamWriteYAML(w io.WriteCloser) ConsumerFunc { + n := int64(0) + return func(s ReadStream) error { + defer w.Close() + for { + v, ok := s.Next() + if !ok { + return nil + } + if n > 0 { + nn, err := w.Write([]byte("---\n")) + if err != nil { + return err + } + n += int64(nn) + } + data, err := yaml.Marshal(v) + if err != nil { + return err + } + nn, err := w.Write(data) + if err != nil { + return err + } + n += int64(nn) + } + } } diff --git a/stream.go b/stream.go index f4f044b..6717509 100644 --- a/stream.go +++ b/stream.go @@ -2,8 +2,6 @@ package ycat import ( "context" - "io" - "os" ) type WriteStream interface { @@ -83,56 +81,6 @@ func (s *stream) Push(v *Value) bool { } } -func ReadFromFile(path string, format Format) ProducerFunc { - if format == Auto { - format = DetectFormat(path) - } - return func(s WriteStream) error { - f, err := os.Open(path) - if err != nil { - return err - } - defer f.Close() - r := ReadFromTask(f, format) - return r(s) - } -} - -func ReadFromTask(r io.Reader, format Format) ProducerFunc { - return func(s WriteStream) error { - dec := NewDecoder(r, format) - for { - v := new(Value) - if err := dec.Decode(v); err != nil { - if err == io.EOF { - return nil - } - return err - } - - if !s.Push(v) { - return nil - } - } - } -} - -func StreamWriteTo(w io.WriteCloser, format Output) ConsumerFunc { - return func(s ReadStream) error { - enc := NewEncoder(w, format) - defer w.Close() - for { - v, ok := s.Next() - if !ok { - return nil - } - if err := enc.Encode(v); err != nil { - return err - } - } - } -} - type NullStream struct{} func (NullStream) Produce(s WriteStream) error {