From 8fbb48f6714afc6edca37973d63d3401da498948 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Wed, 6 Jul 2022 19:04:38 +0200 Subject: [PATCH] Add support for forwarding activities --- .../93e36ff5c691_allow_activity_forwarding.py | 65 +++++++++++++++++++ app/ap_object.py | 4 ++ app/boxes.py | 55 +++++++++++++++- app/httpsig.py | 10 ++- app/ldsig.py | 4 +- app/models.py | 16 ++++- app/outgoing_activities.py | 25 +++++-- 7 files changed, 171 insertions(+), 8 deletions(-) create mode 100644 alembic/versions/93e36ff5c691_allow_activity_forwarding.py diff --git a/alembic/versions/93e36ff5c691_allow_activity_forwarding.py b/alembic/versions/93e36ff5c691_allow_activity_forwarding.py new file mode 100644 index 0000000..46b6b81 --- /dev/null +++ b/alembic/versions/93e36ff5c691_allow_activity_forwarding.py @@ -0,0 +1,65 @@ +"""Allow activity forwarding + +Revision ID: 93e36ff5c691 +Revises: ba131b14c3a1 +Create Date: 2022-07-06 09:03:57.656539 + +""" +import sqlalchemy as sa +from sqlalchemy.schema import CreateTable + +from alembic import op +from app.database import engine +from app.models import OutgoingActivity + +# revision identifiers, used by Alembic. +revision = '93e36ff5c691' +down_revision = 'ba131b14c3a1' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_index(op.f('ix_inbox_activity_object_ap_id'), 'inbox', ['activity_object_ap_id'], unique=False) + op.create_index(op.f('ix_inbox_ap_type'), 'inbox', ['ap_type'], unique=False) + op.create_index(op.f('ix_outbox_activity_object_ap_id'), 'outbox', ['activity_object_ap_id'], unique=False) + op.create_index(op.f('ix_outbox_ap_type'), 'outbox', ['ap_type'], unique=False) + # ### end Alembic commands ### + # XXX: cannot remove alter to make a column nullable, we have to drop/recreate it + create_statement = CreateTable(OutgoingActivity.__table__).compile(engine) + op.execute("DROP TABLE IF EXISTS outgoing_activity;") + op.execute(f"{create_statement};") + # Instead of this: + # op.add_column('outgoing_activity', sa.Column('inbox_object_id', sa.Integer(), nullable=True)) + # op.alter_column('outgoing_activity', 'outbox_object_id', + # existing_type=sa.INTEGER(), + # nullable=True) + # op.create_foreign_key(None, 'outgoing_activity', 'inbox', ['inbox_object_id'], ['id']) + # op.create_foreign_key(None, 'outgoing_activity', 'outbox', ['outbox_object_id'], ['id']) + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table("outgoing_activity") + op.create_table('outgoing_activity', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('recipient', sa.String(), nullable=False), + sa.Column('outbox_object_id', sa.Integer(), nullable=False), + sa.Column('tries', sa.Integer(), nullable=False), + sa.Column('next_try', sa.DateTime(timezone=True), nullable=True), + sa.Column('last_try', sa.DateTime(timezone=True), nullable=True), + sa.Column('last_status_code', sa.Integer(), nullable=True), + sa.Column('last_response', sa.String(), nullable=True), + sa.Column('is_sent', sa.Boolean(), nullable=False), + sa.Column('is_errored', sa.Boolean(), nullable=False), + sa.Column('error', sa.String(), nullable=True), + sa.ForeignKeyConstraint(['outbox_object_id'], ['outbox.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.drop_index(op.f('ix_outbox_ap_type'), table_name='outbox') + op.drop_index(op.f('ix_outbox_activity_object_ap_id'), table_name='outbox') + op.drop_index(op.f('ix_inbox_ap_type'), table_name='inbox') + op.drop_index(op.f('ix_inbox_activity_object_ap_id'), table_name='inbox') + # ### end Alembic commands ### diff --git a/app/ap_object.py b/app/ap_object.py index db79274..f715544 100644 --- a/app/ap_object.py +++ b/app/ap_object.py @@ -155,6 +155,10 @@ class Object: def in_reply_to(self) -> str | None: return self.ap_object.get("inReplyTo") + @property + def has_ld_signature(self) -> bool: + return bool(self.ap_object.get("signature")) + def _to_camel(string: str) -> str: cased = "".join(word.capitalize() for word in string.split("_")) diff --git a/app/boxes.py b/app/boxes.py index daad780..7d399d0 100644 --- a/app/boxes.py +++ b/app/boxes.py @@ -417,6 +417,23 @@ async def _compute_recipients( return recipients +async def _get_followers_recipients(db_session: AsyncSession) -> set[str]: + """Returns all the recipients from the local follower collection.""" + followers = ( + ( + await db_session.scalars( + select(models.Follower).options(joinedload(models.Follower.actor)) + ) + ) + .unique() + .all() + ) + return { + follower.actor.shared_inbox_url or follower.actor.inbox_url + for follower in followers + } + + async def get_inbox_object_by_ap_id( db_session: AsyncSession, ap_id: str ) -> models.InboxObject | None: @@ -455,6 +472,7 @@ async def get_anybox_object_by_ap_id( async def _handle_delete_activity( db_session: AsyncSession, from_actor: models.Actor, + delete_activity: models.InboxObject, ap_object_to_delete: models.InboxObject, ) -> None: if from_actor.ap_id != ap_object_to_delete.actor.ap_id: @@ -464,6 +482,23 @@ async def _handle_delete_activity( ) return + # If it's a local replies, it was forwarded, so we also need to forward + # the Delete activity if possible + if ( + delete_activity.has_ld_signature + and ap_object_to_delete.in_reply_to + and ap_object_to_delete.in_reply_to.startswith(BASE_URL) + ): + logger.info("Forwarding Delete activity as it's a local reply") + recipients = await _get_followers_recipients(db_session) + for rcp in recipients: + await new_outgoing_activity( + db_session, + rcp, + outbox_object_id=None, + inbox_object_id=delete_activity.id, + ) + # TODO(ts): do we need to delete related activities? should we keep # bookmarked objects with a deleted flag? logger.info(f"Deleting {ap_object_to_delete.ap_type}/{ap_object_to_delete.ap_id}") @@ -680,6 +715,19 @@ async def _handle_create_activity( .values(replies_count=models.OutboxObject.replies_count + 1) ) + # This object is a reply of a local object, we may need to forward it + # to our followers (we can only forward JSON-LD signed activities) + if create_activity.has_ld_signature: + logger.info("Forwarding Create activity as it's a local reply") + recipients = await _get_followers_recipients(db_session) + for rcp in recipients: + await new_outgoing_activity( + db_session, + rcp, + outbox_object_id=None, + inbox_object_id=create_activity.id, + ) + for tag in tags: if tag.get("name") == LOCAL_ACTOR.handle or tag.get("href") == LOCAL_ACTOR.url: notif = models.Notification( @@ -773,7 +821,12 @@ async def save_to_inbox( await _handle_update_activity(db_session, actor, inbox_object) elif activity_ro.ap_type == "Delete": if relates_to_inbox_object: - await _handle_delete_activity(db_session, actor, relates_to_inbox_object) + await _handle_delete_activity( + db_session, + actor, + inbox_object, + relates_to_inbox_object, + ) else: # TODO(ts): handle delete actor logger.info( diff --git a/app/httpsig.py b/app/httpsig.py index f236656..6488f95 100644 --- a/app/httpsig.py +++ b/app/httpsig.py @@ -113,6 +113,7 @@ async def _get_public_key(db_session: AsyncSession, key_id: str) -> Key: class HTTPSigInfo: has_valid_signature: bool signed_by_ap_actor_id: str | None = None + is_ap_actor_gone: bool = False async def httpsig_checker( @@ -139,7 +140,7 @@ async def httpsig_checker( k = await _get_public_key(db_session, hsig["keyId"]) except ap.ObjectIsGoneError: logger.info("Actor is gone") - return HTTPSigInfo(has_valid_signature=False) + return HTTPSigInfo(has_valid_signature=False, is_ap_actor_gone=True) except Exception: logger.exception(f'Failed to fetch HTTP sig key {hsig["keyId"]}') return HTTPSigInfo(has_valid_signature=False) @@ -162,6 +163,13 @@ async def enforce_httpsig( logger.warning(f"Invalid HTTP sig {httpsig_info=}") body = await request.body() logger.info(f"{body=}") + + # Special case for Mastoodon instance that keep resending Delete + # activities for actor we don't know about if we raise a 401 + if httpsig_info.is_ap_actor_gone: + logger.info("Let's make Mastodon happy, returning a 204") + raise fastapi.HTTPException(status_code=204) + raise fastapi.HTTPException(status_code=401, detail="Invalid HTTP sig") return httpsig_info diff --git a/app/ldsig.py b/app/ldsig.py index 45caa24..3668516 100644 --- a/app/ldsig.py +++ b/app/ldsig.py @@ -6,6 +6,7 @@ from datetime import datetime import pyld # type: ignore from Crypto.Hash import SHA256 from Crypto.Signature import PKCS1_v1_5 +from loguru import logger from pyld import jsonld # type: ignore from app import activitypub as ap @@ -59,7 +60,8 @@ async def verify_signature( doc: ap.RawObject, ) -> bool: if "signature" not in doc: - raise ValueError("No embedded signature") + logger.warning("The object does contain a signature") + return False key_id = doc["signature"]["creator"] key = await _get_public_key(db_session, key_id) diff --git a/app/models.py b/app/models.py index f5f67e5..3954cee 100644 --- a/app/models.py +++ b/app/models.py @@ -321,9 +321,14 @@ class OutgoingActivity(Base): created_at = Column(DateTime(timezone=True), nullable=False, default=now) recipient = Column(String, nullable=False) - outbox_object_id = Column(Integer, ForeignKey("outbox.id"), nullable=False) + + outbox_object_id = Column(Integer, ForeignKey("outbox.id"), nullable=True) outbox_object = relationship(OutboxObject, uselist=False) + # Can also reference an inbox object if it needds to be forwarded + inbox_object_id = Column(Integer, ForeignKey("inbox.id"), nullable=True) + inbox_object = relationship(InboxObject, uselist=False) + tries = Column(Integer, nullable=False, default=0) next_try = Column(DateTime(timezone=True), nullable=True, default=now) @@ -335,6 +340,15 @@ class OutgoingActivity(Base): is_errored = Column(Boolean, nullable=False, default=False) error = Column(String, nullable=True) + @property + def anybox_object(self) -> OutboxObject | InboxObject: + if self.outbox_object_id: + return self.outbox_object # type: ignore + elif self.inbox_object_id: + return self.inbox_object # type: ignore + else: + raise ValueError("Should never happen") + class TaggedOutboxObject(Base): __tablename__ = "tagged_outbox_object" diff --git a/app/outgoing_activities.py b/app/outgoing_activities.py index e83e85e..04a9201 100644 --- a/app/outgoing_activities.py +++ b/app/outgoing_activities.py @@ -9,6 +9,7 @@ from loguru import logger from sqlalchemy import func from sqlalchemy import select from sqlalchemy.orm import Session +from sqlalchemy.orm import joinedload from app import activitypub as ap from app import config @@ -29,11 +30,18 @@ k.load(KEY_PATH.read_text()) async def new_outgoing_activity( db_session: AsyncSession, recipient: str, - outbox_object_id: int, + outbox_object_id: int | None, + inbox_object_id: int | None = None, ) -> models.OutgoingActivity: + if outbox_object_id is None and inbox_object_id is None: + raise ValueError("Must reference at least one inbox/outbox activity") + elif outbox_object_id and inbox_object_id: + raise ValueError("Cannot reference both inbox/outbox activities") + outgoing_activity = models.OutgoingActivity( recipient=recipient, outbox_object_id=outbox_object_id, + inbox_object_id=inbox_object_id, ) db_session.add(outgoing_activity) @@ -93,17 +101,26 @@ def process_next_outgoing_activity(db: Session) -> bool: select(models.OutgoingActivity) .where(*where) .limit(1) + .options( + joinedload(models.OutgoingActivity.inbox_object), + joinedload(models.OutgoingActivity.outbox_object), + ) .order_by(models.OutgoingActivity.next_try) ).scalar_one() next_activity.tries = next_activity.tries + 1 next_activity.last_try = now() - payload = ap.wrap_object_if_needed(next_activity.outbox_object.ap_object) + payload = ap.wrap_object_if_needed(next_activity.anybox_object.ap_object) # Use LD sig if the activity may need to be forwarded by recipients - if payload["type"] in ["Create", "Delete"]: - ldsig.generate_signature(payload, k) + if next_activity.anybox_object.is_from_outbox and payload["type"] in [ + "Create", + "Delete", + ]: + # But only if the object is public (to help with deniability/privacy) + if next_activity.outbox_object.visibility == ap.VisibilityEnum.PUBLIC: + ldsig.generate_signature(payload, k) logger.info(f"{payload=}") try: