Add support for OAuth 2.0 dynamic client registration

main
Thomas Sileo 2022-12-16 19:23:22 +01:00
parent db6016394b
commit 5cf54c2782
6 changed files with 138 additions and 5 deletions

View File

@ -0,0 +1,48 @@
"""Add OAuth client
Revision ID: 4ab54becec04
Revises: 9b404c47970a
Create Date: 2022-12-16 17:30:54.520477+00:00
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = '4ab54becec04'
down_revision = '9b404c47970a'
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('oauth_client',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('client_name', sa.String(), nullable=False),
sa.Column('redirect_uris', sa.JSON(), nullable=True),
sa.Column('client_uri', sa.String(), nullable=True),
sa.Column('logo_uri', sa.String(), nullable=True),
sa.Column('scope', sa.String(), nullable=True),
sa.Column('client_id', sa.String(), nullable=False),
sa.Column('client_secret', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('client_secret')
)
with op.batch_alter_table('oauth_client', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_oauth_client_client_id'), ['client_id'], unique=True)
batch_op.create_index(batch_op.f('ix_oauth_client_id'), ['id'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('oauth_client', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_oauth_client_id'))
batch_op.drop_index(batch_op.f('ix_oauth_client_client_id'))
op.drop_table('oauth_client')
# ### end Alembic commands ###

View File

@ -11,6 +11,7 @@ from fastapi import HTTPException
from fastapi import Request
from fastapi.responses import JSONResponse
from loguru import logger
from pydantic import BaseModel
from sqlalchemy import select
from app import config
@ -38,9 +39,52 @@ async def well_known_authorization_server(
"code_challenge_methods_supported": ["S256"],
"revocation_endpoint": request.url_for("indieauth_revocation_endpoint"),
"revocation_endpoint_auth_methods_supported": ["none"],
"registration_endpoint": request.url_for("oauth_registration_endpoint"),
}
class OAuthRegisterClientRequest(BaseModel):
client_name: str
redirect_uris: list[str]
client_uri: str | None = None
logo_uri: str | None = None
scope: str | None = None
@router.post("/oauth/register")
async def oauth_registration_endpoint(
register_client_request: OAuthRegisterClientRequest,
db_session: AsyncSession = Depends(get_db_session),
) -> JSONResponse:
"""Implements OAuth 2.0 Dynamic Registration."""
client = models.OAuthClient(
client_name=register_client_request.client_name,
redirect_uris=register_client_request.redirect_uris,
client_uri=register_client_request.client_uri,
logo_uri=register_client_request.logo_uri,
scope=register_client_request.scope,
client_id=secrets.token_hex(16),
client_secret=secrets.token_hex(32),
)
db_session.add(client)
await db_session.commit()
return JSONResponse(
content={
**register_client_request.dict(),
"client_id_issued_at": int(client.created_at.timestamp()), # type: ignore
"grant_types": ["authorization_code", "refresh_token"],
"client_secret_expires_at": 0,
"client_id": client.client_id,
"client_secret": client.client_secret,
},
status_code=201,
)
@router.get("/auth")
async def indieauth_authorization_endpoint(
request: Request,
@ -56,12 +100,29 @@ async def indieauth_authorization_endpoint(
code_challenge = request.query_params.get("code_challenge", "")
code_challenge_method = request.query_params.get("code_challenge_method", "")
# Check if the authorization request is coming from an OAuth client
registered_client = (
await db_session.scalars(
select(models.OAuthClient).where(
models.OAuthClient.client_id == client_id,
)
)
).one_or_none()
if registered_client:
client = {
"name": registered_client.client_name,
"logo": registered_client.logo_uri,
"url": registered_client.client_uri,
}
else:
client = await indieauth.get_client_id_data(client_id)
return await templates.render_template(
db_session,
request,
"indieauth_flow.html",
dict(
client=await indieauth.get_client_id_data(client_id),
client=client,
scopes=scope,
redirect_uri=redirect_uri,
state=state,

View File

@ -472,6 +472,26 @@ class IndieAuthAccessToken(Base):
is_revoked = Column(Boolean, nullable=False, default=False)
class OAuthClient(Base):
__tablename__ = "oauth_client"
id = Column(Integer, primary_key=True, index=True)
created_at = Column(DateTime(timezone=True), nullable=False, default=now)
# Request
client_name = Column(String, nullable=False)
redirect_uris: Mapped[list[str]] = Column(JSON, nullable=True)
# Optional from request
client_uri = Column(String, nullable=True)
logo_uri = Column(String, nullable=True)
scope = Column(String, nullable=True)
# Response
client_id = Column(String, nullable=False, unique=True, index=True)
client_secret = Column(String, nullable=False, unique=True)
@enum.unique
class WebmentionType(str, enum.Enum):
UNKNOWN = "unknown"

View File

@ -459,7 +459,7 @@ a.label-btn {
border: 2px dashed $secondary-color;
}
.error-box {
.error-box, .scolor {
color: $secondary-color;
}

View File

@ -10,8 +10,12 @@
{% endif %}
<div class="indieauth-details">
<div>
<a class="lcolor" href="{{ client.url }}">{{ client.name }}</a>
<p>wants you to login as <strong class="lcolor">{{ me }}</strong> with the following redirect URI: <code>{{ redirect_uri }}</code>.</p>
{% if client.url %}
<a class="scolor" href="{{ client.url }}">{{ client.name }}</a>
{% else %}
<span class="scolor">{{ client.name }}</span>
{% endif %}
<p>wants you to login{% if me %} as <strong class="lcolor">{{ me }}</strong>{% endif %} with the following redirect URI: <code>{{ redirect_uri }}</code>.</p>
<form method="POST" action="{{ url_for('indieauth_flow') }}" class="form">

View File

@ -10,7 +10,7 @@ from app.utils.url import make_abs
class IndieAuthClient:
logo: str | None
name: str
url: str
url: str | None
def _get_prop(props: dict[str, Any], name: str, default=None) -> Any: