Secrets encryption (#4)

Reviewed-on: #4
Co-authored-by: Ivan Golikov <root@ivnglkv.me>
Co-committed-by: Ivan Golikov <root@ivnglkv.me>
This commit is contained in:
Ivan Golikov 2025-01-03 15:06:08 +00:00 committed by root
parent 759c338657
commit 7fae1a18b6
10 changed files with 220 additions and 6 deletions

10
pssecret_server/fernet.py Normal file
View file

@ -0,0 +1,10 @@
from typing import Annotated
from cryptography.fernet import Fernet
from fastapi import Depends
from pssecret_server.settings import Settings, get_settings
def get_fernet(settings: Annotated[Settings, Depends(get_settings)]) -> Fernet:
return Fernet(settings.secrets_encryption_key)

View file

@ -1,16 +1,19 @@
from typing import Annotated
from cryptography.fernet import Fernet
from fastapi import Depends, FastAPI
from fastapi.exceptions import HTTPException
from redis.asyncio import Redis
from pssecret_server.fernet import get_fernet
from pssecret_server.models import Secret, SecretSaveResult
from pssecret_server.redis_db import get_redis
from pssecret_server.utils import save_secret
from pssecret_server.utils import decrypt_secret, encrypt_secret, save_secret
app = FastAPI()
RedisDep = Annotated[Redis, Depends(get_redis)]
FernetDep = Annotated[Fernet, Depends(get_fernet)]
@app.post(
@ -23,7 +26,10 @@ RedisDep = Annotated[Redis, Depends(get_redis)]
),
response_model=SecretSaveResult,
)
async def set_secret(data: Secret, redis: RedisDep) -> dict[str, str]:
async def set_secret(
data: Secret, redis: RedisDep, fernet: FernetDep
) -> dict[str, str]:
data = encrypt_secret(data, fernet)
return {
"key": await save_secret(data, redis),
}
@ -40,12 +46,14 @@ async def set_secret(data: Secret, redis: RedisDep) -> dict[str, str]:
response_model=Secret,
responses={404: {"description": "The item was not found"}},
)
async def get_secret(secret_key: str, redis: RedisDep) -> dict[str, bytes]:
async def get_secret(
secret_key: str, redis: RedisDep, fernet: FernetDep
) -> dict[str, bytes]:
data: bytes | None = await redis.getdel(secret_key)
if data is None:
raise HTTPException(404)
return {
"data": data,
"data": decrypt_secret(data, fernet),
}

View file

@ -1,9 +1,12 @@
from pydantic import RedisDsn
from pydantic_settings import BaseSettings
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env")
redis_url: RedisDsn = RedisDsn("redis://localhost")
secrets_encryption_key: bytes
def get_settings() -> Settings:

View file

@ -1,10 +1,20 @@
from uuid import uuid4
from cryptography.fernet import Fernet
from redis.asyncio import Redis
from pssecret_server.models import Secret
def encrypt_secret(data: Secret, fernet: Fernet) -> Secret:
encrypted = fernet.encrypt(data.data.encode()).decode()
return Secret(data=encrypted)
def decrypt_secret(secret: bytes, fernet: Fernet) -> bytes:
return fernet.decrypt(secret)
async def get_new_key(redis: Redis) -> str:
"""Returns free Redis key"""
while True: