Skip to content

Commit

Permalink
allow specifying multiple arguments with {0}, {1}, {2}
Browse files Browse the repository at this point in the history
  • Loading branch information
brentp committed Jun 29, 2016
1 parent 3d35d68 commit cd72227
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 35 deletions.
35 changes: 25 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<!--
rm -rf binaries
mkdir -p binaries/
VERSION=0.1.0
VERSION=0.2.0
for os in darwin linux windows; do
GOOS=$os GOARCH=$arch go build -o binaries/gargs_${os} main.go
done
Expand All @@ -14,23 +14,33 @@ Work In Progress:
gargs is like xargs but it addresses the following limitations in xargs:

+ it keeps the output serialized even when using multiple threads
+ easy to specify multiple arguments
+ easy to specify multiple arguments with number blocks ({0}, {1}, ...) and {} indicates the entire line.

As an example that currently works, this will keep the output in order and send 3 arguments to each process.
This will keep the output in order (via -o) and send 3 arguments to each process
by pulling in lines of 3.
It is using 4 proceses to parallelize.

```
$ seq 12 -1 1 | gargs -p 4 -n 3 "sleep {}; echo {} {}"
$ seq 12 -1 1 | gargs -o -p 4 -n 3 "sleep {0}; echo {1} {2}"
11 10
8 7
5 4
2 1
```

Note that for each line, we slept 12, 9, 6, 3 seconds respectively but the output order was maintained.
Note that for each line, we slept 12, 9, 6, 3 seconds respectively but the output order was maintained. We can make
more even use of cores by not enforcing the output order (remove -o)

```
$ seq 12 -1 1 | gargs -p 4 -n 3 "sleep {0}; echo {1} {2}"
2 1
5 4
8 7
11 10
```


For now, the -n 3 is redundant with seeing the "{}"'s. In the future, it may be possible to use numbered arguments:
The -n 3 indicates that we'll use 3 lines to fill the args. redundant with seeing the "{}"'s. In the future, it may be possible to use numbered arguments:

Install
=======
Expand All @@ -50,7 +60,11 @@ chr4 22 33
That has a mixture of tabs and spaces. We can convert to chrom:start-end format with:

```
cat t.txt | gargs --sep "\s+" -p 2 "echo '{}:{}-{}'"
$ cat t.txt | gargs --sep "\s+" -p 2 "echo '{0}:{1}-{2}' full-line: \'{}\'"
chr2:22-33 full-line: 'chr2 22 33'
chr1:22-33 full-line: 'chr1 22 33'
chr3:22-33 full-line: 'chr3 22 33'
chr4:22-33 full-line: 'chr4 22 33'
```

In this case, we're using **2** processes to run this in parallel which will make more of a difference
Expand All @@ -63,7 +77,7 @@ Usage
=====

```
usage: gargs [--procs PROCS] [--nlines NLINES] [--sep SEP] [--shell SHELL] [--verbose] COMMAND
usage: gargs [--procs PROCS] [--nlines NLINES] [--sep SEP] [--shell SHELL] [--verbose] [--continueonerror] [--ordered] COMMAND
positional arguments:
command command to execute
Expand All @@ -76,12 +90,13 @@ options:
--sep SEP, -s SEP regular expression split line with to fill multiple template spots default is not to split. -s and -n are mutually exclusive.
--shell SHELL shell to use [default: bash]
--verbose, -v print commands to stderr before they are executed.
--continueonerror, -c
report errors but don't stop the entire execution (which is the default).
--ordered, -o keep output in order of input; default is to output in order of return which greatly improves parallelization.
--help, -h display this help and exit
```

TODO
====

+ --unordered flag to specify that we don't care about the output order. Will improve parallelization for some cases.
+ {0}, {1}, {2} place-holders?
+ combinations of `-n` and `--sep`.
128 changes: 103 additions & 25 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"fmt"
"io"
"log"
Expand All @@ -9,20 +10,31 @@ import (
"regexp"
"runtime"
"strings"
"sync"
"syscall"
"text/template"

"github.com/alexflint/go-arg"
"github.com/brentp/xopen"
)

const VERSION = "0.1.0"
const VERSION = "0.2.0"

type Args struct {
Procs int `arg:"-p,help:number of processes to use"`
Nlines int `arg:"-n,help:number of lines to consume for each command. -s and -n are mutually exclusive."`
Command string `arg:"positional,required,help:command to execute"`
Sep string `arg:"-s,help:regular expression split line with to fill multiple template spots default is not to split. -s and -n are mutually exclusive."`
Shell string `arg:"help:shell to use"`
Verbose bool `arg:"-v,help:print commands to stderr before they are executed."`
Procs int `arg:"-p,help:number of processes to use"`
Nlines int `arg:"-n,help:number of lines to consume for each command. -s and -n are mutually exclusive."`
Command string `arg:"positional,required,help:command to execute"`
Sep string `arg:"-s,help:regular expression split line with to fill multiple template spots default is not to split. -s and -n are mutually exclusive."`
Shell string `arg:"help:shell to use"`
Verbose bool `arg:"-v,help:print commands to stderr before they are executed."`
ContinueOnError bool `arg:"-c,help:report errors but don't stop the entire execution (which is the default)."`
Ordered bool `arg:"-o,help:keep output in order of input; default is to output in order of return which greatly improves parallelization."`
}

// hold the arguments for each call.
type xargs struct {
Lines []string
Xs []string
}

func main() {
Expand All @@ -33,6 +45,8 @@ func main() {
args.Sep = ""
args.Shell = "bash"
args.Verbose = false
args.ContinueOnError = false
args.Ordered = false
p := arg.MustParse(&args)
if args.Sep != "" && args.Nlines > 1 {
p.Fail("must specify either sep (-s) or n-lines (-n), not both")
Expand All @@ -42,7 +56,11 @@ func main() {
os.Exit(255)
}
runtime.GOMAXPROCS(args.Procs)
run(args)
if args.Ordered {
runOrdered(args)
} else {
runUnOrdered(args)
}
}

func check(e error) {
Expand All @@ -51,8 +69,8 @@ func check(e error) {
}
}

func genLines(n int, sep string) chan []interface{} {
ch := make(chan []interface{})
func genXargs(n int, sep string) chan *xargs {
ch := make(chan *xargs)
var resep *regexp.Regexp
if sep != "" {
resep = regexp.MustCompile(sep)
Expand All @@ -62,19 +80,16 @@ func genLines(n int, sep string) chan []interface{} {
rdr, err := xopen.Ropen("-")
check(err)
k := 0
lines := make([]interface{}, n)
re := regexp.MustCompile(`\r?\n`)
lines := make([]string, n)

for {
line, err := rdr.ReadString('\n')
if err == nil || (err == io.EOF && len(line) > 0) {
line = re.ReplaceAllString(line, "")
if resep != nil {
toks := resep.Split(line, -1)
itoks := make([]interface{}, len(toks))
for i, t := range toks {
itoks[i] = t
}
ch <- itoks
ch <- &xargs{Xs: toks, Lines: []string{line}}
} else {
lines[k] = line
k += 1
Expand All @@ -87,22 +102,54 @@ func genLines(n int, sep string) chan []interface{} {
}
if k == n {
k = 0
ch <- lines
lines = make([]interface{}, n)
ch <- &xargs{Lines: lines, Xs: lines}
lines = make([]string, n)
}
}
if k > 0 {
ch <- lines[:k]
ch <- &xargs{Lines: lines[:k], Xs: lines}
}
close(ch)
}()
return ch
}

func run(args Args) {
func runUnOrdered(args Args) {
c := make(chan []byte)
chXargs := genXargs(args.Nlines, args.Sep)
cmd := makeCommand(args.Command)

go func() {
var wg sync.WaitGroup
wg.Add(args.Procs)
for i := 0; i < args.Procs; i++ {
go func() {
defer wg.Done()
for {
x, ok := <-chXargs
if !ok {
return
}
ch := make(chan []byte, 1)
process(ch, cmd, args, x)
c <- <-ch
}
}()

}
wg.Wait()
close(c)
}()

for o := range c {
os.Stdout.Write(o)
}
}

func runOrdered(args Args) {
ch := make(chan chan []byte, args.Procs)

chlines := genLines(args.Nlines, args.Sep)
chlines := genXargs(args.Nlines, args.Sep)
cmd := makeCommand(args.Command)

go func() {
Expand All @@ -120,19 +167,50 @@ func run(args Args) {
}

func makeCommand(cmd string) string {
return strings.Replace(strings.Replace(cmd, "%", "%%", -1), "{}", "%s", -1)
//return strings.Replace(strings.Replace(cmd, "%", "%%", -1), "{}", "%s", -1)
v := strings.Replace(cmd, "{}", "{{index .Lines 0}}", -1)
re := regexp.MustCompile(`({\d+})`)
v = re.ReplaceAllStringFunc(v, func(match string) string {
return "{{index .Xs " + match[1:len(match)-1] + "}}"
})
return v
}

func process(ch chan []byte, cmdStr string, args Args, lines []interface{}) {
cmdStr = fmt.Sprintf(cmdStr, lines...)
func process(ch chan []byte, cmdStr string, args Args, xarg *xargs) {

tmpl, err := template.New(cmdStr).Parse(cmdStr)
check(err)

var buf bytes.Buffer
check(tmpl.Execute(&buf, xarg))

cmdStr = buf.String()

if args.Verbose {
fmt.Fprintf(os.Stderr, "command: %s\n", cmdStr)
}

cmd := exec.Command(args.Shell, "-c", cmdStr)
cmd.Stderr = os.Stderr
out, err := cmd.Output()
check(err)
if err != nil {
var argString string
if xarg.Xs != nil && len(xarg.Xs) > 0 {
argString = strings.Join(xarg.Xs, ",")
} else {
argString = strings.Join(xarg.Lines, "|")
}
log.Printf("ERROR in command: %s\twith args: %s", cmdStr, argString)
log.Println(err)
if !args.ContinueOnError {
if ex, ok := err.(*exec.ExitError); ok {
if st, ok := ex.Sys().(syscall.WaitStatus); ok {
os.Exit(st.ExitStatus())
}
}
os.Exit(1)
}
}
ch <- out
close(ch)
}

0 comments on commit cd72227

Please sign in to comment.