export const FORMAT_WRITER_DEMO = `package com.example.writer; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.*; import com.google.common.collect.Lists; import com.google.gson.Gson; import com.volcengine.service.vikingDB.Collection; import com.volcengine.service.vikingDB.common.*; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.GroupWriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Types; import org.apache.parquet.schema.PrimitiveType; import org.apache.hadoop.conf.Configuration; import com.volcengine.tos.TOSV2; import com.volcengine.tos.TOSV2ClientBuilder; import com.volcengine.tos.TosClientException; import com.volcengine.tos.TosServerException; import com.volcengine.tos.model.object.*; class Buffer { public Map> datas; public long dataCount = 0; private int dataStaticSize = 0; public int fileID = 0; public Buffer(HashMap fields, int id) { datas = new HashMap<>(); fileID = id; for (String field_name : fields.keySet()) { datas.put(field_name, Lists.newArrayList()); if (fields.get(field_name).getFieldType().equals(FieldType.Vector)) { dataStaticSize += fields.get(field_name).getDim() * 4; } else { dataStaticSize += 16; } } } public void write(Map data) { for (String key : data.keySet()) { datas.get(key).add(data.get(key)); } dataCount += 1; } public long bufferSize() { return dataStaticSize * dataCount; } } public class FormatWriter { private HashMap fields=null; private long total_data_count = 0; private String file_type = ""; private String local_path = ""; private String base_path = ""; private long chunk_size = 0; private int file_count = 0; private Buffer buffer; private Map workingThread; public FormatWriter(Collection collection, String fileType, long chunkSize, String localPath) throws IOException { List collection_fields = collection.getFields(); fields = new HashMap<>(); for (int i = 0; i < collection_fields.size(); i++) { fields.put(collection_fields.get(i).getFieldName(), collection_fields.get(i)); } newBuffer(); file_type = fileType; chunk_size = chunkSize; base_path = localPath; String uuid = UUID.randomUUID().toString(); this.workingThread = new HashMap<>(); makeDir(uuid); } private Buffer newBuffer() { file_count += 1; Buffer oldBuffer = buffer; buffer = new Buffer(fields, file_count); return oldBuffer; } private void createDirIfNotExist(java.nio.file.Path path) throws IOException { try { Files.createDirectories(path); System.out.printf("Data path created: {}", path); } catch (IOException e) { System.out.printf("Data Path create failed: {}", path); throw e; } } private void makeDir(String uuid) throws IOException { java.nio.file.Path path = Paths.get(base_path); createDirIfNotExist(path); java.nio.file.Path fullPath = path.resolve(uuid); createDirIfNotExist(fullPath); this.local_path = fullPath.toString(); } private MessageType parseCollectionSchema() { Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage(); for (String fieldName : fields.keySet()) { String fieldType = fields.get(fieldName).getFieldType(); if (fieldType.equals(FieldType.Vector)) { messageTypeBuilder.requiredList() .requiredElement(PrimitiveType.PrimitiveTypeName.FLOAT) .named(fieldName); } else if (fieldType.equals(FieldType.List_Int64)) { messageTypeBuilder.requiredList() .requiredElement(PrimitiveType.PrimitiveTypeName.INT64) .named(fieldName); } else if (fieldType.equals(FieldType.List_String)) { messageTypeBuilder.requiredList() .requiredElement(PrimitiveType.PrimitiveTypeName.BINARY) .as(LogicalTypeAnnotation.stringType()) .named(fieldName); } else if (fieldType.equals(FieldType.Text) || fieldType.equals(FieldType.String)) { messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.BINARY) .as(LogicalTypeAnnotation.stringType()) .named(fieldName); } else if (fieldType.equals(FieldType.Sparse_Vector)) { messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.BINARY) .as(LogicalTypeAnnotation.stringType()) .named(fieldName); } else if (fieldType.equals(FieldType.Int64)) { messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.INT64) .named(fieldName); } else if (fieldType.equals(FieldType.Bool)) { messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.BOOLEAN) .named(fieldName); } else if (fieldType.equals(FieldType.Float32)) { messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.FLOAT) .named(fieldName); } else { System.out.printf("not found type %s \\n", fieldType); } } return messageTypeBuilder.named("schema"); } private static void addFloatArray(Group group, String fieldName, List values) { Group arrayGroup = group.addGroup(fieldName); for (float value : values) { Group addGroup = arrayGroup.addGroup(0); addGroup.add(0, value); } } private static void addStringArray(Group group, String fieldName, List values) { Group arrayGroup = group.addGroup(fieldName); for (String value : values) { Group addGroup = arrayGroup.addGroup(0); addGroup.add(0, value); } } private static void addLongArray(Group group, String fieldName, List values) { Group arrayGroup = group.addGroup(fieldName); for (long value : values) { Group addGroup = arrayGroup.addGroup(0); addGroup.add(0, value); } } private void persist(Buffer buffer) { persistParquet(buffer); workingThread.remove(Thread.currentThread().getName()); } @SuppressWarnings("unchecked") private void persistParquet(Buffer buffer) { Path path = Paths.get(local_path).resolve(String.valueOf(buffer.fileID)); Path filePath = Paths.get(path.toString() + ".parquet"); MessageType messageType = parseCollectionSchema(); int rowGroupSizeMin = 1000; int rowGroupSizeMax = 1000000; int rowGroupSize = 10000; // 32MB is an experience value that avoid high memory usage of parquet reader on server-side int rowGroupBytes = 32 * 1024 * 1024; int sizePerRow = (int) (((buffer.bufferSize() / buffer.dataCount)) + 1); rowGroupSize = rowGroupBytes / sizePerRow; rowGroupSize = Math.max(rowGroupSizeMin, Math.min(rowGroupSizeMax, rowGroupSize)); Configuration configuration = new Configuration(); GroupWriteSupport.setSchema(messageType, configuration); GroupWriteSupport writeSupport = new GroupWriteSupport(); System.out.println(messageType); org.apache.hadoop.fs.Path hadoop_path = new org.apache.hadoop.fs.Path(filePath.toString()); System.out.println(filePath.toString()); try (@SuppressWarnings("deprecation") ParquetWriter parquetwriter = new ParquetWriter<>(hadoop_path, ParquetFileWriter.Mode.CREATE, writeSupport, CompressionCodecName.UNCOMPRESSED, rowGroupBytes, 5 * 1024 * 1024, 5 * 1024 * 1024, ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, ParquetWriter.DEFAULT_WRITER_VERSION, configuration)) { List fieldNameList = Lists.newArrayList(buffer.datas.keySet()); for (int i = 0; i < buffer.dataCount; ++i) { Group group = new SimpleGroupFactory(messageType).newGroup(); for (String fieldName : fieldNameList) { String field_type = fields.get(fieldName).getFieldType(); Object value = buffer.datas.get(fieldName).get(i); if (field_type == FieldType.Text || field_type == FieldType.String) { group.append(fieldName, (String) value ); } else if (field_type == FieldType.Vector) { addFloatArray(group, fieldName, (List) value); } else if (field_type == FieldType.List_Int64) { addLongArray(group, fieldName, (List) value); } else if (field_type == FieldType.List_String) { addStringArray(group, fieldName, (List) value); } else if (field_type == FieldType.Bool) { group.append(fieldName, (Boolean) value ); } else if (field_type == FieldType.Int64) { group.append(fieldName, (Long) value ); } else if (field_type == FieldType.Float32) { group.append(fieldName, (Float) value ); } else if (field_type == FieldType.Sparse_Vector) { Gson gson = new Gson(); String j_value = gson.toJson((HashMap) value); group.append(fieldName, j_value); } } parquetwriter.write(group); } } catch (IOException e) { e.printStackTrace(); } String msg = String.format("Successfully persist file %s, total size: %s, row count: %s, row group size: %s", filePath, buffer.bufferSize(), buffer.dataCount, rowGroupSize); System.out.println(msg); } private void dump() throws InterruptedException { while (workingThread.size() > 0) { String msg = String.format("Previous flush action is not finished, %s is waiting...", Thread.currentThread().getName()); System.out.println(msg); TimeUnit.SECONDS.sleep(5); } Buffer oldBuffer = newBuffer(); Runnable runnable = () -> persist(oldBuffer); Thread thread = new Thread(runnable); workingThread.put(thread.getName(), thread); thread.start(); } public void Write(HashMap data) throws Exception { if (data.size() != fields.size()) { System.out.println(data.size()); System.out.println(fields.size()); throw new Exception("field size not match"); } for (String fieldName : data.keySet()) { if (!fields.containsKey(fieldName)) { throw new Exception("field not found "+fieldName); } } buffer.write(data); if (buffer.bufferSize() > chunk_size) { dump(); } } public void flush() throws InterruptedException { dump(); while (workingThread.size() > 0) { String msg = String.format("Previous flush action is not finished, %s is waiting...", Thread.currentThread().getName()); System.out.println(msg); TimeUnit.SECONDS.sleep(5); } } public void UploadTos(TOSV2 tos, String bucketName, String DirName) throws IOException { Path localPath = Paths.get(local_path); Path basePath = Paths.get(base_path); List fileList = new ArrayList<>(); // 使用 Files.walk() 遍历目录 Files.walk(localPath) .filter(Files::isRegularFile) // 确保它是一个文件 .forEach(path -> { fileList.add(path); }); for (int i = 0; i < fileList.size(); i++) { Path relativePath = basePath.relativize(fileList.get(i)); String objectKey = DirName + "/" + relativePath.toString(); PutObjectFromFileInput putObjectInput = new PutObjectFromFileInput() .setBucket(bucketName).setKey(objectKey).setFilePath(fileList.get(i).toString()); PutObjectFromFileOutput output = tos.putObjectFromFile(putObjectInput); } } } ` export const APP_DEMO = `package com.example; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Random; import java.util.Map; import com.example.writer.FormatWriter; import com.volcengine.service.vikingDB.Collection; import com.volcengine.service.vikingDB.VikingDBService; import com.volcengine.service.vikingDB.Index; import com.volcengine.service.vikingDB.common.*; import com.volcengine.tos.TOSV2; import com.volcengine.tos.TOSV2ClientBuilder; class FileType { public static final String PARQUET = "parquet"; public static final String JSON = "json"; public static final String NPY = "npy"; } public class App { public static List genRandomVector(int dim){ List res = new ArrayList<>(); for(int i=0;i fields = new HashMap(); fields.put("ID", i); fields.put("vector_field", genRandomVector(1024)); fields.put("text", "测试"); fields.put("Int64", i); fields.put("Int64_test", i); fields.put("bool", false); List int64_list = new ArrayList<>(); fields.put("int64_list", int64_list); List string_list = new ArrayList<>(); fields.put("string_list", string_list); fields.put("string", "test"); writer.Write(fields); } writer.flush(); // tos 的桶名和路径 String bucketName = "data-import"; String dirName = "dir"; writer.UploadTos(tos, bucketName, dirName); HashMap taskParams = new HashMap<>(); taskParams.put("tos_path", bucketName+"/"+dirName); taskParams.put("ignore_error", false); taskParams.put("collection_name", collectionName); taskParams.put("file_type", FileType.PARQUET); CreateTaskParam taskParam = new CreateTaskParam() .setTaskType(TaskType.DataImport) .setTaskParams(taskParams) .build(); String task_id = vikingDBService.createTask(taskParam); System.out.println(task_id); Task task = vikingDBService.getTask(task_id); System.out.println(task); } } ` export const POM_DEMO = ` 4.0.0 com.example myproject jar 1.0-SNAPSHOT myproject http://maven.apache.org junit junit 3.8.1 test com.alibaba fastjson 1.2.83 com.volcengine volc-sdk-java 1.0.191 com.volcengine ve-tos-java-sdk 2.6.6 `