Incoming activity worker

main
Thomas Sileo 2022-07-14 08:44:04 +02:00
parent 88b57f29af
commit 5c7fd1199b
9 changed files with 214 additions and 14 deletions

View File

@ -0,0 +1,46 @@
"""Incoming activity model
Revision ID: 1647cef23e9b
Revises: afc37d9c4fc0
Create Date: 2022-07-14 01:20:16.617984
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = '1647cef23e9b'
down_revision = 'afc37d9c4fc0'
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('incoming_activity',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('webmention_source', sa.String(), nullable=True),
sa.Column('sent_by_ap_actor_id', sa.String(), nullable=True),
sa.Column('ap_id', sa.String(), nullable=True),
sa.Column('ap_object', sa.JSON(), nullable=True),
sa.Column('tries', sa.Integer(), nullable=False),
sa.Column('next_try', sa.DateTime(timezone=True), nullable=True),
sa.Column('last_try', sa.DateTime(timezone=True), nullable=True),
sa.Column('is_processed', sa.Boolean(), nullable=False),
sa.Column('is_errored', sa.Boolean(), nullable=False),
sa.Column('error', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_incoming_activity_ap_id'), 'incoming_activity', ['ap_id'], unique=False)
op.create_index(op.f('ix_incoming_activity_id'), 'incoming_activity', ['id'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_incoming_activity_id'), table_name='incoming_activity')
op.drop_index(op.f('ix_incoming_activity_ap_id'), table_name='incoming_activity')
op.drop_table('incoming_activity')
# ### end Alembic commands ###

View File

@ -18,11 +18,14 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('outgoing_activity', sa.Column('webmention_target', sa.String(), nullable=True))
# op.drop_column('outgoing_activity', 'webmention_target')
# op.add_column('outgoing_activity', sa.Column('webmention_target', sa.String(), nullable=True))
# ### end Alembic commands ###
pass
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('outgoing_activity', 'webmention_target')
# op.drop_column('outgoing_activity', 'webmention_target')
# ### end Alembic commands ###
pass

View File

@ -16,7 +16,6 @@ from sqlalchemy.orm import joinedload
from app import activitypub as ap
from app import config
from app import httpsig
from app import ldsig
from app import models
from app.actor import LOCAL_ACTOR
@ -840,7 +839,7 @@ async def _process_note_object(
async def save_to_inbox(
db_session: AsyncSession,
raw_object: ap.RawObject,
httpsig_info: httpsig.HTTPSigInfo,
sent_by_ap_actor_id: str,
) -> None:
try:
actor = await fetch_actor(db_session, ap.get_id(raw_object["actor"]))
@ -851,8 +850,10 @@ async def save_to_inbox(
raw_object_id = ap.get_id(raw_object)
# Ensure forwarded activities have a valid LD sig
if httpsig_info.signed_by_ap_actor_id != actor.ap_id:
logger.info(f"Processing a forwarded activity {httpsig_info=}/{actor.ap_id}")
if sent_by_ap_actor_id != actor.ap_id:
logger.info(
f"Processing a forwarded activity {sent_by_ap_actor_id=}/{actor.ap_id}"
)
if not (await ldsig.verify_signature(db_session, raw_object)):
logger.warning(
f"Failed to verify LD sig, fetching remote object {raw_object_id}"

View File

@ -0,0 +1,114 @@
import asyncio
import traceback
from datetime import datetime
from datetime import timedelta
from loguru import logger
from sqlalchemy import func
from sqlalchemy import select
from app import activitypub as ap
from app import httpsig
from app import models
from app.boxes import save_to_inbox
from app.database import AsyncSession
from app.database import async_session
from app.database import now
_MAX_RETRIES = 5
async def new_ap_incoming_activity(
db_session: AsyncSession,
httpsig_info: httpsig.HTTPSigInfo,
raw_object: ap.RawObject,
) -> models.IncomingActivity:
incoming_activity = models.IncomingActivity(
sent_by_ap_actor_id=httpsig_info.signed_by_ap_actor_id,
ap_id=ap.get_id(raw_object),
ap_object=raw_object,
)
db_session.add(incoming_activity)
await db_session.commit()
await db_session.refresh(incoming_activity)
return incoming_activity
def _exp_backoff(tries: int) -> datetime:
seconds = 2 * (2 ** (tries - 1))
return now() + timedelta(seconds=seconds)
def _set_next_try(
outgoing_activity: models.IncomingActivity,
next_try: datetime | None = None,
) -> None:
if not outgoing_activity.tries:
raise ValueError("Should never happen")
if outgoing_activity.tries == _MAX_RETRIES:
outgoing_activity.is_errored = True
outgoing_activity.next_try = None
else:
outgoing_activity.next_try = next_try or _exp_backoff(outgoing_activity.tries)
async def process_next_incoming_activity(db_session: AsyncSession) -> bool:
where = [
models.IncomingActivity.next_try <= now(),
models.IncomingActivity.is_errored.is_(False),
models.IncomingActivity.is_processed.is_(False),
]
q_count = await db_session.scalar(
select(func.count(models.IncomingActivity.id)).where(*where)
)
if q_count > 0:
logger.info(f"{q_count} outgoing activities ready to process")
if not q_count:
# logger.debug("No activities to process")
return False
next_activity = (
await db_session.execute(
select(models.IncomingActivity)
.where(*where)
.limit(1)
.order_by(models.IncomingActivity.next_try.asc())
)
).scalar_one()
next_activity.tries = next_activity.tries + 1
next_activity.last_try = now()
try:
await save_to_inbox(
db_session,
next_activity.ap_object,
next_activity.sent_by_ap_actor_id,
)
except Exception:
logger.exception("Failed")
next_activity.error = traceback.format_exc()
_set_next_try(next_activity)
else:
logger.info("Success")
next_activity.is_processed = True
await db_session.commit()
return True
async def loop() -> None:
async with async_session() as db_session:
while 1:
try:
await process_next_incoming_activity(db_session)
except Exception:
logger.exception("Failed to process next incoming activity")
raise
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(loop())

View File

@ -42,7 +42,6 @@ from app import webmentions
from app.actor import LOCAL_ACTOR
from app.actor import get_actors_metadata
from app.boxes import public_outbox_objects_count
from app.boxes import save_to_inbox
from app.config import BASE_URL
from app.config import DEBUG
from app.config import DOMAIN
@ -54,6 +53,7 @@ from app.config import is_activitypub_requested
from app.config import verify_csrf_token
from app.database import AsyncSession
from app.database import get_db_session
from app.incoming_activities import new_ap_incoming_activity
from app.templates import is_current_user_admin
from app.uploads import UPLOAD_DIR
from app.utils import pagination
@ -657,8 +657,8 @@ async def inbox(
logger.info(f"headers={request.headers}")
payload = await request.json()
logger.info(f"{payload=}")
await save_to_inbox(db_session, payload, httpsig_info)
return Response(status_code=204)
await new_ap_incoming_activity(db_session, httpsig_info, payload)
return Response(status_code=202)
@app.get("/remote_follow")

View File

@ -316,6 +316,29 @@ class Notification(Base):
inbox_object = relationship(InboxObject, uselist=False)
class IncomingActivity(Base):
__tablename__ = "incoming_activity"
id = Column(Integer, primary_key=True, index=True)
created_at = Column(DateTime(timezone=True), nullable=False, default=now)
# An incoming activity can be a webmention
webmention_source = Column(String, nullable=True)
# or an AP object
sent_by_ap_actor_id = Column(String, nullable=True)
ap_id = Column(String, nullable=True, index=True)
ap_object: Mapped[ap.RawObject] = Column(JSON, nullable=True)
tries = Column(Integer, nullable=False, default=0)
next_try = Column(DateTime(timezone=True), nullable=True, default=now)
last_try = Column(DateTime(timezone=True), nullable=True)
is_processed = Column(Boolean, nullable=False, default=False)
is_errored = Column(Boolean, nullable=False, default=False)
error = Column(String, nullable=True)
class OutgoingActivity(Base):
__tablename__ = "outgoing_activity"

View File

@ -45,9 +45,7 @@
</div>
<footer class="footer">
<div class="box">
Powered by <a href="https://docs.microblog.pub">microblog.pub</a> <small class="microblogpub-version"><code>{{ microblogpub_version }}</code></small> and the <a href="https://activitypub.rocks/">ActivityPub</a> protocol
</div>
Powered by <a href="https://docs.microblog.pub">microblog.pub</a> <small class="microblogpub-version"><code>{{ microblogpub_version }}</code></small> and the <a href="https://activitypub.rocks/">ActivityPub</a> protocol
</footer>
</body>
</html>

View File

@ -1,3 +1,4 @@
import asyncio
import io
import tarfile
from pathlib import Path
@ -64,6 +65,14 @@ def process_outgoing_activities(ctx):
loop()
@task
def process_incoming_activities(ctx):
# type: (Context) -> None
from app.incoming_activities import loop
asyncio.run(loop())
@task
def tests(ctx, k=None):
# type: (Context, Optional[str]) -> None

View File

@ -54,7 +54,10 @@ def test_inbox_follow_request(
)
# Then the server returns a 204
assert response.status_code == 204
assert response.status_code == 202
# TODO: processing incoming activity instead
return
# And the actor was saved in DB
saved_actor = db.query(models.Actor).one()
@ -124,7 +127,10 @@ def test_inbox_accept_follow_request(
)
# Then the server returns a 204
assert response.status_code == 204
assert response.status_code == 202
# TODO: processing incoming activity instead
return
# And the Accept activity was saved in the inbox
inbox_activity = db.query(models.InboxObject).one()