Module openpack_torch.utils.test_helper

Expand source code
from logging import getLogger
from pathlib import Path

import numpy as np
import pandas as pd
import pytorch_lightning as pl
from omegaconf import DictConfig
from openpack_toolkit import ActSet
from openpack_toolkit.codalab.operation_segmentation import (
    eval_operation_segmentation_wrapper,
)
from openpack_toolkit.configs.datasets.annotations import (
    OPENPACK_ACTIONS,
    OPENPACK_OPERATIONS,
)

from openpack_torch.data.utils import assemble_sequence_list_from_cfg

logger = getLogger(__name__)

SCENARIO_DICT = {
    "U0101-S0100": "S1",
    "U0101-S0200": "S1",
    "U0101-S0300": "S1",
    "U0101-S0400": "S1",
    "U0101-S0500": "S1",
    "U0102-S0100": "S1",
    "U0102-S0200": "S1",
    "U0102-S0300": "S1",
    "U0102-S0400": "S1",
    "U0102-S0500": "S1",
    "U0103-S0100": "S1",
    "U0103-S0200": "S1",
    "U0103-S0300": "S1",
    "U0103-S0400": "S1",
    "U0103-S0500": "S1",
    "U0104-S0100": "S1",
    "U0104-S0200": "S1",
    "U0104-S0300": "S1",
    "U0104-S0400": "S1",
    "U0105-S0100": "S1",
    "U0105-S0200": "S1",
    "U0105-S0300": "S1",
    "U0105-S0400": "S1",
    "U0105-S0500": "S1",
    "U0106-S0100": "S1",
    "U0106-S0200": "S1",
    "U0106-S0300": "S1",
    "U0106-S0400": "S1",
    "U0106-S0500": "S1",
    "U0107-S0100": "S1",
    "U0107-S0200": "S1",
    "U0107-S0300": "S1",
    "U0107-S0400": "S1",
    "U0107-S0500": "S1",
    "U0108-S0100": "S1",
    "U0108-S0200": "S1",
    "U0108-S0300": "S1",
    "U0108-S0400": "S1",
    "U0108-S0500": "S1",
    "U0109-S0100": "S1",
    "U0109-S0200": "S1",
    "U0109-S0300": "S1",
    "U0109-S0400": "S1",
    "U0109-S0500": "S1",
    "U0110-S0100": "S1",
    "U0110-S0200": "S1",
    "U0110-S0300": "S1",
    "U0110-S0400": "S1",
    "U0110-S0500": "S1",
    "U0111-S0100": "S1",
    "U0111-S0200": "S1",
    "U0111-S0300": "S1",
    "U0111-S0400": "S1",
    "U0111-S0500": "S1",
    "U0201-S0100": "S2",
    "U0201-S0200": "S2",
    "U0201-S0300": "S3",
    "U0201-S0400": "S3",
    "U0201-S0500": "S4",
    "U0202-S0100": "S2",
    "U0202-S0200": "S2",
    "U0202-S0300": "S3",
    "U0202-S0400": "S3",
    "U0202-S0500": "S4",
    "U0203-S0100": "S2",
    "U0203-S0200": "S2",
    "U0203-S0300": "S3",
    "U0203-S0400": "S3",
    "U0203-S0500": "S4",
    "U0204-S0100": "S2",
    "U0204-S0200": "S2",
    "U0204-S0300": "S3",
    "U0204-S0400": "S3",
    "U0204-S0500": "S4",
    "U0205-S0100": "S2",
    "U0205-S0200": "S2",
    "U0205-S0300": "S3",
    "U0205-S0400": "S3",
    "U0205-S0500": "S4",
    "U0206-S0100": "S2",
    "U0206-S0200": "S2",
    "U0206-S0300": "S3",
    "U0206-S0400": "S3",
    "U0206-S0500": "S4",
    "U0207-S0100": "S2",
    "U0207-S0200": "S2",
    "U0207-S0300": "S3",
    "U0207-S0400": "S3",
    "U0207-S0500": "S4",
    "U0208-S0100": "S2",
    "U0208-S0200": "S2",
    "U0208-S0300": "S3",
    "U0208-S0400": "S3",
    "U0208-S0500": "S4",
    "U0209-S0100": "S2",
    "U0209-S0200": "S2",
    "U0209-S0300": "S3",
    "U0209-S0400": "S3",
    "U0209-S0500": "S4",
    "U0210-S0100": "S2",
    "U0210-S0200": "S2",
    "U0210-S0300": "S3",
    "U0210-S0400": "S3",
    "U0210-S0500": "S4",
}


def test_helper(
    cfg: DictConfig,
    mode: str,
    datamodule: pl.LightningDataModule,
    plmodel: pl.LightningModule,
    trainer: pl.Trainer,
):
    if cfg.dataset.annotation.name == "openpack-actions-1hz-annotation":
        classes = ActSet(OPENPACK_ACTIONS)
    elif cfg.dataset.annotation.name == "openpack-operations-1hz-annotation":
        classes = ActSet(OPENPACK_OPERATIONS)
    else:
        raise ValueError(f"{cfg.dataset.annotation.name} is not supported.")

    dataloaders = None
    if cfg.metadata.labels.benchmarkType == "benchmark1":
        if mode == "test":
            dataloaders = datamodule.test_dataloader()
            if hasattr(cfg.dataset.split, "spec"):
                split = cfg.dataset.split.spec.test
            else:
                split = cfg.dataset.split.test
        elif mode in ("submission", "test-on-submission"):
            dataloaders = datamodule.submission_dataloader()
            if hasattr(cfg.dataset.split, "spec"):
                split = cfg.dataset.split.spec.submission
            else:
                split = cfg.dataset.split.submission

    elif cfg.metadata.labels.benchmarkType in (
        "benchmark2",
        "benchmark3",
        "benchmark5",
    ):
        if mode == "test":
            dataloaders = datamodule.test_dataloader()
            split = [s.split("-") for s in datamodule.dataset_test.keys()]
    if dataloaders is None:
        raise NotImplementedError(
            f"mode={mode} on benchmark={cfg.metadata.labels.benchmarkType} is not supported."
        )

    outputs = dict()
    for i, dataloader in enumerate(dataloaders):
        user, session = split[i]
        logger.info(f"test on {user}-{session}")

        trainer.test(plmodel, dataloader)

        # save model outputs
        pred_dir = Path(cfg.path.logdir.predict.format(user=user, session=session))
        pred_dir.mkdir(parents=True, exist_ok=True)

        for key, arr in plmodel.test_results.items():
            fname = key.replace("/", "-")
            path = Path(pred_dir, f"{fname}.npy")
            np.save(path, arr)
            logger.info(f"save {key}[shape={arr.shape}] to {path}")

        key = f"{user}-{session}"
        outputs[key] = {
            "y": plmodel.test_results.get("y"),
            "unixtime": plmodel.test_results.get("unixtime"),
        }
        if mode in ("test", "test-on-submission"):
            outputs[key].update(
                {
                    "t_idx": plmodel.test_results.get("t"),
                }
            )

    df_summary = None
    if cfg.metadata.labels.benchmarkType == "benchmark1":
        if mode in ("test", "test-on-submission"):
            split = [k.split("-") for k in outputs.keys()]
            df_summary = compute_score_for_each_scenario(cfg, classes, split, outputs)

            if mode == "test":
                path = Path(cfg.path.logdir.summary.test)
            elif mode == "test-on-submission":
                path = Path(cfg.path.logdir.summary.submission)
            path.parent.mkdir(parents=True, exist_ok=True)
            df_summary.to_csv(path, index=False)
            logger.info(f"write df_summary[shape={df_summary.shape}] to {path}")

            # NOTE: change pandas option to show tha all rows/cols.
            pd.set_option("display.max_rows", None)
            pd.set_option("display.max_columns", None)
            pd.set_option("display.width", 200)
            logger.info(f"df_summary:\n{df_summary}")
        elif mode == "submission":
            raise NotImplementedError()
    elif cfg.metadata.labels.benchmarkType == "benchmark2":
        assert mode == "test"

        for stage in ["train", "test-b2"]:
            logger.info(f"test on stage={stage}")

            split = assemble_sequence_list_from_cfg(cfg, stage)
            df_summary = compute_score_for_each_scenario(cfg, classes, split, outputs)
            path = Path(cfg.path.logdir.summary[stage])
            path.parent.mkdir(parents=True, exist_ok=True)
            df_summary.to_csv(path, index=False)
            logger.info(f"write df_summary[shape={df_summary.shape}] to {path}")

            # NOTE: change pandas option to show tha all rows/cols.
            # pd.set_option('display.max_rows', None)
            pd.set_option("display.max_columns", None)
            pd.set_option("display.width", 200)
            logger.info(f"df_summary:\n{df_summary.tail(15)}")

    elif cfg.metadata.labels.benchmarkType == "benchmark3":
        # FIXME: per scenario score computation
        assert mode == "test"

        # TODO: Prepare stage=train
        for stage in ["test"]:
            logger.info(f"test on stage={stage}")

            split = assemble_sequence_list_from_cfg(cfg, stage)
            df_summary = compute_score_for_each_scenario(cfg, classes, split, outputs)

            path = Path(cfg.path.logdir.summary[stage])
            path.parent.mkdir(parents=True, exist_ok=True)
            df_summary.to_csv(path, index=False)
            logger.info(f"write df_summary[shape={df_summary.shape}] to {path}")

            # NOTE: change pandas option to show tha all rows/cols.
            # pd.set_option('display.max_rows', None)
            pd.set_option("display.max_columns", None)
            pd.set_option("display.width", 200)
            logger.info(f"df_summary:\n{df_summary.tail(15)}")

    elif cfg.metadata.labels.benchmarkType == "benchmark5":
        assert mode == "test"

        for stage in ["test"]:
            logger.info(f"test on stage={stage}")

            split = assemble_sequence_list_from_cfg(cfg, stage)
            df_summary = compute_score_for_each_scenario(cfg, classes, split, outputs)
            path = Path(cfg.path.logdir.summary[stage])
            path.parent.mkdir(parents=True, exist_ok=True)
            df_summary.to_csv(path, index=False)
            logger.info(f"write df_summary[shape={df_summary.shape}] to {path}")

            # NOTE: change pandas option to show tha all rows/cols.
            # pd.set_option('display.max_rows', None)
            pd.set_option("display.max_columns", None)
            pd.set_option("display.width", 200)
            logger.info(f"df_summary:\n{df_summary.tail(15)}")

    return outputs, df_summary


def compute_score_for_each_scenario(cfg, classes, split, outputs):
    df_summary = []

    for target_scenario in ["S1", "S2", "S3", "S4", "all"]:
        _outputs = dict()
        for _user, _sess in split:
            key = f"{_user}-{_sess}"
            scenario = SCENARIO_DICT[key]
            if (scenario == target_scenario) or (target_scenario == "all"):
                _outputs[key] = outputs[key]

        if len(_outputs) > 0:
            df_tmp = eval_operation_segmentation_wrapper(cfg, _outputs, classes)
            df_tmp["scenario"] = target_scenario
            df_summary.append(df_tmp)

    df_summary = pd.concat(df_summary, axis=0)
    return df_summary

Functions

def compute_score_for_each_scenario(cfg, classes, split, outputs)
Expand source code
def compute_score_for_each_scenario(cfg, classes, split, outputs):
    df_summary = []

    for target_scenario in ["S1", "S2", "S3", "S4", "all"]:
        _outputs = dict()
        for _user, _sess in split:
            key = f"{_user}-{_sess}"
            scenario = SCENARIO_DICT[key]
            if (scenario == target_scenario) or (target_scenario == "all"):
                _outputs[key] = outputs[key]

        if len(_outputs) > 0:
            df_tmp = eval_operation_segmentation_wrapper(cfg, _outputs, classes)
            df_tmp["scenario"] = target_scenario
            df_summary.append(df_tmp)

    df_summary = pd.concat(df_summary, axis=0)
    return df_summary
def test_helper(cfg: omegaconf.dictconfig.DictConfig, mode: str, datamodule: pytorch_lightning.core.datamodule.LightningDataModule, plmodel: pytorch_lightning.core.module.LightningModule, trainer: pytorch_lightning.trainer.trainer.Trainer)
Expand source code
def test_helper(
    cfg: DictConfig,
    mode: str,
    datamodule: pl.LightningDataModule,
    plmodel: pl.LightningModule,
    trainer: pl.Trainer,
):
    if cfg.dataset.annotation.name == "openpack-actions-1hz-annotation":
        classes = ActSet(OPENPACK_ACTIONS)
    elif cfg.dataset.annotation.name == "openpack-operations-1hz-annotation":
        classes = ActSet(OPENPACK_OPERATIONS)
    else:
        raise ValueError(f"{cfg.dataset.annotation.name} is not supported.")

    dataloaders = None
    if cfg.metadata.labels.benchmarkType == "benchmark1":
        if mode == "test":
            dataloaders = datamodule.test_dataloader()
            if hasattr(cfg.dataset.split, "spec"):
                split = cfg.dataset.split.spec.test
            else:
                split = cfg.dataset.split.test
        elif mode in ("submission", "test-on-submission"):
            dataloaders = datamodule.submission_dataloader()
            if hasattr(cfg.dataset.split, "spec"):
                split = cfg.dataset.split.spec.submission
            else:
                split = cfg.dataset.split.submission

    elif cfg.metadata.labels.benchmarkType in (
        "benchmark2",
        "benchmark3",
        "benchmark5",
    ):
        if mode == "test":
            dataloaders = datamodule.test_dataloader()
            split = [s.split("-") for s in datamodule.dataset_test.keys()]
    if dataloaders is None:
        raise NotImplementedError(
            f"mode={mode} on benchmark={cfg.metadata.labels.benchmarkType} is not supported."
        )

    outputs = dict()
    for i, dataloader in enumerate(dataloaders):
        user, session = split[i]
        logger.info(f"test on {user}-{session}")

        trainer.test(plmodel, dataloader)

        # save model outputs
        pred_dir = Path(cfg.path.logdir.predict.format(user=user, session=session))
        pred_dir.mkdir(parents=True, exist_ok=True)

        for key, arr in plmodel.test_results.items():
            fname = key.replace("/", "-")
            path = Path(pred_dir, f"{fname}.npy")
            np.save(path, arr)
            logger.info(f"save {key}[shape={arr.shape}] to {path}")

        key = f"{user}-{session}"
        outputs[key] = {
            "y": plmodel.test_results.get("y"),
            "unixtime": plmodel.test_results.get("unixtime"),
        }
        if mode in ("test", "test-on-submission"):
            outputs[key].update(
                {
                    "t_idx": plmodel.test_results.get("t"),
                }
            )

    df_summary = None
    if cfg.metadata.labels.benchmarkType == "benchmark1":
        if mode in ("test", "test-on-submission"):
            split = [k.split("-") for k in outputs.keys()]
            df_summary = compute_score_for_each_scenario(cfg, classes, split, outputs)

            if mode == "test":
                path = Path(cfg.path.logdir.summary.test)
            elif mode == "test-on-submission":
                path = Path(cfg.path.logdir.summary.submission)
            path.parent.mkdir(parents=True, exist_ok=True)
            df_summary.to_csv(path, index=False)
            logger.info(f"write df_summary[shape={df_summary.shape}] to {path}")

            # NOTE: change pandas option to show tha all rows/cols.
            pd.set_option("display.max_rows", None)
            pd.set_option("display.max_columns", None)
            pd.set_option("display.width", 200)
            logger.info(f"df_summary:\n{df_summary}")
        elif mode == "submission":
            raise NotImplementedError()
    elif cfg.metadata.labels.benchmarkType == "benchmark2":
        assert mode == "test"

        for stage in ["train", "test-b2"]:
            logger.info(f"test on stage={stage}")

            split = assemble_sequence_list_from_cfg(cfg, stage)
            df_summary = compute_score_for_each_scenario(cfg, classes, split, outputs)
            path = Path(cfg.path.logdir.summary[stage])
            path.parent.mkdir(parents=True, exist_ok=True)
            df_summary.to_csv(path, index=False)
            logger.info(f"write df_summary[shape={df_summary.shape}] to {path}")

            # NOTE: change pandas option to show tha all rows/cols.
            # pd.set_option('display.max_rows', None)
            pd.set_option("display.max_columns", None)
            pd.set_option("display.width", 200)
            logger.info(f"df_summary:\n{df_summary.tail(15)}")

    elif cfg.metadata.labels.benchmarkType == "benchmark3":
        # FIXME: per scenario score computation
        assert mode == "test"

        # TODO: Prepare stage=train
        for stage in ["test"]:
            logger.info(f"test on stage={stage}")

            split = assemble_sequence_list_from_cfg(cfg, stage)
            df_summary = compute_score_for_each_scenario(cfg, classes, split, outputs)

            path = Path(cfg.path.logdir.summary[stage])
            path.parent.mkdir(parents=True, exist_ok=True)
            df_summary.to_csv(path, index=False)
            logger.info(f"write df_summary[shape={df_summary.shape}] to {path}")

            # NOTE: change pandas option to show tha all rows/cols.
            # pd.set_option('display.max_rows', None)
            pd.set_option("display.max_columns", None)
            pd.set_option("display.width", 200)
            logger.info(f"df_summary:\n{df_summary.tail(15)}")

    elif cfg.metadata.labels.benchmarkType == "benchmark5":
        assert mode == "test"

        for stage in ["test"]:
            logger.info(f"test on stage={stage}")

            split = assemble_sequence_list_from_cfg(cfg, stage)
            df_summary = compute_score_for_each_scenario(cfg, classes, split, outputs)
            path = Path(cfg.path.logdir.summary[stage])
            path.parent.mkdir(parents=True, exist_ok=True)
            df_summary.to_csv(path, index=False)
            logger.info(f"write df_summary[shape={df_summary.shape}] to {path}")

            # NOTE: change pandas option to show tha all rows/cols.
            # pd.set_option('display.max_rows', None)
            pd.set_option("display.max_columns", None)
            pd.set_option("display.width", 200)
            logger.info(f"df_summary:\n{df_summary.tail(15)}")

    return outputs, df_summary