Skip to content

Commit

Permalink
Configure Csv delimiter
Browse files Browse the repository at this point in the history
  • Loading branch information
alallema committed Mar 28, 2023
1 parent dac3428 commit f7cb103
Show file tree
Hide file tree
Showing 5 changed files with 560 additions and 65 deletions.
8 changes: 4 additions & 4 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ type IndexInterface interface {

AddDocuments(documentsPtr interface{}, primaryKey ...string) (resp *TaskInfo, err error)
AddDocumentsInBatches(documentsPtr interface{}, batchSize int, primaryKey ...string) (resp []TaskInfo, err error)
AddDocumentsCsv(documents []byte, primaryKey ...string) (resp *TaskInfo, err error)
AddDocumentsCsvInBatches(documents []byte, batchSize int, primaryKey ...string) (resp []TaskInfo, err error)
AddDocumentsCsv(documents []byte, options *CsvDocumentsQuery) (resp *TaskInfo, err error)
AddDocumentsCsvInBatches(documents []byte, batchSize int, options *CsvDocumentsQuery) (resp []TaskInfo, err error)
AddDocumentsNdjson(documents []byte, primaryKey ...string) (resp *TaskInfo, err error)
AddDocumentsNdjsonInBatches(documents []byte, batchSize int, primaryKey ...string) (resp []TaskInfo, err error)
UpdateDocuments(documentsPtr interface{}, primaryKey ...string) (resp *TaskInfo, err error)
UpdateDocumentsInBatches(documentsPtr interface{}, batchSize int, primaryKey ...string) (resp []TaskInfo, err error)
UpdateDocumentsCsv(documents []byte, primaryKey ...string) (resp *TaskInfo, err error)
UpdateDocumentsCsvInBatches(documents []byte, batchsize int, primaryKey ...string) (resp []TaskInfo, err error)
UpdateDocumentsCsv(documents []byte, options *CsvDocumentsQuery) (resp *TaskInfo, err error)
UpdateDocumentsCsvInBatches(documents []byte, batchsize int, options *CsvDocumentsQuery) (resp []TaskInfo, err error)
UpdateDocumentsNdjson(documents []byte, primaryKey ...string) (resp *TaskInfo, err error)
UpdateDocumentsNdjsonInBatches(documents []byte, batchsize int, primaryKey ...string) (resp []TaskInfo, err error)
GetDocument(uid string, request *DocumentQuery, documentPtr interface{}) error
Expand Down
107 changes: 72 additions & 35 deletions index_documents.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,45 @@ import (
"bufio"
"bytes"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"net/url"
"reflect"
"strconv"
"strings"
)

func sendCsvRecords(documentsCsvFunc func(recs []byte, pk ...string) (resp *TaskInfo, err error), records [][]string, primaryKey ...string) (*TaskInfo, error) {
func transformStringVariadicToMap(primaryKey ...string) (options map[string]string) {
if primaryKey != nil {
return map[string]string{
"primaryKey": primaryKey[0],
}
}
return nil
}

func transformCsvDocumentsQueryToMap(options *CsvDocumentsQuery) map[string]string {
var optionsMap map[string]string
op, err := json.Marshal(options)
if err != nil {
json.Unmarshal(op, &optionsMap)
return optionsMap
}
return nil
}

func generateQueryForOptions(options map[string]string) (urlQuery string) {
q := url.Values{}
for key, val := range options {
q.Add(key, val)
}
return q.Encode()
}

func sendCsvRecords(documentsCsvFunc func(recs []byte, op *CsvDocumentsQuery) (resp *TaskInfo, err error), records [][]string, options *CsvDocumentsQuery) (*TaskInfo, error) {
b := new(bytes.Buffer)
w := csv.NewWriter(b)
w.UseCRLF = true
Expand All @@ -23,14 +52,14 @@ func sendCsvRecords(documentsCsvFunc func(recs []byte, pk ...string) (resp *Task
return nil, fmt.Errorf("could not write CSV records: %w", err)
}

resp, err := documentsCsvFunc(b.Bytes(), primaryKey...)
resp, err := documentsCsvFunc(b.Bytes(), options)
if err != nil {
return nil, err
}
return resp, nil
}

func (i Index) saveDocumentsFromReaderInBatches(documents io.Reader, batchSize int, documentsCsvFunc func(recs []byte, pk ...string) (resp *TaskInfo, err error), primaryKey ...string) (resp []TaskInfo, err error) {
func (i Index) saveDocumentsFromReaderInBatches(documents io.Reader, batchSize int, documentsCsvFunc func(recs []byte, op *CsvDocumentsQuery) (resp *TaskInfo, err error), options *CsvDocumentsQuery) (resp []TaskInfo, err error) {
// Because of the possibility of multiline fields it's not safe to split
// into batches by lines, we'll have to parse the file and reassemble it
// into smaller parts. RFC 4180 compliant input with a header row is
Expand Down Expand Up @@ -71,7 +100,7 @@ func (i Index) saveDocumentsFromReaderInBatches(documents io.Reader, batchSize i

// After reaching batchSize (not counting the header record) assemble a CSV file and send records
if len(records) == batchSize+1 {
resp, err := sendCsvRecords(documentsCsvFunc, records, primaryKey...)
resp, err := sendCsvRecords(documentsCsvFunc, records, options)
if err != nil {
return nil, err
}
Expand All @@ -82,7 +111,7 @@ func (i Index) saveDocumentsFromReaderInBatches(documents io.Reader, batchSize i

// Send remaining records as the last batch if there is any
if len(records) > 0 {
resp, err := sendCsvRecords(documentsCsvFunc, records, primaryKey...)
resp, err := sendCsvRecords(documentsCsvFunc, records, options)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -174,14 +203,18 @@ func (i Index) GetDocuments(request *DocumentsQuery, resp *DocumentsResult) erro
return nil
}

func (i Index) addDocuments(documentsPtr interface{}, contentType string, primaryKey ...string) (resp *TaskInfo, err error) {
func (i *Index) addDocuments(documentsPtr interface{}, contentType string, options map[string]string) (resp *TaskInfo, err error) {
resp = &TaskInfo{}
endpoint := ""
if primaryKey == nil {
if options == nil {
endpoint = "/indexes/" + i.UID + "/documents"
} else {
i.PrimaryKey = primaryKey[0] //nolint:golint,staticcheck
endpoint = "/indexes/" + i.UID + "/documents?primaryKey=" + primaryKey[0]
for key, val := range options {
if key == "primaryKey" {
i.PrimaryKey = val
}
}
endpoint = "/indexes/" + i.UID + "/documents?" + generateQueryForOptions(options)
}
req := internalRequest{
endpoint: endpoint,
Expand All @@ -199,40 +232,40 @@ func (i Index) addDocuments(documentsPtr interface{}, contentType string, primar
}

func (i Index) AddDocuments(documentsPtr interface{}, primaryKey ...string) (resp *TaskInfo, err error) {
return i.addDocuments(documentsPtr, contentTypeJSON, primaryKey...)
return i.addDocuments(documentsPtr, contentTypeJSON, transformStringVariadicToMap(primaryKey...))
}

func (i Index) AddDocumentsInBatches(documentsPtr interface{}, batchSize int, primaryKey ...string) (resp []TaskInfo, err error) {
return i.saveDocumentsInBatches(documentsPtr, batchSize, i.AddDocuments, primaryKey...)
}

func (i Index) AddDocumentsCsv(documents []byte, primaryKey ...string) (resp *TaskInfo, err error) {
func (i Index) AddDocumentsCsv(documents []byte, options *CsvDocumentsQuery) (resp *TaskInfo, err error) {
// []byte avoids JSON conversion in Client.sendRequest()
return i.addDocuments(documents, contentTypeCSV, primaryKey...)
return i.addDocuments(documents, contentTypeCSV, transformCsvDocumentsQueryToMap(options))
}

func (i Index) AddDocumentsCsvFromReader(documents io.Reader, primaryKey ...string) (resp *TaskInfo, err error) {
func (i Index) AddDocumentsCsvFromReader(documents io.Reader, options *CsvDocumentsQuery) (resp *TaskInfo, err error) {
// Using io.Reader would avoid JSON conversion in Client.sendRequest(), but
// read content to memory anyway because of problems with streamed bodies
data, err := io.ReadAll(documents)
if err != nil {
return nil, fmt.Errorf("could not read documents: %w", err)
}
return i.addDocuments(data, contentTypeCSV, primaryKey...)
return i.addDocuments(data, contentTypeCSV, transformCsvDocumentsQueryToMap(options))
}

func (i Index) AddDocumentsCsvInBatches(documents []byte, batchSize int, primaryKey ...string) (resp []TaskInfo, err error) {
func (i Index) AddDocumentsCsvInBatches(documents []byte, batchSize int, options *CsvDocumentsQuery) (resp []TaskInfo, err error) {
// Reuse io.Reader implementation
return i.AddDocumentsCsvFromReaderInBatches(bytes.NewReader(documents), batchSize, primaryKey...)
return i.AddDocumentsCsvFromReaderInBatches(bytes.NewReader(documents), batchSize, options)
}

func (i Index) AddDocumentsCsvFromReaderInBatches(documents io.Reader, batchSize int, primaryKey ...string) (resp []TaskInfo, err error) {
return i.saveDocumentsFromReaderInBatches(documents, batchSize, i.AddDocumentsCsv, primaryKey...)
func (i Index) AddDocumentsCsvFromReaderInBatches(documents io.Reader, batchSize int, options *CsvDocumentsQuery) (resp []TaskInfo, err error) {
return i.saveDocumentsFromReaderInBatches(documents, batchSize, i.AddDocumentsCsv, options)
}

func (i Index) AddDocumentsNdjson(documents []byte, primaryKey ...string) (resp *TaskInfo, err error) {
// []byte avoids JSON conversion in Client.sendRequest()
return i.addDocuments([]byte(documents), contentTypeNDJSON, primaryKey...)
return i.addDocuments([]byte(documents), contentTypeNDJSON, transformStringVariadicToMap(primaryKey...))
}

func (i Index) AddDocumentsNdjsonFromReader(documents io.Reader, primaryKey ...string) (resp *TaskInfo, err error) {
Expand All @@ -242,7 +275,7 @@ func (i Index) AddDocumentsNdjsonFromReader(documents io.Reader, primaryKey ...s
if err != nil {
return nil, fmt.Errorf("could not read documents: %w", err)
}
return i.addDocuments(data, contentTypeNDJSON, primaryKey...)
return i.addDocuments(data, contentTypeNDJSON, transformStringVariadicToMap(primaryKey...))
}

func (i Index) AddDocumentsNdjsonInBatches(documents []byte, batchSize int, primaryKey ...string) (resp []TaskInfo, err error) {
Expand Down Expand Up @@ -318,14 +351,18 @@ func (i Index) AddDocumentsNdjsonFromReaderInBatches(documents io.Reader, batchS
return responses, nil
}

func (i Index) updateDocuments(documentsPtr interface{}, contentType string, primaryKey ...string) (resp *TaskInfo, err error) {
func (i *Index) updateDocuments(documentsPtr interface{}, contentType string, options map[string]string) (resp *TaskInfo, err error) {
resp = &TaskInfo{}
endpoint := ""
if primaryKey == nil {
if options == nil {
endpoint = "/indexes/" + i.UID + "/documents"
} else {
i.PrimaryKey = primaryKey[0] //nolint:golint,staticcheck
endpoint = "/indexes/" + i.UID + "/documents?primaryKey=" + primaryKey[0]
for key, val := range options {
if key == "primaryKey" {
i.PrimaryKey = val
}
}
endpoint = "/indexes/" + i.UID + "/documents?" + generateQueryForOptions(options)
}
req := internalRequest{
endpoint: endpoint,
Expand All @@ -343,38 +380,38 @@ func (i Index) updateDocuments(documentsPtr interface{}, contentType string, pri
}

func (i Index) UpdateDocuments(documentsPtr interface{}, primaryKey ...string) (resp *TaskInfo, err error) {
return i.updateDocuments(documentsPtr, contentTypeJSON, primaryKey...)
return i.updateDocuments(documentsPtr, contentTypeJSON, transformStringVariadicToMap(primaryKey...))
}

func (i Index) UpdateDocumentsInBatches(documentsPtr interface{}, batchSize int, primaryKey ...string) (resp []TaskInfo, err error) {
return i.saveDocumentsInBatches(documentsPtr, batchSize, i.UpdateDocuments, primaryKey...)
}

func (i Index) UpdateDocumentsCsv(documents []byte, primaryKey ...string) (resp *TaskInfo, err error) {
return i.updateDocuments(documents, contentTypeCSV, primaryKey...)
func (i Index) UpdateDocumentsCsv(documents []byte, options *CsvDocumentsQuery) (resp *TaskInfo, err error) {
return i.updateDocuments(documents, contentTypeCSV, transformCsvDocumentsQueryToMap(options))
}

func (i Index) UpdateDocumentsCsvFromReader(documents io.Reader, primaryKey ...string) (resp *TaskInfo, err error) {
func (i Index) UpdateDocumentsCsvFromReader(documents io.Reader, options *CsvDocumentsQuery) (resp *TaskInfo, err error) {
// Using io.Reader would avoid JSON conversion in Client.sendRequest(), but
// read content to memory anyway because of problems with streamed bodies
data, err := io.ReadAll(documents)
if err != nil {
return nil, fmt.Errorf("could not read documents: %w", err)
}
return i.updateDocuments(data, contentTypeCSV, primaryKey...)
return i.updateDocuments(data, contentTypeCSV, transformCsvDocumentsQueryToMap(options))
}

func (i Index) UpdateDocumentsCsvInBatches(documents []byte, batchSize int, primaryKey ...string) (resp []TaskInfo, err error) {
func (i Index) UpdateDocumentsCsvInBatches(documents []byte, batchSize int, options *CsvDocumentsQuery) (resp []TaskInfo, err error) {
// Reuse io.Reader implementation
return i.UpdateDocumentsCsvFromReaderInBatches(bytes.NewReader(documents), batchSize, primaryKey...)
return i.UpdateDocumentsCsvFromReaderInBatches(bytes.NewReader(documents), batchSize, options)
}

func (i Index) UpdateDocumentsCsvFromReaderInBatches(documents io.Reader, batchSize int, primaryKey ...string) (resp []TaskInfo, err error) {
return i.saveDocumentsFromReaderInBatches(documents, batchSize, i.UpdateDocumentsCsv, primaryKey...)
func (i Index) UpdateDocumentsCsvFromReaderInBatches(documents io.Reader, batchSize int, options *CsvDocumentsQuery) (resp []TaskInfo, err error) {
return i.saveDocumentsFromReaderInBatches(documents, batchSize, i.UpdateDocumentsCsv, options)
}

func (i Index) UpdateDocumentsNdjson(documents []byte, primaryKey ...string) (resp *TaskInfo, err error) {
return i.updateDocuments(documents, contentTypeNDJSON, primaryKey...)
return i.updateDocuments(documents, contentTypeNDJSON, transformStringVariadicToMap(primaryKey...))
}

func (i Index) UpdateDocumentsNdjsonFromReader(documents io.Reader, primaryKey ...string) (resp *TaskInfo, err error) {
Expand All @@ -384,7 +421,7 @@ func (i Index) UpdateDocumentsNdjsonFromReader(documents io.Reader, primaryKey .
if err != nil {
return nil, fmt.Errorf("could not read documents: %w", err)
}
return i.updateDocuments(data, contentTypeNDJSON, primaryKey...)
return i.updateDocuments(data, contentTypeNDJSON, transformStringVariadicToMap(primaryKey...))
}

func (i Index) UpdateDocumentsNdjsonInBatches(documents []byte, batchsize int, primaryKey ...string) (resp []TaskInfo, err error) {
Expand Down
Loading

0 comments on commit f7cb103

Please sign in to comment.