Improve pruning process
parent
6f9d9d7d9d
commit
b2c161466f
|
@ -0,0 +1,33 @@
|
||||||
|
"""Add InboxObject.has_local_mention
|
||||||
|
|
||||||
|
Revision ID: 604d125ea2fb
|
||||||
|
Revises: 5d3e3f2b9b4e
|
||||||
|
Create Date: 2022-08-19 12:46:22.239989+00:00
|
||||||
|
|
||||||
|
"""
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '604d125ea2fb'
|
||||||
|
down_revision = '5d3e3f2b9b4e'
|
||||||
|
branch_labels = None
|
||||||
|
depends_on = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
with op.batch_alter_table('inbox', schema=None) as batch_op:
|
||||||
|
batch_op.add_column(sa.Column('has_local_mention', sa.Boolean(), server_default='0', nullable=False))
|
||||||
|
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
op.execute("UPDATE inbox SET has_local_mention = 1 WHERE id IN (select inbox_object_id from notifications where notification_type = 'MENTION')")
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
with op.batch_alter_table('inbox', schema=None) as batch_op:
|
||||||
|
batch_op.drop_column('has_local_mention')
|
||||||
|
|
||||||
|
# ### end Alembic commands ###
|
|
@ -86,6 +86,10 @@ class InboxObject(Base, BaseObject):
|
||||||
visibility = Column(Enum(ap.VisibilityEnum), nullable=False)
|
visibility = Column(Enum(ap.VisibilityEnum), nullable=False)
|
||||||
conversation = Column(String, nullable=True)
|
conversation = Column(String, nullable=True)
|
||||||
|
|
||||||
|
has_local_mention = Column(
|
||||||
|
Boolean, nullable=False, default=False, server_default="0"
|
||||||
|
)
|
||||||
|
|
||||||
# Used for Like, Announce and Undo activities
|
# Used for Like, Announce and Undo activities
|
||||||
relates_to_inbox_object_id = Column(
|
relates_to_inbox_object_id = Column(
|
||||||
Integer,
|
Integer,
|
||||||
|
|
26
app/prune.py
26
app/prune.py
|
@ -3,8 +3,10 @@ from datetime import timedelta
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from sqlalchemy import and_
|
from sqlalchemy import and_
|
||||||
from sqlalchemy import delete
|
from sqlalchemy import delete
|
||||||
|
from sqlalchemy import func
|
||||||
from sqlalchemy import not_
|
from sqlalchemy import not_
|
||||||
from sqlalchemy import or_
|
from sqlalchemy import or_
|
||||||
|
from sqlalchemy import select
|
||||||
|
|
||||||
from app import activitypub as ap
|
from app import activitypub as ap
|
||||||
from app import models
|
from app import models
|
||||||
|
@ -20,6 +22,7 @@ async def prune_old_data(
|
||||||
) -> None:
|
) -> None:
|
||||||
logger.info(f"Pruning old data with {INBOX_RETENTION_DAYS=}")
|
logger.info(f"Pruning old data with {INBOX_RETENTION_DAYS=}")
|
||||||
await _prune_old_incoming_activities(db_session)
|
await _prune_old_incoming_activities(db_session)
|
||||||
|
await _prune_old_outgoing_activities(db_session)
|
||||||
await _prune_old_inbox_objects(db_session)
|
await _prune_old_inbox_objects(db_session)
|
||||||
|
|
||||||
await db_session.commit()
|
await db_session.commit()
|
||||||
|
@ -43,9 +46,29 @@ async def _prune_old_incoming_activities(
|
||||||
logger.info(f"Deleted {result.rowcount} old incoming activities") # type: ignore
|
logger.info(f"Deleted {result.rowcount} old incoming activities") # type: ignore
|
||||||
|
|
||||||
|
|
||||||
|
async def _prune_old_outgoing_activities(
|
||||||
|
db_session: AsyncSession,
|
||||||
|
) -> None:
|
||||||
|
result = await db_session.execute(
|
||||||
|
delete(models.OutgoingActivity)
|
||||||
|
.where(
|
||||||
|
models.OutgoingActivity.created_at
|
||||||
|
< now() - timedelta(days=INBOX_RETENTION_DAYS),
|
||||||
|
# Keep failed activity for debug
|
||||||
|
models.OutgoingActivity.is_errored.is_(False),
|
||||||
|
)
|
||||||
|
.execution_options(synchronize_session=False)
|
||||||
|
)
|
||||||
|
logger.info(f"Deleted {result.rowcount} old outgoing activities") # type: ignore
|
||||||
|
|
||||||
|
|
||||||
async def _prune_old_inbox_objects(
|
async def _prune_old_inbox_objects(
|
||||||
db_session: AsyncSession,
|
db_session: AsyncSession,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
outbox_conversation = select(func.distinct(models.OutboxObject.conversation)).where(
|
||||||
|
models.OutboxObject.conversation.is_not(None),
|
||||||
|
models.OutboxObject.conversation.not_like(f"{BASE_URL}%"),
|
||||||
|
)
|
||||||
result = await db_session.execute(
|
result = await db_session.execute(
|
||||||
delete(models.InboxObject)
|
delete(models.InboxObject)
|
||||||
.where(
|
.where(
|
||||||
|
@ -55,11 +78,14 @@ async def _prune_old_inbox_objects(
|
||||||
models.InboxObject.liked_via_outbox_object_ap_id.is_(None),
|
models.InboxObject.liked_via_outbox_object_ap_id.is_(None),
|
||||||
# Keep announced objects
|
# Keep announced objects
|
||||||
models.InboxObject.announced_via_outbox_object_ap_id.is_(None),
|
models.InboxObject.announced_via_outbox_object_ap_id.is_(None),
|
||||||
|
# Keep objects mentioning the local actor
|
||||||
|
models.InboxObject.has_local_mention.is_(False),
|
||||||
# Keep objects related to local conversations (i.e. don't break the
|
# Keep objects related to local conversations (i.e. don't break the
|
||||||
# public website)
|
# public website)
|
||||||
or_(
|
or_(
|
||||||
models.InboxObject.conversation.not_like(f"{BASE_URL}%"),
|
models.InboxObject.conversation.not_like(f"{BASE_URL}%"),
|
||||||
models.InboxObject.conversation.is_(None),
|
models.InboxObject.conversation.is_(None),
|
||||||
|
models.InboxObject.conversation.not_in(outbox_conversation),
|
||||||
),
|
),
|
||||||
# Keep activities related to the outbox (like Like/Announce/Follow...)
|
# Keep activities related to the outbox (like Like/Announce/Follow...)
|
||||||
or_(
|
or_(
|
||||||
|
|
Loading…
Reference in New Issue