import hashlib
import hmac
import os
import secrets
from pathlib import Path
import bcrypt
import itsdangerous
import pydantic
import tomli
from fastapi import Form
from fastapi import HTTPException
from fastapi import Request
from itsdangerous import URLSafeTimedSerializer
from loguru import logger
from mistletoe import markdown # type: ignore
from app.customization import _CUSTOM_ROUTES
from app.customization import _StreamVisibilityCallback
from app.customization import default_stream_visibility_callback
from app.utils.emoji import _load_emojis
from app.utils.version import get_version_commit
ROOT_DIR = Path().parent.resolve()
_CONFIG_FILE = os.getenv("MICROBLOGPUB_CONFIG_FILE", "profile.toml")
VERSION_COMMIT = "dev"
try:
from app._version import VERSION_COMMIT # type: ignore
except ImportError:
VERSION_COMMIT = get_version_commit()
# Force reloading cache when the CSS is updated
CSS_HASH = "none"
try:
css_data = (ROOT_DIR / "app" / "static" / "css" / "main.css").read_bytes()
CSS_HASH = hashlib.md5(css_data, usedforsecurity=False).hexdigest()
except FileNotFoundError:
pass
# Force reloading cache when the JS is changed
JS_HASH = "none"
try:
# To keep things simple, we keep a single hash for the 2 files
dat = b""
for j in [
ROOT_DIR / "app" / "static" / "common.js",
ROOT_DIR / "app" / "static" / "common-admin.js",
ROOT_DIR / "app" / "static" / "new.js",
]:
dat += j.read_bytes()
JS_HASH = hashlib.md5(dat, usedforsecurity=False).hexdigest()
except FileNotFoundError:
pass
MOVED_TO_FILE = ROOT_DIR / "data" / "moved_to.dat"
def _get_moved_to() -> str | None:
if not MOVED_TO_FILE.exists():
return None
return MOVED_TO_FILE.read_text()
def set_moved_to(moved_to: str) -> None:
MOVED_TO_FILE.write_text(moved_to)
VERSION = f"2.0.0+{VERSION_COMMIT}"
USER_AGENT = f"microblogpub/{VERSION}"
AP_CONTENT_TYPE = "application/activity+json"
class _PrivacyReplace(pydantic.BaseModel):
domain: str
replace_by: str
class _ProfileMetadata(pydantic.BaseModel):
key: str
value: str
class _BlockedServer(pydantic.BaseModel):
hostname: str
reason: str | None = None
class Config(pydantic.BaseModel):
domain: str
username: str
admin_password: bytes
name: str
summary: str
https: bool
icon_url: str | None = None
image_url: str | None = None
secret: str
debug: bool = False
trusted_hosts: list[str] = ["127.0.0.1"]
tags: list[str] | None = None
manually_approves_followers: bool = False
privacy_replace: list[_PrivacyReplace] | None = None
metadata: list[_ProfileMetadata] | None = None
code_highlighting_theme = "friendly_grayscale"
blocked_servers: list[_BlockedServer] = []
custom_footer: str | None = None
emoji: str | None = None
also_known_as: str | None = None
hides_followers: bool = False
hides_following: bool = False
inbox_retention_days: int = 15
custom_content_security_policy: str | None = None
webfinger_domain: str | None = None
# Config items to make tests easier
sqlalchemy_database: str | None = None
key_path: str | None = None
session_timeout: int = 3600 * 24 * 3 # in seconds, 3 days by default
disabled_notifications: list[str] = []
# Only set when the app is served on a non-root path
id: str | None = None
def load_config() -> Config:
try:
return Config.parse_obj(
tomli.loads((ROOT_DIR / "data" / _CONFIG_FILE).read_text())
)
except FileNotFoundError:
raise ValueError(
f"Please run the configuration wizard, {_CONFIG_FILE} is missing"
)
def is_activitypub_requested(req: Request) -> bool:
accept_value = req.headers.get("accept")
if not accept_value:
return False
for val in {
"application/ld+json",
"application/activity+json",
}:
if accept_value.startswith(val):
return True
return False
def verify_password(pwd: str) -> bool:
return bcrypt.checkpw(pwd.encode(), CONFIG.admin_password)
CONFIG = load_config()
DOMAIN = CONFIG.domain
_SCHEME = "https" if CONFIG.https else "http"
ID = f"{_SCHEME}://{DOMAIN}"
# When running the app on a path, the ID maybe set by the config, but in this
# case, a valid webfinger must be served on the root domain
if CONFIG.id:
ID = CONFIG.id
USERNAME = CONFIG.username
# Allow to use @handle@webfinger-domain.tld while hosting the server at domain.tld
WEBFINGER_DOMAIN = CONFIG.webfinger_domain or DOMAIN
MANUALLY_APPROVES_FOLLOWERS = CONFIG.manually_approves_followers
HIDES_FOLLOWERS = CONFIG.hides_followers
HIDES_FOLLOWING = CONFIG.hides_following
PRIVACY_REPLACE = None
if CONFIG.privacy_replace:
PRIVACY_REPLACE = {pr.domain: pr.replace_by for pr in CONFIG.privacy_replace}
BLOCKED_SERVERS = {blocked_server.hostname for blocked_server in CONFIG.blocked_servers}
ALSO_KNOWN_AS = CONFIG.also_known_as
CUSTOM_CONTENT_SECURITY_POLICY = CONFIG.custom_content_security_policy
INBOX_RETENTION_DAYS = CONFIG.inbox_retention_days
SESSION_TIMEOUT = CONFIG.session_timeout
CUSTOM_FOOTER = (
markdown(CONFIG.custom_footer.replace("{version}", VERSION))
if CONFIG.custom_footer
else None
)
BASE_URL = ID
DEBUG = CONFIG.debug
DB_PATH = CONFIG.sqlalchemy_database or ROOT_DIR / "data" / "microblogpub.db"
SQLALCHEMY_DATABASE_URL = f"sqlite:///{DB_PATH}"
KEY_PATH = (
(ROOT_DIR / CONFIG.key_path) if CONFIG.key_path else ROOT_DIR / "data" / "key.pem"
)
EMOJIS = "😺 😸 😹 😻 😼 😽 🙀 😿 😾"
if CONFIG.emoji:
EMOJIS = CONFIG.emoji
# Emoji template for the FE
EMOJI_TPL = (
''
)
_load_emojis(ROOT_DIR, BASE_URL)
CODE_HIGHLIGHTING_THEME = CONFIG.code_highlighting_theme
MOVED_TO = _get_moved_to()
_NavBarItem = tuple[str, str]
class NavBarItems:
EXTRA_NAVBAR_ITEMS: list[_NavBarItem] = []
INDEX_NAVBAR_ITEM: _NavBarItem | None = None
NOTES_PATH = "/"
def load_custom_routes() -> None:
try:
from data import custom_routes # type: ignore # noqa: F401
except ImportError:
pass
for path, custom_handler in _CUSTOM_ROUTES.items():
# If a handler wants to replace the root, move the index to /notes
if path == "/":
NavBarItems.NOTES_PATH = "/notes"
NavBarItems.INDEX_NAVBAR_ITEM = (path, custom_handler.title)
else:
if custom_handler.show_in_navbar:
NavBarItems.EXTRA_NAVBAR_ITEMS.append((path, custom_handler.title))
session_serializer = URLSafeTimedSerializer(
CONFIG.secret,
salt=f"{ID}.session",
)
csrf_serializer = URLSafeTimedSerializer(
CONFIG.secret,
salt=f"{ID}.csrf",
)
def generate_csrf_token() -> str:
return csrf_serializer.dumps(secrets.token_hex(16)) # type: ignore
def verify_csrf_token(
csrf_token: str = Form(),
redirect_url: str | None = Form(None),
) -> None:
please_try_again = "please try again"
if redirect_url:
please_try_again = f'please try again'
try:
csrf_serializer.loads(csrf_token, max_age=1800)
except (itsdangerous.BadData, itsdangerous.SignatureExpired):
logger.exception("Failed to verify CSRF token")
raise HTTPException(
status_code=403,
detail=f"The security token has expired, {please_try_again}",
)
return None
def hmac_sha256() -> hmac.HMAC:
return hmac.new(CONFIG.secret.encode(), digestmod=hashlib.sha256)
stream_visibility_callback: _StreamVisibilityCallback
try:
from data.stream import ( # type: ignore # noqa: F401, E501
custom_stream_visibility_callback,
)
stream_visibility_callback = custom_stream_visibility_callback
except ImportError:
stream_visibility_callback = default_stream_visibility_callback