export const WRITER_GO = `package writer import ( "context" "fmt" "os" "path/filepath" "runtime" "sync" "time" "github.com/google/uuid" "github.com/volcengine/ve-tos-golang-sdk/v2/tos" "github.com/volcengine/volc-sdk-golang/service/vikingdb" // parquet "github.com/apache/arrow/go/v10/arrow" "github.com/apache/arrow/go/v10/arrow/array" "github.com/apache/arrow/go/v10/arrow/memory" "github.com/apache/arrow/go/v10/parquet" "github.com/apache/arrow/go/v10/parquet/compress" "github.com/apache/arrow/go/v10/parquet/pqarrow" // json "encoding/json" // npy // "github.com/kshedden/gonpy" ) const ( NUMPY = "npy" PARQUET = "parquet" JSON = "json" MB = 1048576 GB = 1073741824 MAXFileSize = 5 * GB ) type Buffer struct { Datas map[string][]interface{} DataCount int64 DataStaticSize int64 FileID int32 } func (buffer *Buffer) Init(fields map[string]vikingdb.Field) { // 默认除去vector的每个字段占16字节, buffer.DataStaticSize = 0 buffer.Datas = map[string][]interface{}{} for fieldName, field := range(fields) { buffer.Datas[fieldName] = make([]interface{}, 0) if field.FieldType == vikingdb.Vector { buffer.DataStaticSize += field.Dim * 4 } else { buffer.DataStaticSize += 16 } } } func (buffer *Buffer) write(data map[string]interface{}) { for field, value := range(data) { buffer.Datas[field] = append(buffer.Datas[field], value) } buffer.DataCount += 1 } func (buffer *Buffer) bufferSize() int64 { return buffer.DataStaticSize * buffer.DataCount } type FormatWriter struct { Fields map[string]vikingdb.Field Buffer *Buffer TotalDataCount int64 FileType string LocalPath string BasePath string ChunkSize int64 FileCount int32 wg sync.WaitGroup semaphore chan struct{} } func InitWriter(collection vikingdb.Collection, fileType string, chunkSize int64, localPath string, taskNum int) (*FormatWriter){ runtime.GOMAXPROCS(taskNum) formatWriter := new(FormatWriter) formatWriter.Fields = make(map[string]vikingdb.Field) for _, field := range(collection.Fields) { formatWriter.Fields[field.FieldName] = field } formatWriter.Buffer = new(Buffer) formatWriter.Buffer.Init(formatWriter.Fields) formatWriter.FileCount = 0 formatWriter.FileType = fileType formatWriter.BasePath = localPath if _, err := os.Stat(localPath); os.IsNotExist(err) { err := os.MkdirAll(localPath, 0755) if err != nil { panic(err) } } uuidV4 := uuid.New() formatWriter.LocalPath = filepath.Join(formatWriter.BasePath, uuidV4.String()) err := os.MkdirAll(formatWriter.LocalPath, 0755) if err != nil { panic(err) } formatWriter.ChunkSize = chunkSize if taskNum < 1 { taskNum = 1 } formatWriter.semaphore = make(chan struct{}, taskNum) return formatWriter } func (formatWriter *FormatWriter) persist(buffer *Buffer) { ts1 := time.Now().UnixMilli() defer formatWriter.wg.Done() // 确保在函数结束时调用 Done if formatWriter.FileType == JSON { formatWriter.persistJson(buffer) } else if formatWriter.FileType == NUMPY { formatWriter.persistNpy(buffer) } else if formatWriter.FileType == PARQUET { formatWriter.persistParquet(buffer) } else { panic(fmt.Errorf("invalid file type %s", formatWriter.FileType)) } ts2 := time.Now().UnixMilli() fmt.Println("persist cost", ts2-ts1) runtime.GC() } func (formatWriter *FormatWriter) persistJson(buffer *Buffer) { filePath := fmt.Sprintf("%s/%d.json", formatWriter.LocalPath, buffer.FileID) file, err := os.Create(filePath) if err != nil { panic(fmt.Errorf("failed to create file: %v", err)) } defer file.Close() file.WriteString("[") first := true encoder := json.NewEncoder(file) for i := 0; i < int(buffer.DataCount); i++ { data := make(map[string]interface{}, 0) for field := range(formatWriter.Fields) { data[field] = buffer.Datas[field][i] } if !first { file.WriteString(",") } if err := encoder.Encode(data); err != nil { panic(fmt.Errorf("failed to encode data: %v", err)) } first = false } // 结束 JSON 数组 file.WriteString("]") fmt.Println("successfully save file", filePath, "data count", buffer.DataCount) } func (formatWriter *FormatWriter) persistParquet(buffer *Buffer) { filePath := fmt.Sprintf("%s/%d.parquet", formatWriter.LocalPath, buffer.FileID) pool := memory.NewGoAllocator() dataArray := make([]arrow.Array, 0) fieldArray := make([]arrow.Field, 0) for fieldName, field := range(formatWriter.Fields) { var arrowField arrow.Field arrowField.Name = fieldName arrowField.Nullable = true if field.FieldType == vikingdb.Vector { arrowField.Type = arrow.ListOf(arrow.PrimitiveTypes.Float32) builder := array.NewListBuilder(pool, arrow.PrimitiveTypes.Float32) for _, val := range buffer.Datas[fieldName] { builder.Append(true) builder := builder.ValueBuilder().(*array.Float32Builder) builder.AppendValues(val.([]float32), nil) } array := builder.NewArray() dataArray = append(dataArray, array) } else if field.FieldType == vikingdb.ListInt64 { arrowField.Type = arrow.ListOf(arrow.PrimitiveTypes.Int64) builder := array.NewListBuilder(pool, arrow.PrimitiveTypes.Int64) for _, val := range buffer.Datas[fieldName] { builder.Append(true) builder := builder.ValueBuilder().(*array.Int64Builder) builder.AppendValues(val.([]int64), nil) } array := builder.NewArray() dataArray = append(dataArray, array) } else if field.FieldType == vikingdb.ListString { arrowField.Type = arrow.ListOf(arrow.BinaryTypes.String) builder := array.NewListBuilder(pool, arrow.BinaryTypes.String) for _, val := range buffer.Datas[fieldName] { builder.Append(true) builder := builder.ValueBuilder().(*array.StringBuilder) builder.AppendValues(val.([]string), nil) } array := builder.NewArray() dataArray = append(dataArray, array) } else if field.FieldType == vikingdb.Text || field.FieldType == vikingdb.String { arrowField.Type = arrow.BinaryTypes.String builder := array.NewStringBuilder(pool) for _, v := range buffer.Datas[fieldName] {builder.Append(v.(string))} array := builder.NewArray() dataArray = append(dataArray, array) } else if field.FieldType == vikingdb.Int64 { arrowField.Type = arrow.PrimitiveTypes.Int64 builder := array.NewInt64Builder(pool) for _, v := range buffer.Datas[fieldName] {builder.Append(v.(int64))} array := builder.NewArray() dataArray = append(dataArray, array) } else if field.FieldType == vikingdb.Bool { arrowField.Type = arrow.FixedWidthTypes.Boolean builder := array.NewBooleanBuilder(pool) for _, v := range buffer.Datas[fieldName] {builder.Append(v.(bool))} array := builder.NewArray() dataArray = append(dataArray, array) } else if field.FieldType == vikingdb.Float32 { arrowField.Type = arrow.PrimitiveTypes.Float32 builder := array.NewFloat32Builder(pool) for _, v := range buffer.Datas[fieldName] {builder.Append(v.(float32))} array := builder.NewArray() dataArray = append(dataArray, array) } else if field.FieldType == vikingdb.Sparse_Vector { arrowField.Type = arrow.BinaryTypes.String builder := array.NewStringBuilder(pool) for _, v := range buffer.Datas[fieldName] { s, _ := json.Marshal(v) builder.Append(string(s)) } array := builder.NewArray() dataArray = append(dataArray, array) } fieldArray = append(fieldArray, arrowField) } ts1 := time.Now().UnixMilli() schema := arrow.NewSchema(fieldArray, nil) record := array.NewRecord(schema, dataArray, buffer.DataCount) defer record.Release() f, err := os.Create(filePath) if err != nil { panic(fmt.Errorf("failed to create file: %v", err)) } defer f.Close() pw, err := pqarrow.NewFileWriter(schema, f, parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Snappy)), pqarrow.DefaultWriterProps()) if err != nil { panic(fmt.Errorf("failed to create Parquet writer: %v", err)) } defer pw.Close() if err := pw.Write(record); err != nil { panic(fmt.Errorf("failed to write record: %v", err)) } ts2 := time.Now().UnixMilli() fmt.Println("successfully save file", filePath, "data count", buffer.DataCount, "cost ", ts2-ts1) } func (formatWriter *FormatWriter) persistNpy(buffer *Buffer) { // 保存格式,每个field单独保存到一个npy文件,例如 ID.npy // go 中npy支持不好,暂不支持 panic(fmt.Errorf("not supported npy")) } func (formatWriter *FormatWriter) Write(data map[string]interface{}) { if len(data) != len(formatWriter.Fields) { panic(fmt.Errorf("data %v fields num != collection fields num", data)) } for field := range(data) { if _, ok := formatWriter.Fields[field]; !ok { panic(fmt.Errorf("data %v field %s not in collection", data, field)) } } formatWriter.Buffer.write(data) formatWriter.TotalDataCount += 1 if formatWriter.Buffer.bufferSize() > formatWriter.ChunkSize { // fmt.Println(formatWriter.Buffer.bufferSize(), formatWriter.ChunkSize) formatWriter.dump() } } func (formatWriter *FormatWriter) dump() { ts1 := time.Now().UnixMilli() formatWriter.semaphore <- struct{}{} defer func() { <-formatWriter.semaphore }() // 完成后释放位置 // 等待上一个任务完成 oldBuffer := formatWriter.Buffer newBuffer := new(Buffer) newBuffer.Init(formatWriter.Fields) formatWriter.Buffer = newBuffer formatWriter.FileCount += 1 oldBuffer.FileID = formatWriter.FileCount formatWriter.wg.Add(1) go formatWriter.persist(oldBuffer) ts2 := time.Now().UnixMilli() fmt.Println("dump cost ", ts2-ts1) } func (formatWriter *FormatWriter) Flush() { formatWriter.dump() formatWriter.wg.Wait() } func (formatWriter *FormatWriter)UploadTos(client *tos.ClientV2, bucketName string, prefix string) { var files []string err := filepath.WalkDir(formatWriter.LocalPath, func(path string, d os.DirEntry, err error) error { if err != nil { return err // 如果有错误,返回错误,终止遍历 } if !d.IsDir() { files = append(files, path) } return nil }) if err != nil { panic(err) } ctx := context.Background() for _, file := range files { relativePath, _ := filepath.Rel(formatWriter.BasePath, file) objectPath := filepath.Join(prefix, relativePath) _, err := client.UploadFile(ctx, &tos.UploadFileInput{ CreateMultipartUploadV2Input: tos.CreateMultipartUploadV2Input{ Bucket: bucketName, Key: objectPath, }, FilePath: file, PartSize: tos.DefaultPartSize, TaskNum: 5, EnableCheckpoint: true, }) if err != nil { panic(err) } } }` export const GO_MOD = `module Test go 1.18 require github.com/volcengine/volc-sdk-golang v1.0.178 require ( github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/andybalholm/brotli v1.0.4 // indirect github.com/apache/thrift v0.16.0 // indirect github.com/goccy/go-json v0.9.11 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v2.0.8+incompatible // indirect github.com/klauspost/asmfmt v1.3.2 // indirect github.com/klauspost/compress v1.15.9 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect golang.org/x/mod v0.8.0 // indirect golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.10.0 // indirect golang.org/x/tools v0.6.0 // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect google.golang.org/genproto v0.0.0-20210917145530-b395a37504d4 // indirect google.golang.org/grpc v1.49.0 // indirect google.golang.org/protobuf v1.28.1 // indirect ) require ( github.com/apache/arrow/go/v10 v10.0.1 github.com/cenkalti/backoff/v4 v4.1.2 // indirect github.com/google/uuid v1.3.0 github.com/volcengine/ve-tos-golang-sdk/v2 v2.7.7 golang.org/x/net v0.12.0 // indirect golang.org/x/text v0.11.0 // indirect ) ` export const MAIN_GO = `package main import ( "Test/writer" "fmt" "math/rand" "time" "github.com/volcengine/ve-tos-golang-sdk/v2/tos" "github.com/volcengine/volc-sdk-golang/service/vikingdb" ) func genRandomVector(dim int) []float32 { res := []float32{} rand.Seed(time.Now().UnixNano()) for i := 0; i < dim; i++ { res = append(res, rand.Float32()-0.5) } return res } func main() { AK := "" SK := "" VIKINGDB_HOST := "api-vikingdb.volces.com" TOS_HOST := "tos-cn-beijing.volces.com" region := "cn-beijing" collectionName := "benchmark" service := vikingdb.NewVikingDBService(VIKINGDB_HOST, region, AK, SK, "http") collection, _ := service.GetCollection(collectionName) // ./test 是本地文件保存目录 formatWriter := writer.InitWriter(*collection, writer.PARQUET, 5 * writer.GB, "./test", 5) for i := 0; i < 5000; i++ { data := map[string]interface{}{ "ID":int64(i), "Int64": int64(i), "Int64_test": int64(i), "bool": false, "string": "inoianfg", "text": "测试", "int64_list": []int64{1,2,3}, "vector_field":genRandomVector(1024), "string_list":[]string{"测试", "成功"}, } formatWriter.Write(data) } // 强制将数据保存到文件 formatWriter.Flush() client, err := tos.NewClientV2(TOS_HOST, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(AK, SK))) if err != nil { panic(err) } // bucketName 是tos桶名,dirName 是tos文件夹名 bucketName := "data-import" dirName := "test" formatWriter.UploadTos(client, bucketName, dirName) taskParam := map[string]interface{} { "collection_name": collectionName, "file_type": writer.PARQUET, "tos_path": bucketName+"/"+dirName, } taskID, err := service.CreateTask(vikingdb.Data_Import, taskParam) fmt.Println(taskID) task, err := service.GetTask(taskID) fmt.Println(task.TaskType, task.TaskStatus) }`