export const PYTHON_DEMO = ` # coding:utf-8 import json from enum import Enum, IntEnum import logging, threading import random import os from pathlib import Path import time import uuid import pandas as pd import numpy as np import tos from volcengine.viking_db import VikingDBService, Collection, common from volcengine.viking_db import * MB = 1024 * 1024 GB = 1024 * MB class FileType(Enum): NUMPY = "npy" PARQUET = "parquet" JSON = "json" NUMPY_TYPE_CREATOR = { common.FieldType.Vector: np.dtype("float32"), common.FieldType.Int64: np.dtype("int64"), common.FieldType.String: None, common.FieldType.List_String: None, common.FieldType.List_Int64: None, common.FieldType.Float32: np.dtype("float32"), common.FieldType.Bool: np.dtype("bool"), common.FieldType.Text: None, common.FieldType.Sparse_Vector: None, } def gen_random_vector(dim): res = [0, ] * dim for i in range(dim): res[i] = random.random() - 0.5 return res logger = logging.getLogger("batch_writer") logger.setLevel(logging.DEBUG) class WriteBuffer(object): def __init__(self, fields): self._fields = {} self._buffer = {} for f in fields: self._fields[f.field_name] = f self._buffer[f.field_name] = [] def write(self, data): row_size = 0 for f_name in data.fields: if f_name not in self._fields: raise Exception("unknown field {}".format(f_name)) for n, f in self._fields.items(): f_data = None if n not in data.fields: f_data = f.default_val else: f_data = data.fields[n] if f_data == None: raise Exception("missing field {} and default val is none".format(n)) field_type = common.FieldType(f.field_type) if field_type == common.FieldType.Vector: row_size += f.dim * 4 elif field_type == common.FieldType.Text or field_type == common.FieldType.String: row_size += len(f_data) elif field_type == common.FieldType.List_String: for v in f_data: row_size += len(v) elif field_type == common.FieldType.List_Int64: row_size += len(f_data) * 8 elif field_type == common.FieldType.Int64: row_size += 8 elif field_type == common.FieldType.Float32: row_size += 4 elif field_type == common.FieldType.Bool: row_size += 1 elif field_type == common.FieldType.Sparse_Vector: row_size += len(json.dumps(f_data)) self._buffer[n].append(f_data) return row_size @property def buffer_data_num(self): for f in self._buffer: return len(self._buffer[f]) def dump(self, path, check_num, file_type, buffer_size): if check_num != self.buffer_data_num: raise Exception("buffer num != write num") if file_type == FileType.JSON: return self._persist_json_rows(path) elif file_type == FileType.PARQUET: return self._persist_parquet(path, buffer_size, check_num) elif file_type == FileType.NUMPY: return self._persist_npy(path) raise Exception("unknown file_type {}".format(file_type)) return None def _persist_npy(self, local_path): file_list = [] Path(local_path).mkdir(exist_ok=True) # row_count = len(next(iter(self._buffer.values()))) for k in self._buffer: full_file_name = Path(local_path).joinpath(k + ".npy") file_list.append(str(full_file_name)) dt = None field_type = common.FieldType(self._fields[k].field_type) dt = NUMPY_TYPE_CREATOR[field_type] # for JSON field, convert to string array if field_type == common.FieldType.Sparse_Vector: str_arr = [] for val in self._buffer[k]: str_arr.append(json.dumps(val)) self._buffer[k] = str_arr elif field_type == common.FieldType.Vector: a = [] for b in self._buffer[k]: a.append(b) arr = np.array(a, dtype=dt) else: arr = np.array(self._buffer[k], dtype=dt) np.save(str(full_file_name), arr) if len(file_list) != len(self._buffer): logger.error("Some of fields were not persisted successfully, abort the files") for f in file_list: Path(f).unlink() Path(local_path).rmdir() file_list.clear() raise Exception("Some of fields were not persisted successfully, abort the files") return file_list def _persist_json_rows(self, local_path): rows = [] row_count = len(next(iter(self._buffer.values()))) row_index = 0 while row_index < row_count: row = {} for k, v in self._buffer.items(): row[k] = v[row_index] rows.append(row) row_index = row_index + 1 file_path = Path(str(local_path) + ".json") try: with file_path.open("w") as json_file: json.dump(rows, json_file, indent=2) except Exception as e: Exception("Failed to persist file {}, error: {}".format(file_path, e)) print("Successfully persist file {}, row count: {}".format(file_path, len(rows))) return [str(file_path)] def _persist_parquet(self, local_path, buffer_size, data_num): file_path = Path(str(local_path) + ".parquet") ts0 = time.time() * 1000 data = {} for k in self._buffer: field_type = common.FieldType(self._fields[k].field_type) if field_type == common.FieldType.Sparse_Vector: str_arr = [] for val in self._buffer[k]: str_arr.append(json.dumps(val)) data[k] = pd.Series(str_arr, dtype=None) elif field_type == common.FieldType.Vector: arr = [] for val in self._buffer[k]: arr.append(np.array(val, dtype=NUMPY_TYPE_CREATOR[field_type])) data[k] = pd.Series(arr) elif field_type == common.FieldType.List_Int64 or field_type == common.FieldType.List_String: arr = [] for val in self._buffer[k]: arr.append(np.array(val, dtype=NUMPY_TYPE_CREATOR[field_type])) data[k] = pd.Series(arr) elif field_type in NUMPY_TYPE_CREATOR: dt = NUMPY_TYPE_CREATOR[field_type] data[k] = pd.Series(self._buffer[k], dtype=dt) else: data[k] = pd.Series(self._buffer[k]) # calculate a proper row group size row_group_size_min = 1000 row_group_size = 10000 row_group_size_max = 1000000 row_group_bytes =32 * MB size_per_row = int(buffer_size / data_num) + 1 row_group_size = int(row_group_bytes / size_per_row) if row_group_size < row_group_size_min: row_group_size = row_group_size_min if row_group_size > row_group_size_max: row_group_size = row_group_size_max ts1 = time.time() * 1000 data_frame = pd.DataFrame(data=data) ts2 = time.time() * 1000 data_frame.to_parquet( file_path, row_group_size=row_group_size, engine="pyarrow", compression='None' ) ts3 = time.time() * 1000 print("prepare {} dataframe cost {}, to file {}".format(ts1-ts0, ts2-ts1, ts3-ts2)) print("Successfully persist file {}, total size: {}, row count: {}, row group size: {}".format( file_path, buffer_size, data_num, row_group_size)) return [str(file_path)] class FormatWriter(object): def __init__(self, collection, local_path="./", chunk_size=128 * MB, file_type=FileType.PARQUET): self._collection = collection self._local_path = local_path self._base_path = local_path self._local_files = [] self._chunk_size = chunk_size self._file_type = file_type self._uuid = str(uuid.uuid4()) self._make_dir() self._working_thread = {} self._working_thread_lock = threading.Lock() self._buffer_lock = threading.Lock() self._buffer = WriteBuffer(collection.fields) self._buffer_size = 0 self._buffer_data_num = 0 self._total_data_num = 0 self._flush_count = 0 @property def total_data_num(self): return self._total_data_num @property def local_files(self): return self._local_files def _make_dir(self): Path(self._local_path).mkdir(exist_ok=True) uidir = Path(self._local_path).joinpath(self._uuid) self._local_path = uidir Path(uidir).mkdir(exist_ok=True) logger.info("Create path for writer data {}".format(self._local_path)) def write(self, data): now_buffer_size = 0 now_buffer_data_num = 0 with self._buffer_lock: row_size = self._buffer.write(data) self._buffer_size += row_size self._buffer_data_num += 1 self._total_data_num += 1 now_buffer_size = self._buffer_size now_buffer_data_num = self._buffer_data_num if now_buffer_size > self._chunk_size: print("Prepare to flush buffer, row_count: {}, size: {}".format(now_buffer_data_num, now_buffer_size)) with self._working_thread_lock: x = threading.Thread(target=self._dump) print("Flush thread begin, name: {}".format(x.name)) self._working_thread[x.name] = x x.start() def _dump(self): logger.info("Prepare to flush buffer") old_buffer = None old_buffer_size = 0 old_buffer_data_num = 0 with self._buffer_lock: old_buffer_size = self._buffer_size old_buffer_data_num = self._buffer_data_num old_buffer = self._buffer new_buffer = WriteBuffer(self._collection.fields) self._buffer = new_buffer self._buffer_size = 0 self._buffer_data_num = 0 with self._working_thread_lock: self._flush_count = self._flush_count + 1 target_path = Path.joinpath(self._local_path, str(self._flush_count)) if old_buffer != None and old_buffer.buffer_data_num > 0: file_list = old_buffer.dump(target_path, old_buffer_data_num, self._file_type, old_buffer_size) self._local_files.extend(file_list) if threading.current_thread().name in self._working_thread: del self._working_thread[threading.current_thread().name] def flush(self): self._dump() def upload_tos(self, tos_client, bucket_name, path, thread_num=3): for file_path in self._local_files: relative_path = os.path.relpath(file_path, self._base_path) object_key = "{}/{}".format(path, relative_path) tos_client.upload_file(bucket_name, object_key, file_path, task_num=thread_num, part_size=1024 * 1024 * 20) def main(): AK = "" SK = "" region = "cn-beijing" VIKINGDB_HOST = "api-vikingdb.volces.com" TOS_HOST = "tos-cn-beijing.volces.com" collection_name = "benchmark" # collection 名称 file_type = FileType.PARQUET # 保存文件的格式,parquet vikingdb_service = VikingDBService(host=VIKINGDB_HOST, region=region, ak=AK, sk=SK) tos_client = tos.TosClientV2(AK, SK, TOS_HOST, region) collection = vikingdb_service.get_collection(collection_name) # 本地临时生成文件的目录 writer = FormatWriter(collection, local_path="./data_import", chunk_size=1024 * 1024 * 1024, file_type=file_type) for i in range(100): field = { "ID":i, "Int64": i, "Int64_test": 0, "bool": i%2 == 0, "string": str(i), "text": "测试", "vector_field":[0.1]*1024, "int64_list":[13131,2534546,343434], "string_list":["ab", "dcd"] } data = common.Data(fields=field) writer.write(data=data) writer.flush() # tos 桶名 bucket_name = "data-import" # tos 路径 dir_name = "test" writer.upload_tos(tos_client, bucket_name, dir_name) task_param = { "collection_name": collection.collection_name, "file_type": file_type.value, "tos_path": bucket_name+"/"+dir_name } task_id = vikingdb_service.create_task(TaskType.Data_Import, task_param) print(task_id) task = vikingdb_service.get_task(task_id) print(task.task_status, task.process_info) if __name__ == '__main__': main()`