diff --git a/array.go b/array.go index 1ecd611..3297870 100644 --- a/array.go +++ b/array.go @@ -1098,12 +1098,8 @@ func DeleteFragmentsList(tdbCtx *Context, uri string, fragmentURIs []string) err curi := C.CString(uri) defer C.free(unsafe.Pointer(curi)) - var list []*C.char - for _, furi := range fragmentURIs { - cfuri := C.CString(furi) - defer C.free(unsafe.Pointer(cfuri)) - list = append(list, cfuri) - } + list, freeMemory := cStringArray(fragmentURIs) + defer freeMemory() ret := C.tiledb_array_delete_fragments_list(tdbCtx.tiledbContext, curi, (**C.char)(unsafe.Pointer(&list[0])), C.size_t(len(list))) if ret != C.TILEDB_OK { diff --git a/array_experimental.go b/array_experimental.go index effa71a..f9f4ef4 100644 --- a/array_experimental.go +++ b/array_experimental.go @@ -6,7 +6,11 @@ package tiledb #include */ import "C" -import "fmt" +import ( + "fmt" + "runtime" + "unsafe" +) // ConsolidationPlan is a consolidation plan for array type ConsolidationPlan struct { @@ -94,3 +98,26 @@ func (cp *ConsolidationPlan) DumpJSON() (string, error) { return json, nil } + +// ConsolidateFragments consolidates an explicit list of fragments in an array into a single fragment. +// You must first finalize all queries to the array before consolidation can +// begin (as consolidation temporarily acquires an exclusive lock on the array). +func (a *Array) ConsolidateFragments(config *Config, fragmentList []string) error { + if config == nil { + return fmt.Errorf("Config must not be nil for Consolidate") + } + + curi := C.CString(a.uri) + defer C.free(unsafe.Pointer(curi)) + + list, freeMemory := cStringArray(fragmentList) + defer freeMemory() + + ret := C.tiledb_array_consolidate_fragments(a.context.tiledbContext, curi, (**C.char)(slicePtr(list)), C.uint64_t(len(list)), config.tiledbConfig) + if ret != C.TILEDB_OK { + return fmt.Errorf("Error consolidating tiledb array fragment list: %s", a.context.LastError()) + } + + runtime.KeepAlive(config) + return nil +} diff --git a/array_experimental_test.go b/array_experimental_test.go index 3a2a131..fe52957 100644 --- a/array_experimental_test.go +++ b/array_experimental_test.go @@ -9,8 +9,8 @@ import ( "github.com/stretchr/testify/require" ) -func TestGetConsolidationPlan(t *testing.T) { - // Create an 1d array +func create1DTestArray(t *testing.T) *Array { + // Create a 1d array // Create configuration config, err := NewConfig() @@ -63,10 +63,12 @@ func TestGetConsolidationPlan(t *testing.T) { err = array.Create(arraySchema) require.NoError(t, err) - // Write to array + return array +} +func write1DTestArray(t *testing.T, array *Array, data []int32) { // Open array for writing - err = array.Open(TILEDB_WRITE) + err := array.Open(TILEDB_WRITE) require.NoError(t, err) // Create subarray @@ -76,15 +78,14 @@ func TestGetConsolidationPlan(t *testing.T) { require.NoError(t, err) // Create write query - query, err := NewQuery(context, array) + query, err := NewQuery(array.context, array) require.NoError(t, err) assert.NotNil(t, query) err = query.SetSubarray(subarray) require.NoError(t, err) // Initialize the data buffer - bufferV := []int32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} - _, err = query.SetDataBuffer("v", bufferV) + _, err = query.SetDataBuffer("v", data) require.NoError(t, err) // Submit write query @@ -99,6 +100,12 @@ func TestGetConsolidationPlan(t *testing.T) { // close array err = array.Close() require.NoError(t, err) +} + +func TestGetConsolidationPlan(t *testing.T) { + array := create1DTestArray(t) + + write1DTestArray(t, array, []int32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) checkConsolidationPlan := func(t *testing.T, cplan *ConsolidationPlan) { numNodes, err := cplan.NumNodes() @@ -113,7 +120,7 @@ func TestGetConsolidationPlan(t *testing.T) { require.NoError(t, err) // fragment uris in the plan are relative - fullPath := filepath.Join(tmpArrayPath, "__fragments", fragmentURI) + fullPath := filepath.Join(array.uri, "__fragments", fragmentURI) _, err = os.Stat(fullPath) require.NoError(t, err) } @@ -121,7 +128,7 @@ func TestGetConsolidationPlan(t *testing.T) { tdbCtx, err := NewContext(nil) require.NoError(t, err) - arr, err := NewArray(tdbCtx, tmpArrayPath) + arr, err := NewArray(tdbCtx, array.uri) require.NoError(t, err) require.NoError(t, arr.Open(TILEDB_READ)) t.Cleanup(func() { arr.Close() }) @@ -131,3 +138,62 @@ func TestGetConsolidationPlan(t *testing.T) { checkConsolidationPlan(t, cplan) } + +func TestConsolidateFragments(t *testing.T) { + // The test is skipped pending a core release for 2.25.0 that includes this fix: + // https://github.com/TileDB-Inc/TileDB/pull/5135 + t.Skip("Skipping fragment list consolidation SC-51140") + + array := create1DTestArray(t) + + numFrags := 5 + for i := 0; i < numFrags; i++ { + write1DTestArray(t, array, []int32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) + } + + fragmentInfo, err := NewFragmentInfo(array.context, array.uri) + require.NoError(t, err) + + err = fragmentInfo.Load() + require.NoError(t, err) + + fragInfoNum, err := fragmentInfo.GetFragmentNum() + require.NoError(t, err) + require.EqualValues(t, numFrags, fragInfoNum) + fragUris := make([]string, numFrags) + for i := 0; i < numFrags; i++ { + uri, err := fragmentInfo.GetFragmentURI(uint32(i)) + require.NoError(t, err) + fragUris[i] = uri + } + + // Default consolidation mode is 'fragments'. + config, err := array.context.Config() + require.NoError(t, err) + + err = array.ConsolidateFragments(config, fragUris) + require.NoError(t, err) + + // Check that the new consolidated fragment was created. + err = fragmentInfo.Load() + require.NoError(t, err) + fragInfoNum, err = fragmentInfo.GetFragmentNum() + require.NoError(t, err) + fragToVacuumNum, err := fragmentInfo.GetToVacuumNum() + require.NoError(t, err) + require.EqualValues(t, numFrags, fragToVacuumNum) + require.Equal(t, uint32(1), fragInfoNum) + + err = array.Vacuum(config) + require.NoError(t, err) + + // Check for one fragment after vacuum. + err = fragmentInfo.Load() + require.NoError(t, err) + fragInfoNum, err = fragmentInfo.GetFragmentNum() + require.NoError(t, err) + fragToVacuumNum, err = fragmentInfo.GetToVacuumNum() + require.NoError(t, err) + require.Equal(t, uint32(1), fragInfoNum) + require.Equal(t, uint32(0), fragToVacuumNum) +} diff --git a/common.go b/common.go index 8b77d71..623be33 100644 --- a/common.go +++ b/common.go @@ -1,5 +1,9 @@ package tiledb +/* +#include +*/ +import "C" import ( "reflect" "unsafe" @@ -20,3 +24,18 @@ func slicePtr[T any](slc []T) unsafe.Pointer { hdr := (*reflect.SliceHeader)(unsafe.Pointer(&slc)) return unsafe.Pointer(hdr.Data) } + +// cStringArray takes an array of Go strings and converts it to an array of CStrings. +// The function returned should be deferred by the caller to free allocated memory. +func cStringArray(stringList []string) ([]*C.char, func()) { + list := make([]*C.char, len(stringList)) + for i, str := range stringList { + list[i] = C.CString(str) + } + + return list, func() { + for _, str := range list { + C.free(unsafe.Pointer(str)) + } + } +}