Module openpack_torch.data
Expand source code
from . import datasets
from .datamodule import OpenPackBaseDataModule
__all__ = [
"datasets",
"OpenPackBaseDataModule"
]
Sub-modules
openpack_torch.data.datamodule
-
Utilities for PyTorch Lightning DataModule …
openpack_torch.data.datasets
-
Dataset Class for OpenPack dataset.
openpack_torch.data.preprocessing
openpack_torch.data.utils
Classes
class OpenPackBaseDataModule (cfg: omegaconf.dictconfig.DictConfig)
-
Base class of PyTorch Lightning DataModule. A datamodule is a shareable, reusable class that encapsulates all the steps needed to process data:
Attributes
dataset_class
:torch.utils.data.Dataset
- dataset class. this variable is call to create dataset instances.
cfg
:DictConfig
- config object. The all parameters used to initialuze dataset class should be included in this object.
batch_size
:int
- batch size.
debug
:bool
- If True, enable debug mode.
Attributes
prepare_data_per_node: If True, each LOCAL_RANK=0 will call prepare data. Otherwise only NODE_RANK=0, LOCAL_RANK=0 will prepare data. allow_zero_length_dataloader_with_multiple_devices: If True, dataloader with zero length within local rank is allowed. Default value is False.
Expand source code
class OpenPackBaseDataModule(pl.LightningDataModule): """Base class of PyTorch Lightning DataModule. A datamodule is a shareable, reusable class that encapsulates all the steps needed to process data: Attributes: dataset_class (torch.utils.data.Dataset): dataset class. this variable is call to create dataset instances. cfg (DictConfig): config object. The all parameters used to initialuze dataset class should be included in this object. batch_size (int): batch size. debug (bool): If True, enable debug mode. """ dataset_class: torch.utils.data.Dataset def __init__(self, cfg: DictConfig): super().__init__() self.cfg = cfg self.debug = cfg.debug if cfg.debug: self.batch_size = cfg.train.debug.batch_size else: self.batch_size = cfg.train.batch_size def get_kwargs_for_datasets(self, stage: Optional[str] = None) -> Dict: """Build a kwargs to initialize dataset class. This method is called in ``setup()``. Args: stage (str, optional): dataset type. {train, validate, test, submission}. Example: :: def get_kwargs_for_datasets(self) -> Dict: kwargs = { "window": self.cfg.train.window, "debug": self.cfg.debug, } return kwargs Returns: Dict: """ kwargs = { "window": self.cfg.train.window, "debug": self.cfg.debug, } return kwargs def _init_datasets( self, user_session: Tuple[int, int], kwargs: Dict, ) -> Dict[str, torch.utils.data.Dataset]: """Returns list of initialized dataset object. Args: rootdir (Path): _description_ user_session (Tuple[int, int]): _description_ kwargs (Dict): _description_ Returns: Dict[str, torch.utils.data.Dataset]: dataset objects """ datasets = dict() for user, session in user_session: key = f"{user}-{session}" datasets[key] = self.dataset_class( copy.deepcopy(self.cfg), [(user, session)], **kwargs ) return datasets def setup(self, stage: Optional[str] = None) -> None: if hasattr(self.cfg.dataset.split, "spec"): split = self.cfg.dataset.split.spec else: split = self.cfg.dataset.split if stage in (None, "fit"): kwargs = self.get_kwargs_for_datasets(stage="train") self.op_train = self.dataset_class(self.cfg, split.train, **kwargs) if self.cfg.train.random_crop: self.op_train.random_crop = True log.debug(f"enable random_crop in training dataset: {self.op_train}") else: self.op_train = None if stage in (None, "fit", "validate"): kwargs = self.get_kwargs_for_datasets(stage="validate") self.op_val = self._init_datasets(split.val, kwargs) else: self.op_val = None if stage in (None, "test"): kwargs = self.get_kwargs_for_datasets(stage="test") self.op_test = self._init_datasets(split.test, kwargs) else: self.op_test = None if stage in (None, "submission"): kwargs = self.get_kwargs_for_datasets(stage="submission") kwargs.update({"submission": True}) self.op_submission = self._init_datasets(split.submission, kwargs) elif stage == "test-on-submission": kwargs = self.get_kwargs_for_datasets(stage="submission") self.op_submission = self._init_datasets(split.submission, kwargs) else: self.op_submission = None log.info(f"dataset[train]: {self.op_train}") log.info(f"dataset[val]: {self.op_val}") log.info(f"dataset[test]: {self.op_test}") log.info(f"dataset[submission]: {self.op_submission}") def train_dataloader(self) -> DataLoader: return DataLoader( self.op_train, batch_size=self.batch_size, shuffle=True, num_workers=self.cfg.train.num_workers, ) def val_dataloader(self) -> List[DataLoader]: dataloaders = [] for key, dataset in self.op_val.items(): dataloaders.append( DataLoader( dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.cfg.train.num_workers, ) ) return dataloaders def test_dataloader(self) -> List[DataLoader]: dataloaders = [] for key, dataset in self.op_test.items(): dataloaders.append( DataLoader( dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.cfg.train.num_workers, ) ) return dataloaders def submission_dataloader(self) -> List[DataLoader]: dataloaders = [] for key, dataset in self.op_submission.items(): dataloaders.append( DataLoader( dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.cfg.train.num_workers, ) ) return dataloaders
Ancestors
- pytorch_lightning.core.datamodule.LightningDataModule
- pytorch_lightning.core.hooks.DataHooks
- pytorch_lightning.core.mixins.hparams_mixin.HyperparametersMixin
Subclasses
Class variables
var dataset_class : torch.utils.data.dataset.Dataset
Methods
def get_kwargs_for_datasets(self, stage: Optional[str] = None) ‑> Dict
-
Build a kwargs to initialize dataset class. This method is called in
setup()
.Args
stage
:str
, optional- dataset type. {train, validate, test, submission}.
Example
::
def get_kwargs_for_datasets(self) -> Dict: kwargs = { "window": self.cfg.train.window, "debug": self.cfg.debug, } return kwargs
Returns
Dict:
Expand source code
def get_kwargs_for_datasets(self, stage: Optional[str] = None) -> Dict: """Build a kwargs to initialize dataset class. This method is called in ``setup()``. Args: stage (str, optional): dataset type. {train, validate, test, submission}. Example: :: def get_kwargs_for_datasets(self) -> Dict: kwargs = { "window": self.cfg.train.window, "debug": self.cfg.debug, } return kwargs Returns: Dict: """ kwargs = { "window": self.cfg.train.window, "debug": self.cfg.debug, } return kwargs
def setup(self, stage: Optional[str] = None) ‑> None
-
Called at the beginning of fit (train + validate), validate, test, or predict. This is a good hook when you need to build models dynamically or adjust something about them. This hook is called on every process when using DDP.
Args
stage
- either
'fit'
,'validate'
,'test'
, or'predict'
Example::
class LitModel(...): def __init__(self): self.l1 = None def prepare_data(self): download_data() tokenize() # don't do this self.something = else def setup(self, stage): data = load_data(...) self.l1 = nn.Linear(28, data.num_classes)
Expand source code
def setup(self, stage: Optional[str] = None) -> None: if hasattr(self.cfg.dataset.split, "spec"): split = self.cfg.dataset.split.spec else: split = self.cfg.dataset.split if stage in (None, "fit"): kwargs = self.get_kwargs_for_datasets(stage="train") self.op_train = self.dataset_class(self.cfg, split.train, **kwargs) if self.cfg.train.random_crop: self.op_train.random_crop = True log.debug(f"enable random_crop in training dataset: {self.op_train}") else: self.op_train = None if stage in (None, "fit", "validate"): kwargs = self.get_kwargs_for_datasets(stage="validate") self.op_val = self._init_datasets(split.val, kwargs) else: self.op_val = None if stage in (None, "test"): kwargs = self.get_kwargs_for_datasets(stage="test") self.op_test = self._init_datasets(split.test, kwargs) else: self.op_test = None if stage in (None, "submission"): kwargs = self.get_kwargs_for_datasets(stage="submission") kwargs.update({"submission": True}) self.op_submission = self._init_datasets(split.submission, kwargs) elif stage == "test-on-submission": kwargs = self.get_kwargs_for_datasets(stage="submission") self.op_submission = self._init_datasets(split.submission, kwargs) else: self.op_submission = None log.info(f"dataset[train]: {self.op_train}") log.info(f"dataset[val]: {self.op_val}") log.info(f"dataset[test]: {self.op_test}") log.info(f"dataset[submission]: {self.op_submission}")
def submission_dataloader(self) ‑> List[torch.utils.data.dataloader.DataLoader]
-
Expand source code
def submission_dataloader(self) -> List[DataLoader]: dataloaders = [] for key, dataset in self.op_submission.items(): dataloaders.append( DataLoader( dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.cfg.train.num_workers, ) ) return dataloaders
def test_dataloader(self) ‑> List[torch.utils.data.dataloader.DataLoader]
-
An iterable or collection of iterables specifying test samples.
For more information about multiple dataloaders, see this :ref:
section <multiple-dataloaders>
.For data processing use the following pattern:
- download in :meth:<code>prepare\_data</code> - process and split in :meth:<code>setup</code>
However, the above are only necessary for distributed processing.
Warning: do not assign state in prepare_data
- :meth:
~pytorch_lightning.trainer.trainer.Trainer.test
- :meth:
prepare_data
- :meth:
setup
Note
Lightning tries to add the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.
Note
If you don't need a test dataset and a :meth:
test_step
, you don't need to implement this method.Expand source code
def test_dataloader(self) -> List[DataLoader]: dataloaders = [] for key, dataset in self.op_test.items(): dataloaders.append( DataLoader( dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.cfg.train.num_workers, ) ) return dataloaders
- :meth:
def train_dataloader(self) ‑> torch.utils.data.dataloader.DataLoader
-
An iterable or collection of iterables specifying training samples.
For more information about multiple dataloaders, see this :ref:
section <multiple-dataloaders>
.The dataloader you return will not be reloaded unless you set :paramref:
~pytorch_lightning.trainer.trainer.Trainer.reload_dataloaders_every_n_epochs
to a positive integer.For data processing use the following pattern:
- download in :meth:<code>prepare\_data</code> - process and split in :meth:<code>setup</code>
However, the above are only necessary for distributed processing.
Warning: do not assign state in prepare_data
- :meth:
~pytorch_lightning.trainer.trainer.Trainer.fit
- :meth:
prepare_data
- :meth:
setup
Note
Lightning tries to add the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.
Expand source code
def train_dataloader(self) -> DataLoader: return DataLoader( self.op_train, batch_size=self.batch_size, shuffle=True, num_workers=self.cfg.train.num_workers, )
- :meth:
def val_dataloader(self) ‑> List[torch.utils.data.dataloader.DataLoader]
-
An iterable or collection of iterables specifying validation samples.
For more information about multiple dataloaders, see this :ref:
section <multiple-dataloaders>
.The dataloader you return will not be reloaded unless you set :paramref:
~pytorch_lightning.trainer.trainer.Trainer.reload_dataloaders_every_n_epochs
to a positive integer.It's recommended that all data downloads and preparation happen in :meth:
prepare_data
.- :meth:
~pytorch_lightning.trainer.trainer.Trainer.fit
- :meth:
~pytorch_lightning.trainer.trainer.Trainer.validate
- :meth:
prepare_data
- :meth:
setup
Note
Lightning tries to add the correct sampler for distributed and arbitrary hardware There is no need to set it yourself.
Note
If you don't need a validation dataset and a :meth:
validation_step
, you don't need to implement this method.Expand source code
def val_dataloader(self) -> List[DataLoader]: dataloaders = [] for key, dataset in self.op_val.items(): dataloaders.append( DataLoader( dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.cfg.train.num_workers, ) ) return dataloaders
- :meth: