Module openpack_toolkit.codalab.operation_segmentation

Evaluation codes for Operation Semantic Segmentation Task

This task is aimed to recognize 9 operations in the manual packing activity. F1-measure with macro average is used as metrics.

Note

OpenPack Challenge uses eval_operation_segmentation() in eval.py for evaluation.

Todo

  • Add task desciption.
  • Add usage (data format)
  • Add detail desciption of evaluation format.
Expand source code
"""Evaluation codes for Operation Semantic Segmentation Task

This task is aimed to recognize 9 operations in the manual packing activity.
F1-measure with macro average is used as metrics.

Note:
    OpenPack Challenge uses `eval_operation_segmentation()` in `eval.py` for evaluation.

Todo:
    * Add task desciption.
    * Add usage (data format)
    * Add detail desciption of evaluation format.
"""
from .eval import eval_operation_segmentation
from .utils import (
    construct_submission_dict,
    eval_operation_segmentation_wrapper,
    make_submission_zipfile,
)

__all__ = [
    "construct_submission_dict",
    "eval_operation_segmentation",
    "eval_operation_segmentation_wrapper",
    "make_submission_zipfile",
]

Sub-modules

openpack_toolkit.codalab.operation_segmentation.eval
openpack_toolkit.codalab.operation_segmentation.utils

Todo

  • Make Unit-Test.
  • Refactoring is needed!

Functions

def construct_submission_dict(outputs: Dict[str, Dict[str, numpy.ndarray]], act_set: ActSet, include_ground_truth: Optional[bool] = False, cfg: Optional[omegaconf.dictconfig.DictConfig] = None) ‑> Dict

Make dict that can be used for submission and eval_workprocess_segmentation() func.

Args

outputs : Dict[str, Dict[str, np.ndarray]]
key is expected to be a pair of user and session. e.g., "U0102-S0100".
act_set : ActSet
-
include_ground_truth : bool, optional
If True, ground truth labels are included in the submission dict. Set True when you calculate scores.
cfg : DictConfig, optional
config dict.

Returns

Dict
submission dict
Expand source code
def construct_submission_dict(
    outputs: Dict[str, Dict[str, np.ndarray]],
    act_set: ActSet,
    include_ground_truth: Optional[bool] = False,
    cfg: Optional[DictConfig] = None,
) -> Dict:
    """Make dict that can be used for submission and `eval_workprocess_segmentation()` func.
    Args:
        outputs (Dict[str, Dict[str, np.ndarray]]): key is expected to be a pair of user and
            session. e.g., "U0102-S0100".
        act_set (ActSet): -
        include_ground_truth (bool, optional): If True, ground truth labels are included
            in the submission dict. Set True when you calculate scores.
        cfg (DictConfig, optional): config dict.
    Returns:
        Dict: submission dict
    """
    submission = dict()

    keys = sorted(outputs.keys())
    for key in keys:
        d = outputs[key]
        record = dict()
        user, session = key.split("-")

        assert d["y"].ndim == 3
        assert d["unixtime"].dtype == np.int64, (
            "unixtime must be np.int64, but got {}".format(d["unixtime"].dtype)
        )

        prediction_sess = act_set.convert_index_to_id(
            np.argmax(d["y"], axis=1).ravel())
        unixtime_pred_sess, prediction_sess = resample_prediction_1Hz(
            ts_unix=d["unixtime"].copy().ravel(), arr=prediction_sess)

        if include_ground_truth:
            with open_dict(cfg):
                cfg.user = {"name": user}
                cfg.session = session

            # TODO: Move to new function ( load_ground_truth() )
            if hasattr(cfg.dataset.annotation, "spec"):
                path = Path(
                    cfg.dataset.annotation.spec.path.dir,
                    cfg.dataset.annotation.spec.path.fname
                )
            else:
                path = Path(
                    cfg.dataset.annotation.path.dir,
                    cfg.dataset.annotation.path.fname
                )
            df_label = pd.read_csv(path)

            label_format = cfg.dataset.annotation.metadata.labels.get(
                "label_format", "")
            if label_format == "soft-target":
                cols = [c for c in df_label.columns if c.startswith("ID")]
                index_to_id = {i: int(c.replace("ID", ""))
                               for i, c in enumerate(cols)}
                df_label["index"] = np.argmax(df_label[cols].values, axis=1)
                df_label["id"] = df_label["index"].apply(
                    lambda ind: index_to_id[ind])

            unixtime_gt_sess = df_label["unixtime"].values
            ground_truth_sess = df_label["id"].values

            # check timestamp
            unixtime_pred_sess, prediction_sess = crop_prediction_sequence(
                unixtime_gt_sess, unixtime_pred_sess, prediction_sess)
            np.testing.assert_array_equal(unixtime_pred_sess, unixtime_gt_sess)

            record["ground_truth"] = ground_truth_sess.copy()

        record["unixtime"] = unixtime_pred_sess.copy()
        record["prediction"] = prediction_sess.copy()
        submission[key] = record

    return submission
def eval_operation_segmentation(t_id: numpy.ndarray = None, y_id: numpy.ndarray = None, classes: Tuple[Tuple[int, str], ...] = None, ignore_class_id: int = None, mode: str = 'final') ‑> pandas.core.frame.DataFrame

Compute metrics (i.e., precision, recall, f1, support) for the given sequence.

Args

t_id : np.ndarray
unixtime and corresponding activity ID, shape=(T,)
y_id : np.ndarray
unixtime and predicted activity ID, shape=(T,)
classes : Tuple
class definition. pairs of class id and name.
mode : str
If final, only the macro score will be calculated. Otherwise, macro avg., weighted avg., and score for each class will be calculated.

Returns

pd.DataFrame

Expand source code
def eval_operation_segmentation(
    t_id: np.ndarray = None,
    y_id: np.ndarray = None,
    classes: Tuple[Tuple[int, str], ...] = None,
    ignore_class_id: int = None,
    mode: str = "final",
) -> pd.DataFrame:
    """Compute metrics (i.e., precision, recall, f1, support) for the given sequence.
    Args:
        t_id (np.ndarray): unixtime and corresponding activity ID, shape=(T,)
        y_id (np.ndarray): unixtime and predicted activity ID, shape=(T,)
        classes (Tuple): class definition. pairs of class id and name.
        mode (str): If final, only the macro score will be calculated. Otherwise,
            macro avg., weighted avg., and score for each class will be calculated.
    Returns:
        pd.DataFrame
    """
    assert t_id.ndim == 1
    assert y_id.ndim == 1
    verify_class_ids(y_id, classes)

    if ignore_class_id is not None:
        t_id, y_id = drop_ignore_class(t_id, y_id, ignore_class_id)
        classes = tuple([t for t in classes if t[0] != ignore_class_id])

    df_scores = [
        calc_avg_metrics(t_id, y_id, classes, average="macro"),
        calc_avg_metrics(t_id, y_id, classes, average="weighted"),
    ]
    if mode != "final":
        df_scores.append(
            calc_class_metrics(t_id, y_id, classes)
        )

    df_scores = pd.concat(
        df_scores,
        axis=0,
        ignore_index=True).set_index("name")
    return df_scores
def eval_operation_segmentation_wrapper(cfg: omegaconf.dictconfig.DictConfig, outputs: Dict[str, Dict[str, numpy.ndarray]], act_set: ActSet = ActSet(classes=(Label(id=100, name='Picking', version='v3.0.0', is_ignore=False, category=None, event=None), Label(id=200, name='Relocate Item Label', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=300, name='Assemble Box', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=400, name='Insert Items', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=500, name='Close Box', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=600, name='Attach Box Label', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=700, name='Scan Label', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=800, name='Attach Shipping Label', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=900, name='Put on Back Table', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=1000, name='Fill out Order', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=8100, name='Null', version='v3.2.2', is_ignore=True, category=None, event=None))), exclude_ignore_class=True) ‑> pandas.core.frame.DataFrame

Compute evaluation metrics from model outputs (predicted probability).

Args

cfg : DictConfig
config dict.
outputs : Dict[str, Dict[str, np.ndarray]]
dict object that contains t_idx and y. t_idx is a 2d array of target class index with shape=(BATCH_SIZE, WINDOW). y is a 3d array of predction probabilities with shape=(BATCH_SIZE, NUM_CLASSES, WINDOW).
act_set : ActSete, optional
class definition.
exclude_ignore_class : bool
If true, ignore classes are excluded. (default: True)

Returns

pd.DataFrame

Expand source code
def eval_operation_segmentation_wrapper(
    cfg: DictConfig,
    outputs: Dict[str, Dict[str, np.ndarray]],
    act_set: ActSet = ActSet(OPENPACK_OPERATIONS),
    exclude_ignore_class=True,
) -> pd.DataFrame:
    """ Compute evaluation metrics from model outputs (predicted probability).
    Args:
        cfg (DictConfig): config dict.
        outputs (Dict[str, Dict[str, np.ndarray]]): dict object that contains t_idx and y.
            t_idx is a 2d array of target class index with shape=(BATCH_SIZE, WINDOW).
            y is a 3d array of predction probabilities with shape=(BATCH_SIZE, NUM_CLASSES, WINDOW).
        act_set (ActSete, optional): class definition.
        exclude_ignore_class (bool): If true, ignore classes are excluded. (default: True)
    Returns:
        pd.DataFrame
    """
    submission = construct_submission_dict(
        outputs, act_set, include_ground_truth=True, cfg=cfg)
    classes = act_set.to_tuple()
    ignore_class_id = act_set.get_ignore_class_id()
    if isinstance(ignore_class_id, tuple):
        raise NotImplementedError()

    # Evaluate
    df_scores = []
    t_id_concat, y_id_concat = [], []
    for key, d in submission.items():
        t_id = d["ground_truth"]
        y_id = d["prediction"]

        t_id_concat.append(t_id.copy())
        y_id_concat.append(y_id.copy())

        df_tmp = eval_operation_segmentation(
            t_id,
            y_id,
            classes=classes,
            ignore_class_id=ignore_class_id,
            mode=None)
        df_tmp["key"] = key
        df_scores.append(df_tmp.reset_index(drop=False))

    # Overall Score
    df_tmp = eval_operation_segmentation(
        np.concatenate(t_id_concat, axis=0),
        np.concatenate(y_id_concat, axis=0),
        classes=classes,
        ignore_class_id=ignore_class_id,
        mode=None,
    )
    df_tmp["key"] = "all"
    df_scores.append(df_tmp.reset_index(drop=False))

    df_scores = pd.concat(df_scores, axis=0, ignore_index=True)
    return df_scores
def make_submission_zipfile(submission: Dict, logdir: pathlib.Path, metadata: dict = None) ‑> None

Check dict contents and generate zip file for codalab submission.

Args

submission : Dict
submission dict
logdir : Path
path to the output directory
metadata : dict
dict of additional information that is included in submission.json. We recommend to include a data split name.

Returns

None (make JSON & zip files)

Expand source code
def make_submission_zipfile(
        submission: Dict,
        logdir: Path,
        metadata: dict = None) -> None:
    """Check dict contents and generate zip file for codalab submission.

    Args:
        submission (Dict): submission dict
        logdir (Path): path to the output directory
        metadata (dict): dict of additional information that is included in
            ``submission.json``. We recommend to include a data split name.
    Returns:
        None (make JSON & zip files)
    """
    # Check data format and convert into pure objects
    submission_clean = dict()
    for key, d in submission.items():
        assert isinstance(d, dict)

        record = dict()
        for arr_name, arr in d.items():
            if arr_name not in ("prediction", "unixtime"):
                logger.warning(
                    f"unexpected entry[{arr_name}] is found in submission dict.")
            assert isinstance(arr, np.ndarray)
            record[arr_name] = arr.tolist()
        submission_clean[key] = record

    # Add meta data
    if metadata is not None:
        submission_clean["meta"] = metadata

    # Write JSON file
    path_json = Path(logdir, "submission.json")
    if path_json.exists():
        os.remove(path_json)
    with open(path_json, "w") as f:
        json.dump(submission_clean, f)
    logger.info(f"write submission.json to {path_json}")

    # Make zip file
    path_zip = Path(logdir, "submission.zip")
    if path_zip.exists():
        os.remove(path_zip)
    with zipfile.ZipFile(path_zip, "w") as zf:
        zf.write(path_json, arcname="./submission.json")
    logger.info(f"write submission.zip to {path_zip}")