mirror of
https://github.com/etesync/server
synced 2025-01-21 05:50:55 +00:00
a54afd5210
No need to account for revisions that are not current when calculating stokens because those, by definition, are not the latest ones, and therefore won't have the most recent stokens. This becomes a problem when collections have many associated revisions.
632 lines
22 KiB
Python
632 lines
22 KiB
Python
import typing as t
|
|
|
|
from asgiref.sync import sync_to_async
|
|
from django.core import exceptions as django_exceptions
|
|
from django.core.files.base import ContentFile
|
|
from django.db import transaction, IntegrityError
|
|
from django.db.models import Q, QuerySet
|
|
from fastapi import APIRouter, Depends, status, Request, BackgroundTasks
|
|
|
|
from etebase_server.django import models
|
|
from etebase_server.myauth.models import UserType
|
|
from .authentication import get_authenticated_user
|
|
from .websocket import get_ticket, TicketRequest, TicketOut
|
|
from ..exceptions import HttpError, transform_validation_error, PermissionDenied, ValidationError
|
|
from ..msgpack import MsgpackRoute
|
|
from ..stoken_handler import filter_by_stoken_and_limit, filter_by_stoken, get_stoken_obj, get_queryset_stoken
|
|
from ..utils import (
|
|
get_object_or_404,
|
|
Context,
|
|
Prefetch,
|
|
PrefetchQuery,
|
|
is_collection_admin,
|
|
msgpack_encode,
|
|
BaseModel,
|
|
permission_responses,
|
|
PERMISSIONS_READ,
|
|
PERMISSIONS_READWRITE,
|
|
)
|
|
from ..dependencies import get_collection_queryset, get_item_queryset, get_collection
|
|
from ..sendfile import sendfile
|
|
from ..redis import redisw
|
|
from ..db_hack import django_db_cleanup_decorator
|
|
|
|
collection_router = APIRouter(route_class=MsgpackRoute, responses=permission_responses)
|
|
item_router = APIRouter(route_class=MsgpackRoute, responses=permission_responses)
|
|
CollectionQuerySet = QuerySet[models.Collection]
|
|
CollectionItemQuerySet = QuerySet[models.CollectionItem]
|
|
|
|
|
|
class ListMulti(BaseModel):
|
|
collectionTypes: t.List[bytes]
|
|
|
|
|
|
ChunkType = t.Tuple[str, t.Optional[bytes]]
|
|
|
|
|
|
class CollectionItemRevisionInOut(BaseModel):
|
|
uid: str
|
|
meta: bytes
|
|
deleted: bool
|
|
chunks: t.List[ChunkType]
|
|
|
|
class Config:
|
|
orm_mode = True
|
|
|
|
@classmethod
|
|
def from_orm_context(
|
|
cls: t.Type["CollectionItemRevisionInOut"], obj: models.CollectionItemRevision, context: Context
|
|
) -> "CollectionItemRevisionInOut":
|
|
chunks: t.List[ChunkType] = []
|
|
for chunk_relation in obj.chunks_relation.all():
|
|
chunk_obj = chunk_relation.chunk
|
|
if context.prefetch == "auto":
|
|
with open(chunk_obj.chunkFile.path, "rb") as f:
|
|
chunks.append((chunk_obj.uid, f.read()))
|
|
else:
|
|
chunks.append((chunk_obj.uid, None))
|
|
return cls(uid=obj.uid, meta=bytes(obj.meta), deleted=obj.deleted, chunks=chunks)
|
|
|
|
|
|
class CollectionItemCommon(BaseModel):
|
|
uid: str
|
|
version: int
|
|
encryptionKey: t.Optional[bytes]
|
|
content: CollectionItemRevisionInOut
|
|
|
|
|
|
class CollectionItemOut(CollectionItemCommon):
|
|
class Config:
|
|
orm_mode = True
|
|
|
|
@classmethod
|
|
def from_orm_context(
|
|
cls: t.Type["CollectionItemOut"], obj: models.CollectionItem, context: Context
|
|
) -> "CollectionItemOut":
|
|
return cls(
|
|
uid=obj.uid,
|
|
version=obj.version,
|
|
encryptionKey=obj.encryptionKey,
|
|
content=CollectionItemRevisionInOut.from_orm_context(obj.content, context),
|
|
)
|
|
|
|
|
|
class CollectionItemIn(CollectionItemCommon):
|
|
etag: t.Optional[str]
|
|
|
|
|
|
class CollectionCommon(BaseModel):
|
|
# FIXME: remove optional once we finish collection-type-migration
|
|
collectionType: t.Optional[bytes]
|
|
collectionKey: bytes
|
|
|
|
|
|
class CollectionOut(CollectionCommon):
|
|
accessLevel: models.AccessLevels
|
|
stoken: str
|
|
item: CollectionItemOut
|
|
|
|
@classmethod
|
|
def from_orm_context(cls: t.Type["CollectionOut"], obj: models.Collection, context: Context) -> "CollectionOut":
|
|
member: models.CollectionMember = obj.members.get(user=context.user)
|
|
collection_type = member.collectionType
|
|
assert obj.main_item is not None
|
|
ret = cls(
|
|
collectionType=collection_type and bytes(collection_type.uid),
|
|
collectionKey=bytes(member.encryptionKey),
|
|
accessLevel=member.accessLevel,
|
|
stoken=obj.stoken,
|
|
item=CollectionItemOut.from_orm_context(obj.main_item, context),
|
|
)
|
|
return ret
|
|
|
|
|
|
class CollectionIn(CollectionCommon):
|
|
item: CollectionItemIn
|
|
|
|
|
|
class RemovedMembershipOut(BaseModel):
|
|
uid: str
|
|
|
|
|
|
class CollectionListResponse(BaseModel):
|
|
data: t.List[CollectionOut]
|
|
stoken: t.Optional[str]
|
|
done: bool
|
|
|
|
removedMemberships: t.Optional[t.List[RemovedMembershipOut]]
|
|
|
|
|
|
class CollectionItemListResponse(BaseModel):
|
|
data: t.List[CollectionItemOut]
|
|
stoken: t.Optional[str]
|
|
done: bool
|
|
|
|
|
|
class CollectionItemRevisionListResponse(BaseModel):
|
|
data: t.List[CollectionItemRevisionInOut]
|
|
iterator: t.Optional[str]
|
|
done: bool
|
|
|
|
|
|
class CollectionItemBulkGetIn(BaseModel):
|
|
uid: str
|
|
etag: t.Optional[str]
|
|
|
|
|
|
class ItemDepIn(BaseModel):
|
|
uid: str
|
|
etag: str
|
|
|
|
def validate_db(self):
|
|
item = models.CollectionItem.objects.get(uid=self.uid)
|
|
etag = self.etag
|
|
if item.etag != etag:
|
|
raise ValidationError(
|
|
"wrong_etag",
|
|
"Wrong etag. Expected {} got {}".format(item.etag, etag),
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
field=self.uid,
|
|
)
|
|
|
|
|
|
class ItemBatchIn(BaseModel):
|
|
items: t.List[CollectionItemIn]
|
|
deps: t.Optional[t.List[ItemDepIn]]
|
|
|
|
def validate_db(self):
|
|
if self.deps is not None:
|
|
errors: t.List[HttpError] = []
|
|
for dep in self.deps:
|
|
try:
|
|
dep.validate_db()
|
|
except ValidationError as e:
|
|
errors.append(e)
|
|
if len(errors) > 0:
|
|
raise ValidationError(
|
|
code="dep_failed",
|
|
detail="Dependencies failed to validate",
|
|
errors=errors,
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
)
|
|
|
|
|
|
async def report_items_changed(col_uid: str, stoken: str, items: t.List[CollectionItemIn]):
|
|
if not redisw.is_active:
|
|
return
|
|
|
|
redis = redisw.redis
|
|
content = msgpack_encode(CollectionItemListResponse(data=items, stoken=stoken, done=True).dict())
|
|
await redis.publish(f"col.{col_uid}", content)
|
|
|
|
|
|
def collection_list_common(
|
|
queryset: CollectionQuerySet,
|
|
user: UserType,
|
|
stoken: t.Optional[str],
|
|
limit: int,
|
|
prefetch: Prefetch,
|
|
) -> CollectionListResponse:
|
|
result, new_stoken_obj, done = filter_by_stoken_and_limit(
|
|
stoken, limit, queryset.filter(items__revisions__current=True), models.Collection.stoken_annotation
|
|
)
|
|
new_stoken = new_stoken_obj and new_stoken_obj.uid
|
|
context = Context(user, prefetch)
|
|
data: t.List[CollectionOut] = [CollectionOut.from_orm_context(item, context) for item in result]
|
|
|
|
ret = CollectionListResponse(data=data, stoken=new_stoken, done=done)
|
|
|
|
stoken_obj = get_stoken_obj(stoken)
|
|
if stoken_obj is not None:
|
|
# FIXME: honour limit? (the limit should be combined for data and this because of stoken)
|
|
remed_qs = models.CollectionMemberRemoved.objects.filter(user=user, stoken__id__gt=stoken_obj.id)
|
|
if not done and new_stoken_obj is not None:
|
|
# We only filter by the new_stoken if we are not done. This is because if we are done, the new stoken
|
|
# can point to the most recent collection change rather than most recent removed membership.
|
|
remed_qs = remed_qs.filter(stoken__id__lte=new_stoken_obj.id)
|
|
|
|
remed = remed_qs.values_list("collection__uid", flat=True)
|
|
if len(remed) > 0:
|
|
ret.removedMemberships = [RemovedMembershipOut(uid=x) for x in remed]
|
|
|
|
return ret
|
|
|
|
|
|
# permissions
|
|
|
|
|
|
@django_db_cleanup_decorator
|
|
def verify_collection_admin(
|
|
collection: models.Collection = Depends(get_collection), user: UserType = Depends(get_authenticated_user)
|
|
):
|
|
if not is_collection_admin(collection, user):
|
|
raise PermissionDenied("admin_access_required", "Only collection admins can perform this operation.")
|
|
|
|
|
|
@django_db_cleanup_decorator
|
|
def has_write_access(
|
|
collection: models.Collection = Depends(get_collection), user: UserType = Depends(get_authenticated_user)
|
|
):
|
|
member = collection.members.get(user=user)
|
|
if member.accessLevel == models.AccessLevels.READ_ONLY:
|
|
raise PermissionDenied("no_write_access", "You need write access to write to this collection")
|
|
|
|
|
|
# paths
|
|
|
|
|
|
@collection_router.post(
|
|
"/list_multi/",
|
|
response_model=CollectionListResponse,
|
|
response_model_exclude_unset=True,
|
|
dependencies=PERMISSIONS_READ,
|
|
)
|
|
def list_multi(
|
|
data: ListMulti,
|
|
stoken: t.Optional[str] = None,
|
|
limit: int = 50,
|
|
queryset: CollectionQuerySet = Depends(get_collection_queryset),
|
|
user: UserType = Depends(get_authenticated_user),
|
|
prefetch: Prefetch = PrefetchQuery,
|
|
):
|
|
# FIXME: Remove the isnull part once we attach collection types to all objects ("collection-type-migration")
|
|
queryset = queryset.filter(
|
|
Q(members__collectionType__uid__in=data.collectionTypes) | Q(members__collectionType__isnull=True)
|
|
)
|
|
|
|
return collection_list_common(queryset, user, stoken, limit, prefetch)
|
|
|
|
|
|
@collection_router.get("/", response_model=CollectionListResponse, dependencies=PERMISSIONS_READ)
|
|
def collection_list(
|
|
stoken: t.Optional[str] = None,
|
|
limit: int = 50,
|
|
prefetch: Prefetch = PrefetchQuery,
|
|
user: UserType = Depends(get_authenticated_user),
|
|
queryset: CollectionQuerySet = Depends(get_collection_queryset),
|
|
):
|
|
return collection_list_common(queryset, user, stoken, limit, prefetch)
|
|
|
|
|
|
def process_revisions_for_item(item: models.CollectionItem, revision_data: CollectionItemRevisionInOut):
|
|
chunks_objs = []
|
|
|
|
revision = models.CollectionItemRevision(**revision_data.dict(exclude={"chunks"}), item=item)
|
|
revision.validate_unique() # Verify there aren't any validation issues
|
|
|
|
for chunk in revision_data.chunks:
|
|
uid = chunk[0]
|
|
chunk_obj = models.CollectionItemChunk.objects.filter(uid=uid).first()
|
|
content = chunk[1] if len(chunk) > 1 else None
|
|
# If the chunk already exists we assume it's fine. Otherwise, we upload it.
|
|
if chunk_obj is None:
|
|
if content is not None:
|
|
chunk_obj = models.CollectionItemChunk(uid=uid, collection=item.collection)
|
|
chunk_obj.chunkFile.save("IGNORED", ContentFile(content))
|
|
chunk_obj.save()
|
|
else:
|
|
raise ValidationError("chunk_no_content", "Tried to create a new chunk without content")
|
|
|
|
chunks_objs.append(chunk_obj)
|
|
|
|
stoken = models.Stoken.objects.create()
|
|
revision.stoken = stoken
|
|
revision.save()
|
|
|
|
for chunk2 in chunks_objs:
|
|
models.RevisionChunkRelation.objects.create(chunk=chunk2, revision=revision)
|
|
return revision
|
|
|
|
|
|
def _create(data: CollectionIn, user: UserType):
|
|
with transaction.atomic():
|
|
if data.item.etag is not None:
|
|
raise ValidationError("bad_etag", "etag is not null")
|
|
instance = models.Collection(uid=data.item.uid, owner=user)
|
|
try:
|
|
instance.validate_unique()
|
|
except django_exceptions.ValidationError:
|
|
raise ValidationError(
|
|
"unique_uid", "Collection with this uid already exists", status_code=status.HTTP_409_CONFLICT
|
|
)
|
|
instance.save()
|
|
|
|
main_item = models.CollectionItem.objects.create(
|
|
uid=data.item.uid, version=data.item.version, collection=instance
|
|
)
|
|
|
|
instance.main_item = main_item
|
|
instance.save()
|
|
|
|
# TODO
|
|
process_revisions_for_item(main_item, data.item.content)
|
|
|
|
collection_type_obj, _ = models.CollectionType.objects.get_or_create(uid=data.collectionType, owner=user)
|
|
|
|
models.CollectionMember(
|
|
collection=instance,
|
|
stoken=models.Stoken.objects.create(),
|
|
user=user,
|
|
accessLevel=models.AccessLevels.ADMIN,
|
|
encryptionKey=data.collectionKey,
|
|
collectionType=collection_type_obj,
|
|
).save()
|
|
|
|
|
|
@collection_router.post("/", status_code=status.HTTP_201_CREATED, dependencies=PERMISSIONS_READWRITE)
|
|
def create(data: CollectionIn, user: UserType = Depends(get_authenticated_user)):
|
|
_create(data, user)
|
|
|
|
|
|
@collection_router.get("/{collection_uid}/", response_model=CollectionOut, dependencies=PERMISSIONS_READ)
|
|
def collection_get(
|
|
obj: models.Collection = Depends(get_collection),
|
|
user: UserType = Depends(get_authenticated_user),
|
|
prefetch: Prefetch = PrefetchQuery,
|
|
):
|
|
return CollectionOut.from_orm_context(obj, Context(user, prefetch))
|
|
|
|
|
|
def item_create(item_model: CollectionItemIn, collection: models.Collection, validate_etag: bool):
|
|
"""Function that's called when this serializer creates an item"""
|
|
etag = item_model.etag
|
|
revision_data = item_model.content
|
|
uid = item_model.uid
|
|
|
|
Model = models.CollectionItem
|
|
|
|
with transaction.atomic():
|
|
instance, created = Model.objects.get_or_create(
|
|
uid=uid, collection=collection, defaults=item_model.dict(exclude={"uid", "etag", "content"})
|
|
)
|
|
cur_etag = instance.etag if not created else None
|
|
|
|
# If we are trying to update an up to date item, abort early and consider it a success
|
|
if cur_etag == revision_data.uid:
|
|
return instance
|
|
|
|
if validate_etag and cur_etag != etag:
|
|
raise ValidationError(
|
|
"wrong_etag",
|
|
"Wrong etag. Expected {} got {}".format(cur_etag, etag),
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
field=uid,
|
|
)
|
|
|
|
if not created:
|
|
# We don't have to use select_for_update here because the unique constraint on current guards against
|
|
# the race condition. But it's a good idea because it'll lock and wait rather than fail.
|
|
current_revision = instance.revisions.filter(current=True).select_for_update()[0]
|
|
assert current_revision is not None
|
|
current_revision.current = None
|
|
current_revision.save()
|
|
|
|
try:
|
|
process_revisions_for_item(instance, revision_data)
|
|
except django_exceptions.ValidationError as e:
|
|
transform_validation_error("content", e)
|
|
|
|
return instance
|
|
|
|
|
|
@item_router.get("/item/{item_uid}/", response_model=CollectionItemOut, dependencies=PERMISSIONS_READ)
|
|
def item_get(
|
|
item_uid: str,
|
|
queryset: CollectionItemQuerySet = Depends(get_item_queryset),
|
|
user: UserType = Depends(get_authenticated_user),
|
|
prefetch: Prefetch = PrefetchQuery,
|
|
):
|
|
obj = queryset.get(uid=item_uid)
|
|
return CollectionItemOut.from_orm_context(obj, Context(user, prefetch))
|
|
|
|
|
|
def item_list_common(
|
|
queryset: CollectionItemQuerySet,
|
|
user: UserType,
|
|
stoken: t.Optional[str],
|
|
limit: int,
|
|
prefetch: Prefetch,
|
|
) -> CollectionItemListResponse:
|
|
result, new_stoken_obj, done = filter_by_stoken_and_limit(
|
|
stoken, limit, queryset.filter(revisions__current=True), models.CollectionItem.stoken_annotation
|
|
)
|
|
new_stoken = new_stoken_obj and new_stoken_obj.uid
|
|
context = Context(user, prefetch)
|
|
data: t.List[CollectionItemOut] = [CollectionItemOut.from_orm_context(item, context) for item in result]
|
|
return CollectionItemListResponse(data=data, stoken=new_stoken, done=done)
|
|
|
|
|
|
@item_router.get("/item/", response_model=CollectionItemListResponse, dependencies=PERMISSIONS_READ)
|
|
def item_list(
|
|
queryset: CollectionItemQuerySet = Depends(get_item_queryset),
|
|
stoken: t.Optional[str] = None,
|
|
limit: int = 50,
|
|
prefetch: Prefetch = PrefetchQuery,
|
|
withCollection: bool = False,
|
|
user: UserType = Depends(get_authenticated_user),
|
|
):
|
|
if not withCollection:
|
|
queryset = queryset.filter(parent__isnull=True)
|
|
|
|
response = item_list_common(queryset, user, stoken, limit, prefetch)
|
|
return response
|
|
|
|
|
|
@item_router.post("/item/subscription-ticket/", response_model=TicketOut, dependencies=PERMISSIONS_READ)
|
|
async def item_list_subscription_ticket(
|
|
collection: models.Collection = Depends(get_collection),
|
|
user: UserType = Depends(get_authenticated_user),
|
|
):
|
|
"""Get an authentication ticket that can be used with the websocket endpoint"""
|
|
return await get_ticket(TicketRequest(collection=collection.uid), user)
|
|
|
|
|
|
def item_bulk_common(
|
|
data: ItemBatchIn,
|
|
user: UserType,
|
|
stoken: t.Optional[str],
|
|
uid: str,
|
|
validate_etag: bool,
|
|
background_tasks: BackgroundTasks,
|
|
):
|
|
queryset = get_collection_queryset(user)
|
|
with transaction.atomic(): # We need this for locking the collection object
|
|
collection_object = queryset.select_for_update().get(uid=uid)
|
|
|
|
if stoken and stoken != collection_object.stoken:
|
|
raise HttpError("stale_stoken", "Stoken is too old", status_code=status.HTTP_409_CONFLICT)
|
|
|
|
data.validate_db()
|
|
|
|
errors: t.List[HttpError] = []
|
|
for item in data.items:
|
|
try:
|
|
item_create(item, collection_object, validate_etag)
|
|
except ValidationError as e:
|
|
errors.append(e)
|
|
|
|
if len(errors) > 0:
|
|
raise ValidationError(
|
|
code="item_failed",
|
|
detail="Items failed to validate",
|
|
errors=errors,
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
)
|
|
|
|
background_tasks.add_task(report_items_changed, collection_object.uid, collection_object.stoken, data.items)
|
|
|
|
|
|
@item_router.get(
|
|
"/item/{item_uid}/revision/", response_model=CollectionItemRevisionListResponse, dependencies=PERMISSIONS_READ
|
|
)
|
|
def item_revisions(
|
|
item_uid: str,
|
|
limit: int = 50,
|
|
iterator: t.Optional[str] = None,
|
|
prefetch: Prefetch = PrefetchQuery,
|
|
user: UserType = Depends(get_authenticated_user),
|
|
items: CollectionItemQuerySet = Depends(get_item_queryset),
|
|
):
|
|
item = get_object_or_404(items, uid=item_uid)
|
|
|
|
queryset = item.revisions.order_by("-id")
|
|
|
|
if iterator is not None:
|
|
iterator_obj = get_object_or_404(queryset, uid=iterator)
|
|
queryset = queryset.filter(id__lt=iterator_obj.id)
|
|
|
|
result = list(queryset[: limit + 1])
|
|
if len(result) < limit + 1:
|
|
done = True
|
|
else:
|
|
done = False
|
|
result = result[:-1]
|
|
|
|
context = Context(user, prefetch)
|
|
ret_data = [CollectionItemRevisionInOut.from_orm_context(revision, context) for revision in result]
|
|
iterator = ret_data[-1].uid if len(result) > 0 else None
|
|
|
|
return CollectionItemRevisionListResponse(
|
|
data=ret_data,
|
|
iterator=iterator,
|
|
done=done,
|
|
)
|
|
|
|
|
|
@item_router.post("/item/fetch_updates/", response_model=CollectionItemListResponse, dependencies=PERMISSIONS_READ)
|
|
def fetch_updates(
|
|
data: t.List[CollectionItemBulkGetIn],
|
|
stoken: t.Optional[str] = None,
|
|
prefetch: Prefetch = PrefetchQuery,
|
|
user: UserType = Depends(get_authenticated_user),
|
|
queryset: CollectionItemQuerySet = Depends(get_item_queryset),
|
|
):
|
|
# FIXME: make configurable?
|
|
item_limit = 200
|
|
|
|
if len(data) > item_limit:
|
|
raise HttpError("too_many_items", "Request has too many items.", status_code=status.HTTP_400_BAD_REQUEST)
|
|
|
|
queryset, stoken_rev = filter_by_stoken(stoken, queryset, models.CollectionItem.stoken_annotation)
|
|
|
|
uids, etags = zip(*[(item.uid, item.etag) for item in data])
|
|
revs = models.CollectionItemRevision.objects.filter(uid__in=etags, current=True)
|
|
queryset = queryset.filter(uid__in=uids).exclude(revisions__in=revs)
|
|
|
|
new_stoken_obj = get_queryset_stoken(queryset)
|
|
new_stoken = new_stoken_obj and new_stoken_obj.uid
|
|
stoken_rev_uid = stoken_rev and getattr(stoken_rev, "uid", None)
|
|
new_stoken = new_stoken or stoken_rev_uid
|
|
|
|
context = Context(user, prefetch)
|
|
return CollectionItemListResponse(
|
|
data=[CollectionItemOut.from_orm_context(item, context) for item in queryset],
|
|
stoken=new_stoken,
|
|
done=True, # we always return all the items, so it's always done
|
|
)
|
|
|
|
|
|
@item_router.post("/item/transaction/", dependencies=[Depends(has_write_access), *PERMISSIONS_READWRITE])
|
|
def item_transaction(
|
|
collection_uid: str,
|
|
data: ItemBatchIn,
|
|
background_tasks: BackgroundTasks,
|
|
stoken: t.Optional[str] = None,
|
|
user: UserType = Depends(get_authenticated_user),
|
|
):
|
|
return item_bulk_common(data, user, stoken, collection_uid, validate_etag=True, background_tasks=background_tasks)
|
|
|
|
|
|
@item_router.post("/item/batch/", dependencies=[Depends(has_write_access), *PERMISSIONS_READWRITE])
|
|
def item_batch(
|
|
collection_uid: str,
|
|
data: ItemBatchIn,
|
|
background_tasks: BackgroundTasks,
|
|
stoken: t.Optional[str] = None,
|
|
user: UserType = Depends(get_authenticated_user),
|
|
):
|
|
return item_bulk_common(data, user, stoken, collection_uid, validate_etag=False, background_tasks=background_tasks)
|
|
|
|
|
|
# Chunks
|
|
|
|
|
|
@sync_to_async
|
|
def chunk_save(chunk_uid: str, collection: models.Collection, content_file: ContentFile):
|
|
chunk_obj = models.CollectionItemChunk(uid=chunk_uid, collection=collection)
|
|
chunk_obj.chunkFile.save("IGNORED", content_file)
|
|
chunk_obj.save()
|
|
return chunk_obj
|
|
|
|
|
|
@item_router.put(
|
|
"/item/{item_uid}/chunk/{chunk_uid}/",
|
|
dependencies=[Depends(has_write_access), *PERMISSIONS_READWRITE],
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
async def chunk_update(
|
|
request: Request,
|
|
chunk_uid: str,
|
|
collection: models.Collection = Depends(get_collection),
|
|
):
|
|
# IGNORED FOR NOW: col_it = get_object_or_404(col.items, uid=collection_item_uid)
|
|
content_file = ContentFile(await request.body())
|
|
try:
|
|
await chunk_save(chunk_uid, collection, content_file)
|
|
except IntegrityError:
|
|
raise HttpError("chunk_exists", "Chunk already exists.", status_code=status.HTTP_409_CONFLICT)
|
|
|
|
|
|
@item_router.get(
|
|
"/item/{item_uid}/chunk/{chunk_uid}/download/",
|
|
dependencies=PERMISSIONS_READ,
|
|
)
|
|
def chunk_download(
|
|
chunk_uid: str,
|
|
collection: models.Collection = Depends(get_collection),
|
|
):
|
|
chunk = get_object_or_404(collection.chunks, uid=chunk_uid)
|
|
|
|
filename = chunk.chunkFile.path
|
|
return sendfile(filename)
|