|
import os |
|
from pathlib import Path |
|
|
|
import pandas as pd |
|
|
|
from trackio import deploy, utils |
|
from trackio.sqlite_storage import SQLiteStorage |
|
|
|
|
|
def import_csv( |
|
csv_path: str | Path, |
|
project: str, |
|
name: str | None = None, |
|
space_id: str | None = None, |
|
dataset_id: str | None = None, |
|
) -> None: |
|
""" |
|
Imports a CSV file into a Trackio project. The CSV file must contain a `"step"` |
|
column, may optionally contain a `"timestamp"` column, and any other columns will be |
|
treated as metrics. It should also include a header row with the column names. |
|
|
|
TODO: call init() and return a Run object so that the user can continue to log metrics to it. |
|
|
|
Args: |
|
csv_path (`str` or `Path`): |
|
The str or Path to the CSV file to import. |
|
project (`str`): |
|
The name of the project to import the CSV file into. Must not be an existing |
|
project. |
|
name (`str` or `None`, *optional*, defaults to `None`): |
|
The name of the Run to import the CSV file into. If not provided, a default |
|
name will be generated. |
|
name (`str` or `None`, *optional*, defaults to `None`): |
|
The name of the run (if not provided, a default name will be generated). |
|
space_id (`str` or `None`, *optional*, defaults to `None`): |
|
If provided, the project will be logged to a Hugging Face Space instead of a |
|
local directory. Should be a complete Space name like `"username/reponame"` |
|
or `"orgname/reponame"`, or just `"reponame"` in which case the Space will |
|
be created in the currently-logged-in Hugging Face user's namespace. If the |
|
Space does not exist, it will be created. If the Space already exists, the |
|
project will be logged to it. |
|
dataset_id (`str` or `None`, *optional*, defaults to `None`): |
|
If provided, a persistent Hugging Face Dataset will be created and the |
|
metrics will be synced to it every 5 minutes. Should be a complete Dataset |
|
name like `"username/datasetname"` or `"orgname/datasetname"`, or just |
|
`"datasetname"` in which case the Dataset will be created in the |
|
currently-logged-in Hugging Face user's namespace. If the Dataset does not |
|
exist, it will be created. If the Dataset already exists, the project will |
|
be appended to it. If not provided, the metrics will be logged to a local |
|
SQLite database, unless a `space_id` is provided, in which case a Dataset |
|
will be automatically created with the same name as the Space but with the |
|
`"_dataset"` suffix. |
|
""" |
|
if SQLiteStorage.get_runs(project): |
|
raise ValueError( |
|
f"Project '{project}' already exists. Cannot import CSV into existing project." |
|
) |
|
|
|
csv_path = Path(csv_path) |
|
if not csv_path.exists(): |
|
raise FileNotFoundError(f"CSV file not found: {csv_path}") |
|
|
|
df = pd.read_csv(csv_path) |
|
if df.empty: |
|
raise ValueError("CSV file is empty") |
|
|
|
column_mapping = utils.simplify_column_names(df.columns.tolist()) |
|
df = df.rename(columns=column_mapping) |
|
|
|
step_column = None |
|
for col in df.columns: |
|
if col.lower() == "step": |
|
step_column = col |
|
break |
|
|
|
if step_column is None: |
|
raise ValueError("CSV file must contain a 'step' or 'Step' column") |
|
|
|
if name is None: |
|
name = csv_path.stem |
|
|
|
metrics_list = [] |
|
steps = [] |
|
timestamps = [] |
|
|
|
numeric_columns = [] |
|
for column in df.columns: |
|
if column == step_column: |
|
continue |
|
if column == "timestamp": |
|
continue |
|
|
|
try: |
|
pd.to_numeric(df[column], errors="raise") |
|
numeric_columns.append(column) |
|
except (ValueError, TypeError): |
|
continue |
|
|
|
for _, row in df.iterrows(): |
|
metrics = {} |
|
for column in numeric_columns: |
|
value = row[column] |
|
if bool(pd.notna(value)): |
|
metrics[column] = float(value) |
|
|
|
if metrics: |
|
metrics_list.append(metrics) |
|
steps.append(int(row[step_column])) |
|
|
|
if "timestamp" in df.columns and bool(pd.notna(row["timestamp"])): |
|
timestamps.append(str(row["timestamp"])) |
|
else: |
|
timestamps.append("") |
|
|
|
if metrics_list: |
|
SQLiteStorage.bulk_log( |
|
project=project, |
|
run=name, |
|
metrics_list=metrics_list, |
|
steps=steps, |
|
timestamps=timestamps, |
|
) |
|
|
|
print( |
|
f"* Imported {len(metrics_list)} rows from {csv_path} into project '{project}' as run '{name}'" |
|
) |
|
print(f"* Metrics found: {', '.join(metrics_list[0].keys())}") |
|
|
|
space_id, dataset_id = utils.preprocess_space_and_dataset_ids(space_id, dataset_id) |
|
if dataset_id is not None: |
|
os.environ["TRACKIO_DATASET_ID"] = dataset_id |
|
print(f"* Trackio metrics will be synced to Hugging Face Dataset: {dataset_id}") |
|
|
|
if space_id is None: |
|
utils.print_dashboard_instructions(project) |
|
else: |
|
deploy.create_space_if_not_exists(space_id, dataset_id) |
|
deploy.wait_until_space_exists(space_id) |
|
deploy.upload_db_to_space(project, space_id) |
|
print( |
|
f"* View dashboard by going to: {deploy.SPACE_URL.format(space_id=space_id)}" |
|
) |
|
|
|
|
|
def import_tf_events( |
|
log_dir: str | Path, |
|
project: str, |
|
name: str | None = None, |
|
space_id: str | None = None, |
|
dataset_id: str | None = None, |
|
) -> None: |
|
""" |
|
Imports TensorFlow Events files from a directory into a Trackio project. Each |
|
subdirectory in the log directory will be imported as a separate run. |
|
|
|
Args: |
|
log_dir (`str` or `Path`): |
|
The str or Path to the directory containing TensorFlow Events files. |
|
project (`str`): |
|
The name of the project to import the TensorFlow Events files into. Must not |
|
be an existing project. |
|
name (`str` or `None`, *optional*, defaults to `None`): |
|
The name prefix for runs (if not provided, will use directory names). Each |
|
subdirectory will create a separate run. |
|
space_id (`str` or `None`, *optional*, defaults to `None`): |
|
If provided, the project will be logged to a Hugging Face Space instead of a |
|
local directory. Should be a complete Space name like `"username/reponame"` |
|
or `"orgname/reponame"`, or just `"reponame"` in which case the Space will |
|
be created in the currently-logged-in Hugging Face user's namespace. If the |
|
Space does not exist, it will be created. If the Space already exists, the |
|
project will be logged to it. |
|
dataset_id (`str` or `None`, *optional*, defaults to `None`): |
|
If provided, a persistent Hugging Face Dataset will be created and the |
|
metrics will be synced to it every 5 minutes. Should be a complete Dataset |
|
name like `"username/datasetname"` or `"orgname/datasetname"`, or just |
|
`"datasetname"` in which case the Dataset will be created in the |
|
currently-logged-in Hugging Face user's namespace. If the Dataset does not |
|
exist, it will be created. If the Dataset already exists, the project will |
|
be appended to it. If not provided, the metrics will be logged to a local |
|
SQLite database, unless a `space_id` is provided, in which case a Dataset |
|
will be automatically created with the same name as the Space but with the |
|
`"_dataset"` suffix. |
|
""" |
|
try: |
|
from tbparse import SummaryReader |
|
except ImportError: |
|
raise ImportError( |
|
"The `tbparse` package is not installed but is required for `import_tf_events`. Please install trackio with the `tensorboard` extra: `pip install trackio[tensorboard]`." |
|
) |
|
|
|
if SQLiteStorage.get_runs(project): |
|
raise ValueError( |
|
f"Project '{project}' already exists. Cannot import TF events into existing project." |
|
) |
|
|
|
path = Path(log_dir) |
|
if not path.exists(): |
|
raise FileNotFoundError(f"TF events directory not found: {path}") |
|
|
|
|
|
reader = SummaryReader(str(path), extra_columns={"dir_name"}) |
|
df = reader.scalars |
|
|
|
if df.empty: |
|
raise ValueError(f"No TensorFlow events data found in {path}") |
|
|
|
total_imported = 0 |
|
imported_runs = [] |
|
|
|
|
|
for dir_name, group_df in df.groupby("dir_name"): |
|
try: |
|
|
|
if dir_name == "": |
|
run_name = "main" |
|
else: |
|
run_name = dir_name |
|
|
|
if name: |
|
run_name = f"{name}_{run_name}" |
|
|
|
if group_df.empty: |
|
print(f"* Skipping directory {dir_name}: no scalar data found") |
|
continue |
|
|
|
metrics_list = [] |
|
steps = [] |
|
timestamps = [] |
|
|
|
for _, row in group_df.iterrows(): |
|
|
|
tag = str(row["tag"]) |
|
value = float(row["value"]) |
|
step = int(row["step"]) |
|
|
|
metrics = {tag: value} |
|
metrics_list.append(metrics) |
|
steps.append(step) |
|
|
|
|
|
if "wall_time" in group_df.columns and not bool( |
|
pd.isna(row["wall_time"]) |
|
): |
|
timestamps.append(str(row["wall_time"])) |
|
else: |
|
timestamps.append("") |
|
|
|
if metrics_list: |
|
SQLiteStorage.bulk_log( |
|
project=project, |
|
run=str(run_name), |
|
metrics_list=metrics_list, |
|
steps=steps, |
|
timestamps=timestamps, |
|
) |
|
|
|
total_imported += len(metrics_list) |
|
imported_runs.append(run_name) |
|
|
|
print( |
|
f"* Imported {len(metrics_list)} scalar events from directory '{dir_name}' as run '{run_name}'" |
|
) |
|
print(f"* Metrics in this run: {', '.join(set(group_df['tag']))}") |
|
|
|
except Exception as e: |
|
print(f"* Error processing directory {dir_name}: {e}") |
|
continue |
|
|
|
if not imported_runs: |
|
raise ValueError("No valid TensorFlow events data could be imported") |
|
|
|
print(f"* Total imported events: {total_imported}") |
|
print(f"* Created runs: {', '.join(imported_runs)}") |
|
|
|
space_id, dataset_id = utils.preprocess_space_and_dataset_ids(space_id, dataset_id) |
|
if dataset_id is not None: |
|
os.environ["TRACKIO_DATASET_ID"] = dataset_id |
|
print(f"* Trackio metrics will be synced to Hugging Face Dataset: {dataset_id}") |
|
|
|
if space_id is None: |
|
utils.print_dashboard_instructions(project) |
|
else: |
|
deploy.create_space_if_not_exists(space_id, dataset_id) |
|
deploy.wait_until_space_exists(space_id) |
|
deploy.upload_db_to_space(project, space_id) |
|
print( |
|
f"* View dashboard by going to: {deploy.SPACE_URL.format(space_id=space_id)}" |
|
) |
|
|