Module elpis.datasets

Expand source code
from elpis.datasets.dataset import CleaningOptions, Dataset, ProcessingBatch
from elpis.datasets.preprocessing import process_batch
from elpis.datasets.processing import prepare_dataset, create_dataset

__all__ = [
    "CleaningOptions",
    "Dataset",
    "ProcessingBatch",
    "process_batch",
    "create_dataset",
    "prepare_dataset",
]

Sub-modules

elpis.datasets.clean_text
elpis.datasets.dataset
elpis.datasets.extract_annotations
elpis.datasets.preprocessing
elpis.datasets.processing

Functions

def create_dataset(job: Job) ‑> datasets.dataset_dict.DatasetDict | datasets.dataset_dict.IterableDatasetDict
Expand source code
def create_dataset(job: Job) -> DatasetDict | IterableDatasetDict:
    if Path(job.data_args.dataset_name_or_path).is_dir():
        return create_local_dataset(job)

    return create_hf_dataset(job)
def prepare_dataset(job: Job, tokenizer: transformers.models.auto.tokenization_auto.AutoTokenizer, feature_extractor: transformers.models.auto.feature_extraction_auto.AutoFeatureExtractor, dataset: datasets.dataset_dict.DatasetDict | datasets.dataset_dict.IterableDatasetDict) ‑> datasets.dataset_dict.DatasetDict | datasets.dataset_dict.IterableDatasetDict

Runs some preprocessing over the given dataset.

Parameters

dataset: The dataset on which to apply the preprocessing processor: The processor to apply over the dataset

Expand source code
def prepare_dataset(
    job: Job,
    tokenizer: AutoTokenizer,
    feature_extractor: AutoFeatureExtractor,
    dataset: DatasetDict | IterableDatasetDict,
) -> DatasetDict | IterableDatasetDict:
    """Runs some preprocessing over the given dataset.

    Parameters:
        dataset: The dataset on which to apply the preprocessing
        processor: The processor to apply over the dataset
    """
    dataset = clean_dataset(job, dataset)
    dataset = constrain_to_max_samples(job, dataset)

    # Load the audio data and resample if necessary.
    dataset = dataset.cast_column(
        job.data_args.audio_column_name,
        Audio(sampling_rate=feature_extractor.sampling_rate),  # type: ignore
    )

    def _prepare_dataset(batch: Dict) -> Dict[str, List]:
        audio = batch[job.data_args.audio_column_name]
        inputs = feature_extractor(  # type: ignore
            audio["array"], sampling_rate=audio["sampling_rate"]
        )

        batch["input_values"] = inputs.input_values[0]
        batch["input_length"] = len(batch["input_values"])

        # encode targets
        additional_kwargs = {}
        phoneme_language = job.data_args.phoneme_language
        if phoneme_language is not None:
            additional_kwargs["phonemizer_lang"] = phoneme_language

        batch["labels"] = tokenizer(batch[job.data_args.text_column_name], **additional_kwargs).input_ids  # type: ignore
        return batch

    max_input_length = (
        job.data_args.max_duration_in_seconds * feature_extractor.sampling_rate  # type: ignore
    )
    min_input_length = (
        job.data_args.min_duration_in_seconds * feature_extractor.sampling_rate  # type: ignore
    )

    def is_audio_in_length_range(length: int):
        return length >= min_input_length and length <= max_input_length

    with job.training_args.main_process_first(desc="dataset map preprocessing"):
        worker_count = job.data_args.preprocessing_num_workers

        kwargs = {}
        if not job.data_args.stream_dataset:
            kwargs = {
                "num_proc": worker_count,
                "desc": "Dataset Preprocessing",
            }

        dataset = dataset.map(
            _prepare_dataset,
            remove_columns=next(iter(dataset.values())).column_names,
            **kwargs,
        )

        # filter data that is shorter than min_input_length
        dataset = dataset.filter(
            is_audio_in_length_range, input_columns=["input_length"], **kwargs
        )

    logger.info(f"Test encoding labels: {dataset['train'][0]['labels']}")

    return dataset
def process_batch(batch: ProcessingBatch, output_dir: pathlib.Path = PosixPath('/tmp')) ‑> Iterable[pathlib.Path]

Generates training files from the processing batch and puts them in the given directory.

Parameters

batch: The processing batch to generate files from output_dir: The directory in which to stick the files.

Returns

The paths of the generated files.

Expand source code
def process_batch(
    batch: ProcessingBatch, output_dir: Path = DEFAULT_DIR
) -> Iterable[Path]:
    """Generates training files from the processing batch and puts them in
    the given directory.

    Parameters:
        batch: The processing batch to generate files from
        output_dir: The directory in which to stick the files.

    Returns:
        The paths of the generated files.
    """
    annotations = extract_annotations(
        transcription_file=batch.transcription_file, elan_options=batch.elan_options
    )

    annotations = map(
        lambda annotation: clean_annotation(annotation, batch.cleaning_options),
        annotations,
    )

    # Generate training files from the annotations
    return chain(
        *map(
            lambda annotation: generate_training_files(
                annotation, output_dir=output_dir
            ),
            annotations,
        )
    )

Classes

class CleaningOptions (punctuation_to_remove: str = '', punctuation_to_explode: str = '', words_to_remove: List[str] = <factory>)

A class representing cleaning options for a dataset.

Expand source code
@dataclass
class CleaningOptions:
    """A class representing cleaning options for a dataset."""

    punctuation_to_remove: str = ""
    punctuation_to_explode: str = ""
    words_to_remove: List[str] = field(default_factory=list)

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> CleaningOptions:
        kwargs = {field.name: data[field.name] for field in fields(CleaningOptions)}
        return cls(**kwargs)

    def to_dict(self) -> Dict[str, Any]:
        return dict(self.__dict__)

Class variables

var punctuation_to_explode : str
var punctuation_to_remove : str
var words_to_remove : List[str]

Static methods

def from_dict(data: Dict[str, Any]) ‑> CleaningOptions
Expand source code
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> CleaningOptions:
    kwargs = {field.name: data[field.name] for field in fields(CleaningOptions)}
    return cls(**kwargs)

Methods

def to_dict(self) ‑> Dict[str, Any]
Expand source code
def to_dict(self) -> Dict[str, Any]:
    return dict(self.__dict__)
class Dataset (name: str, files: List[Path], cleaning_options: CleaningOptions, elan_options: Optional[ElanOptions])

A class representing an unprocessed dataset.

Expand source code
@dataclass
class Dataset:
    """A class representing an unprocessed dataset."""

    name: str
    files: List[Path]
    cleaning_options: CleaningOptions
    elan_options: Optional[ElanOptions]

    def __post_init__(self):
        self.files = sorted(self.files)

    def is_empty(self) -> bool:
        """Returns true iff the dataset contains no files."""
        return len(self.files) == 0

    def has_elan(self) -> bool:
        """Returns true iff any of the files in the dataset is an elan file."""
        return any(map((lambda file_name: file_name.suffix == ".eaf"), self.files))

    def is_valid(self) -> bool:
        """Returns true iff this dataset is valid for processing."""
        return (
            not self.is_empty()
            and len(self.files) % 2 == 0
            and len(self.mismatched_files) == 0
            and len(self.colliding_files) == 0
        )

    @staticmethod
    def is_audio(file: Path) -> bool:
        return file.suffix == ".wav"

    @staticmethod
    def is_transcript(file: Path) -> bool:
        return file.suffix in TRANSCRIPTION_EXTENSIONS

    @staticmethod
    def corresponding_audio_name(transcript_file: Path) -> Path:
        """Gets the corresponding audio file name for a given transcript file."""
        return Path(transcript_file).parent / (transcript_file.stem + ".wav")

    @property
    def transcript_files(self) -> Iterable[Path]:
        """Returns an iterable of all transcription files within the dataset."""
        return filter(Dataset.is_transcript, self.files)

    @cached_property
    def mismatched_files(self) -> Set[Path]:
        """Returns the list of transcript files with no corresponding
        audio and vice versa.

        Corresponding in this case means that for every transcript file with
        name x.some_extension, there is a corresponding file x.wav in the dataset.

        Returns:
            A list of the mismatched file names.
        """
        grouped_by_stems = groupby(self.files, lambda path: path.stem)

        def mismatches(files: Iterable[Path]) -> list[Path]:
            files = list(files)
            has_audio = any(Dataset.is_audio(file) for file in files)
            has_transcript = any(Dataset.is_transcript(file) for file in files)
            return [] if has_transcript == has_audio else files

        groups = (mismatches(g) for _, g in grouped_by_stems)
        result = set(chain.from_iterable(groups))
        return result

    @cached_property
    def colliding_files(self) -> Set[Path]:
        """Returns the list of transcript file names that collide.

        Collide means that two transcript files would be for the same .wav
        file.

        Returns:
            A list of the colliding file names.
        """
        grouped_by_stems = groupby(self.transcript_files, lambda path: path.stem)

        def collisions(files: Iterable[Path]) -> list[Path]:
            files = list(files)
            return files if len(files) >= 2 else []

        collision_groups = (collisions(g) for _, g in grouped_by_stems)
        return set(chain.from_iterable(collision_groups))

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> Dataset:
        name = data["name"]
        files = [Path(file) for file in data["files"]]
        cleaning_options = CleaningOptions.from_dict(data["cleaning_options"])

        elan_options = None
        if "elan_options" in data:
            elan_options = ElanOptions.from_dict(data["elan_options"])

        return cls(
            name=name,
            files=files,
            cleaning_options=cleaning_options,
            elan_options=elan_options,
        )

    @property
    def valid_transcriptions(self):
        is_valid = lambda path: path not in (
            self.mismatched_files | self.colliding_files
        )
        return filter(is_valid, self.transcript_files)

    def to_batches(self) -> Iterable[ProcessingBatch]:
        """Converts a valid dataset to a list of processing jobs, matching
        transcript and audio files.
        """
        return (
            ProcessingBatch(
                transcription_file=transcription_file,
                audio_file=self.corresponding_audio_name(transcription_file),
                cleaning_options=self.cleaning_options,
                elan_options=self.elan_options,
            )
            for transcription_file in self.valid_transcriptions
        )

    def to_dict(self) -> Dict[str, Any]:
        result = {
            "name": self.name,
            "files": [file.name for file in self.files],
            "cleaning_options": self.cleaning_options.to_dict(),
        }

        if self.elan_options is not None:
            result["elan_options"] = self.elan_options.to_dict()

        return result

Class variables

var cleaning_optionsCleaningOptions
var elan_options : Optional[ElanOptions]
var files : List[pathlib.Path]
var name : str

Static methods

def corresponding_audio_name(transcript_file: Path) ‑> pathlib.Path

Gets the corresponding audio file name for a given transcript file.

Expand source code
@staticmethod
def corresponding_audio_name(transcript_file: Path) -> Path:
    """Gets the corresponding audio file name for a given transcript file."""
    return Path(transcript_file).parent / (transcript_file.stem + ".wav")
def from_dict(data: Dict[str, Any]) ‑> Dataset
Expand source code
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Dataset:
    name = data["name"]
    files = [Path(file) for file in data["files"]]
    cleaning_options = CleaningOptions.from_dict(data["cleaning_options"])

    elan_options = None
    if "elan_options" in data:
        elan_options = ElanOptions.from_dict(data["elan_options"])

    return cls(
        name=name,
        files=files,
        cleaning_options=cleaning_options,
        elan_options=elan_options,
    )
def is_audio(file: Path) ‑> bool
Expand source code
@staticmethod
def is_audio(file: Path) -> bool:
    return file.suffix == ".wav"
def is_transcript(file: Path) ‑> bool
Expand source code
@staticmethod
def is_transcript(file: Path) -> bool:
    return file.suffix in TRANSCRIPTION_EXTENSIONS

Instance variables

var colliding_files

Returns the list of transcript file names that collide.

Collide means that two transcript files would be for the same .wav file.

Returns

A list of the colliding file names.

Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        with self.lock:
            # check if another thread filled cache while we awaited lock
            val = cache.get(self.attrname, _NOT_FOUND)
            if val is _NOT_FOUND:
                val = self.func(instance)
                try:
                    cache[self.attrname] = val
                except TypeError:
                    msg = (
                        f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                        f"does not support item assignment for caching {self.attrname!r} property."
                    )
                    raise TypeError(msg) from None
    return val
var mismatched_files

Returns the list of transcript files with no corresponding audio and vice versa.

Corresponding in this case means that for every transcript file with name x.some_extension, there is a corresponding file x.wav in the dataset.

Returns

A list of the mismatched file names.

Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        with self.lock:
            # check if another thread filled cache while we awaited lock
            val = cache.get(self.attrname, _NOT_FOUND)
            if val is _NOT_FOUND:
                val = self.func(instance)
                try:
                    cache[self.attrname] = val
                except TypeError:
                    msg = (
                        f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                        f"does not support item assignment for caching {self.attrname!r} property."
                    )
                    raise TypeError(msg) from None
    return val
var transcript_files : Iterable[pathlib.Path]

Returns an iterable of all transcription files within the dataset.

Expand source code
@property
def transcript_files(self) -> Iterable[Path]:
    """Returns an iterable of all transcription files within the dataset."""
    return filter(Dataset.is_transcript, self.files)
var valid_transcriptions
Expand source code
@property
def valid_transcriptions(self):
    is_valid = lambda path: path not in (
        self.mismatched_files | self.colliding_files
    )
    return filter(is_valid, self.transcript_files)

Methods

def has_elan(self) ‑> bool

Returns true iff any of the files in the dataset is an elan file.

Expand source code
def has_elan(self) -> bool:
    """Returns true iff any of the files in the dataset is an elan file."""
    return any(map((lambda file_name: file_name.suffix == ".eaf"), self.files))
def is_empty(self) ‑> bool

Returns true iff the dataset contains no files.

Expand source code
def is_empty(self) -> bool:
    """Returns true iff the dataset contains no files."""
    return len(self.files) == 0
def is_valid(self) ‑> bool

Returns true iff this dataset is valid for processing.

Expand source code
def is_valid(self) -> bool:
    """Returns true iff this dataset is valid for processing."""
    return (
        not self.is_empty()
        and len(self.files) % 2 == 0
        and len(self.mismatched_files) == 0
        and len(self.colliding_files) == 0
    )
def to_batches(self) ‑> Iterable[ProcessingBatch]

Converts a valid dataset to a list of processing jobs, matching transcript and audio files.

Expand source code
def to_batches(self) -> Iterable[ProcessingBatch]:
    """Converts a valid dataset to a list of processing jobs, matching
    transcript and audio files.
    """
    return (
        ProcessingBatch(
            transcription_file=transcription_file,
            audio_file=self.corresponding_audio_name(transcription_file),
            cleaning_options=self.cleaning_options,
            elan_options=self.elan_options,
        )
        for transcription_file in self.valid_transcriptions
    )
def to_dict(self) ‑> Dict[str, Any]
Expand source code
def to_dict(self) -> Dict[str, Any]:
    result = {
        "name": self.name,
        "files": [file.name for file in self.files],
        "cleaning_options": self.cleaning_options.to_dict(),
    }

    if self.elan_options is not None:
        result["elan_options"] = self.elan_options.to_dict()

    return result
class ProcessingBatch (audio_file: Path, transcription_file: Path, cleaning_options: CleaningOptions, elan_options: Optional[ElanOptions])

A class encapsulating the data needed for an individual processing job

Expand source code
@dataclass
class ProcessingBatch:
    """A class encapsulating the data needed for an individual processing job"""

    audio_file: Path
    transcription_file: Path
    cleaning_options: CleaningOptions
    elan_options: Optional[ElanOptions]

    def to_dict(self) -> Dict[str, Any]:
        result = {}

        result["audio_file"] = str(self.audio_file)
        result["transcription_file"] = str(self.transcription_file)
        result["cleaning_options"] = self.cleaning_options.to_dict()
        if self.elan_options is not None:
            result["elan_options"] = self.elan_options.to_dict()

        return result

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> ProcessingBatch:
        audio_file = Path(data["audio_file"])
        transcription_file = Path(data["transcription_file"])
        cleaning_options = CleaningOptions.from_dict(data["cleaning_options"])
        elan_options = ElanOptions.from_dict(data["elan_options"])
        return cls(
            audio_file=audio_file,
            transcription_file=transcription_file,
            cleaning_options=cleaning_options,
            elan_options=elan_options,
        )

Class variables

var audio_file : pathlib.Path
var cleaning_optionsCleaningOptions
var elan_options : Optional[ElanOptions]
var transcription_file : pathlib.Path

Static methods

def from_dict(data: Dict[str, Any]) ‑> ProcessingBatch
Expand source code
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> ProcessingBatch:
    audio_file = Path(data["audio_file"])
    transcription_file = Path(data["transcription_file"])
    cleaning_options = CleaningOptions.from_dict(data["cleaning_options"])
    elan_options = ElanOptions.from_dict(data["elan_options"])
    return cls(
        audio_file=audio_file,
        transcription_file=transcription_file,
        cleaning_options=cleaning_options,
        elan_options=elan_options,
    )

Methods

def to_dict(self) ‑> Dict[str, Any]
Expand source code
def to_dict(self) -> Dict[str, Any]:
    result = {}

    result["audio_file"] = str(self.audio_file)
    result["transcription_file"] = str(self.transcription_file)
    result["cleaning_options"] = self.cleaning_options.to_dict()
    if self.elan_options is not None:
        result["elan_options"] = self.elan_options.to_dict()

    return result