From 7624342ed74891b577aa44fd697fdcd9b1261a14 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Tue, 5 Jul 2022 20:47:00 +0200 Subject: [PATCH] Improve Create activity handling --- app/admin.py | 4 +- app/boxes.py | 149 ++++++++++++++++++++++++++++++++++----------------- app/ldsig.py | 12 ++++- app/main.py | 2 +- 4 files changed, 116 insertions(+), 51 deletions(-) diff --git a/app/admin.py b/app/admin.py index c9f4c05..4fc357e 100644 --- a/app/admin.py +++ b/app/admin.py @@ -193,7 +193,9 @@ async def admin_inbox( filter_by: str | None = None, cursor: str | None = None, ) -> templates.TemplateResponse: - where = [models.InboxObject.ap_type.not_in(["Accept", "Delete"])] + where = [ + models.InboxObject.ap_type.not_in(["Accept", "Delete", "Create", "Update"]) + ] if filter_by: where.append(models.InboxObject.ap_type == filter_by) if cursor: diff --git a/app/boxes.py b/app/boxes.py index 8e6235e..a0b0aca 100644 --- a/app/boxes.py +++ b/app/boxes.py @@ -4,6 +4,7 @@ from collections import defaultdict from dataclasses import dataclass from urllib.parse import urlparse +import fastapi import httpx from dateutil.parser import isoparse from loguru import logger @@ -16,6 +17,8 @@ from sqlalchemy.orm import joinedload from app import activitypub as ap from app import config +from app import httpsig +from app import ldsig from app import models from app.actor import LOCAL_ACTOR from app.actor import Actor @@ -590,10 +593,47 @@ async def _handle_undo_activity( async def _handle_create_activity( db_session: AsyncSession, from_actor: models.Actor, - created_object: models.InboxObject, + create_activity: models.InboxObject, ) -> None: logger.info("Processing Create activity") - tags = created_object.ap_object.get("tag") + wrapped_object = ap.unwrap_activity(create_activity.ap_object) + if create_activity.actor.ap_id != ap.get_actor_id(wrapped_object): + raise ValueError("Object actor does not match activity") + + ro = RemoteObject(wrapped_object, actor=from_actor) + + ap_published_at = now() + if "published" in ro.ap_object: + ap_published_at = isoparse(ro.ap_object["published"]) + + inbox_object = models.InboxObject( + server=urlparse(ro.ap_id).netloc, + actor_id=from_actor.id, + ap_actor_id=from_actor.ap_id, + ap_type=ro.ap_type, + ap_id=ro.ap_id, + ap_context=ro.ap_context, + ap_published_at=ap_published_at, + ap_object=ro.ap_object, + visibility=ro.visibility, + relates_to_inbox_object_id=create_activity.id, + relates_to_outbox_object_id=None, + activity_object_ap_id=ro.activity_object_ap_id, + # Hide replies from the stream + is_hidden_from_stream=( + True + if (ro.in_reply_to and not ro.in_reply_to.startswith(BASE_URL)) + else False + ), # TODO: handle mentions + ) + + db_session.add(inbox_object) + await db_session.flush() + await db_session.refresh(inbox_object) + + create_activity.relates_to_inbox_object_id = inbox_object.id + + tags = inbox_object.ap_object.get("tag") if not tags: logger.info("No tags to process") @@ -603,11 +643,11 @@ async def _handle_create_activity( logger.info(f"Invalid tags: {tags}") return None - if created_object.in_reply_to and created_object.in_reply_to.startswith(BASE_URL): + if inbox_object.in_reply_to and inbox_object.in_reply_to.startswith(BASE_URL): await db_session.execute( update(models.OutboxObject) .where( - models.OutboxObject.ap_id == created_object.in_reply_to, + models.OutboxObject.ap_id == inbox_object.in_reply_to, ) .values(replies_count=models.OutboxObject.replies_count + 1) ) @@ -617,104 +657,115 @@ async def _handle_create_activity( notif = models.Notification( notification_type=models.NotificationType.MENTION, actor_id=from_actor.id, - inbox_object_id=created_object.id, + inbox_object_id=inbox_object.id, ) db_session.add(notif) -async def save_to_inbox(db_session: AsyncSession, raw_object: ap.RawObject) -> None: +async def save_to_inbox( + db_session: AsyncSession, + raw_object: ap.RawObject, + httpsig_info: httpsig.HTTPSigInfo, +) -> None: try: actor = await fetch_actor(db_session, ap.get_id(raw_object["actor"])) except httpx.HTTPStatusError: logger.exception("Failed to fetch actor") return - ap_published_at = now() - if "published" in raw_object: - ap_published_at = isoparse(raw_object["published"]) + raw_object_id = ap.get_id(raw_object) - ra = RemoteObject(ap.unwrap_activity(raw_object), actor=actor) + # Ensure forwarded activities have a valid LD sig + if httpsig_info.signed_by_ap_actor_id != actor.ap_id: + logger.info(f"Processing a forwarded activity {httpsig_info=}/{actor.ap_id}") + if not (await ldsig.verify_signature(db_session, raw_object)): + raise fastapi.HTTPException(status_code=401, detail="Invalid LD sig") if ( await db_session.scalar( select(func.count(models.InboxObject.id)).where( - models.InboxObject.ap_id == ra.ap_id + models.InboxObject.ap_id == raw_object_id ) ) > 0 ): - logger.info(f"Received duplicate {ra.ap_type} activity: {ra.ap_id}") + logger.info( + f'Received duplicate {raw_object["type"]} activity: {raw_object_id}' + ) return + ap_published_at = now() + if "published" in raw_object: + ap_published_at = isoparse(raw_object["published"]) + + activity_ro = RemoteObject(raw_object, actor=actor) + relates_to_inbox_object: models.InboxObject | None = None relates_to_outbox_object: models.OutboxObject | None = None - if ra.activity_object_ap_id: - if ra.activity_object_ap_id.startswith(BASE_URL): + if activity_ro.activity_object_ap_id: + if activity_ro.activity_object_ap_id.startswith(BASE_URL): relates_to_outbox_object = await get_outbox_object_by_ap_id( db_session, - ra.activity_object_ap_id, + activity_ro.activity_object_ap_id, ) else: relates_to_inbox_object = await get_inbox_object_by_ap_id( db_session, - ra.activity_object_ap_id, + activity_ro.activity_object_ap_id, ) inbox_object = models.InboxObject( - server=urlparse(ra.ap_id).netloc, + server=urlparse(activity_ro.ap_id).netloc, actor_id=actor.id, ap_actor_id=actor.ap_id, - ap_type=ra.ap_type, - ap_id=ra.ap_id, - ap_context=ra.ap_context, + ap_type=activity_ro.ap_type, + ap_id=activity_ro.ap_id, + ap_context=activity_ro.ap_context, ap_published_at=ap_published_at, - ap_object=ra.ap_object, - visibility=ra.visibility, + ap_object=activity_ro.ap_object, + visibility=activity_ro.visibility, relates_to_inbox_object_id=relates_to_inbox_object.id if relates_to_inbox_object else None, relates_to_outbox_object_id=relates_to_outbox_object.id if relates_to_outbox_object else None, - activity_object_ap_id=ra.activity_object_ap_id, + activity_object_ap_id=activity_ro.activity_object_ap_id, # Hide replies from the stream - is_hidden_from_stream=( - True - if (ra.in_reply_to and not ra.in_reply_to.startswith(BASE_URL)) - else False - ), # TODO: handle mentions + is_hidden_from_stream=True, ) db_session.add(inbox_object) await db_session.flush() await db_session.refresh(inbox_object) - if ra.ap_type == "Note": # TODO: handle create better + if activity_ro.ap_type == "Create": await _handle_create_activity(db_session, actor, inbox_object) - elif ra.ap_type == "Update": + elif activity_ro.ap_type == "Update": pass - elif ra.ap_type == "Delete": + elif activity_ro.ap_type == "Delete": if relates_to_inbox_object: await _handle_delete_activity(db_session, actor, relates_to_inbox_object) else: # TODO(ts): handle delete actor logger.info( - f"Received a Delete for an unknown object: {ra.activity_object_ap_id}" + "Received a Delete for an unknown object: " + f"{activity_ro.activity_object_ap_id}" ) - elif ra.ap_type == "Follow": + elif activity_ro.ap_type == "Follow": await _handle_follow_follow_activity(db_session, actor, inbox_object) - elif ra.ap_type == "Undo": + elif activity_ro.ap_type == "Undo": if relates_to_inbox_object: await _handle_undo_activity( db_session, actor, inbox_object, relates_to_inbox_object ) else: logger.info("Received Undo for an unknown activity") - elif ra.ap_type in ["Accept", "Reject"]: + elif activity_ro.ap_type in ["Accept", "Reject"]: if not relates_to_outbox_object: logger.info( f"Received {raw_object['type']} for an unknown activity: " - f"{ra.activity_object_ap_id}" + f"{activity_ro.activity_object_ap_id}" ) else: if relates_to_outbox_object.ap_type == "Follow": @@ -729,18 +780,20 @@ async def save_to_inbox(db_session: AsyncSession, raw_object: ap.RawObject) -> N "Received an Accept for an unsupported activity: " f"{relates_to_outbox_object.ap_type}" ) - elif ra.ap_type == "EmojiReact": + elif activity_ro.ap_type == "EmojiReact": if not relates_to_outbox_object: logger.info( - f"Received a like for an unknown activity: {ra.activity_object_ap_id}" + "Received a reaction for an unknown activity: " + f"{activity_ro.activity_object_ap_id}" ) else: # TODO(ts): support reactions pass - elif ra.ap_type == "Like": + elif activity_ro.ap_type == "Like": if not relates_to_outbox_object: logger.info( - f"Received a like for an unknown activity: {ra.activity_object_ap_id}" + "Received a like for an unknown activity: " + f"{activity_ro.activity_object_ap_id}" ) else: relates_to_outbox_object.likes_count = models.OutboxObject.likes_count + 1 @@ -752,7 +805,7 @@ async def save_to_inbox(db_session: AsyncSession, raw_object: ap.RawObject) -> N inbox_object_id=inbox_object.id, ) db_session.add(notif) - elif raw_object["type"] == "Announce": + elif activity_ro.ap_type == "Announce": if relates_to_outbox_object: # This is an announce for a local object relates_to_outbox_object.announces_count = ( @@ -772,9 +825,9 @@ async def save_to_inbox(db_session: AsyncSession, raw_object: ap.RawObject) -> N logger.info("Nothing to do, we already know about this object") else: # Save it as an inbox object - if not ra.activity_object_ap_id: + if not activity_ro.activity_object_ap_id: raise ValueError("Should never happen") - announced_raw_object = await ap.fetch(ra.activity_object_ap_id) + announced_raw_object = await ap.fetch(activity_ro.activity_object_ap_id) announced_actor = await fetch_actor( db_session, ap.get_actor_id(announced_raw_object) ) @@ -794,14 +847,14 @@ async def save_to_inbox(db_session: AsyncSession, raw_object: ap.RawObject) -> N db_session.add(announced_inbox_object) await db_session.flush() inbox_object.relates_to_inbox_object_id = announced_inbox_object.id - elif ra.ap_type in ["Like", "Announce"]: + elif activity_ro.ap_type in ["Like", "Announce"]: if not relates_to_outbox_object: logger.info( - f"Received {ra.ap_type} for an unknown activity: " - f"{ra.activity_object_ap_id}" + f"Received {activity_ro.ap_type} for an unknown activity: " + f"{activity_ro.activity_object_ap_id}" ) else: - if ra.ap_type == "Like": + if activity_ro.ap_type == "Like": # TODO(ts): notification relates_to_outbox_object.likes_count = ( models.OutboxObject.likes_count + 1 @@ -814,7 +867,7 @@ async def save_to_inbox(db_session: AsyncSession, raw_object: ap.RawObject) -> N inbox_object_id=inbox_object.id, ) db_session.add(notif) - elif raw_object["type"] == "Announce": + elif activity_ro.ap_type == "Announce": # TODO(ts): notification relates_to_outbox_object.announces_count = ( models.OutboxObject.announces_count + 1 diff --git a/app/ldsig.py b/app/ldsig.py index f339299..45caa24 100644 --- a/app/ldsig.py +++ b/app/ldsig.py @@ -9,6 +9,8 @@ from Crypto.Signature import PKCS1_v1_5 from pyld import jsonld # type: ignore from app import activitypub as ap +from app.database import AsyncSession +from app.httpsig import _get_public_key if typing.TYPE_CHECKING: from app.key import Key @@ -52,7 +54,15 @@ def _doc_hash(doc: ap.RawObject) -> str: return h.hexdigest() -def verify_signature(doc: ap.RawObject, key: "Key") -> bool: +async def verify_signature( + db_session: AsyncSession, + doc: ap.RawObject, +) -> bool: + if "signature" not in doc: + raise ValueError("No embedded signature") + + key_id = doc["signature"]["creator"] + key = await _get_public_key(db_session, key_id) to_be_signed = _options_hash(doc) + _doc_hash(doc) signature = doc["signature"]["signatureValue"] signer = PKCS1_v1_5.new(key.pubkey or key.privkey) # type: ignore diff --git a/app/main.py b/app/main.py index d413345..5ed7f9c 100644 --- a/app/main.py +++ b/app/main.py @@ -633,7 +633,7 @@ async def inbox( logger.info(f"headers={request.headers}") payload = await request.json() logger.info(f"{payload=}") - await save_to_inbox(db_session, payload) + await save_to_inbox(db_session, payload, httpsig_info) return Response(status_code=204)