233 lines
8.3 KiB
Python
233 lines
8.3 KiB
Python
from urllib.parse import urlparse
|
|
|
|
import httpx
|
|
from bs4 import BeautifulSoup # type: ignore
|
|
from fastapi import APIRouter
|
|
from fastapi import Depends
|
|
from fastapi import HTTPException
|
|
from fastapi import Request
|
|
from fastapi.responses import JSONResponse
|
|
from loguru import logger
|
|
from sqlalchemy import func
|
|
from sqlalchemy import select
|
|
|
|
from app import models
|
|
from app.boxes import _get_outbox_announces_count
|
|
from app.boxes import _get_outbox_likes_count
|
|
from app.boxes import _get_outbox_replies_count
|
|
from app.boxes import get_outbox_object_by_ap_id
|
|
from app.boxes import get_outbox_object_by_slug_and_short_id
|
|
from app.boxes import is_notification_enabled
|
|
from app.database import AsyncSession
|
|
from app.database import get_db_session
|
|
from app.utils import microformats
|
|
from app.utils.facepile import Face
|
|
from app.utils.facepile import WebmentionReply
|
|
from app.utils.url import check_url
|
|
from app.utils.url import is_url_valid
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def is_source_containing_target(source_html: str, target_url: str) -> bool:
|
|
soup = BeautifulSoup(source_html, "html5lib")
|
|
for link in soup.find_all("a"):
|
|
h = link.get("href")
|
|
if not is_url_valid(h):
|
|
continue
|
|
|
|
if h == target_url:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
@router.post("/webmentions")
|
|
async def webmention_endpoint(
|
|
request: Request,
|
|
db_session: AsyncSession = Depends(get_db_session),
|
|
) -> JSONResponse:
|
|
form_data = await request.form()
|
|
try:
|
|
source = form_data["source"]
|
|
target = form_data["target"]
|
|
|
|
if source == target:
|
|
raise ValueError("source URL is the same as target")
|
|
|
|
check_url(source)
|
|
check_url(target)
|
|
parsed_target_url = urlparse(target)
|
|
except Exception:
|
|
logger.exception("Invalid webmention request")
|
|
raise HTTPException(status_code=400, detail="Invalid payload")
|
|
|
|
logger.info(f"Received webmention {source=} {target=}")
|
|
|
|
existing_webmention_in_db = (
|
|
await db_session.execute(
|
|
select(models.Webmention).where(
|
|
models.Webmention.source == source,
|
|
models.Webmention.target == target,
|
|
)
|
|
)
|
|
).scalar_one_or_none()
|
|
if existing_webmention_in_db:
|
|
logger.info("Found existing Webmention, will try to update or delete")
|
|
|
|
mentioned_object = await get_outbox_object_by_ap_id(db_session, target)
|
|
|
|
if not mentioned_object and parsed_target_url.path.startswith("/articles/"):
|
|
try:
|
|
_, _, short_id, slug = parsed_target_url.path.split("/")
|
|
mentioned_object = await get_outbox_object_by_slug_and_short_id(
|
|
db_session, slug, short_id
|
|
)
|
|
except Exception:
|
|
logger.exception(f"Failed to match {target}")
|
|
|
|
if not mentioned_object:
|
|
logger.info(f"Invalid target {target=}")
|
|
|
|
if existing_webmention_in_db:
|
|
logger.info("Deleting existing Webmention")
|
|
existing_webmention_in_db.is_deleted = True
|
|
await db_session.commit()
|
|
raise HTTPException(status_code=400, detail="Invalid target")
|
|
|
|
is_webmention_deleted = False
|
|
try:
|
|
data_and_html = await microformats.fetch_and_parse(source)
|
|
except microformats.URLNotFoundOrGone:
|
|
is_webmention_deleted = True
|
|
except httpx.HTTPError:
|
|
raise HTTPException(status_code=500, detail=f"Fetch to process {source}")
|
|
|
|
data, html = data_and_html
|
|
is_target_found_in_source = is_source_containing_target(html, target)
|
|
|
|
data, html = data_and_html
|
|
if is_webmention_deleted or not is_target_found_in_source:
|
|
logger.warning(f"target {target=} not found in source")
|
|
if existing_webmention_in_db:
|
|
logger.info("Deleting existing Webmention")
|
|
existing_webmention_in_db.is_deleted = True
|
|
await db_session.flush()
|
|
|
|
# Revert side effects
|
|
await _handle_webmention_side_effects(
|
|
db_session, existing_webmention_in_db, mentioned_object
|
|
)
|
|
|
|
if is_notification_enabled(models.NotificationType.DELETED_WEBMENTION):
|
|
notif = models.Notification(
|
|
notification_type=models.NotificationType.DELETED_WEBMENTION,
|
|
outbox_object_id=mentioned_object.id,
|
|
webmention_id=existing_webmention_in_db.id,
|
|
)
|
|
db_session.add(notif)
|
|
|
|
await db_session.commit()
|
|
|
|
if not is_target_found_in_source:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="target not found in source",
|
|
)
|
|
else:
|
|
return JSONResponse(content={}, status_code=200)
|
|
|
|
webmention_type = models.WebmentionType.UNKNOWN
|
|
webmention: models.Webmention
|
|
if existing_webmention_in_db:
|
|
# Undelete if needed
|
|
existing_webmention_in_db.is_deleted = False
|
|
existing_webmention_in_db.source_microformats = data
|
|
await db_session.flush()
|
|
webmention = existing_webmention_in_db
|
|
|
|
if is_notification_enabled(models.NotificationType.UPDATED_WEBMENTION):
|
|
notif = models.Notification(
|
|
notification_type=models.NotificationType.UPDATED_WEBMENTION,
|
|
outbox_object_id=mentioned_object.id,
|
|
webmention_id=existing_webmention_in_db.id,
|
|
)
|
|
db_session.add(notif)
|
|
else:
|
|
new_webmention = models.Webmention(
|
|
source=source,
|
|
target=target,
|
|
source_microformats=data,
|
|
outbox_object_id=mentioned_object.id,
|
|
webmention_type=webmention_type,
|
|
)
|
|
db_session.add(new_webmention)
|
|
await db_session.flush()
|
|
webmention = new_webmention
|
|
|
|
if is_notification_enabled(models.NotificationType.NEW_WEBMENTION):
|
|
notif = models.Notification(
|
|
notification_type=models.NotificationType.NEW_WEBMENTION,
|
|
outbox_object_id=mentioned_object.id,
|
|
webmention_id=new_webmention.id,
|
|
)
|
|
db_session.add(notif)
|
|
|
|
# Determine the webmention type
|
|
for item in data.get("items", []):
|
|
if target in item.get("properties", {}).get(
|
|
"in-reply-to", []
|
|
) and WebmentionReply.from_webmention(webmention):
|
|
webmention_type = models.WebmentionType.REPLY
|
|
break
|
|
elif target in item.get("properties", {}).get(
|
|
"like-of", []
|
|
) and Face.from_webmention(webmention):
|
|
webmention_type = models.WebmentionType.LIKE
|
|
break
|
|
elif target in item.get("properties", {}).get(
|
|
"repost-of", []
|
|
) and Face.from_webmention(webmention):
|
|
webmention_type = models.WebmentionType.REPOST
|
|
break
|
|
|
|
if webmention_type != models.WebmentionType.UNKNOWN:
|
|
webmention.webmention_type = webmention_type
|
|
await db_session.flush()
|
|
|
|
# Handle side effect
|
|
await _handle_webmention_side_effects(db_session, webmention, mentioned_object)
|
|
await db_session.commit()
|
|
|
|
return JSONResponse(content={}, status_code=200)
|
|
|
|
|
|
async def _handle_webmention_side_effects(
|
|
db_session: AsyncSession,
|
|
webmention: models.Webmention,
|
|
mentioned_object: models.OutboxObject,
|
|
) -> None:
|
|
if webmention.webmention_type == models.WebmentionType.UNKNOWN:
|
|
# TODO: recount everything
|
|
mentioned_object.webmentions_count = await db_session.scalar(
|
|
select(func.count(models.Webmention.id)).where(
|
|
models.Webmention.is_deleted.is_(False),
|
|
models.Webmention.outbox_object_id == mentioned_object.id,
|
|
models.Webmention.webmention_type == models.WebmentionType.UNKNOWN,
|
|
)
|
|
)
|
|
elif webmention.webmention_type == models.WebmentionType.LIKE:
|
|
mentioned_object.likes_count = await _get_outbox_likes_count(
|
|
db_session, mentioned_object
|
|
)
|
|
elif webmention.webmention_type == models.WebmentionType.REPOST:
|
|
mentioned_object.announces_count = await _get_outbox_announces_count(
|
|
db_session, mentioned_object
|
|
)
|
|
elif webmention.webmention_type == models.WebmentionType.REPLY:
|
|
mentioned_object.replies_count = await _get_outbox_replies_count(
|
|
db_session, mentioned_object
|
|
)
|
|
else:
|
|
raise ValueError(f"Unhandled {webmention.webmention_type} webmention")
|