microblog/app/ap_object.py

339 lines
9.8 KiB
Python
Raw Normal View History

2022-06-22 18:11:22 +00:00
import hashlib
import mimetypes
2022-06-22 18:11:22 +00:00
from datetime import datetime
2022-07-15 18:01:55 +00:00
from functools import cached_property
2022-06-22 18:11:22 +00:00
from typing import Any
import pydantic
2022-08-03 17:54:11 +00:00
from bs4 import BeautifulSoup # type: ignore
2022-10-19 18:46:01 +00:00
from mistletoe import markdown # type: ignore
2022-06-22 18:11:22 +00:00
from app import activitypub as ap
from app.actor import LOCAL_ACTOR
from app.actor import Actor
from app.actor import RemoteActor
2022-06-25 06:23:28 +00:00
from app.media import proxied_media_url
2022-07-23 21:06:30 +00:00
from app.utils.datetime import now
2022-07-11 10:48:38 +00:00
from app.utils.datetime import parse_isoformat
2022-06-22 18:11:22 +00:00
class Object:
@property
def is_from_db(self) -> bool:
return False
2022-06-25 08:20:07 +00:00
@property
def is_from_outbox(self) -> bool:
return False
@property
def is_from_inbox(self) -> bool:
return False
2022-07-15 18:01:55 +00:00
@cached_property
2022-06-22 18:11:22 +00:00
def ap_type(self) -> str:
2022-08-13 20:37:44 +00:00
return ap.as_list(self.ap_object["type"])[0]
2022-06-22 18:11:22 +00:00
@property
def ap_object(self) -> ap.RawObject:
raise NotImplementedError
@property
def ap_id(self) -> str:
return ap.get_id(self.ap_object["id"])
@property
def ap_actor_id(self) -> str:
return ap.get_actor_id(self.ap_object)
2022-07-15 18:01:55 +00:00
@cached_property
2022-06-22 18:11:22 +00:00
def ap_published_at(self) -> datetime | None:
# TODO: default to None? or now()?
if "published" in self.ap_object:
2022-07-11 10:48:38 +00:00
return parse_isoformat(self.ap_object["published"])
2022-06-22 18:11:22 +00:00
elif "created" in self.ap_object:
2022-07-11 10:48:38 +00:00
return parse_isoformat(self.ap_object["created"])
2022-06-22 18:11:22 +00:00
return None
@property
def actor(self) -> Actor:
raise NotImplementedError()
2022-07-15 18:01:55 +00:00
@cached_property
2022-06-22 18:11:22 +00:00
def visibility(self) -> ap.VisibilityEnum:
return ap.object_visibility(self.ap_object, self.actor)
2022-06-22 18:11:22 +00:00
@property
2022-06-24 20:41:43 +00:00
def ap_context(self) -> str | None:
2022-06-24 09:33:05 +00:00
return self.ap_object.get("context") or self.ap_object.get("conversation")
2022-06-22 18:11:22 +00:00
@property
def sensitive(self) -> bool:
return self.ap_object.get("sensitive", False)
@property
def tags(self) -> list[ap.RawObject]:
2022-07-10 20:59:45 +00:00
return ap.as_list(self.ap_object.get("tag", []))
2022-08-03 17:54:11 +00:00
@cached_property
def inlined_images(self) -> set[str]:
image_urls: set[str] = set()
if not self.content:
return image_urls
soup = BeautifulSoup(self.content, "html5lib")
imgs = soup.find_all("img")
for img in imgs:
if not img.attrs.get("src"):
continue
image_urls.add(img.attrs["src"])
return image_urls
2022-07-15 18:01:55 +00:00
@cached_property
2022-06-24 09:33:05 +00:00
def attachments(self) -> list["Attachment"]:
2022-06-23 19:07:20 +00:00
attachments = []
2022-07-10 20:59:45 +00:00
for obj in ap.as_list(self.ap_object.get("attachment", [])):
if obj.get("type") == "PropertyValue":
continue
2022-07-20 18:59:29 +00:00
if obj.get("type") == "Link":
attachments.append(
Attachment.parse_obj(
{
"proxiedUrl": None,
"resizedUrl": None,
"mediaType": None,
"type": "Link",
"url": obj["href"],
}
)
)
continue
2022-06-25 06:23:28 +00:00
proxied_url = proxied_media_url(obj["url"])
2022-06-23 19:07:20 +00:00
attachments.append(
Attachment.parse_obj(
{
"proxiedUrl": proxied_url,
"resizedUrl": proxied_url + "/740"
2022-08-03 17:54:11 +00:00
if obj.get("mediaType", "").startswith("image")
2022-06-23 19:07:20 +00:00
else None,
**obj,
}
)
)
2022-06-22 18:11:22 +00:00
# Also add any video Link (for PeerTube compat)
if self.ap_type == "Video":
for link in ap.as_list(self.ap_object.get("url", [])):
if (isinstance(link, dict)) and link.get("type") == "Link":
if link.get("mediaType", "").startswith("video"):
2022-06-25 06:23:28 +00:00
proxied_url = proxied_media_url(link["href"])
2022-06-22 18:11:22 +00:00
attachments.append(
Attachment(
type="Video",
mediaType=link["mediaType"],
url=link["href"],
2022-06-23 19:07:20 +00:00
proxiedUrl=proxied_url,
2022-06-22 18:11:22 +00:00
)
)
break
2022-07-07 18:37:16 +00:00
elif link.get("mediaType", "") == "application/x-mpegURL":
for tag in ap.as_list(link.get("tag", [])):
if tag.get("mediaType", "").startswith("video"):
proxied_url = proxied_media_url(tag["href"])
attachments.append(
Attachment(
type="Video",
mediaType=tag["mediaType"],
url=tag["href"],
proxiedUrl=proxied_url,
)
)
break
2022-06-22 18:11:22 +00:00
return attachments
2022-07-24 10:36:59 +00:00
@cached_property
2022-06-22 18:11:22 +00:00
def url(self) -> str | None:
obj_url = self.ap_object.get("url")
2022-09-23 07:00:23 +00:00
if isinstance(obj_url, str) and obj_url:
2022-06-22 18:11:22 +00:00
return obj_url
elif obj_url:
for u in ap.as_list(obj_url):
2022-07-24 10:36:59 +00:00
if u.get("type") == "Link":
return u["href"]
2022-06-22 18:11:22 +00:00
if u["mediaType"] == "text/html":
return u["href"]
2022-07-19 18:56:54 +00:00
return self.ap_id
2022-06-22 18:11:22 +00:00
2022-07-15 18:01:55 +00:00
@cached_property
2022-06-22 18:11:22 +00:00
def content(self) -> str | None:
content = self.ap_object.get("content")
if not content:
return None
# PeerTube returns the content as markdown
if self.ap_object.get("mediaType") == "text/markdown":
2022-10-19 18:46:01 +00:00
content = markdown(content)
2022-06-22 18:11:22 +00:00
2022-08-09 21:09:37 +00:00
return content
2022-06-22 18:11:22 +00:00
2022-06-28 19:10:22 +00:00
@property
def summary(self) -> str | None:
return self.ap_object.get("summary")
@property
def name(self) -> str | None:
return self.ap_object.get("name")
2022-07-15 18:01:55 +00:00
@cached_property
2022-06-22 18:11:22 +00:00
def permalink_id(self) -> str:
return (
"permalink-"
+ hashlib.md5(
self.ap_id.encode(),
usedforsecurity=False,
).hexdigest()
)
@property
def activity_object_ap_id(self) -> str | None:
if "object" in self.ap_object:
return ap.get_id(self.ap_object["object"])
return None
@property
def in_reply_to(self) -> str | None:
return self.ap_object.get("inReplyTo")
2022-09-01 19:00:14 +00:00
@property
def is_in_reply_to_from_inbox(self) -> bool | None:
if not self.in_reply_to:
return None
return not self.in_reply_to.startswith(LOCAL_ACTOR.ap_id)
2022-07-06 17:04:38 +00:00
@property
def has_ld_signature(self) -> bool:
return bool(self.ap_object.get("signature"))
2022-07-23 21:06:30 +00:00
@property
def is_poll_ended(self) -> bool:
if self.poll_end_time:
return now() > self.poll_end_time
2022-07-23 21:06:30 +00:00
return False
@cached_property
def poll_items(self) -> list[ap.RawObject] | None:
return self.ap_object.get("oneOf") or self.ap_object.get("anyOf")
@cached_property
def poll_end_time(self) -> datetime | None:
# Some polls may not have an end time
if self.ap_object.get("endTime"):
return parse_isoformat(self.ap_object["endTime"])
return None
@cached_property
def poll_voters_count(self) -> int | None:
if not self.poll_items:
return None
# Only Mastodon set this attribute
if self.ap_object.get("votersCount"):
return self.ap_object["votersCount"]
else:
voters_count = 0
for item in self.poll_items:
voters_count += item.get("replies", {}).get("totalItems", 0)
return voters_count
2022-07-23 21:06:30 +00:00
@cached_property
def is_one_of_poll(self) -> bool:
return bool(self.ap_object.get("oneOf"))
2022-06-22 18:11:22 +00:00
def _to_camel(string: str) -> str:
cased = "".join(word.capitalize() for word in string.split("_"))
return cased[0:1].lower() + cased[1:]
class BaseModel(pydantic.BaseModel):
class Config:
alias_generator = _to_camel
class Attachment(BaseModel):
type: str
2022-07-20 18:59:29 +00:00
media_type: str | None
2022-06-22 18:11:22 +00:00
name: str | None
url: str
width: int
height: int
2022-06-22 18:11:22 +00:00
2022-07-20 18:59:29 +00:00
# Extra fields for the templates (and only for media)
proxied_url: str | None = None
2022-06-23 19:07:20 +00:00
resized_url: str | None = None
@property
def mimetype(self) -> str:
mimetype = self.media_type
if not mimetype:
mimetype, _ = mimetypes.guess_type(self.url)
if not mimetype:
return "unknown"
return mimetype.split("/")[-1]
2022-06-22 18:11:22 +00:00
class RemoteObject(Object):
2022-06-29 22:28:07 +00:00
def __init__(self, raw_object: ap.RawObject, actor: Actor):
2022-06-22 18:11:22 +00:00
self._raw_object = raw_object
2022-06-29 22:28:07 +00:00
self._actor = actor
2022-06-22 18:11:22 +00:00
2022-06-29 22:28:07 +00:00
if self._actor.ap_id != ap.get_actor_id(self._raw_object):
raise ValueError(f"Invalid actor {self._actor.ap_id}")
@classmethod
async def from_raw_object(
cls,
raw_object: ap.RawObject,
actor: Actor | None = None,
):
2022-06-22 18:11:22 +00:00
# Pre-fetch the actor
actor_id = ap.get_actor_id(raw_object)
if actor_id == LOCAL_ACTOR.ap_id:
2022-06-29 22:28:07 +00:00
_actor = LOCAL_ACTOR
2022-06-22 18:11:22 +00:00
elif actor:
if actor.ap_id != actor_id:
raise ValueError(
f"Invalid actor, got {actor.ap_id}, " f"expected {actor_id}"
)
2022-06-29 22:28:07 +00:00
_actor = actor # type: ignore
2022-06-22 18:11:22 +00:00
else:
2022-06-29 22:28:07 +00:00
_actor = RemoteActor(
ap_actor=await ap.fetch(ap.get_actor_id(raw_object)),
2022-06-22 18:11:22 +00:00
)
2022-06-29 22:28:07 +00:00
return cls(raw_object, _actor)
2022-06-22 18:11:22 +00:00
@property
def og_meta(self) -> list[dict[str, Any]] | None:
return None
@property
def ap_object(self) -> ap.RawObject:
return self._raw_object
@property
def actor(self) -> Actor:
return self._actor