Improve conversation/threads handling
parent
47b898bb13
commit
7d9ced7740
|
@ -0,0 +1,40 @@
|
||||||
|
"""New conversation field
|
||||||
|
|
||||||
|
Revision ID: 9bc69ed947e2
|
||||||
|
Revises: 1702e88016db
|
||||||
|
Create Date: 2022-08-14 16:38:37.688377+00:00
|
||||||
|
|
||||||
|
"""
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '9bc69ed947e2'
|
||||||
|
down_revision = '1702e88016db'
|
||||||
|
branch_labels = None
|
||||||
|
depends_on = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
with op.batch_alter_table('inbox', schema=None) as batch_op:
|
||||||
|
batch_op.add_column(sa.Column('conversation', sa.String(), nullable=True))
|
||||||
|
|
||||||
|
with op.batch_alter_table('outbox', schema=None) as batch_op:
|
||||||
|
batch_op.add_column(sa.Column('conversation', sa.String(), nullable=True))
|
||||||
|
|
||||||
|
op.execute("UPDATE inbox SET conversation = ap_context")
|
||||||
|
op.execute("UPDATE outbox SET conversation = ap_context")
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
with op.batch_alter_table('outbox', schema=None) as batch_op:
|
||||||
|
batch_op.drop_column('conversation')
|
||||||
|
|
||||||
|
with op.batch_alter_table('inbox', schema=None) as batch_op:
|
||||||
|
batch_op.drop_column('conversation')
|
||||||
|
|
||||||
|
# ### end Alembic commands ###
|
60
app/boxes.py
60
app/boxes.py
|
@ -58,6 +58,7 @@ async def save_outbox_object(
|
||||||
relates_to_actor_id: int | None = None,
|
relates_to_actor_id: int | None = None,
|
||||||
source: str | None = None,
|
source: str | None = None,
|
||||||
is_transient: bool = False,
|
is_transient: bool = False,
|
||||||
|
conversation: str | None = None,
|
||||||
) -> models.OutboxObject:
|
) -> models.OutboxObject:
|
||||||
ro = await RemoteObject.from_raw_object(raw_object)
|
ro = await RemoteObject.from_raw_object(raw_object)
|
||||||
|
|
||||||
|
@ -76,6 +77,7 @@ async def save_outbox_object(
|
||||||
is_hidden_from_homepage=True if ro.in_reply_to else False,
|
is_hidden_from_homepage=True if ro.in_reply_to else False,
|
||||||
source=source,
|
source=source,
|
||||||
is_transient=is_transient,
|
is_transient=is_transient,
|
||||||
|
conversation=conversation,
|
||||||
)
|
)
|
||||||
db_session.add(outbox_object)
|
db_session.add(outbox_object)
|
||||||
await db_session.flush()
|
await db_session.flush()
|
||||||
|
@ -292,6 +294,38 @@ async def send_undo(db_session: AsyncSession, ap_object_id: str) -> None:
|
||||||
await db_session.commit()
|
await db_session.commit()
|
||||||
|
|
||||||
|
|
||||||
|
async def fetch_conversation_root(
|
||||||
|
db_session: AsyncSession,
|
||||||
|
obj: AnyboxObject | RemoteObject,
|
||||||
|
is_root: bool = False,
|
||||||
|
) -> str:
|
||||||
|
if not obj.in_reply_to or is_root:
|
||||||
|
if obj.ap_context:
|
||||||
|
return obj.ap_context
|
||||||
|
else:
|
||||||
|
# Use the root AP ID if there'no context
|
||||||
|
return f"microblogpub:root:{obj.ap_id}"
|
||||||
|
else:
|
||||||
|
in_reply_to_object: AnyboxObject | RemoteObject | None = (
|
||||||
|
await get_anybox_object_by_ap_id(db_session, obj.in_reply_to)
|
||||||
|
)
|
||||||
|
if not in_reply_to_object:
|
||||||
|
try:
|
||||||
|
raw_reply = await ap.fetch(ap.get_id(obj.in_reply_to))
|
||||||
|
raw_reply_actor = await fetch_actor(
|
||||||
|
db_session, ap.get_actor_id(raw_reply)
|
||||||
|
)
|
||||||
|
in_reply_to_object = RemoteObject(raw_reply, actor=raw_reply_actor)
|
||||||
|
except httpx.HTTPStatusError as http_status_error:
|
||||||
|
if 400 <= http_status_error.response.status_code < 500:
|
||||||
|
# We may not have access, in this case consider if root
|
||||||
|
return await fetch_conversation_root(db_session, obj, is_root=True)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
return await fetch_conversation_root(db_session, in_reply_to_object)
|
||||||
|
|
||||||
|
|
||||||
async def send_create(
|
async def send_create(
|
||||||
db_session: AsyncSession,
|
db_session: AsyncSession,
|
||||||
ap_type: str,
|
ap_type: str,
|
||||||
|
@ -309,6 +343,7 @@ async def send_create(
|
||||||
note_id = allocate_outbox_id()
|
note_id = allocate_outbox_id()
|
||||||
published = now().replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
published = now().replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
||||||
context = f"{ID}/contexts/" + uuid.uuid4().hex
|
context = f"{ID}/contexts/" + uuid.uuid4().hex
|
||||||
|
conversation = context
|
||||||
content, tags, mentioned_actors = await markdownify(db_session, source)
|
content, tags, mentioned_actors = await markdownify(db_session, source)
|
||||||
attachments = []
|
attachments = []
|
||||||
|
|
||||||
|
@ -318,8 +353,16 @@ async def send_create(
|
||||||
raise ValueError(f"Invalid in reply to {in_reply_to=}")
|
raise ValueError(f"Invalid in reply to {in_reply_to=}")
|
||||||
if not in_reply_to_object.ap_context:
|
if not in_reply_to_object.ap_context:
|
||||||
logger.warning(f"Replied object {in_reply_to} has no context")
|
logger.warning(f"Replied object {in_reply_to} has no context")
|
||||||
|
try:
|
||||||
|
conversation = await fetch_conversation_root(
|
||||||
|
db_session,
|
||||||
|
in_reply_to_object,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception(f"Failed to fetch convo root {in_reply_to}")
|
||||||
else:
|
else:
|
||||||
context = in_reply_to_object.ap_context
|
context = in_reply_to_object.ap_context
|
||||||
|
conversation = in_reply_to_object.ap_context
|
||||||
|
|
||||||
if in_reply_to_object.is_from_outbox:
|
if in_reply_to_object.is_from_outbox:
|
||||||
await db_session.execute(
|
await db_session.execute(
|
||||||
|
@ -409,7 +452,13 @@ async def send_create(
|
||||||
"attachment": attachments,
|
"attachment": attachments,
|
||||||
**extra_obj_attrs, # type: ignore
|
**extra_obj_attrs, # type: ignore
|
||||||
}
|
}
|
||||||
outbox_object = await save_outbox_object(db_session, note_id, obj, source=source)
|
outbox_object = await save_outbox_object(
|
||||||
|
db_session,
|
||||||
|
note_id,
|
||||||
|
obj,
|
||||||
|
source=source,
|
||||||
|
conversation=conversation,
|
||||||
|
)
|
||||||
if not outbox_object.id:
|
if not outbox_object.id:
|
||||||
raise ValueError("Should never happen")
|
raise ValueError("Should never happen")
|
||||||
|
|
||||||
|
@ -1211,6 +1260,7 @@ async def _process_note_object(
|
||||||
ap_type=ro.ap_type,
|
ap_type=ro.ap_type,
|
||||||
ap_id=ro.ap_id,
|
ap_id=ro.ap_id,
|
||||||
ap_context=ro.ap_context,
|
ap_context=ro.ap_context,
|
||||||
|
conversation=await fetch_conversation_root(db_session, ro),
|
||||||
ap_published_at=ap_published_at,
|
ap_published_at=ap_published_at,
|
||||||
ap_object=ro.ap_object,
|
ap_object=ro.ap_object,
|
||||||
visibility=ro.visibility,
|
visibility=ro.visibility,
|
||||||
|
@ -1716,7 +1766,7 @@ async def get_replies_tree(
|
||||||
) -> ReplyTreeNode:
|
) -> ReplyTreeNode:
|
||||||
# XXX: PeerTube video don't use context
|
# XXX: PeerTube video don't use context
|
||||||
tree_nodes: list[AnyboxObject] = []
|
tree_nodes: list[AnyboxObject] = []
|
||||||
if requested_object.ap_context is None:
|
if requested_object.conversation is None:
|
||||||
tree_nodes = [requested_object]
|
tree_nodes = [requested_object]
|
||||||
else:
|
else:
|
||||||
# TODO: handle visibility
|
# TODO: handle visibility
|
||||||
|
@ -1725,7 +1775,8 @@ async def get_replies_tree(
|
||||||
await db_session.scalars(
|
await db_session.scalars(
|
||||||
select(models.InboxObject)
|
select(models.InboxObject)
|
||||||
.where(
|
.where(
|
||||||
models.InboxObject.ap_context == requested_object.ap_context,
|
models.InboxObject.conversation
|
||||||
|
== requested_object.conversation,
|
||||||
models.InboxObject.ap_type.in_(["Note", "Page", "Article"]),
|
models.InboxObject.ap_type.in_(["Note", "Page", "Article"]),
|
||||||
models.InboxObject.is_deleted.is_(False),
|
models.InboxObject.is_deleted.is_(False),
|
||||||
)
|
)
|
||||||
|
@ -1740,7 +1791,8 @@ async def get_replies_tree(
|
||||||
await db_session.scalars(
|
await db_session.scalars(
|
||||||
select(models.OutboxObject)
|
select(models.OutboxObject)
|
||||||
.where(
|
.where(
|
||||||
models.OutboxObject.ap_context == requested_object.ap_context,
|
models.OutboxObject.conversation
|
||||||
|
== requested_object.conversation,
|
||||||
models.OutboxObject.is_deleted.is_(False),
|
models.OutboxObject.is_deleted.is_(False),
|
||||||
models.OutboxObject.ap_type.in_(["Note", "Page", "Article"]),
|
models.OutboxObject.ap_type.in_(["Note", "Page", "Article"]),
|
||||||
)
|
)
|
||||||
|
|
|
@ -83,6 +83,7 @@ class InboxObject(Base, BaseObject):
|
||||||
activity_object_ap_id = Column(String, nullable=True, index=True)
|
activity_object_ap_id = Column(String, nullable=True, index=True)
|
||||||
|
|
||||||
visibility = Column(Enum(ap.VisibilityEnum), nullable=False)
|
visibility = Column(Enum(ap.VisibilityEnum), nullable=False)
|
||||||
|
conversation = Column(String, nullable=True)
|
||||||
|
|
||||||
# Used for Like, Announce and Undo activities
|
# Used for Like, Announce and Undo activities
|
||||||
relates_to_inbox_object_id = Column(
|
relates_to_inbox_object_id = Column(
|
||||||
|
@ -166,6 +167,7 @@ class OutboxObject(Base, BaseObject):
|
||||||
|
|
||||||
ap_published_at = Column(DateTime(timezone=True), nullable=False, default=now)
|
ap_published_at = Column(DateTime(timezone=True), nullable=False, default=now)
|
||||||
visibility = Column(Enum(ap.VisibilityEnum), nullable=False)
|
visibility = Column(Enum(ap.VisibilityEnum), nullable=False)
|
||||||
|
conversation = Column(String, nullable=True)
|
||||||
|
|
||||||
likes_count = Column(Integer, nullable=False, default=0)
|
likes_count = Column(Integer, nullable=False, default=0)
|
||||||
announces_count = Column(Integer, nullable=False, default=0)
|
announces_count = Column(Integer, nullable=False, default=0)
|
||||||
|
|
Loading…
Reference in New Issue