Improve Create activity handling

main
Thomas Sileo 2022-07-05 20:47:00 +02:00
parent 09a7287877
commit 7624342ed7
4 changed files with 116 additions and 51 deletions

View File

@ -193,7 +193,9 @@ async def admin_inbox(
filter_by: str | None = None,
cursor: str | None = None,
) -> templates.TemplateResponse:
where = [models.InboxObject.ap_type.not_in(["Accept", "Delete"])]
where = [
models.InboxObject.ap_type.not_in(["Accept", "Delete", "Create", "Update"])
]
if filter_by:
where.append(models.InboxObject.ap_type == filter_by)
if cursor:

View File

@ -4,6 +4,7 @@ from collections import defaultdict
from dataclasses import dataclass
from urllib.parse import urlparse
import fastapi
import httpx
from dateutil.parser import isoparse
from loguru import logger
@ -16,6 +17,8 @@ from sqlalchemy.orm import joinedload
from app import activitypub as ap
from app import config
from app import httpsig
from app import ldsig
from app import models
from app.actor import LOCAL_ACTOR
from app.actor import Actor
@ -590,10 +593,47 @@ async def _handle_undo_activity(
async def _handle_create_activity(
db_session: AsyncSession,
from_actor: models.Actor,
created_object: models.InboxObject,
create_activity: models.InboxObject,
) -> None:
logger.info("Processing Create activity")
tags = created_object.ap_object.get("tag")
wrapped_object = ap.unwrap_activity(create_activity.ap_object)
if create_activity.actor.ap_id != ap.get_actor_id(wrapped_object):
raise ValueError("Object actor does not match activity")
ro = RemoteObject(wrapped_object, actor=from_actor)
ap_published_at = now()
if "published" in ro.ap_object:
ap_published_at = isoparse(ro.ap_object["published"])
inbox_object = models.InboxObject(
server=urlparse(ro.ap_id).netloc,
actor_id=from_actor.id,
ap_actor_id=from_actor.ap_id,
ap_type=ro.ap_type,
ap_id=ro.ap_id,
ap_context=ro.ap_context,
ap_published_at=ap_published_at,
ap_object=ro.ap_object,
visibility=ro.visibility,
relates_to_inbox_object_id=create_activity.id,
relates_to_outbox_object_id=None,
activity_object_ap_id=ro.activity_object_ap_id,
# Hide replies from the stream
is_hidden_from_stream=(
True
if (ro.in_reply_to and not ro.in_reply_to.startswith(BASE_URL))
else False
), # TODO: handle mentions
)
db_session.add(inbox_object)
await db_session.flush()
await db_session.refresh(inbox_object)
create_activity.relates_to_inbox_object_id = inbox_object.id
tags = inbox_object.ap_object.get("tag")
if not tags:
logger.info("No tags to process")
@ -603,11 +643,11 @@ async def _handle_create_activity(
logger.info(f"Invalid tags: {tags}")
return None
if created_object.in_reply_to and created_object.in_reply_to.startswith(BASE_URL):
if inbox_object.in_reply_to and inbox_object.in_reply_to.startswith(BASE_URL):
await db_session.execute(
update(models.OutboxObject)
.where(
models.OutboxObject.ap_id == created_object.in_reply_to,
models.OutboxObject.ap_id == inbox_object.in_reply_to,
)
.values(replies_count=models.OutboxObject.replies_count + 1)
)
@ -617,104 +657,115 @@ async def _handle_create_activity(
notif = models.Notification(
notification_type=models.NotificationType.MENTION,
actor_id=from_actor.id,
inbox_object_id=created_object.id,
inbox_object_id=inbox_object.id,
)
db_session.add(notif)
async def save_to_inbox(db_session: AsyncSession, raw_object: ap.RawObject) -> None:
async def save_to_inbox(
db_session: AsyncSession,
raw_object: ap.RawObject,
httpsig_info: httpsig.HTTPSigInfo,
) -> None:
try:
actor = await fetch_actor(db_session, ap.get_id(raw_object["actor"]))
except httpx.HTTPStatusError:
logger.exception("Failed to fetch actor")
return
ap_published_at = now()
if "published" in raw_object:
ap_published_at = isoparse(raw_object["published"])
raw_object_id = ap.get_id(raw_object)
ra = RemoteObject(ap.unwrap_activity(raw_object), actor=actor)
# Ensure forwarded activities have a valid LD sig
if httpsig_info.signed_by_ap_actor_id != actor.ap_id:
logger.info(f"Processing a forwarded activity {httpsig_info=}/{actor.ap_id}")
if not (await ldsig.verify_signature(db_session, raw_object)):
raise fastapi.HTTPException(status_code=401, detail="Invalid LD sig")
if (
await db_session.scalar(
select(func.count(models.InboxObject.id)).where(
models.InboxObject.ap_id == ra.ap_id
models.InboxObject.ap_id == raw_object_id
)
)
> 0
):
logger.info(f"Received duplicate {ra.ap_type} activity: {ra.ap_id}")
logger.info(
f'Received duplicate {raw_object["type"]} activity: {raw_object_id}'
)
return
ap_published_at = now()
if "published" in raw_object:
ap_published_at = isoparse(raw_object["published"])
activity_ro = RemoteObject(raw_object, actor=actor)
relates_to_inbox_object: models.InboxObject | None = None
relates_to_outbox_object: models.OutboxObject | None = None
if ra.activity_object_ap_id:
if ra.activity_object_ap_id.startswith(BASE_URL):
if activity_ro.activity_object_ap_id:
if activity_ro.activity_object_ap_id.startswith(BASE_URL):
relates_to_outbox_object = await get_outbox_object_by_ap_id(
db_session,
ra.activity_object_ap_id,
activity_ro.activity_object_ap_id,
)
else:
relates_to_inbox_object = await get_inbox_object_by_ap_id(
db_session,
ra.activity_object_ap_id,
activity_ro.activity_object_ap_id,
)
inbox_object = models.InboxObject(
server=urlparse(ra.ap_id).netloc,
server=urlparse(activity_ro.ap_id).netloc,
actor_id=actor.id,
ap_actor_id=actor.ap_id,
ap_type=ra.ap_type,
ap_id=ra.ap_id,
ap_context=ra.ap_context,
ap_type=activity_ro.ap_type,
ap_id=activity_ro.ap_id,
ap_context=activity_ro.ap_context,
ap_published_at=ap_published_at,
ap_object=ra.ap_object,
visibility=ra.visibility,
ap_object=activity_ro.ap_object,
visibility=activity_ro.visibility,
relates_to_inbox_object_id=relates_to_inbox_object.id
if relates_to_inbox_object
else None,
relates_to_outbox_object_id=relates_to_outbox_object.id
if relates_to_outbox_object
else None,
activity_object_ap_id=ra.activity_object_ap_id,
activity_object_ap_id=activity_ro.activity_object_ap_id,
# Hide replies from the stream
is_hidden_from_stream=(
True
if (ra.in_reply_to and not ra.in_reply_to.startswith(BASE_URL))
else False
), # TODO: handle mentions
is_hidden_from_stream=True,
)
db_session.add(inbox_object)
await db_session.flush()
await db_session.refresh(inbox_object)
if ra.ap_type == "Note": # TODO: handle create better
if activity_ro.ap_type == "Create":
await _handle_create_activity(db_session, actor, inbox_object)
elif ra.ap_type == "Update":
elif activity_ro.ap_type == "Update":
pass
elif ra.ap_type == "Delete":
elif activity_ro.ap_type == "Delete":
if relates_to_inbox_object:
await _handle_delete_activity(db_session, actor, relates_to_inbox_object)
else:
# TODO(ts): handle delete actor
logger.info(
f"Received a Delete for an unknown object: {ra.activity_object_ap_id}"
"Received a Delete for an unknown object: "
f"{activity_ro.activity_object_ap_id}"
)
elif ra.ap_type == "Follow":
elif activity_ro.ap_type == "Follow":
await _handle_follow_follow_activity(db_session, actor, inbox_object)
elif ra.ap_type == "Undo":
elif activity_ro.ap_type == "Undo":
if relates_to_inbox_object:
await _handle_undo_activity(
db_session, actor, inbox_object, relates_to_inbox_object
)
else:
logger.info("Received Undo for an unknown activity")
elif ra.ap_type in ["Accept", "Reject"]:
elif activity_ro.ap_type in ["Accept", "Reject"]:
if not relates_to_outbox_object:
logger.info(
f"Received {raw_object['type']} for an unknown activity: "
f"{ra.activity_object_ap_id}"
f"{activity_ro.activity_object_ap_id}"
)
else:
if relates_to_outbox_object.ap_type == "Follow":
@ -729,18 +780,20 @@ async def save_to_inbox(db_session: AsyncSession, raw_object: ap.RawObject) -> N
"Received an Accept for an unsupported activity: "
f"{relates_to_outbox_object.ap_type}"
)
elif ra.ap_type == "EmojiReact":
elif activity_ro.ap_type == "EmojiReact":
if not relates_to_outbox_object:
logger.info(
f"Received a like for an unknown activity: {ra.activity_object_ap_id}"
"Received a reaction for an unknown activity: "
f"{activity_ro.activity_object_ap_id}"
)
else:
# TODO(ts): support reactions
pass
elif ra.ap_type == "Like":
elif activity_ro.ap_type == "Like":
if not relates_to_outbox_object:
logger.info(
f"Received a like for an unknown activity: {ra.activity_object_ap_id}"
"Received a like for an unknown activity: "
f"{activity_ro.activity_object_ap_id}"
)
else:
relates_to_outbox_object.likes_count = models.OutboxObject.likes_count + 1
@ -752,7 +805,7 @@ async def save_to_inbox(db_session: AsyncSession, raw_object: ap.RawObject) -> N
inbox_object_id=inbox_object.id,
)
db_session.add(notif)
elif raw_object["type"] == "Announce":
elif activity_ro.ap_type == "Announce":
if relates_to_outbox_object:
# This is an announce for a local object
relates_to_outbox_object.announces_count = (
@ -772,9 +825,9 @@ async def save_to_inbox(db_session: AsyncSession, raw_object: ap.RawObject) -> N
logger.info("Nothing to do, we already know about this object")
else:
# Save it as an inbox object
if not ra.activity_object_ap_id:
if not activity_ro.activity_object_ap_id:
raise ValueError("Should never happen")
announced_raw_object = await ap.fetch(ra.activity_object_ap_id)
announced_raw_object = await ap.fetch(activity_ro.activity_object_ap_id)
announced_actor = await fetch_actor(
db_session, ap.get_actor_id(announced_raw_object)
)
@ -794,14 +847,14 @@ async def save_to_inbox(db_session: AsyncSession, raw_object: ap.RawObject) -> N
db_session.add(announced_inbox_object)
await db_session.flush()
inbox_object.relates_to_inbox_object_id = announced_inbox_object.id
elif ra.ap_type in ["Like", "Announce"]:
elif activity_ro.ap_type in ["Like", "Announce"]:
if not relates_to_outbox_object:
logger.info(
f"Received {ra.ap_type} for an unknown activity: "
f"{ra.activity_object_ap_id}"
f"Received {activity_ro.ap_type} for an unknown activity: "
f"{activity_ro.activity_object_ap_id}"
)
else:
if ra.ap_type == "Like":
if activity_ro.ap_type == "Like":
# TODO(ts): notification
relates_to_outbox_object.likes_count = (
models.OutboxObject.likes_count + 1
@ -814,7 +867,7 @@ async def save_to_inbox(db_session: AsyncSession, raw_object: ap.RawObject) -> N
inbox_object_id=inbox_object.id,
)
db_session.add(notif)
elif raw_object["type"] == "Announce":
elif activity_ro.ap_type == "Announce":
# TODO(ts): notification
relates_to_outbox_object.announces_count = (
models.OutboxObject.announces_count + 1

View File

@ -9,6 +9,8 @@ from Crypto.Signature import PKCS1_v1_5
from pyld import jsonld # type: ignore
from app import activitypub as ap
from app.database import AsyncSession
from app.httpsig import _get_public_key
if typing.TYPE_CHECKING:
from app.key import Key
@ -52,7 +54,15 @@ def _doc_hash(doc: ap.RawObject) -> str:
return h.hexdigest()
def verify_signature(doc: ap.RawObject, key: "Key") -> bool:
async def verify_signature(
db_session: AsyncSession,
doc: ap.RawObject,
) -> bool:
if "signature" not in doc:
raise ValueError("No embedded signature")
key_id = doc["signature"]["creator"]
key = await _get_public_key(db_session, key_id)
to_be_signed = _options_hash(doc) + _doc_hash(doc)
signature = doc["signature"]["signatureValue"]
signer = PKCS1_v1_5.new(key.pubkey or key.privkey) # type: ignore

View File

@ -633,7 +633,7 @@ async def inbox(
logger.info(f"headers={request.headers}")
payload = await request.json()
logger.info(f"{payload=}")
await save_to_inbox(db_session, payload)
await save_to_inbox(db_session, payload, httpsig_info)
return Response(status_code=204)