Skip to content

Commit

Permalink
[skip ci] live streaming VCH creation log to VCH datastore folder (vm…
Browse files Browse the repository at this point in the history
…ware#6403)

* clean up after rebase

* integration test added to 6-04

* integration test md file modified

* integration test md file change
  • Loading branch information
AngieCris authored and zjs committed Oct 9, 2017
1 parent 0ea7ad4 commit 1e8c11f
Show file tree
Hide file tree
Showing 7 changed files with 435 additions and 2 deletions.
7 changes: 7 additions & 0 deletions cmd/vic-machine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/vmware/vic/cmd/vic-machine/list"
"github.com/vmware/vic/cmd/vic-machine/update"
"github.com/vmware/vic/cmd/vic-machine/upgrade"
"github.com/vmware/vic/lib/install/vchlog"
viclog "github.com/vmware/vic/pkg/log"
"github.com/vmware/vic/pkg/version"
)
Expand Down Expand Up @@ -139,6 +140,12 @@ func main() {
logs = append(logs, f)
}

// create the logger for streaming VCH log messages
vchlog.Init()
logs = append(logs, vchlog.GetPipe())
go vchlog.Run()
defer vchlog.Close() // close the logger pipe when done

// Initiliaze logger with default TextFormatter
log.SetFormatter(viclog.NewTextFormatter())
// SetOutput to io.MultiWriter so that we can log to stdout and a file
Expand Down
14 changes: 13 additions & 1 deletion lib/install/management/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/vmware/govmomi/object"
"github.com/vmware/vic/lib/config"
"github.com/vmware/vic/lib/install/data"
"github.com/vmware/vic/lib/install/vchlog"
"github.com/vmware/vic/pkg/errors"
"github.com/vmware/vic/pkg/retry"
"github.com/vmware/vic/pkg/trace"
Expand All @@ -37,6 +38,7 @@ const (
uploadMaxElapsedTime = 30 * time.Minute
uploadMaxInterval = 1 * time.Minute
uploadInitialInterval = 10 * time.Second
timeFormat = "2006-01-02T15:04:05-0700"
)

func (d *Dispatcher) CreateVCH(conf *config.VirtualContainerHostConfigSpec, settings *data.InstallerData) error {
Expand All @@ -60,6 +62,16 @@ func (d *Dispatcher) CreateVCH(conf *config.VirtualContainerHostConfigSpec, sett
return errors.Errorf("Creating the appliance failed with %s. Exiting...", err)
}

// send the signal to VCH logger to indicate VCH datastore path is ready
datastoreReadySignal := vchlog.DatastoreReadySignal{
Datastore: d.session.Datastore,
LogFileName: "vic-machine-create",
Operation: trace.NewOperation(d.ctx, "vic-machine create"),
VMPathName: d.vmPathName,
Timestamp: time.Now().UTC().Format(timeFormat),
}
vchlog.Signal(datastoreReadySignal)

if err = d.uploadImages(settings.ImageFiles); err != nil {
return errors.Errorf("Uploading images failed with %s. Exiting...", err)
}
Expand Down Expand Up @@ -139,7 +151,7 @@ func (d *Dispatcher) uploadImages(files map[string]string) error {
switch err.(type) {
// if not found, do nothing
case object.DatastoreNoSuchFileError:
// otherwise force delete
// otherwise force delete
default:
log.Debugf("target delete path = %s", isoTargetPath)
err := fm.Delete(d.ctx, isoTargetPath)
Expand Down
82 changes: 82 additions & 0 deletions lib/install/vchlog/bufferedPipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2016-2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package vchlog

import (
"bytes"
"sync"
"io"
)

// BufferedPipe struct implements a pipe readwriter with buffer
// buffer: the internal buffer to hold data
// c: the sync locker to manage concurrent reads and writes
// closed: boolean indicating if the stream is closed
type BufferedPipe struct {
buffer *bytes.Buffer
c *sync.Cond
closed bool
}

// NewBufferedPipe returns a new buffered pipe instance
// the internal buffer is initialized to default size.
// Since internal memory is used, need to make sure that buffered data is bounded.
func NewBufferedPipe() *BufferedPipe {
var m sync.Mutex
c := sync.NewCond(&m)
return &BufferedPipe{
buffer: bytes.NewBuffer(nil),
c: c,
closed: false,
}
}

// Read is blocked until a writer in the queue is done writing (until data is available)
func (bp *BufferedPipe) Read(data []byte) (n int, err error) {
bp.c.L.Lock()
defer bp.c.L.Unlock()

// pipe closed, drop all left-over data
if bp.closed {
return 0, io.EOF
}
for bp.buffer.Len() == 0 && !bp.closed {
bp.c.Wait()
}

return bp.buffer.Read(data)
}

// Write writes to the internal buffer, and signals one of the reader in queue to start reading.
func (bp *BufferedPipe) Write(data []byte) (n int, err error) {
bp.c.L.Lock()
defer bp.c.L.Unlock()
defer bp.c.Signal()

if bp.closed {
return 0, io.ErrUnexpectedEOF
}

return bp.buffer.Write(data)
}

// Close closes the pipe.
func (bp *BufferedPipe) Close() (err error) {
bp.c.L.Lock()
defer bp.c.L.Unlock()
defer bp.c.Signal()
bp.closed = true
return nil
}
230 changes: 230 additions & 0 deletions lib/install/vchlog/bufferedPipe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
// Copyright 2016-2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package vchlog

import (
. "io"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

// Part 1. Test basic pipe functionality
// (Referred to: https://golang.org/src/io/pipe_test.go)

// Test a single read and write
func TestSingleReadWrite(t *testing.T) {
testByte := []byte("hello world")
toRead := make([]byte, 64)
bp := NewBufferedPipe()
writeDone := make(chan int)

// write to the pipe
go write(t, bp, testByte, len(testByte), writeDone)

// read from the pipe
read(t, bp, toRead, len(testByte), nil)
assert.Equal(t, string(testByte), string(toRead[0:len(testByte)]),
"expected %s, read %s", string(testByte), string(toRead[0:len(testByte)]))

<-writeDone
bp.Close()
}

// Test a sequence of reads and writes
func readSequence(t *testing.T, r Reader, c chan int) {
buf := make([]byte, 64)
for {
n, err := r.Read(buf)
if err == EOF {
c <- 0
break
}
assert.Nil(t, err, "read error: %v", err)
c <- n
}
}

func TestSequenceReadsWrites(t *testing.T) {
readDone := make(chan int)
bp := NewBufferedPipe()
buf := make([]byte, 64)

// fire the reader
go readSequence(t, bp, readDone)

// write to the pipe
for i := 0; i < 5; i++ {
l := 5 + i*10
toWrite := buf[0:l]
write(t, bp, toWrite, l, nil)

n := <-readDone
assert.Equal(t, l, n, "wrote %d bytes, read %d bytes", l, n)
}

bp.Close()
n := <-readDone
assert.Equal(t, 0, n, "final read should be 0, got %d", n)
}

// Part 2. Test buffered pipe functionalities

// Test write buffering
func TestWriteBuffering(t *testing.T) {
bp := NewBufferedPipe()

// write first chunk of data
firstChunk := make([]byte, 16)
write(t, bp, firstChunk, len(firstChunk), nil)

// fire the reader
readDone := make(chan int)
go func() {
buf := make([]byte, 64)
for i := 0; i < 2; i++ {
l := 8 * (i + 2)
read(t, bp, buf, l, nil)
}
readDone <- 0
}()

// sleep before the next write
time.Sleep(time.Second * 5)

// write second chunk of data
secondChunk := make([]byte, 24)
write(t, bp, secondChunk, len(secondChunk), nil)

<-readDone
bp.Close()
}

// Test concurrent writers
func writeSequence(t *testing.T, w Writer, l int, c chan int) {
for i := 0; i < l; i++ {
write(t, w, []byte(strconv.Itoa(i)), 1, nil)
}
time.Sleep(1 * time.Second)
c <- 0
}

func TestConcurrentWriter(t *testing.T) {
bp := NewBufferedPipe()
l := 10
writersNum := 5
total := writersNum * l

// fire 5 concurrent writers
chans := make([]chan int, writersNum)
for i := 0; i < writersNum; i++ {
chans[i] = make(chan int)
go writeSequence(t, bp, l, chans[i])
}

// fire one concurrent reader
finalLen := 0
readDone := make(chan int)
go func() {
for {
buf := make([]byte, 16)
n, err := bp.Read(buf)
if err == EOF {
break
}
assert.Nil(t, err, "read error: %v", err)
finalLen += n
}
readDone <- 0
}()

// wait for writers to finish
for i := 0; i < writersNum; i++ {
<-chans[i]
}
bp.Close()

// check if the output has all the bytes
<-readDone
assert.Equal(t, total, finalLen,
"%d concurrent writers wrote %d bytes total, got %d bytes", writersNum, total, finalLen)
}

// Part 3. Edge cases

// Test read on closed pipe during read
func delayClose(t *testing.T, cl Closer, c chan int) {
time.Sleep(1 * time.Millisecond)
err := cl.Close()
assert.Nil(t, err, "close error: %v", err)
c <- 0
}

func TestPipeReadClose(t *testing.T) {
bp := NewBufferedPipe()
c := make(chan int)

// delay closer
go delayClose(t, bp, c)

// read is expected to block until the pipe is closed
buf := make([]byte, 64)
n, err := bp.Read(buf)
<-c

assert.Equal(t, EOF, err, "read from closed pipe: %v want %v", err, EOF)
assert.Equal(t, 0, n, "read on closed pipe returned %d bytes", n)
}

// Test write on closed pipe during write
func TestPipeWriteClose(t *testing.T) {
bp := NewBufferedPipe()

// close pipe
bp.Close()

// write
buf := make([]byte, 64)
n, err := bp.Write(buf)

assert.Equal(t, ErrUnexpectedEOF, err, "write to closed pipe: %v want %v", err, ErrUnexpectedEOF)
assert.Equal(t, 0, n, "write on closed pipe returned %d bytes", n)
}

// Helper Functions

// write writes the data and report any error
func write(t *testing.T, w Writer, data []byte, expected int, c chan int) {
n, err := w.Write(data)
assert.Nil(t, err, "write error: %v", err)
assert.Equal(t, expected, n, "expected %d bytes, wrote %d", expected, n)

if c != nil {
c <- 0
}
}

// read reads to buffer and report any error
func read(t *testing.T, r Reader, buf []byte, expected int, c chan int) {
n, err := r.Read(buf)
assert.Nil(t, err, "read error: %v", err)
assert.Equal(t, expected, n, "expected %d bytes, read %d", expected, n)

if c != nil {
c <- 0
}
}
Loading

0 comments on commit 1e8c11f

Please sign in to comment.