indexer.js

import { Bucket } from "./s3.js";
import { Walker } from "./walker.js";
import lodashPkg from "lodash";
const { orderBy, uniqBy, random } = lodashPkg;
import hasha from "hasha";

/** Class representing an S3 Indexer. */
export class Indexer {
    /**
     * Handle content indices in an S3 bucket
     * @constructor
     * @param {Object} params
     * @param {Credentials} params.credentials - the AWS credentials to use for the connection
     */
    constructor({ credentials }) {
        if (!credentials) throw new Error(`Missing required property: 'credentials'`);
        const requiredProperties = ["bucket", "accessKeyId", "secretAccessKey", "region"];
        requiredProperties.forEach((property) => {
            if (!credentials[property]) {
                throw new Error(`Missing required property: '${property}'`);
            }
        });

        this.roCrateFile = "ro-crate-metadata.json";
        this.inventoryFile = "nocfl.inventory.json";
        this.identifierFile = "nocfl.identifier.json";

        this.credentials = credentials;
        this.bucket = new Bucket(credentials);
    }

    /**
     * Create index files
     * @param {Object} params
     * @param {string} [params.prefix] - Create indices for this prefix only
     */
    async createIndices({ domain = null, prefix = null }) {
        prefix = prefix ?? domain;
        const walker = new Walker({ credentials: this.credentials, prefix });
        let indices = {};

        walker.on("object", (object) => {
            let { prefix, type, id, splay } = object;
            let idPrefix = id.slice(0, 1).toLowerCase();
            if (!indices[prefix]) indices[prefix] = {};
            if (!indices[prefix][type]) indices[prefix][type] = {};
            if (!indices[prefix][type][idPrefix]) indices[prefix][type][idPrefix] = [];
            indices[prefix][type][idPrefix].push({ prefix, type, id, splay });
        });
        await walker.walk({ domain });

        let indexFiles = [];
        for (let prefix of Object.keys(indices)) {
            for (let type of Object.keys(indices[prefix])) {
                for (let idPrefix of Object.keys(indices[prefix][type])) {
                    let indexFile = `${prefix}/indices/${type}/${idPrefix}.json`;
                    indexFiles.push(indexFile);
                    await this.bucket.put({
                        target: indexFile,
                        json: orderBy(indices[prefix][type][idPrefix], "id"),
                    });
                }
            }
        }
        return indexFiles;
    }

    /**
     * Patch an index file - add new item to it or remove an existing item. This method works by uploading a patch file
     *  and then running a process to patch all the relevant index files. If it detects a lockfile (a patching process is running)
     *  it will sleep for a short time and then try again. Given this, you probably don't want to 'await'
     *  this in your code as it could take a while to run; depending on how many parallel patch operations are run.
     *
     * @param {Object} params
     * @param {'PUT'|'DELETE'} params.action - the action to perform
     * @param {string} params.className - the class name of the item being operated on
     * @param {string} params.id - the id of the item being operated on
     * @param {string} params.domain - provide this to prefix the paths by domain
     * @param {number} params.splay=1 - the number of characters (from the start of the identifer) when converting the id to a path
     */
    async patchIndex({ action, domain = null, prefix = null, type, className, id, splay = 1 }) {
        if (!["PUT", "DELETE"].includes(action)) {
            throw new Error(`'action' must be one of 'PUT' or 'DELETE'`);
        }

        prefix = prefix ?? domain;
        type = type ?? className;

        let indexBase = `${prefix}/indices/${type}`;
        let patch = {
            action,
            data: { prefix, type, id, splay },
        };
        let patchFile = `patch-${hasha(JSON.stringify(patch), { algorithm: "sha256" })}`;

        await this.bucket.put({ target: `${indexBase}/${patchFile}`, json: patch });
        await new Promise((resolve) => setTimeout(resolve, random(0, 1, true) * 300));
        await this.__patchIndices({ prefix, type });
    }

    /**
     * List indices in a given domain
     * @param {Object} params
     * @param {string} params.prefix - provide the domain of the index file
     * @param {string} params.type - the class name of the item being operated on
     */
    async listIndices({ prefix = null, domain = null, type = null, className = null }) {
        if (!prefix) throw new Error(`You must provide 'prefix'`);
        prefix = prefix ?? domain;
        prefix = `${prefix}/indices`;
        type = type ?? className;
        if (type) prefix = `${prefix}/${type}`;
        let files = (await this.bucket.listObjects({ prefix })).Contents;
        if (!files) return [];
        files = files.map((f) => f.Key);
        return files;
    }

    /**
     * Get an index file
     * @since 1.17.0
     * @param {Object} params
     * @param {string} params.prefix - provide the domain of the index file
     * @param {string} params.type - the class name of the item being operated on
     * @param {string} params.file - the index file name
     */
    async getIndex({ prefix, type, file }) {
        if (!prefix) throw new Error(`You must provide 'prefix'`);
        if (!type) throw new Error(`You must provide 'type'`);
        if (!file) throw new Error(`You must provide 'file'`);
        let indexFile;
        indexFile = `${prefix}/indices/${type}/${file}`;
        try {
            return await this.bucket.readJSON({ target: indexFile });
        } catch (error) {
            if (error.message === "The specified key does not exist.")
                return `Index file does not exist`;
            throw new Error(error.message);
        }
    }

    /**
     * Apply the index files patches to the relevant index files
     * @private
     * @param {Object} params
     * @param {string} params.prefix - provide the domain of the index file
     * @param {string} params.type - the class name of the item being operated on
     */
    async __patchIndices({ prefix, type, count = 1 }) {
        const base = `${prefix}/indices/${type}`;
        const lockFile = `${base}/.update`;

        let exists = await this.bucket.pathExists({ path: lockFile });
        if (exists) {
            if (count < 3) {
                count += 1;

                // wait a little then try this again
                await new Promise((resolve) => setTimeout(resolve, random(1, 2, true) * 1000));
                await this.__patchIndices({ prefix, type, count });
            }
        }

        // upload lock file
        await this.bucket.put({ target: lockFile, json: { date: new Date() } });

        // get list of patch files to be applied
        let patchFiles = await this.bucket.listObjects({ prefix: `${base}/patch` });
        patchFiles = patchFiles?.Contents?.map((f) => f.Key);
        if (!patchFiles?.length) {
            await this.bucket.delete({ keys: [lockFile] });
            return;
        }

        // walk the patchFiles, download the relevant index and patch it
        let indexFiles = {};
        for (let file of patchFiles) {
            let patch;
            try {
                patch = await this.bucket.readJSON({ target: file });
            } catch (error) {
                continue;
            }
            let indexFileName = `${base}/${patch.data.id.slice(0, 1).toLowerCase()}.json`;
            let indexFile = indexFiles[indexFileName];
            if (!indexFile) {
                try {
                    indexFiles[indexFileName] = await this.bucket.readJSON({
                        target: indexFileName,
                    });
                } catch (error) {
                    indexFiles[indexFileName] = [];
                }
            }

            if (patch.action === "PUT") {
                indexFiles[indexFileName].push(patch.data);
            } else if (patch.action === "DELETE") {
                indexFiles[indexFileName] = indexFiles[indexFileName].filter(
                    (i) => i.id !== patch.data.id
                );
            }
            indexFiles[indexFileName] = uniqBy(indexFiles[indexFileName], "id");
        }

        // upload the index files
        for (let file of Object.keys(indexFiles)) {
            await this.bucket.put({ target: file, json: indexFiles[file] });
        }

        // remove the lockfile and patchFiles that we just processed
        await this.bucket.delete({ keys: [lockFile, ...patchFiles] });
    }
}