Skip to content

script

Module to define a SQL script

Script

Bases: BaseModel

SQL Script class, basically a container for Statements

Source code in xerini/script.py
class Script(BaseModel):
    """SQL Script class, basically a container for Statements"""

    raw_code: Optional[str] = None

    @classmethod
    @validate_call
    def from_string(cls, text: str) -> Self:
        """This is just a helper function"""
        return cls(raw_code=text)

    @classmethod
    @validate_call
    def from_file(cls, pth: Path) -> Self:
        """Constructs the Script object from the sql in a file"""
        if pth.suffix.lower() != ".sql":
            raise FileNotFoundError("I am only allowed to read *.sql files!")
        with pth.open("r", encoding="utf-8") as fin:
            text = fin.read()
        return cls(raw_code=text)

    @classmethod
    @validate_call
    def from_directory(cls, directory: Path) -> Self:
        """Constructs the Script object from a directory of sql files"""
        if not directory.is_dir():
            raise NotADirectoryError(f"{dir=} is not a directory!")
        texts = []
        for pth in directory.iterdir():
            if pth.is_file() and pth.suffix.lower() == ".sql":
                with pth.open("r", encoding="utf-8") as fin:
                    texts.append(fin.read())
        return cls(raw_code="\n".join(texts))

    @property
    def formatted_text(self) -> str:
        """Render string"""
        return "\n\n".join(stmt.formatted_text for stmt in self.statements)

    def keys(self) -> set[str | None]:
        """The keys to the dictionary are the names of the affected table of the code"""
        return {
            stmt.affected_table
            for stmt in self.statements
            if stmt.write_type != TableWriteType.NO_WRITING
        }

    def __getitem__(self, item: str) -> list[Statement]:
        """Dictionary form to retrieve statements pertinent to an affected table"""
        if item not in self.keys():
            raise KeyError(f"{item=} is not an affected table of the script!")
        return [
            stmt
            for stmt in self.statements
            if (
                (stmt.write_type != TableWriteType.NO_WRITING)
                & (stmt.affected_table == item)
            )
        ]

    @cached_property
    def statements(self) -> list[Statement]:
        """The list of statements that make up the script"""
        return [Statement(text=sub) for sub in sqlparse.split(self.raw_code)]

    @property
    def statement_types(self) -> list[StatementType]:
        """The statement types"""
        return [_s.statement_type for _s in self.statements]

    @cached_property
    def statement_query_types(self) -> list[QueryType | None]:
        """The statement query types"""
        return [_s.query_type for _s in self.statements]

    @property
    def statement_write_types(self) -> list[TableWriteType]:
        """The statement write types"""
        return [_s.write_type for _s in self.statements]

    @property
    def is_valid(self) -> bool:
        """Are all the statements in the script valid ones"""
        return all(_qt for _qt in self.statement_query_types)

    @validate_call
    def write(self, output: Path) -> None:
        """Formatted output to the file at the path"""
        if not self.statements:
            raise ValueError("Nothing to write, as the script is empty!")
        with output.open("w") as fout:
            for stmt in self.statements:
                fout.write(stmt.formatted_text)
                fout.write("\n")

    @cached_property
    def digraph(self) -> nx.DiGraph:
        """Returns a directed graph with the table dependency structure of the sql code"""
        dsg = nx.DiGraph()
        for stmt in self.statements:
            if stmt.write_type != TableWriteType.NO_WRITING and stmt.affected_table:
                for source in stmt.source_tables:
                    dsg.add_edge(source, stmt.affected_table)
                    dsg.edges[source, stmt.affected_table][
                        "tooltip"
                    ] = stmt.formatted_text
        for _n in dsg.nodes():
            dsg.nodes[_n]["shape"] = "box"
            dsg.nodes[_n]["color"] = "blue"
        return dsg

    @validate_call
    def write_dot(self, file: Path, name: str = "dotted_script") -> Path:
        """Writes the dotfile for the graph representation"""
        dsg = self.digraph
        dsg.graph = {"name": name, "splines": "ortho", "rankdir": "LR"}
        _adg = nx.nx_agraph.to_agraph(dsg)
        _adg.write(file)
        return file

    @staticmethod
    @validate_call
    def write_svg(dot_file: Path) -> Path:
        """Produce the SVG from the dot_file"""
        subprocess.call(["dot", "-Tsvg", dot_file, "-O"])
        svg_file: Path = dot_file.parent / (dot_file.name + ".svg")
        return svg_file

    def stage_decomposition(self) -> list[set[str]]:
        """Use parallel decomposition of the graph representation
        to produce a parallelized orchestration of the table builds
        """
        if not self.statements:
            raise ValueError("I can't decompose an empty script!")
        pcg = list(parallel_decomposition(self.digraph))
        ell = []
        for stg in pcg[1:]:
            tables = set()
            for tbl in stg:
                if tbl in self.keys():
                    tables.add(tbl)
            ell.append(tables)

        return ell

    @validate_call
    def write_orchestration(self, staged_directory: Path):
        """Write the script to disk"""
        if not staged_directory.is_absolute():
            staged_directory = staged_directory.expanduser().resolve()

        if staged_directory.exists() and staged_directory.is_dir():
            raise IsADirectoryError(
                f"{staged_directory=} is an existing directory, I should not overwrite it!"
            )
        if staged_directory.exists() and staged_directory.is_file():
            raise FileExistsError(
                f"{staged_directory=} is an existing file, I should definitely not overwrite it!"
            )
        staged_directory.mkdir(parents=True, exist_ok=True)
        decomposed = self.stage_decomposition()
        for idx, stg in enumerate(decomposed):
            stage = staged_directory / f"stage_{str(idx+1).zfill(2)}"
            stage.mkdir()
            for tbl in stg:
                pth = stage / f"{tbl}.sql"
                with pth.open("w", encoding="utf-8") as fout:
                    for stmt in self[tbl]:
                        fout.write(str(stmt))
                        fout.write("\n")

digraph cached property

Returns a directed graph with the table dependency structure of the sql code

formatted_text property

Render string

is_valid property

Are all the statements in the script valid ones

statement_query_types cached property

The statement query types

statement_types property

The statement types

statement_write_types property

The statement write types

statements cached property

The list of statements that make up the script

__getitem__(item)

Dictionary form to retrieve statements pertinent to an affected table

Source code in xerini/script.py
def __getitem__(self, item: str) -> list[Statement]:
    """Dictionary form to retrieve statements pertinent to an affected table"""
    if item not in self.keys():
        raise KeyError(f"{item=} is not an affected table of the script!")
    return [
        stmt
        for stmt in self.statements
        if (
            (stmt.write_type != TableWriteType.NO_WRITING)
            & (stmt.affected_table == item)
        )
    ]

from_directory(directory) classmethod

Constructs the Script object from a directory of sql files

Source code in xerini/script.py
@classmethod
@validate_call
def from_directory(cls, directory: Path) -> Self:
    """Constructs the Script object from a directory of sql files"""
    if not directory.is_dir():
        raise NotADirectoryError(f"{dir=} is not a directory!")
    texts = []
    for pth in directory.iterdir():
        if pth.is_file() and pth.suffix.lower() == ".sql":
            with pth.open("r", encoding="utf-8") as fin:
                texts.append(fin.read())
    return cls(raw_code="\n".join(texts))

from_file(pth) classmethod

Constructs the Script object from the sql in a file

Source code in xerini/script.py
@classmethod
@validate_call
def from_file(cls, pth: Path) -> Self:
    """Constructs the Script object from the sql in a file"""
    if pth.suffix.lower() != ".sql":
        raise FileNotFoundError("I am only allowed to read *.sql files!")
    with pth.open("r", encoding="utf-8") as fin:
        text = fin.read()
    return cls(raw_code=text)

from_string(text) classmethod

This is just a helper function

Source code in xerini/script.py
@classmethod
@validate_call
def from_string(cls, text: str) -> Self:
    """This is just a helper function"""
    return cls(raw_code=text)

keys()

The keys to the dictionary are the names of the affected table of the code

Source code in xerini/script.py
def keys(self) -> set[str | None]:
    """The keys to the dictionary are the names of the affected table of the code"""
    return {
        stmt.affected_table
        for stmt in self.statements
        if stmt.write_type != TableWriteType.NO_WRITING
    }

stage_decomposition()

Use parallel decomposition of the graph representation
to produce a parallelized orchestration of the table builds

Source code in xerini/script.py
def stage_decomposition(self) -> list[set[str]]:
    """Use parallel decomposition of the graph representation
    to produce a parallelized orchestration of the table builds
    """
    if not self.statements:
        raise ValueError("I can't decompose an empty script!")
    pcg = list(parallel_decomposition(self.digraph))
    ell = []
    for stg in pcg[1:]:
        tables = set()
        for tbl in stg:
            if tbl in self.keys():
                tables.add(tbl)
        ell.append(tables)

    return ell

write(output)

Formatted output to the file at the path

Source code in xerini/script.py
@validate_call
def write(self, output: Path) -> None:
    """Formatted output to the file at the path"""
    if not self.statements:
        raise ValueError("Nothing to write, as the script is empty!")
    with output.open("w") as fout:
        for stmt in self.statements:
            fout.write(stmt.formatted_text)
            fout.write("\n")

write_dot(file, name='dotted_script')

Writes the dotfile for the graph representation

Source code in xerini/script.py
@validate_call
def write_dot(self, file: Path, name: str = "dotted_script") -> Path:
    """Writes the dotfile for the graph representation"""
    dsg = self.digraph
    dsg.graph = {"name": name, "splines": "ortho", "rankdir": "LR"}
    _adg = nx.nx_agraph.to_agraph(dsg)
    _adg.write(file)
    return file

write_orchestration(staged_directory)

Write the script to disk

Source code in xerini/script.py
@validate_call
def write_orchestration(self, staged_directory: Path):
    """Write the script to disk"""
    if not staged_directory.is_absolute():
        staged_directory = staged_directory.expanduser().resolve()

    if staged_directory.exists() and staged_directory.is_dir():
        raise IsADirectoryError(
            f"{staged_directory=} is an existing directory, I should not overwrite it!"
        )
    if staged_directory.exists() and staged_directory.is_file():
        raise FileExistsError(
            f"{staged_directory=} is an existing file, I should definitely not overwrite it!"
        )
    staged_directory.mkdir(parents=True, exist_ok=True)
    decomposed = self.stage_decomposition()
    for idx, stg in enumerate(decomposed):
        stage = staged_directory / f"stage_{str(idx+1).zfill(2)}"
        stage.mkdir()
        for tbl in stg:
            pth = stage / f"{tbl}.sql"
            with pth.open("w", encoding="utf-8") as fout:
                for stmt in self[tbl]:
                    fout.write(str(stmt))
                    fout.write("\n")

write_svg(dot_file) staticmethod

Produce the SVG from the dot_file

Source code in xerini/script.py
@staticmethod
@validate_call
def write_svg(dot_file: Path) -> Path:
    """Produce the SVG from the dot_file"""
    subprocess.call(["dot", "-Tsvg", dot_file, "-O"])
    svg_file: Path = dot_file.parent / (dot_file.name + ".svg")
    return svg_file