microblog/app/utils/opengraph.py

99 lines
2.6 KiB
Python
Raw Normal View History

2022-06-22 18:11:22 +00:00
import mimetypes
import re
from typing import Any
2022-06-22 18:11:22 +00:00
from urllib.parse import urlparse
import httpx
from bs4 import BeautifulSoup # type: ignore
from pydantic import BaseModel
from app import activitypub as ap
from app import config
2022-06-22 19:15:07 +00:00
from app.utils.url import is_url_valid
2022-06-22 18:11:22 +00:00
class OpenGraphMeta(BaseModel):
url: str
title: str
image: str
description: str
2022-07-31 13:27:12 +00:00
site_name: str | None = None
2022-06-22 18:11:22 +00:00
def _scrap_og_meta(html: str) -> OpenGraphMeta | None:
soup = BeautifulSoup(html, "html5lib")
ogs = {
og.attrs["property"]: og.attrs.get("content")
for og in soup.html.head.findAll(property=re.compile(r"^og"))
}
raw = {}
for field in OpenGraphMeta.__fields__.keys():
og_field = f"og:{field}"
2022-07-31 13:27:12 +00:00
if not ogs.get(og_field) and field != "site_name":
2022-06-22 18:11:22 +00:00
return None
2022-07-31 13:27:12 +00:00
raw[field] = ogs.get(og_field, None)
2022-06-22 18:11:22 +00:00
return OpenGraphMeta.parse_obj(raw)
def _urls_from_note(note: ap.RawObject) -> set[str]:
note_host = urlparse(ap.get_id(note["id"]) or "").netloc
2022-07-06 19:13:33 +00:00
tags_hrefs = set()
for tag in note.get("tag", []):
if tag_href := tag.get("href"):
tags_hrefs.add(tag_href)
2022-06-22 18:11:22 +00:00
urls = set()
if "content" in note:
soup = BeautifulSoup(note["content"], "html5lib")
for link in soup.find_all("a"):
h = link.get("href")
ph = urlparse(h)
mimetype, _ = mimetypes.guess_type(h)
if (
ph.scheme in {"http", "https"}
and ph.netloc != note_host
and is_url_valid(h)
and (
not mimetype
or mimetype.split("/")[0] in ["image", "video", "audio"]
)
):
urls.add(h)
2022-07-06 19:13:33 +00:00
return urls - tags_hrefs
2022-06-22 18:11:22 +00:00
async def _og_meta_from_url(url: str) -> OpenGraphMeta | None:
async with httpx.AsyncClient() as client:
resp = await client.get(
url,
headers={
"User-Agent": config.USER_AGENT,
},
follow_redirects=True,
)
2022-06-22 18:11:22 +00:00
resp.raise_for_status()
if not (ct := resp.headers.get("content-type")) or not ct.startswith("text/html"):
return None
return _scrap_og_meta(resp.text)
async def og_meta_from_note(note: ap.RawObject) -> list[dict[str, Any]]:
2022-06-22 18:11:22 +00:00
og_meta = []
urls = _urls_from_note(note)
for url in urls:
try:
maybe_og_meta = await _og_meta_from_url(url)
2022-06-22 18:11:22 +00:00
if maybe_og_meta:
og_meta.append(maybe_og_meta.dict())
2022-06-22 18:11:22 +00:00
except httpx.HTTPError:
pass
return og_meta