Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[skip ci] live streaming VCH creation log to VCH datastore folder #6403

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/vic-machine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/vmware/vic/cmd/vic-machine/list"
"github.com/vmware/vic/cmd/vic-machine/update"
"github.com/vmware/vic/cmd/vic-machine/upgrade"
"github.com/vmware/vic/lib/install/vchlog"
viclog "github.com/vmware/vic/pkg/log"
"github.com/vmware/vic/pkg/version"
)
Expand Down Expand Up @@ -139,6 +140,12 @@ func main() {
logs = append(logs, f)
}

// create the logger for streaming VCH log messages
vchlog.Init()
logs = append(logs, vchlog.GetPipe())
go vchlog.Run()
defer vchlog.Close() // close the logger pipe when done

// Initiliaze logger with default TextFormatter
log.SetFormatter(viclog.NewTextFormatter())
// SetOutput to io.MultiWriter so that we can log to stdout and a file
Expand Down
14 changes: 13 additions & 1 deletion lib/install/management/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/vmware/govmomi/object"
"github.com/vmware/vic/lib/config"
"github.com/vmware/vic/lib/install/data"
"github.com/vmware/vic/lib/install/vchlog"
"github.com/vmware/vic/pkg/errors"
"github.com/vmware/vic/pkg/retry"
"github.com/vmware/vic/pkg/trace"
Expand All @@ -37,6 +38,7 @@ const (
uploadMaxElapsedTime = 30 * time.Minute
uploadMaxInterval = 1 * time.Minute
uploadInitialInterval = 10 * time.Second
timeFormat = "2006-01-02T15:04:05-0700"
)

func (d *Dispatcher) CreateVCH(conf *config.VirtualContainerHostConfigSpec, settings *data.InstallerData) error {
Expand All @@ -60,6 +62,16 @@ func (d *Dispatcher) CreateVCH(conf *config.VirtualContainerHostConfigSpec, sett
return errors.Errorf("Creating the appliance failed with %s. Exiting...", err)
}

// send the signal to VCH logger to indicate VCH datastore path is ready
datastoreReadySignal := vchlog.DatastoreReadySignal{
Datastore: d.session.Datastore,
LogFileName: "vic-machine-create",
Operation: trace.NewOperation(d.ctx, "vic-machine create"),
VMPathName: d.vmPathName,
Timestamp: time.Now().UTC().Format(timeFormat),
}
vchlog.Signal(datastoreReadySignal)

if err = d.uploadImages(settings.ImageFiles); err != nil {
return errors.Errorf("Uploading images failed with %s. Exiting...", err)
}
Expand Down Expand Up @@ -139,7 +151,7 @@ func (d *Dispatcher) uploadImages(files map[string]string) error {
switch err.(type) {
// if not found, do nothing
case object.DatastoreNoSuchFileError:
// otherwise force delete
// otherwise force delete
default:
log.Debugf("target delete path = %s", isoTargetPath)
err := fm.Delete(d.ctx, isoTargetPath)
Expand Down
82 changes: 82 additions & 0 deletions lib/install/vchlog/bufferedPipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2016-2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package vchlog

import (
"bytes"
"sync"
"io"
)

// BufferedPipe struct implements a pipe readwriter with buffer
// buffer: the internal buffer to hold data
// c: the sync locker to manage concurrent reads and writes
// closed: boolean indicating if the stream is closed
type BufferedPipe struct {
buffer *bytes.Buffer
c *sync.Cond
closed bool
}

// NewBufferedPipe returns a new buffered pipe instance
// the internal buffer is initialized to default size.
// Since internal memory is used, need to make sure that buffered data is bounded.
func NewBufferedPipe() *BufferedPipe {
var m sync.Mutex
c := sync.NewCond(&m)
return &BufferedPipe{
buffer: bytes.NewBuffer(nil),
c: c,
closed: false,
}
}

// Read is blocked until a writer in the queue is done writing (until data is available)
func (bp *BufferedPipe) Read(data []byte) (n int, err error) {
bp.c.L.Lock()
defer bp.c.L.Unlock()

// pipe closed, drop all left-over data
if bp.closed {
return 0, io.EOF
}
for bp.buffer.Len() == 0 && !bp.closed {
bp.c.Wait()
}

return bp.buffer.Read(data)
}

// Write writes to the internal buffer, and signals one of the reader in queue to start reading.
func (bp *BufferedPipe) Write(data []byte) (n int, err error) {
bp.c.L.Lock()
defer bp.c.L.Unlock()
defer bp.c.Signal()

if bp.closed {
return 0, io.ErrUnexpectedEOF
}

return bp.buffer.Write(data)
}

// Close closes the pipe.
func (bp *BufferedPipe) Close() (err error) {
bp.c.L.Lock()
defer bp.c.L.Unlock()
defer bp.c.Signal()
bp.closed = true
return nil
}
230 changes: 230 additions & 0 deletions lib/install/vchlog/bufferedPipe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
// Copyright 2016-2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package vchlog

import (
. "io"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

// Part 1. Test basic pipe functionality
// (Referred to: https://golang.org/src/io/pipe_test.go)

// Test a single read and write
func TestSingleReadWrite(t *testing.T) {
testByte := []byte("hello world")
toRead := make([]byte, 64)
bp := NewBufferedPipe()
writeDone := make(chan int)

// write to the pipe
go write(t, bp, testByte, len(testByte), writeDone)

// read from the pipe
read(t, bp, toRead, len(testByte), nil)
assert.Equal(t, string(testByte), string(toRead[0:len(testByte)]),
"expected %s, read %s", string(testByte), string(toRead[0:len(testByte)]))

<-writeDone
bp.Close()
}

// Test a sequence of reads and writes
func readSequence(t *testing.T, r Reader, c chan int) {
buf := make([]byte, 64)
for {
n, err := r.Read(buf)
if err == EOF {
c <- 0
break
}
assert.Nil(t, err, "read error: %v", err)
c <- n
}
}

func TestSequenceReadsWrites(t *testing.T) {
readDone := make(chan int)
bp := NewBufferedPipe()
buf := make([]byte, 64)

// fire the reader
go readSequence(t, bp, readDone)

// write to the pipe
for i := 0; i < 5; i++ {
l := 5 + i*10
toWrite := buf[0:l]
write(t, bp, toWrite, l, nil)

n := <-readDone
assert.Equal(t, l, n, "wrote %d bytes, read %d bytes", l, n)
}

bp.Close()
n := <-readDone
assert.Equal(t, 0, n, "final read should be 0, got %d", n)
}

// Part 2. Test buffered pipe functionalities

// Test write buffering
func TestWriteBuffering(t *testing.T) {
bp := NewBufferedPipe()

// write first chunk of data
firstChunk := make([]byte, 16)
write(t, bp, firstChunk, len(firstChunk), nil)

// fire the reader
readDone := make(chan int)
go func() {
buf := make([]byte, 64)
for i := 0; i < 2; i++ {
l := 8 * (i + 2)
read(t, bp, buf, l, nil)
}
readDone <- 0
}()

// sleep before the next write
time.Sleep(time.Second * 5)

// write second chunk of data
secondChunk := make([]byte, 24)
write(t, bp, secondChunk, len(secondChunk), nil)

<-readDone
bp.Close()
}

// Test concurrent writers
func writeSequence(t *testing.T, w Writer, l int, c chan int) {
for i := 0; i < l; i++ {
write(t, w, []byte(strconv.Itoa(i)), 1, nil)
}
time.Sleep(1 * time.Second)
c <- 0
}

func TestConcurrentWriter(t *testing.T) {
bp := NewBufferedPipe()
l := 10
writersNum := 5
total := writersNum * l

// fire 5 concurrent writers
chans := make([]chan int, writersNum)
for i := 0; i < writersNum; i++ {
chans[i] = make(chan int)
go writeSequence(t, bp, l, chans[i])
}

// fire one concurrent reader
finalLen := 0
readDone := make(chan int)
go func() {
for {
buf := make([]byte, 16)
n, err := bp.Read(buf)
if err == EOF {
break
}
assert.Nil(t, err, "read error: %v", err)
finalLen += n
}
readDone <- 0
}()

// wait for writers to finish
for i := 0; i < writersNum; i++ {
<-chans[i]
}
bp.Close()

// check if the output has all the bytes
<-readDone
assert.Equal(t, total, finalLen,
"%d concurrent writers wrote %d bytes total, got %d bytes", writersNum, total, finalLen)
}

// Part 3. Edge cases

// Test read on closed pipe during read
func delayClose(t *testing.T, cl Closer, c chan int) {
time.Sleep(1 * time.Millisecond)
err := cl.Close()
assert.Nil(t, err, "close error: %v", err)
c <- 0
}

func TestPipeReadClose(t *testing.T) {
bp := NewBufferedPipe()
c := make(chan int)

// delay closer
go delayClose(t, bp, c)

// read is expected to block until the pipe is closed
buf := make([]byte, 64)
n, err := bp.Read(buf)
<-c

assert.Equal(t, EOF, err, "read from closed pipe: %v want %v", err, EOF)
assert.Equal(t, 0, n, "read on closed pipe returned %d bytes", n)
}

// Test write on closed pipe during write
func TestPipeWriteClose(t *testing.T) {
bp := NewBufferedPipe()

// close pipe
bp.Close()

// write
buf := make([]byte, 64)
n, err := bp.Write(buf)

assert.Equal(t, ErrUnexpectedEOF, err, "write to closed pipe: %v want %v", err, ErrUnexpectedEOF)
assert.Equal(t, 0, n, "write on closed pipe returned %d bytes", n)
}

// Helper Functions

// write writes the data and report any error
func write(t *testing.T, w Writer, data []byte, expected int, c chan int) {
n, err := w.Write(data)
assert.Nil(t, err, "write error: %v", err)
assert.Equal(t, expected, n, "expected %d bytes, wrote %d", expected, n)

if c != nil {
c <- 0
}
}

// read reads to buffer and report any error
func read(t *testing.T, r Reader, buf []byte, expected int, c chan int) {
n, err := r.Read(buf)
assert.Nil(t, err, "read error: %v", err)
assert.Equal(t, expected, n, "expected %d bytes, read %d", expected, n)

if c != nil {
c <- 0
}
}
Loading