Module elpis.datasets.preprocessing

Expand source code
import json
import shutil
from copy import copy
from itertools import chain
from pathlib import Path
from typing import Iterable, List, Tuple

import elpis.utils.audio as audio
from elpis.datasets.clean_text import clean_text
from elpis.datasets.dataset import CleaningOptions, ProcessingBatch
from elpis.datasets.extract_annotations import extract_annotations
from elpis.models.annotation import Annotation

DEFAULT_DIR = Path("/tmp")
TARGET_SAMPLE_RATE = 16_000


def process_batch(
    batch: ProcessingBatch, output_dir: Path = DEFAULT_DIR
) -> Iterable[Path]:
    """Generates training files from the processing batch and puts them in
    the given directory.

    Parameters:
        batch: The processing batch to generate files from
        output_dir: The directory in which to stick the files.

    Returns:
        The paths of the generated files.
    """
    annotations = extract_annotations(
        transcription_file=batch.transcription_file, elan_options=batch.elan_options
    )

    annotations = map(
        lambda annotation: clean_annotation(annotation, batch.cleaning_options),
        annotations,
    )

    # Generate training files from the annotations
    return chain(
        *map(
            lambda annotation: generate_training_files(
                annotation, output_dir=output_dir
            ),
            annotations,
        )
    )


def clean_annotation(
    annotation: Annotation, cleaning_options: CleaningOptions
) -> Annotation:
    """Cleans the text within an annotation.

    Parameters:
        annotation: The annotation to clean.
        cleaning_options: The cleaning options for the dataset.

    Returns:
        A new annotation whose transcript has been cleaned.
    """
    transcript = clean_text(
        text=annotation.transcript,
        words_to_remove=cleaning_options.words_to_remove,
        characters_to_explode=cleaning_options.punctuation_to_explode,
        characters_to_remove=cleaning_options.punctuation_to_remove,
    )
    result = copy(annotation)
    result.transcript = transcript
    return result


def generate_training_files(
    annotation: Annotation, output_dir: Path = DEFAULT_DIR
) -> Tuple[Path, Path]:
    """Generates a transcript and audio file pairing for this annotation.

    If the annotation is timed (has a start and stop time), we return a path
    to a new audio file, which is constrained to the given times. Otherwise,
    the annotation spans the entire audio path, and so we return this path,
    unmodified.

    Parameters:
        annotation: The annotation for a given section of audio within the
            supplied audio_file.
        output_dir: The directory in which to store the generated files.

    Returns:
        A tuple containing a transcription and audio file path for the given
            annotation.
    """
    # Get a unique name prefix based on annotation start time
    audio_file = annotation.audio_file
    name = audio_file.stem
    if annotation.start_ms is not None:
        name = f"{name}_{annotation.start_ms}"

    # Save audio file.
    if annotation.is_timed():
        cut_audio_file = output_dir / f"{name}.wav"
        audio.cut(
            audio_path=audio_file,
            destination=cut_audio_file,
            start_ms=annotation.start_ms,  # type: ignore
            stop_ms=annotation.stop_ms,  # type: ignore
        )
        audio_file = cut_audio_file
    else:
        # Make sure we're putting the audio file in the output dir
        if audio_file.parent != output_dir:
            shutil.copy(str(audio_file), str(output_dir / audio_file.name))
            audio_file = output_dir / audio_file.name

    # Resample audio to standardise for training
    audio.resample(
        audio_path=audio_file,
        destination=audio_file,
        sample_rate=TARGET_SAMPLE_RATE,
    )

    # Save gimped transcription_file
    next_annotation = Annotation(
        audio_file=audio_file, transcript=annotation.transcript
    )
    transcription_file = output_dir / f"{name}.json"
    with open(transcription_file, "w") as f:
        json.dump(next_annotation.to_dict(), f)

    return transcription_file, audio_file


def has_finished_processing(
    dataset_files: List[str], processed_files: List[str]
) -> bool:
    """Checks whether the dataset has finished processing.

    Parameters:
        dataset_files: A list of names of the files in the dataset.
        processed_files: A list of names of files uploaded to cloud storage for
            the corresponding dataset.

    Returns:
        true iff the supplied list of processed files would be a valid
        processed dataset for the initial files.
    """

    required_stems = {Path(name).stem for name in dataset_files}
    uploaded_stems = {Path(name).stem for name in processed_files}

    def is_processed(required_stem: str) -> bool:
        starts_with_required_stem = lambda stem: stem.startswith(required_stem)
        return any(map(starts_with_required_stem, uploaded_stems))

    return all(map(is_processed, required_stems))

Functions

def clean_annotation(annotation: Annotation, cleaning_options: CleaningOptions) ‑> Annotation

Cleans the text within an annotation.

Parameters

annotation: The annotation to clean. cleaning_options: The cleaning options for the dataset.

Returns

A new annotation whose transcript has been cleaned.

Expand source code
def clean_annotation(
    annotation: Annotation, cleaning_options: CleaningOptions
) -> Annotation:
    """Cleans the text within an annotation.

    Parameters:
        annotation: The annotation to clean.
        cleaning_options: The cleaning options for the dataset.

    Returns:
        A new annotation whose transcript has been cleaned.
    """
    transcript = clean_text(
        text=annotation.transcript,
        words_to_remove=cleaning_options.words_to_remove,
        characters_to_explode=cleaning_options.punctuation_to_explode,
        characters_to_remove=cleaning_options.punctuation_to_remove,
    )
    result = copy(annotation)
    result.transcript = transcript
    return result
def generate_training_files(annotation: Annotation, output_dir: pathlib.Path = PosixPath('/tmp')) ‑> Tuple[pathlib.Path, pathlib.Path]

Generates a transcript and audio file pairing for this annotation.

If the annotation is timed (has a start and stop time), we return a path to a new audio file, which is constrained to the given times. Otherwise, the annotation spans the entire audio path, and so we return this path, unmodified.

Parameters

annotation: The annotation for a given section of audio within the supplied audio_file. output_dir: The directory in which to store the generated files.

Returns

A tuple containing a transcription and audio file path for the given annotation.

Expand source code
def generate_training_files(
    annotation: Annotation, output_dir: Path = DEFAULT_DIR
) -> Tuple[Path, Path]:
    """Generates a transcript and audio file pairing for this annotation.

    If the annotation is timed (has a start and stop time), we return a path
    to a new audio file, which is constrained to the given times. Otherwise,
    the annotation spans the entire audio path, and so we return this path,
    unmodified.

    Parameters:
        annotation: The annotation for a given section of audio within the
            supplied audio_file.
        output_dir: The directory in which to store the generated files.

    Returns:
        A tuple containing a transcription and audio file path for the given
            annotation.
    """
    # Get a unique name prefix based on annotation start time
    audio_file = annotation.audio_file
    name = audio_file.stem
    if annotation.start_ms is not None:
        name = f"{name}_{annotation.start_ms}"

    # Save audio file.
    if annotation.is_timed():
        cut_audio_file = output_dir / f"{name}.wav"
        audio.cut(
            audio_path=audio_file,
            destination=cut_audio_file,
            start_ms=annotation.start_ms,  # type: ignore
            stop_ms=annotation.stop_ms,  # type: ignore
        )
        audio_file = cut_audio_file
    else:
        # Make sure we're putting the audio file in the output dir
        if audio_file.parent != output_dir:
            shutil.copy(str(audio_file), str(output_dir / audio_file.name))
            audio_file = output_dir / audio_file.name

    # Resample audio to standardise for training
    audio.resample(
        audio_path=audio_file,
        destination=audio_file,
        sample_rate=TARGET_SAMPLE_RATE,
    )

    # Save gimped transcription_file
    next_annotation = Annotation(
        audio_file=audio_file, transcript=annotation.transcript
    )
    transcription_file = output_dir / f"{name}.json"
    with open(transcription_file, "w") as f:
        json.dump(next_annotation.to_dict(), f)

    return transcription_file, audio_file
def has_finished_processing(dataset_files: List[str], processed_files: List[str]) ‑> bool

Checks whether the dataset has finished processing.

Parameters

dataset_files: A list of names of the files in the dataset. processed_files: A list of names of files uploaded to cloud storage for the corresponding dataset.

Returns

true iff the supplied list of processed files would be a valid processed dataset for the initial files.

Expand source code
def has_finished_processing(
    dataset_files: List[str], processed_files: List[str]
) -> bool:
    """Checks whether the dataset has finished processing.

    Parameters:
        dataset_files: A list of names of the files in the dataset.
        processed_files: A list of names of files uploaded to cloud storage for
            the corresponding dataset.

    Returns:
        true iff the supplied list of processed files would be a valid
        processed dataset for the initial files.
    """

    required_stems = {Path(name).stem for name in dataset_files}
    uploaded_stems = {Path(name).stem for name in processed_files}

    def is_processed(required_stem: str) -> bool:
        starts_with_required_stem = lambda stem: stem.startswith(required_stem)
        return any(map(starts_with_required_stem, uploaded_stems))

    return all(map(is_processed, required_stems))
def process_batch(batch: ProcessingBatch, output_dir: pathlib.Path = PosixPath('/tmp')) ‑> Iterable[pathlib.Path]

Generates training files from the processing batch and puts them in the given directory.

Parameters

batch: The processing batch to generate files from output_dir: The directory in which to stick the files.

Returns

The paths of the generated files.

Expand source code
def process_batch(
    batch: ProcessingBatch, output_dir: Path = DEFAULT_DIR
) -> Iterable[Path]:
    """Generates training files from the processing batch and puts them in
    the given directory.

    Parameters:
        batch: The processing batch to generate files from
        output_dir: The directory in which to stick the files.

    Returns:
        The paths of the generated files.
    """
    annotations = extract_annotations(
        transcription_file=batch.transcription_file, elan_options=batch.elan_options
    )

    annotations = map(
        lambda annotation: clean_annotation(annotation, batch.cleaning_options),
        annotations,
    )

    # Generate training files from the annotations
    return chain(
        *map(
            lambda annotation: generate_training_files(
                annotation, output_dir=output_dir
            ),
            annotations,
        )
    )