Support for sending actor updates

main
Thomas Sileo 2022-07-06 21:13:55 +02:00
parent 786072ec8a
commit a0471cbedc
2 changed files with 117 additions and 0 deletions

View File

@ -1,3 +1,4 @@
import hashlib
import typing import typing
from dataclasses import dataclass from dataclasses import dataclass
from typing import Union from typing import Union
@ -226,3 +227,29 @@ async def get_actors_metadata(
inbox_follow_ap_id=followers.get(actor.ap_id), inbox_follow_ap_id=followers.get(actor.ap_id),
) )
return idx return idx
def _actor_hash(actor: Actor) -> bytes:
"""Used to detect when an actor is updated"""
h = hashlib.blake2b(digest_size=32)
h.update(actor.ap_id.encode())
h.update(actor.handle.encode())
if actor.name:
h.update(actor.name.encode())
if actor.summary:
h.update(actor.summary.encode())
if actor.url:
h.update(actor.url.encode())
h.update(actor.display_name.encode())
if actor.icon_url:
h.update(actor.icon_url.encode())
h.update(actor.public_key_id.encode())
h.update(actor.public_key_as_pem.encode())
return h.digest()

View File

@ -15,6 +15,8 @@ from app import activitypub as ap
from app import config from app import config
from app import ldsig from app import ldsig
from app import models from app import models
from app.actor import LOCAL_ACTOR
from app.actor import _actor_hash
from app.config import KEY_PATH from app.config import KEY_PATH
from app.database import AsyncSession from app.database import AsyncSession
from app.database import SessionLocal from app.database import SessionLocal
@ -27,6 +29,92 @@ k = Key(config.ID, f"{config.ID}#main-key")
k.load(KEY_PATH.read_text()) k.load(KEY_PATH.read_text())
def _is_local_actor_updated() -> bool:
"""Returns True if the local actor was updated, i.e. updated via the config file"""
actor_hash = _actor_hash(LOCAL_ACTOR)
actor_hash_cache = config.ROOT_DIR / "data" / "local_actor_hash.dat"
if not actor_hash_cache.exists():
logger.info("Initializing local actor hash cache")
actor_hash_cache.write_bytes(actor_hash)
return False
previous_actor_hash = actor_hash_cache.read_bytes()
if previous_actor_hash == actor_hash:
logger.info("Local actor hasn't been updated")
return False
actor_hash_cache.write_bytes(actor_hash)
logger.info("Local actor has been updated")
return True
def _send_actor_update_if_needed(db_session: Session) -> None:
"""The process for sending an update for the local actor is done here as
in production, we may have multiple uvicorn worker and this worker will
always run in a single process."""
if not _is_local_actor_updated():
return
logger.info("Will send an Update for the local actor")
from app.boxes import RemoteObject
from app.boxes import allocate_outbox_id
from app.boxes import outbox_object_id
update_activity_id = allocate_outbox_id()
update_activity = {
"@context": ap.AS_EXTENDED_CTX,
"id": outbox_object_id(update_activity_id),
"type": "Update",
"to": [ap.AS_PUBLIC],
"actor": config.ID,
"object": ap.remove_context(LOCAL_ACTOR.ap_actor),
}
ro = RemoteObject(update_activity, actor=LOCAL_ACTOR)
outbox_object = models.OutboxObject(
public_id=update_activity_id,
ap_type=ro.ap_type,
ap_id=ro.ap_id,
ap_context=ro.ap_context,
ap_object=ro.ap_object,
visibility=ro.visibility,
og_meta=None,
relates_to_inbox_object_id=None,
relates_to_outbox_object_id=None,
relates_to_actor_id=None,
activity_object_ap_id=LOCAL_ACTOR.ap_id,
is_hidden_from_homepage=True,
source=None,
)
db_session.add(outbox_object)
db_session.flush()
# TODO(ts): also send to every actor we contact (distinct on recipient)
followers = (
(
db_session.scalars(
select(models.Follower).options(joinedload(models.Follower.actor))
)
)
.unique()
.all()
)
for rcp in {
follower.actor.shared_inbox_url or follower.actor.inbox_url
for follower in followers
}:
outgoing_activity = models.OutgoingActivity(
recipient=rcp,
outbox_object_id=outbox_object.id,
inbox_object_id=None,
)
db_session.add(outgoing_activity)
db_session.commit()
async def new_outgoing_activity( async def new_outgoing_activity(
db_session: AsyncSession, db_session: AsyncSession,
recipient: str, recipient: str,
@ -116,6 +204,7 @@ def process_next_outgoing_activity(db: Session) -> bool:
# Use LD sig if the activity may need to be forwarded by recipients # Use LD sig if the activity may need to be forwarded by recipients
if next_activity.anybox_object.is_from_outbox and payload["type"] in [ if next_activity.anybox_object.is_from_outbox and payload["type"] in [
"Create", "Create",
"Update",
"Delete", "Delete",
]: ]:
# But only if the object is public (to help with deniability/privacy) # But only if the object is public (to help with deniability/privacy)
@ -160,6 +249,7 @@ def process_next_outgoing_activity(db: Session) -> bool:
def loop() -> None: def loop() -> None:
db = SessionLocal() db = SessionLocal()
_send_actor_update_if_needed(db)
while 1: while 1:
try: try:
process_next_outgoing_activity(db) process_next_outgoing_activity(db)