diff --git a/etebase_fastapi/routers/collection.py b/etebase_fastapi/routers/collection.py index df25541..167179d 100644 --- a/etebase_fastapi/routers/collection.py +++ b/etebase_fastapi/routers/collection.py @@ -1,11 +1,11 @@ import typing as t -from asgiref.sync import sync_to_async, async_to_sync +from asgiref.sync import sync_to_async from django.core import exceptions as django_exceptions from django.core.files.base import ContentFile from django.db import transaction, IntegrityError from django.db.models import Q, QuerySet -from fastapi import APIRouter, Depends, status, Request +from fastapi import APIRouter, Depends, status, Request, BackgroundTasks from django_etebase import models from myauth.models import UserType @@ -191,14 +191,13 @@ class ItemBatchIn(BaseModel): ) -# FIXME: make it a background task -def report_items_changed(col_uid: str, stoken: str, items: t.List[CollectionItemIn]): +async def report_items_changed(col_uid: str, stoken: str, items: t.List[CollectionItemIn]): if not redisw.is_active: return redis = redisw.redis content = msgpack_encode(CollectionItemListResponse(data=items, stoken=stoken, done=True).dict()) - async_to_sync(redis.publish)(f"col.{col_uid}", content) + await redis.publish(f"col.{col_uid}", content) def collection_list_common( @@ -462,7 +461,14 @@ async def item_list_subscription_ticket( return await get_ticket(TicketRequest(collection=collection.uid), user) -def item_bulk_common(data: ItemBatchIn, user: UserType, stoken: t.Optional[str], uid: str, validate_etag: bool): +def item_bulk_common( + data: ItemBatchIn, + user: UserType, + stoken: t.Optional[str], + uid: str, + validate_etag: bool, + background_tasks: BackgroundTasks, +): queryset = get_collection_queryset(user) with transaction.atomic(): # We need this for locking the collection object collection_object = queryset.select_for_update().get(uid=uid) @@ -487,7 +493,7 @@ def item_bulk_common(data: ItemBatchIn, user: UserType, stoken: t.Optional[str], status_code=status.HTTP_409_CONFLICT, ) - report_items_changed(collection_object.uid, collection_object.stoken, data.items) + background_tasks.add_task(report_items_changed, collection_object.uid, collection_object.stoken, data.items) @item_router.get( @@ -564,20 +570,22 @@ def fetch_updates( def item_transaction( collection_uid: str, data: ItemBatchIn, + background_tasks: BackgroundTasks, stoken: t.Optional[str] = None, user: UserType = Depends(get_authenticated_user), ): - return item_bulk_common(data, user, stoken, collection_uid, validate_etag=True) + return item_bulk_common(data, user, stoken, collection_uid, validate_etag=True, background_tasks=background_tasks) @item_router.post("/item/batch/", dependencies=[Depends(has_write_access), *PERMISSIONS_READWRITE]) def item_batch( collection_uid: str, data: ItemBatchIn, + background_tasks: BackgroundTasks, stoken: t.Optional[str] = None, user: UserType = Depends(get_authenticated_user), ): - return item_bulk_common(data, user, stoken, collection_uid, validate_etag=False) + return item_bulk_common(data, user, stoken, collection_uid, validate_etag=False, background_tasks=background_tasks) # Chunks