Source code for debusine.db.models.scopes

# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Data models for db scopes."""

import enum
import re
from typing import Any, Self, TYPE_CHECKING, TypeAlias, cast

import pgtrigger
from django.core.exceptions import ImproperlyConfigured, ValidationError
from django.db import models
from django.db.models import (
    CheckConstraint,
    Exists,
    F,
    OuterRef,
    Q,
    QuerySet,
    UniqueConstraint,
)

from debusine.db.models import permissions
from debusine.db.models.files import File, FileStore
from debusine.db.models.permissions import (
    Allow,
    Role,
    permission_check,
    permission_filter,
)
from debusine.db.permissioncontext import PermissionContext

if TYPE_CHECKING:
    from django.http import HttpRequest
    from django_stubs_ext.db.models import TypedModelMeta

    from debusine.server.file_backend.interface import FileBackendInterface
    from debusine.web.views.ui.scopes import ScopeUI
else:
    TypedModelMeta = object

#: Scope names reserved for use in toplevel URL path components
RESERVED_SCOPE_NAMES = frozenset(
    (
        "accounts",
        "admin",
        "api",
        "api-auth",
        "artifact",
        "task-status",
        "user",
        "workers",
        "work-request",
        "workspace",
    )
)

#: Regexp matching the structure of scope names
scope_name_regex = re.compile(r"^[A-Za-z][A-Za-z0-9+._-]*$")


def is_valid_scope_name(value: str) -> bool:
    """Check if value is a valid scope name."""
    if value in RESERVED_SCOPE_NAMES:
        return False
    return bool(scope_name_regex.match(value))


def validate_scope_name(value: str) -> None:
    """Validate scope names."""
    if not is_valid_scope_name(value):
        raise ValidationError(
            "%(value)r is not a valid scope name", params={"value": value}
        )


class ScopeRoleBase(permissions.RoleBase):
    """Scope role implementation."""

    implied_by_scope_roles: frozenset["ScopeRoles"]

    def _setup(self) -> None:
        """Set up implications for a newly constructed role."""
        implied_by_scope_roles: set[ScopeRoles] = {cast(ScopeRoles, self)}
        for i in self.implied_by:
            match i:
                case Role():
                    # Resolve a role passed during class definition into its
                    # enum instance
                    role = self.__class__(i.value)
                    implied_by_scope_roles |= role.implied_by_scope_roles
                case _:
                    raise ImproperlyConfigured(
                        f"Scope roles do not support implications by {type(i)}"
                    )
        self.implied_by_scope_roles = frozenset(implied_by_scope_roles)

    def q(self, pc: PermissionContext) -> Q:
        """Return a Q expression to select scopes with this role."""
        return Q(
            roles__group__in=pc.groups(),
            roles__role__in=self.implied_by_scope_roles,
        )

    def implies(self, role: "ScopeRoles") -> bool:
        """Check if this role implies the given one."""
        return self in role.implied_by_scope_roles


class ScopeRoles(permissions.Roles, ScopeRoleBase, enum.ReprEnum):
    """Available roles for a Scope."""

    OWNER = Role(
        "owner",
        description="Full administrative access",
    )

    # Read-only access to the whole scope.
    OBSERVER = Role(
        "observer",
        implied_by=[OWNER],
        description="Read-only access to all workspaces",
    )

    # "viewer" is reserved for possibly adding private scopes in the future,
    # in which case we may need a distinction between "can view the scope
    # itself" and "can view all workspaces in the scope".


ScopeRoles.setup()


class ScopeQuerySet[A](QuerySet["Scope", A]):
    """Custom QuerySet for Scope."""

    def with_role(self, pc: PermissionContext, role: ScopeRoles) -> Self:
        """Keep only resources where the user has the given role."""
        if not pc.user.is_authenticated:
            return self.none()
        if pc.user.is_system:
            return self
        return self.filter(
            Exists(self.model.objects.filter(role.q(pc), pk=OuterRef("pk")))
        )

    @permission_filter(work_request=Allow.PASS, anonymous=Allow.PASS)
    def can_display(self, pc: PermissionContext) -> Self:  # noqa: ARG002, U100
        """Keep only Scopes that can be displayed."""
        return self

    @permission_filter()
    def can_create_workspace(self, pc: PermissionContext) -> Self:
        """Keep only Scopes where the user can create workspaces."""
        return self.with_role(pc, Scope.Roles.OWNER)


class ScopeManager(models.Manager["Scope"]):
    """Manager for Scope model."""

    def get_roles_model(self) -> type["ScopeRole"]:
        """Get the model used for role assignment."""
        return ScopeRole

    def get_queryset(self) -> ScopeQuerySet[Any]:
        """Use the custom QuerySet."""
        return ScopeQuerySet(self.model, using=self._db)


[docs] class Scope(models.Model): """ Scope model. This is used to create different distinct sets of groups and workspaces """ Roles: TypeAlias = ScopeRoles name = models.CharField( max_length=255, unique=True, validators=[validate_scope_name], help_text="internal name for the scope", ) label = models.CharField( max_length=255, unique=True, help_text="User-visible name for the scope", ) icon = models.CharField( max_length=255, default="", blank=True, help_text=( "Optional user-visible icon," " resolved via ``{% static %}`` in templates" ), ) file_stores = models.ManyToManyField( FileStore, related_name="scopes", through="db.FileStoreInScope" ) objects = ScopeManager.from_queryset(ScopeQuerySet)() class Meta(TypedModelMeta): base_manager_name = "objects" def __str__(self) -> str: """Return basic information of Scope.""" return self.name
[docs] def ui(self, request: "HttpRequest") -> "ScopeUI": """Return a UI helper for this instance.""" from debusine.web.views.ui.scopes import ScopeUI return ScopeUI.get(request, self)
[docs] @permission_check( "{user} cannot display scope {resource}", work_request=Allow.PASS, anonymous=Allow.PASS, ) def can_display(self, pc: PermissionContext) -> bool: # noqa: ARG002, U100 """Check if the scope can be displayed.""" return True
[docs] @permission_check( "{user} cannot create workspaces in {resource}", ) def can_create_workspace(self, pc: PermissionContext) -> bool: """Check if the user can create workspaces in this scope.""" return self.has_role(pc, Scope.Roles.OWNER)
[docs] def has_role(self, pc: PermissionContext, role: ScopeRoles) -> bool: """Check if the user has the given role on this Scope.""" from debusine.db.context import context if not pc.user.is_authenticated: return False if pc.user.is_system: return True if context.pc_is(pc) and context.scope == self: return any(r.implies(role) for r in context.scope_roles) return Scope.objects.with_role(pc, role).filter(pk=self.pk).exists()
# See https://github.com/typeddjango/django-stubs/issues/1047 for the typing
[docs] def get_group_roles( self, pc: PermissionContext ) -> QuerySet["ScopeRole", str]: """Get the roles of the user on this scope.""" if not pc.user.is_authenticated: return ScopeRole.objects.none().values_list("role", flat=True) else: return ( ScopeRole.objects.filter(resource=self, group__in=pc.groups()) .values_list("role", flat=True) .distinct() )
[docs] def upload_file_stores( self, fileobj: File, *, enforce_soft_limits: bool = False, include_write_only: bool = False, ) -> QuerySet[FileStore]: """ Find the file stores in this scope where `fileobj` can be uploaded. The returned query set is in descending order of upload priority, breaking ties by ascending file store IDs. :param enforce_soft_limits: Enforce `soft_max_size` policies as well as `max_size`. :param include_write_only: Uploading a file to a write-only store will mean that debusine won't download it from there again, so that only makes sense in specialized situations such as populating a backup store. Set this to True to include write-only stores in the returned query set. """ file_stores = ( self.file_stores.annotate_current_total_size() .exclude(filestoreinscope__read_only=True) .exclude(max_size__lt=F("current_total_size") + fileobj.size) .order_by( F("filestoreinscope__upload_priority").desc(nulls_last=True), "id", ) ) if enforce_soft_limits: # TODO: We should also enforce FileStoreInScope.soft_max_size. file_stores = file_stores.exclude( soft_max_size__lt=F("current_total_size") + fileobj.size ) if not include_write_only: file_stores = file_stores.exclude(filestoreinscope__write_only=True) return file_stores
[docs] def upload_file_backend(self, fileobj: File) -> "FileBackendInterface[Any]": """ Find the best file backend for uploading `fileobj`. :raises IndexError: if there is no such file backend. """ return self.upload_file_stores(fileobj)[0].get_backend_object()
[docs] def download_file_stores(self, fileobj: File) -> QuerySet[FileStore]: """ Find the file stores in this scope that have `fileobj`, if any. The returned query set is in descending order of download priority, breaking ties in descending order of upload priority and then by ascending file store IDs. """ return ( self.file_stores.exclude(filestoreinscope__write_only=True) .filter(files=fileobj) .order_by( F("filestoreinscope__download_priority").desc(nulls_last=True), F("filestoreinscope__upload_priority").desc(nulls_last=True), "id", ) )
[docs] def download_file_backend( self, fileobj: File ) -> "FileBackendInterface[Any]": """ Find the best file backend for downloading `fileobj`. :raises IndexError: if there is no such file backend. """ return self.download_file_stores(fileobj)[0].get_backend_object()
class ScopeRole(models.Model): """Role assignments for scopes.""" Roles: TypeAlias = ScopeRoles resource = models.ForeignKey( Scope, on_delete=models.CASCADE, related_name="roles", ) group = models.ForeignKey( "Group", on_delete=models.CASCADE, related_name="scope_roles", ) role = models.CharField(choices=Roles.choices) class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["resource", "group", "role"], name="%(app_label)s_%(class)s_unique_resource_group_role", ), ] def __str__(self) -> str: """Return a description of the role assignment.""" return f"{self.group}{self.role}{self.resource}"
[docs] class FileStoreInScope(models.Model): """Database model used for extra data on Scope/FileStore relations.""" scope = models.ForeignKey(Scope, on_delete=models.PROTECT) file_store = models.ForeignKey(FileStore, on_delete=models.PROTECT) _file_store_instance_wide = models.BooleanField( default=True, editable=False, db_column="file_store_instance_wide", help_text="Synced from FileStore.instance_wide; do not update directly", ) #: The priority of this store for the purpose of storing new files. #: When adding a new file, debusine tries stores whose policies allow #: adding new files in descending order of upload priority, counting #: null as the lowest. upload_priority = models.IntegerField(blank=True, null=True) #: The priority of this store for the purpose of serving files to #: clients. When downloading a file, debusine tries stores in #: descending order of download priority, counting null as the lowest; #: it breaks ties in descending order of upload priority, again counting #: null as the lowest. If there is still a tie, it picks one of the #: possibilities arbitrarily. download_priority = models.IntegerField(blank=True, null=True) #: If True, the storage maintenance job ensures that this store has a #: copy of all files in the scope. populate = models.BooleanField(default=False) #: If True, the storage maintenance job moves all files in this scope to #: some other store in the same scope, following the same rules for #: finding a target store as for uploads of new files. It does not move #: into a store if that would take its total size over `soft_max_size` #: (either for the scope or the file store), and it logs an error if it #: cannot find any eligible target store. drain = models.BooleanField(default=False) #: If this field is set, then constrain `drain` to use the store with #: the given name in this scope. # # TODO: ruff is correct that we shouldn't use null=True on a TextField, # but fixing that retroactively is non-trivial. drain_to = models.TextField(blank=True, null=True) # noqa: DJ001 #: If True, debusine will not add new files to this store. Use this in #: combination with `drain` to prepare for removing the file store. read_only = models.BooleanField(default=False) #: If True, debusine will not read files from this store. This is #: suitable for provider storage classes that are designed for long-term #: archival rather than routine retrieval, such as S3 Glacier Deep #: Archive. write_only = models.BooleanField(default=False) #: An integer specifying the number of bytes that the file store can #: hold for this scope (accounting files that are in multiple scopes to #: all of the scopes in question). This limit may be exceeded #: temporarily during uploads; the storage maintenance job will move the #: least-recently-used files to another file store to get back below the #: limit. soft_max_size = models.IntegerField(blank=True, null=True) class Meta(TypedModelMeta): triggers = [ pgtrigger.Trigger( name="db_filestoreinscope_sync_instance_wide", operation=pgtrigger.Insert | pgtrigger.Update, when=pgtrigger.Before, func=" ".join( """ NEW.file_store_instance_wide = (SELECT db_filestore.instance_wide FROM db_filestore WHERE db_filestore.id = NEW.file_store_id); RETURN NEW; """.split() ), ) ] constraints = [ UniqueConstraint( fields=["scope", "file_store"], name="%(app_label)s_%(class)s_unique_scope_file_store", ), UniqueConstraint( fields=["file_store"], name=( "%(app_label)s_%(class)s_" "unique_file_store_not_instance_wide" ), condition=Q(_file_store_instance_wide=False), ), # It does not make sense to request a store to be populated # while also requesting it to be either drained or read-only. CheckConstraint( name="%(app_label)s_%(class)s_consistent_populate", check=Q(populate=False) | Q(drain=False, read_only=False), ), ] def __str__(self) -> str: """Return basic information of FileStoreInScope.""" return f"{self.scope}/{self.file_store.name}"