Module openpack_toolkit.codalab.operation_segmentation.utils

Todo

  • Make Unit-Test.
  • Refactoring is needed!
Expand source code
"""
Todo:
    - Make Unit-Test.
    - Refactoring is needed!
"""
import json
import os
import zipfile
from logging import getLogger
from pathlib import Path
from typing import Dict, Optional, Tuple

import numpy as np
import pandas as pd
from omegaconf import DictConfig, open_dict

from ...activity import ActSet
from ...configs.datasets.annotations import OPENPACK_OPERATIONS
from .eval import eval_operation_segmentation

logger = getLogger(__name__)

# -----------------------------------------------------------------------------


def resample_prediction_1Hz(
    ts_unix: np.ndarray = None,
    arr: np.ndarray = None,
) -> Tuple[np.ndarray, np.ndarray]:
    """Change the sampling rate into 1 Hz.

    Args:
        ts_unix (np.ndarray): 1d array of unixtimestamp. Defaults to None.
        arr (np.ndarray): 1d array of class IDs. Defaults to None.

    Returns:
        Tuple[np.ndarray, np.ndarray]: arrays of unixtime and resampled class ids.
    """
    assert arr.ndim == 1
    assert ts_unix.ndim == 1
    assert len(arr) == len(
        ts_unix), f"ts_unix={ts_unix.shape}, arr={arr.shape}"

    tmp = ts_unix - (ts_unix % 1000)
    ts_unix_1hz = np.append(tmp, tmp[-1] + 1000)  # FIXME: write in one line

    delta = (ts_unix_1hz[1:] - ts_unix_1hz[:-1])
    assert delta.min() >= 0, (
        "values in array are expected to be monotonically increasing, "
        f"but the minium step is {delta.min()}."
    )

    arr_out, ts_unix_out = [], []
    cur_time = ts_unix_1hz[0]
    for r in range(len(ts_unix_1hz)):
        if cur_time != ts_unix_1hz[r]:
            arr_out.append(arr[r - 1])
            ts_unix_out.append(cur_time)
            cur_time = ts_unix_1hz[r]

    return (
        np.array(ts_unix_out),
        np.array(arr_out),
    )


def ffill_missing_elements(
    unixtime_gt: np.ndarray,
    unixtime_pred: np.ndarray,
    prediction: np.ndarray,
):
    missing_timestep = sorted(
        list(set(unixtime_gt.tolist()) - set(unixtime_pred.tolist())))
    if len(missing_timestep) == 0:
        return unixtime_pred, prediction
    logger.warning(
        f"{len(missing_timestep)} elements are missing from prediction.: {missing_timestep}")

    for ts in missing_timestep:
        ind = np.where(unixtime_gt == ts)[0][0]

        val = prediction[ind - 1] if ind > 0 else -1
        unixtime_pred = np.insert(unixtime_pred, ind, ts)
        prediction = np.insert(prediction, ind, val)

        logger.warning(
            f"fill missing element at ts={ts} (ind={ind}) with {val}.")

        # DEBUG: Remove before merge
        delta = set(unixtime_gt[:ind]) - set(unixtime_pred[:ind])
        assert len(delta) == 0, delta

    assert len(unixtime_pred) == len(unixtime_gt), (
        f"unixtime_pred={unixtime_pred.shape}, unixtime_gt={unixtime_gt.shape}"
    )
    assert len(prediction) == len(unixtime_gt), (
        f"prediction={prediction.shape}, unixtime_gt={unixtime_gt.shape}"
    )
    return unixtime_pred, prediction


def crop_prediction_sequence(
    unixtime_gt: np.ndarray,
    unixtime_pred: np.ndarray,
    prediction: np.ndarray,
):
    """Crop prediction array to have the same timestamp as reference data.
    Args:
        unixtime_gt (np.ndarray): unixtimes of the ground truth sequence
        unixtime_pred (np.ndarray): unixtimes of the prediction sequence
        prediction (np.ndarray): -
    """
    ts_head, ts_tail = unixtime_gt[0], unixtime_gt[-1]
    logger.debug(f"ground truth: ts_head={ts_head}, ts_tail={ts_tail}")
    logger.debug(
        f"prediction  : ts_head={int(unixtime_pred[0])}, ts_tail={int(unixtime_pred[-1])}")

    ind = np.where((unixtime_pred >= ts_head) & (unixtime_pred <= ts_tail))[0]
    logger.debug(
        f"ind={ind.shape}, unixtime_pred={unixtime_pred.shape}, prediction={prediction.shape}")

    unixtime_pred = unixtime_pred[ind]
    prediction = prediction[ind]

    unixtime_pred, prediction = ffill_missing_elements(
        unixtime_gt,
        unixtime_pred,
        prediction,
    )

    return unixtime_pred, prediction


def construct_submission_dict(
    outputs: Dict[str, Dict[str, np.ndarray]],
    act_set: ActSet,
    include_ground_truth: Optional[bool] = False,
    cfg: Optional[DictConfig] = None,
) -> Dict:
    """Make dict that can be used for submission and `eval_workprocess_segmentation()` func.
    Args:
        outputs (Dict[str, Dict[str, np.ndarray]]): key is expected to be a pair of user and
            session. e.g., "U0102-S0100".
        act_set (ActSet): -
        include_ground_truth (bool, optional): If True, ground truth labels are included
            in the submission dict. Set True when you calculate scores.
        cfg (DictConfig, optional): config dict.
    Returns:
        Dict: submission dict
    """
    submission = dict()

    keys = sorted(outputs.keys())
    for key in keys:
        d = outputs[key]
        record = dict()
        user, session = key.split("-")

        assert d["y"].ndim == 3
        assert d["unixtime"].dtype == np.int64, (
            "unixtime must be np.int64, but got {}".format(d["unixtime"].dtype)
        )

        prediction_sess = act_set.convert_index_to_id(
            np.argmax(d["y"], axis=1).ravel())
        unixtime_pred_sess, prediction_sess = resample_prediction_1Hz(
            ts_unix=d["unixtime"].copy().ravel(), arr=prediction_sess)

        if include_ground_truth:
            with open_dict(cfg):
                cfg.user = {"name": user}
                cfg.session = session

            # TODO: Move to new function ( load_ground_truth() )
            if hasattr(cfg.dataset.annotation, "spec"):
                path = Path(
                    cfg.dataset.annotation.spec.path.dir,
                    cfg.dataset.annotation.spec.path.fname
                )
            else:
                path = Path(
                    cfg.dataset.annotation.path.dir,
                    cfg.dataset.annotation.path.fname
                )
            df_label = pd.read_csv(path)

            label_format = cfg.dataset.annotation.metadata.labels.get(
                "label_format", "")
            if label_format == "soft-target":
                cols = [c for c in df_label.columns if c.startswith("ID")]
                index_to_id = {i: int(c.replace("ID", ""))
                               for i, c in enumerate(cols)}
                df_label["index"] = np.argmax(df_label[cols].values, axis=1)
                df_label["id"] = df_label["index"].apply(
                    lambda ind: index_to_id[ind])

            unixtime_gt_sess = df_label["unixtime"].values
            ground_truth_sess = df_label["id"].values

            # check timestamp
            unixtime_pred_sess, prediction_sess = crop_prediction_sequence(
                unixtime_gt_sess, unixtime_pred_sess, prediction_sess)
            np.testing.assert_array_equal(unixtime_pred_sess, unixtime_gt_sess)

            record["ground_truth"] = ground_truth_sess.copy()

        record["unixtime"] = unixtime_pred_sess.copy()
        record["prediction"] = prediction_sess.copy()
        submission[key] = record

    return submission


def make_submission_zipfile(
        submission: Dict,
        logdir: Path,
        metadata: dict = None) -> None:
    """Check dict contents and generate zip file for codalab submission.

    Args:
        submission (Dict): submission dict
        logdir (Path): path to the output directory
        metadata (dict): dict of additional information that is included in
            ``submission.json``. We recommend to include a data split name.
    Returns:
        None (make JSON & zip files)
    """
    # Check data format and convert into pure objects
    submission_clean = dict()
    for key, d in submission.items():
        assert isinstance(d, dict)

        record = dict()
        for arr_name, arr in d.items():
            if arr_name not in ("prediction", "unixtime"):
                logger.warning(
                    f"unexpected entry[{arr_name}] is found in submission dict.")
            assert isinstance(arr, np.ndarray)
            record[arr_name] = arr.tolist()
        submission_clean[key] = record

    # Add meta data
    if metadata is not None:
        submission_clean["meta"] = metadata

    # Write JSON file
    path_json = Path(logdir, "submission.json")
    if path_json.exists():
        os.remove(path_json)
    with open(path_json, "w") as f:
        json.dump(submission_clean, f)
    logger.info(f"write submission.json to {path_json}")

    # Make zip file
    path_zip = Path(logdir, "submission.zip")
    if path_zip.exists():
        os.remove(path_zip)
    with zipfile.ZipFile(path_zip, "w") as zf:
        zf.write(path_json, arcname="./submission.json")
    logger.info(f"write submission.zip to {path_zip}")


# -----------------------------------------------------------------------------

def eval_operation_segmentation_wrapper(
    cfg: DictConfig,
    outputs: Dict[str, Dict[str, np.ndarray]],
    act_set: ActSet = ActSet(OPENPACK_OPERATIONS),
    exclude_ignore_class=True,
) -> pd.DataFrame:
    """ Compute evaluation metrics from model outputs (predicted probability).
    Args:
        cfg (DictConfig): config dict.
        outputs (Dict[str, Dict[str, np.ndarray]]): dict object that contains t_idx and y.
            t_idx is a 2d array of target class index with shape=(BATCH_SIZE, WINDOW).
            y is a 3d array of predction probabilities with shape=(BATCH_SIZE, NUM_CLASSES, WINDOW).
        act_set (ActSete, optional): class definition.
        exclude_ignore_class (bool): If true, ignore classes are excluded. (default: True)
    Returns:
        pd.DataFrame
    """
    submission = construct_submission_dict(
        outputs, act_set, include_ground_truth=True, cfg=cfg)
    classes = act_set.to_tuple()
    ignore_class_id = act_set.get_ignore_class_id()
    if isinstance(ignore_class_id, tuple):
        raise NotImplementedError()

    # Evaluate
    df_scores = []
    t_id_concat, y_id_concat = [], []
    for key, d in submission.items():
        t_id = d["ground_truth"]
        y_id = d["prediction"]

        t_id_concat.append(t_id.copy())
        y_id_concat.append(y_id.copy())

        df_tmp = eval_operation_segmentation(
            t_id,
            y_id,
            classes=classes,
            ignore_class_id=ignore_class_id,
            mode=None)
        df_tmp["key"] = key
        df_scores.append(df_tmp.reset_index(drop=False))

    # Overall Score
    df_tmp = eval_operation_segmentation(
        np.concatenate(t_id_concat, axis=0),
        np.concatenate(y_id_concat, axis=0),
        classes=classes,
        ignore_class_id=ignore_class_id,
        mode=None,
    )
    df_tmp["key"] = "all"
    df_scores.append(df_tmp.reset_index(drop=False))

    df_scores = pd.concat(df_scores, axis=0, ignore_index=True)
    return df_scores

Functions

def construct_submission_dict(outputs: Dict[str, Dict[str, numpy.ndarray]], act_set: ActSet, include_ground_truth: Optional[bool] = False, cfg: Optional[omegaconf.dictconfig.DictConfig] = None) ‑> Dict

Make dict that can be used for submission and eval_workprocess_segmentation() func.

Args

outputs : Dict[str, Dict[str, np.ndarray]]
key is expected to be a pair of user and session. e.g., "U0102-S0100".
act_set : ActSet
-
include_ground_truth : bool, optional
If True, ground truth labels are included in the submission dict. Set True when you calculate scores.
cfg : DictConfig, optional
config dict.

Returns

Dict
submission dict
Expand source code
def construct_submission_dict(
    outputs: Dict[str, Dict[str, np.ndarray]],
    act_set: ActSet,
    include_ground_truth: Optional[bool] = False,
    cfg: Optional[DictConfig] = None,
) -> Dict:
    """Make dict that can be used for submission and `eval_workprocess_segmentation()` func.
    Args:
        outputs (Dict[str, Dict[str, np.ndarray]]): key is expected to be a pair of user and
            session. e.g., "U0102-S0100".
        act_set (ActSet): -
        include_ground_truth (bool, optional): If True, ground truth labels are included
            in the submission dict. Set True when you calculate scores.
        cfg (DictConfig, optional): config dict.
    Returns:
        Dict: submission dict
    """
    submission = dict()

    keys = sorted(outputs.keys())
    for key in keys:
        d = outputs[key]
        record = dict()
        user, session = key.split("-")

        assert d["y"].ndim == 3
        assert d["unixtime"].dtype == np.int64, (
            "unixtime must be np.int64, but got {}".format(d["unixtime"].dtype)
        )

        prediction_sess = act_set.convert_index_to_id(
            np.argmax(d["y"], axis=1).ravel())
        unixtime_pred_sess, prediction_sess = resample_prediction_1Hz(
            ts_unix=d["unixtime"].copy().ravel(), arr=prediction_sess)

        if include_ground_truth:
            with open_dict(cfg):
                cfg.user = {"name": user}
                cfg.session = session

            # TODO: Move to new function ( load_ground_truth() )
            if hasattr(cfg.dataset.annotation, "spec"):
                path = Path(
                    cfg.dataset.annotation.spec.path.dir,
                    cfg.dataset.annotation.spec.path.fname
                )
            else:
                path = Path(
                    cfg.dataset.annotation.path.dir,
                    cfg.dataset.annotation.path.fname
                )
            df_label = pd.read_csv(path)

            label_format = cfg.dataset.annotation.metadata.labels.get(
                "label_format", "")
            if label_format == "soft-target":
                cols = [c for c in df_label.columns if c.startswith("ID")]
                index_to_id = {i: int(c.replace("ID", ""))
                               for i, c in enumerate(cols)}
                df_label["index"] = np.argmax(df_label[cols].values, axis=1)
                df_label["id"] = df_label["index"].apply(
                    lambda ind: index_to_id[ind])

            unixtime_gt_sess = df_label["unixtime"].values
            ground_truth_sess = df_label["id"].values

            # check timestamp
            unixtime_pred_sess, prediction_sess = crop_prediction_sequence(
                unixtime_gt_sess, unixtime_pred_sess, prediction_sess)
            np.testing.assert_array_equal(unixtime_pred_sess, unixtime_gt_sess)

            record["ground_truth"] = ground_truth_sess.copy()

        record["unixtime"] = unixtime_pred_sess.copy()
        record["prediction"] = prediction_sess.copy()
        submission[key] = record

    return submission
def crop_prediction_sequence(unixtime_gt: numpy.ndarray, unixtime_pred: numpy.ndarray, prediction: numpy.ndarray)

Crop prediction array to have the same timestamp as reference data.

Args

unixtime_gt : np.ndarray
unixtimes of the ground truth sequence
unixtime_pred : np.ndarray
unixtimes of the prediction sequence
prediction : np.ndarray
-
Expand source code
def crop_prediction_sequence(
    unixtime_gt: np.ndarray,
    unixtime_pred: np.ndarray,
    prediction: np.ndarray,
):
    """Crop prediction array to have the same timestamp as reference data.
    Args:
        unixtime_gt (np.ndarray): unixtimes of the ground truth sequence
        unixtime_pred (np.ndarray): unixtimes of the prediction sequence
        prediction (np.ndarray): -
    """
    ts_head, ts_tail = unixtime_gt[0], unixtime_gt[-1]
    logger.debug(f"ground truth: ts_head={ts_head}, ts_tail={ts_tail}")
    logger.debug(
        f"prediction  : ts_head={int(unixtime_pred[0])}, ts_tail={int(unixtime_pred[-1])}")

    ind = np.where((unixtime_pred >= ts_head) & (unixtime_pred <= ts_tail))[0]
    logger.debug(
        f"ind={ind.shape}, unixtime_pred={unixtime_pred.shape}, prediction={prediction.shape}")

    unixtime_pred = unixtime_pred[ind]
    prediction = prediction[ind]

    unixtime_pred, prediction = ffill_missing_elements(
        unixtime_gt,
        unixtime_pred,
        prediction,
    )

    return unixtime_pred, prediction
def eval_operation_segmentation_wrapper(cfg: omegaconf.dictconfig.DictConfig, outputs: Dict[str, Dict[str, numpy.ndarray]], act_set: ActSet = ActSet(classes=(Label(id=100, name='Picking', version='v3.0.0', is_ignore=False, category=None, event=None), Label(id=200, name='Relocate Item Label', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=300, name='Assemble Box', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=400, name='Insert Items', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=500, name='Close Box', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=600, name='Attach Box Label', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=700, name='Scan Label', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=800, name='Attach Shipping Label', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=900, name='Put on Back Table', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=1000, name='Fill out Order', version='v3.2.2', is_ignore=False, category=None, event=None), Label(id=8100, name='Null', version='v3.2.2', is_ignore=True, category=None, event=None))), exclude_ignore_class=True) ‑> pandas.core.frame.DataFrame

Compute evaluation metrics from model outputs (predicted probability).

Args

cfg : DictConfig
config dict.
outputs : Dict[str, Dict[str, np.ndarray]]
dict object that contains t_idx and y. t_idx is a 2d array of target class index with shape=(BATCH_SIZE, WINDOW). y is a 3d array of predction probabilities with shape=(BATCH_SIZE, NUM_CLASSES, WINDOW).
act_set : ActSete, optional
class definition.
exclude_ignore_class : bool
If true, ignore classes are excluded. (default: True)

Returns

pd.DataFrame

Expand source code
def eval_operation_segmentation_wrapper(
    cfg: DictConfig,
    outputs: Dict[str, Dict[str, np.ndarray]],
    act_set: ActSet = ActSet(OPENPACK_OPERATIONS),
    exclude_ignore_class=True,
) -> pd.DataFrame:
    """ Compute evaluation metrics from model outputs (predicted probability).
    Args:
        cfg (DictConfig): config dict.
        outputs (Dict[str, Dict[str, np.ndarray]]): dict object that contains t_idx and y.
            t_idx is a 2d array of target class index with shape=(BATCH_SIZE, WINDOW).
            y is a 3d array of predction probabilities with shape=(BATCH_SIZE, NUM_CLASSES, WINDOW).
        act_set (ActSete, optional): class definition.
        exclude_ignore_class (bool): If true, ignore classes are excluded. (default: True)
    Returns:
        pd.DataFrame
    """
    submission = construct_submission_dict(
        outputs, act_set, include_ground_truth=True, cfg=cfg)
    classes = act_set.to_tuple()
    ignore_class_id = act_set.get_ignore_class_id()
    if isinstance(ignore_class_id, tuple):
        raise NotImplementedError()

    # Evaluate
    df_scores = []
    t_id_concat, y_id_concat = [], []
    for key, d in submission.items():
        t_id = d["ground_truth"]
        y_id = d["prediction"]

        t_id_concat.append(t_id.copy())
        y_id_concat.append(y_id.copy())

        df_tmp = eval_operation_segmentation(
            t_id,
            y_id,
            classes=classes,
            ignore_class_id=ignore_class_id,
            mode=None)
        df_tmp["key"] = key
        df_scores.append(df_tmp.reset_index(drop=False))

    # Overall Score
    df_tmp = eval_operation_segmentation(
        np.concatenate(t_id_concat, axis=0),
        np.concatenate(y_id_concat, axis=0),
        classes=classes,
        ignore_class_id=ignore_class_id,
        mode=None,
    )
    df_tmp["key"] = "all"
    df_scores.append(df_tmp.reset_index(drop=False))

    df_scores = pd.concat(df_scores, axis=0, ignore_index=True)
    return df_scores
def ffill_missing_elements(unixtime_gt: numpy.ndarray, unixtime_pred: numpy.ndarray, prediction: numpy.ndarray)
Expand source code
def ffill_missing_elements(
    unixtime_gt: np.ndarray,
    unixtime_pred: np.ndarray,
    prediction: np.ndarray,
):
    missing_timestep = sorted(
        list(set(unixtime_gt.tolist()) - set(unixtime_pred.tolist())))
    if len(missing_timestep) == 0:
        return unixtime_pred, prediction
    logger.warning(
        f"{len(missing_timestep)} elements are missing from prediction.: {missing_timestep}")

    for ts in missing_timestep:
        ind = np.where(unixtime_gt == ts)[0][0]

        val = prediction[ind - 1] if ind > 0 else -1
        unixtime_pred = np.insert(unixtime_pred, ind, ts)
        prediction = np.insert(prediction, ind, val)

        logger.warning(
            f"fill missing element at ts={ts} (ind={ind}) with {val}.")

        # DEBUG: Remove before merge
        delta = set(unixtime_gt[:ind]) - set(unixtime_pred[:ind])
        assert len(delta) == 0, delta

    assert len(unixtime_pred) == len(unixtime_gt), (
        f"unixtime_pred={unixtime_pred.shape}, unixtime_gt={unixtime_gt.shape}"
    )
    assert len(prediction) == len(unixtime_gt), (
        f"prediction={prediction.shape}, unixtime_gt={unixtime_gt.shape}"
    )
    return unixtime_pred, prediction
def make_submission_zipfile(submission: Dict, logdir: pathlib.Path, metadata: dict = None) ‑> None

Check dict contents and generate zip file for codalab submission.

Args

submission : Dict
submission dict
logdir : Path
path to the output directory
metadata : dict
dict of additional information that is included in submission.json. We recommend to include a data split name.

Returns

None (make JSON & zip files)

Expand source code
def make_submission_zipfile(
        submission: Dict,
        logdir: Path,
        metadata: dict = None) -> None:
    """Check dict contents and generate zip file for codalab submission.

    Args:
        submission (Dict): submission dict
        logdir (Path): path to the output directory
        metadata (dict): dict of additional information that is included in
            ``submission.json``. We recommend to include a data split name.
    Returns:
        None (make JSON & zip files)
    """
    # Check data format and convert into pure objects
    submission_clean = dict()
    for key, d in submission.items():
        assert isinstance(d, dict)

        record = dict()
        for arr_name, arr in d.items():
            if arr_name not in ("prediction", "unixtime"):
                logger.warning(
                    f"unexpected entry[{arr_name}] is found in submission dict.")
            assert isinstance(arr, np.ndarray)
            record[arr_name] = arr.tolist()
        submission_clean[key] = record

    # Add meta data
    if metadata is not None:
        submission_clean["meta"] = metadata

    # Write JSON file
    path_json = Path(logdir, "submission.json")
    if path_json.exists():
        os.remove(path_json)
    with open(path_json, "w") as f:
        json.dump(submission_clean, f)
    logger.info(f"write submission.json to {path_json}")

    # Make zip file
    path_zip = Path(logdir, "submission.zip")
    if path_zip.exists():
        os.remove(path_zip)
    with zipfile.ZipFile(path_zip, "w") as zf:
        zf.write(path_json, arcname="./submission.json")
    logger.info(f"write submission.zip to {path_zip}")
def resample_prediction_1Hz(ts_unix: numpy.ndarray = None, arr: numpy.ndarray = None) ‑> Tuple[numpy.ndarray, numpy.ndarray]

Change the sampling rate into 1 Hz.

Args

ts_unix : np.ndarray
1d array of unixtimestamp. Defaults to None.
arr : np.ndarray
1d array of class IDs. Defaults to None.

Returns

Tuple[np.ndarray, np.ndarray]
arrays of unixtime and resampled class ids.
Expand source code
def resample_prediction_1Hz(
    ts_unix: np.ndarray = None,
    arr: np.ndarray = None,
) -> Tuple[np.ndarray, np.ndarray]:
    """Change the sampling rate into 1 Hz.

    Args:
        ts_unix (np.ndarray): 1d array of unixtimestamp. Defaults to None.
        arr (np.ndarray): 1d array of class IDs. Defaults to None.

    Returns:
        Tuple[np.ndarray, np.ndarray]: arrays of unixtime and resampled class ids.
    """
    assert arr.ndim == 1
    assert ts_unix.ndim == 1
    assert len(arr) == len(
        ts_unix), f"ts_unix={ts_unix.shape}, arr={arr.shape}"

    tmp = ts_unix - (ts_unix % 1000)
    ts_unix_1hz = np.append(tmp, tmp[-1] + 1000)  # FIXME: write in one line

    delta = (ts_unix_1hz[1:] - ts_unix_1hz[:-1])
    assert delta.min() >= 0, (
        "values in array are expected to be monotonically increasing, "
        f"but the minium step is {delta.min()}."
    )

    arr_out, ts_unix_out = [], []
    cur_time = ts_unix_1hz[0]
    for r in range(len(ts_unix_1hz)):
        if cur_time != ts_unix_1hz[r]:
            arr_out.append(arr[r - 1])
            ts_unix_out.append(cur_time)
            cur_time = ts_unix_1hz[r]

    return (
        np.array(ts_unix_out),
        np.array(arr_out),
    )