今日目标
o 理解RESTful API设计原则
o 掌握FastAPI框架的使用
o 学会API文档自动生成
o 了解API认证和授权
o 掌握API测试和部署
RESTful API设计
1. REST原则
REST(Representational State Transfer)是一种软件架构风格:
o 无状态:每个请求包含所有必要信息
o 统一接口:使用标准HTTP方法
o 资源导向:以资源为中心设计
o 可缓存:支持缓存机制
o 分层系统:支持代理、网关等
2. HTTP方法映射
# RESTful API的HTTP方法映射
HTTP_METHODS = {
'GET': '获取资源',
'POST': '创建资源',
'PUT': '更新资源(完整更新)',
'PATCH': '更新资源(部分更新)',
'DELETE': '删除资源'
}
# 资源URL设计示例
API_ENDPOINTS = {
'users': '/api/v1/users',
'user_detail': '/api/v1/users/{user_id}',
'user_posts': '/api/v1/users/{user_id}/posts',
'posts': '/api/v1/posts',
'post_detail': '/api/v1/posts/{post_id}',
'post_comments': '/api/v1/posts/{post_id}/comments'
}
FastAPI框架介绍
FastAPI是一个现代、快速的Web框架,具有以下特点:
o 高性能:基于Starlette和Pydantic
o 自动文档:自动生成OpenAPI文档
o 类型提示:完整的Python类型支持
o 异步支持:原生异步/等待支持
o 易于学习:简洁的API设计
安装FastAPI
pip install fastapi uvicorn
FastAPI基础
1. 第一个FastAPI应用
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
# 创建FastAPI应用
app = FastAPI(
title="博客API",
description="一个使用FastAPI构建的博客API",
version="1.0.0"
)
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
"""根路径"""
return {"message": "欢迎使用博客API"}
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy", "timestamp": "2024-01-01T00:00:00Z"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
2. 数据模型定义
from pydantic import BaseModel, Field, EmailStr
from typing import Optional, List
from datetime import datetime
from enum import Enum
class UserStatus(str, Enum):
ACTIVE = "active"
INACTIVE = "inactive"
BANNED = "banned"
class PostStatus(str, Enum):
DRAFT = "draft"
PUBLISHED = "published"
ARCHIVED = "archived"
class UserBase(BaseModel):
"""用户基础模型"""
username: str = Field(..., min_length=3, max_length=50, description="用户名")
email: EmailStr = Field(..., description="邮箱地址")
full_name: Optional[str] = Field(None, max_length=100, description="全名")
class UserCreate(UserBase):
"""创建用户模型"""
password: str = Field(..., min_length=6, description="密码")
class UserUpdate(BaseModel):
"""更新用户模型"""
username: Optional[str] = Field(None, min_length=3, max_length=50)
email: Optional[EmailStr] = None
full_name: Optional[str] = Field(None, max_length=100)
status: Optional[UserStatus] = None
class User(UserBase):
"""用户响应模型"""
id: int
status: UserStatus
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class PostBase(BaseModel):
"""文章基础模型"""
title: str = Field(..., min_length=1, max_length=200, description="文章标题")
content: str = Field(..., description="文章内容")
excerpt: Optional[str] = Field(None, max_length=500, description="文章摘要")
class PostCreate(PostBase):
"""创建文章模型"""
category_id: Optional[int] = Field(None, description="分类ID")
tags: List[str] = Field(default=[], description="标签列表")
class PostUpdate(BaseModel):
"""更新文章模型"""
title: Optional[str] = Field(None, min_length=1, max_length=200)
content: Optional[str] = None
excerpt: Optional[str] = Field(None, max_length=500)
category_id: Optional[int] = None
status: Optional[PostStatus] = None
tags: Optional[List[str]] = None
class Post(PostBase):
"""文章响应模型"""
id: int
slug: str
author_id: int
category_id: Optional[int]
status: PostStatus
view_count: int
created_at: datetime
updated_at: datetime
published_at: Optional[datetime]
author: User
tags: List[str] = []
class Config:
from_attributes = True
class CommentBase(BaseModel):
"""评论基础模型"""
content: str = Field(..., min_length=1, description="评论内容")
class CommentCreate(CommentBase):
"""创建评论模型"""
post_id: int = Field(..., description="文章ID")
class CommentUpdate(BaseModel):
"""更新评论模型"""
content: str = Field(..., min_length=1)
class Comment(CommentBase):
"""评论响应模型"""
id: int
post_id: int
author_id: int
parent_id: Optional[int]
is_approved: bool
created_at: datetime
author: User
class Config:
from_attributes = True
class PaginationParams(BaseModel):
"""分页参数"""
page: int = Field(1, ge=1, description="页码")
size: int = Field(10, ge=1, le=100, description="每页数量")
class PaginatedResponse(BaseModel):
"""分页响应"""
items: List[dict]
total: int
page: int
size: int
pages: int
API路由设计
1. 用户API
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
import crud, models, schemas
from database import get_db
router = APIRouter(prefix="/api/v1/users", tags=["users"])
@router.get("/", response_model=List[User])
async def get_users(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
"""获取用户列表"""
users = crud.user.get_multi(db, skip=skip, limit=limit)
return users
@router.post("/", response_model=User, status_code=status.HTTP_201_CREATED)
async def create_user(
user: UserCreate,
db: Session = Depends(get_db)
):
"""创建新用户"""
db_user = crud.user.get_by_email(db, email=user.email)
if db_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="邮箱已被注册"
)
db_user = crud.user.get_by_username(db, username=user.username)
if db_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="用户名已存在"
)
return crud.user.create(db, obj_in=user)
@router.get("/{user_id}", response_model=User)
async def get_user(
user_id: int,
db: Session = Depends(get_db)
):
"""获取特定用户"""
user = crud.user.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="用户不存在"
)
return user
@router.put("/{user_id}", response_model=User)
async def update_user(
user_id: int,
user_update: UserUpdate,
db: Session = Depends(get_db)
):
"""更新用户信息"""
user = crud.user.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="用户不存在"
)
return crud.user.update(db, db_obj=user, obj_in=user_update)
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: int,
db: Session = Depends(get_db)
):
"""删除用户"""
user = crud.user.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="用户不存在"
)
crud.user.remove(db, id=user_id)
return None
@router.get("/{user_id}/posts", response_model=List[Post])
async def get_user_posts(
user_id: int,
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
"""获取用户的文章列表"""
user = crud.user.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="用户不存在"
)
posts = crud.post.get_by_author(db, author_id=user_id, skip=skip, limit=limit)
return posts
2. 文章API
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from typing import List, Optional
import crud, models, schemas
from database import get_db
router = APIRouter(prefix="/api/v1/posts", tags=["posts"])
@router.get("/", response_model=PaginatedResponse)
async def get_posts(
pagination: PaginationParams = Depends(),
category_id: Optional[int] = Query(None, description="分类ID"),
tag: Optional[str] = Query(None, description="标签"),
search: Optional[str] = Query(None, description="搜索关键词"),
db: Session = Depends(get_db)
):
"""获取文章列表(支持分页、筛选、搜索)"""
posts, total = crud.post.get_multi_paginated(
db,
skip=(pagination.page - 1) * pagination.size,
limit=pagination.size,
category_id=category_id,
tag=tag,
search=search
)
pages = (total + pagination.size - 1) // pagination.size
return PaginatedResponse(
items=[post.dict() for post in posts],
total=total,
page=pagination.page,
size=pagination.size,
pages=pages
)
@router.post("/", response_model=Post, status_code=status.HTTP_201_CREATED)
async def create_post(
post: PostCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""创建新文章"""
return crud.post.create_with_author(
db, obj_in=post, author_id=current_user.id
)
@router.get("/{post_id}", response_model=Post)
async def get_post(
post_id: int,
db: Session = Depends(get_db)
):
"""获取特定文章"""
post = crud.post.get(db, id=post_id)
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文章不存在"
)
# 增加浏览量
crud.post.increment_view_count(db, post_id=post_id)
return post
@router.put("/{post_id}", response_model=Post)
async def update_post(
post_id: int,
post_update: PostUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""更新文章"""
post = crud.post.get(db, id=post_id)
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文章不存在"
)
if post.author_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="没有权限修改此文章"
)
return crud.post.update(db, db_obj=post, obj_in=post_update)
@router.delete("/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(
post_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""删除文章"""
post = crud.post.get(db, id=post_id)
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文章不存在"
)
if post.author_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="没有权限删除此文章"
)
crud.post.remove(db, id=post_id)
return None
@router.get("/{post_id}/comments", response_model=List[Comment])
async def get_post_comments(
post_id: int,
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
"""获取文章的评论列表"""
post = crud.post.get(db, id=post_id)
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文章不存在"
)
comments = crud.comment.get_by_post(
db, post_id=post_id, skip=skip, limit=limit
)
return comments
3. 评论API
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
import crud, models, schemas
from database import get_db
router = APIRouter(prefix="/api/v1/comments", tags=["comments"])
@router.post("/", response_model=Comment, status_code=status.HTTP_201_CREATED)
async def create_comment(
comment: CommentCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""创建新评论"""
# 检查文章是否存在
post = crud.post.get(db, id=comment.post_id)
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文章不存在"
)
return crud.comment.create_with_author(
db, obj_in=comment, author_id=current_user.id
)
@router.put("/{comment_id}", response_model=Comment)
async def update_comment(
comment_id: int,
comment_update: CommentUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""更新评论"""
comment = crud.comment.get(db, id=comment_id)
if not comment:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="评论不存在"
)
if comment.author_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="没有权限修改此评论"
)
return crud.comment.update(db, db_obj=comment, obj_in=comment_update)
@router.delete("/{comment_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_comment(
comment_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""删除评论"""
comment = crud.comment.get(db, id=comment_id)
if not comment:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="评论不存在"
)
if comment.author_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="没有权限删除此评论"
)
crud.comment.remove(db, id=comment_id)
return None
认证和授权
1. JWT认证
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Optional
# 配置
SECRET_KEY = "your-secret-key-here"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""生成密码哈希"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""创建访问令牌"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str) -> Optional[dict]:
"""验证令牌"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
"""获取当前用户"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无法验证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
token = credentials.credentials
payload = verify_token(token)
if payload is None:
raise credentials_exception
username: str = payload.get("sub")
if username is None:
raise credentials_exception
user = crud.user.get_by_username(db, username=username)
if user is None:
raise credentials_exception
return user
# 认证路由
@router.post("/auth/login")
async def login(
username: str = Form(...),
password: str = Form(...),
db: Session = Depends(get_db)
):
"""用户登录"""
user = crud.user.authenticate(db, username=username, password=password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {
"access_token": access_token,
"token_type": "bearer",
"user": user
}
@router.post("/auth/register", response_model=User)
async def register(
user: UserCreate,
db: Session = Depends(get_db)
):
"""用户注册"""
db_user = crud.user.get_by_email(db, email=user.email)
if db_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="邮箱已被注册"
)
return crud.user.create(db, obj_in=user)
2. 权限控制
from functools import wraps
from fastapi import HTTPException, status
def require_permissions(permissions: List[str]):
"""权限装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
current_user = kwargs.get('current_user')
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="需要登录"
)
# 检查用户权限
user_permissions = get_user_permissions(current_user)
if not all(perm in user_permissions for perm in permissions):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="权限不足"
)
return await func(*args, **kwargs)
return wrapper
return decorator
# 管理员权限
@router.get("/admin/users", response_model=List[User])
@require_permissions(["admin"])
async def admin_get_users(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""管理员获取所有用户"""
return crud.user.get_multi(db)
# 文章审核权限
@router.put("/admin/posts/{post_id}/approve")
@require_permissions(["moderator"])
async def approve_post(
post_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""审核文章"""
post = crud.post.get(db, id=post_id)
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文章不存在"
)
return crud.post.update_status(db, post_id=post_id, status="published")
中间件和异常处理
1. 自定义中间件
from fastapi import Request
from fastapi.responses import JSONResponse
import time
import logging
# 日志配置
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
"""添加处理时间头部"""
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
@app.middleware("http")
async def log_requests(request: Request, call_next):
"""记录请求日志"""
logger.info(f"请求: {request.method} {request.url}")
response = await call_next(request)
logger.info(f"响应: {response.status_code}")
return response
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
"""速率限制中间件"""
client_ip = request.client.host
# 简单的速率限制实现
if is_rate_limited(client_ip):
return JSONResponse(
status_code=429,
content={"detail": "请求过于频繁,请稍后再试"}
)
response = await call_next(request)
return response
2. 异常处理
from fastapi import HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException as StarletteHTTPException
@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request: Request, exc: StarletteHTTPException):
"""HTTP异常处理"""
return JSONResponse(
status_code=exc.status_code,
content={
"detail": exc.detail,
"status_code": exc.status_code,
"path": request.url.path
}
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""请求验证异常处理"""
return JSONResponse(
status_code=422,
content={
"detail": "请求数据验证失败",
"errors": exc.errors(),
"path": request.url.path
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""通用异常处理"""
logger.error(f"未处理的异常: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"detail": "服务器内部错误",
"path": request.url.path
}
)
API测试
1. 单元测试
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from database import get_db
from main import app
# 测试数据库
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def override_get_db():
"""覆盖数据库依赖"""
try:
db = TestingSessionLocal()
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
class TestUserAPI:
"""用户API测试"""
def test_create_user(self):
"""测试创建用户"""
response = client.post(
"/api/v1/users/",
json={
"username": "testuser",
"email": "test@example.com",
"password": "testpass123",
"full_name": "Test User"
}
)
assert response.status_code == 201
data = response.json()
assert data["username"] == "testuser"
assert data["email"] == "test@example.com"
def test_get_user(self):
"""测试获取用户"""
# 先创建用户
user_data = {
"username": "testuser2",
"email": "test2@example.com",
"password": "testpass123"
}
create_response = client.post("/api/v1/users/", json=user_data)
user_id = create_response.json()["id"]
# 获取用户
response = client.get(f"/api/v1/users/{user_id}")
assert response.status_code == 200
data = response.json()
assert data["username"] == "testuser2"
def test_get_nonexistent_user(self):
"""测试获取不存在的用户"""
response = client.get("/api/v1/users/999")
assert response.status_code == 404
class TestPostAPI:
"""文章API测试"""
def test_create_post(self):
"""测试创建文章"""
# 先创建用户
user_data = {
"username": "postuser",
"email": "post@example.com",
"password": "testpass123"
}
user_response = client.post("/api/v1/users/", json=user_data)
user_id = user_response.json()["id"]
# 登录获取token
login_response = client.post("/auth/login", data={
"username": "postuser",
"password": "testpass123"
})
token = login_response.json()["access_token"]
# 创建文章
post_data = {
"title": "Test Post",
"content": "This is a test post",
"excerpt": "Test excerpt"
}
response = client.post(
"/api/v1/posts/",
json=post_data,
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 201
data = response.json()
assert data["title"] == "Test Post"
assert data["author_id"] == user_id
if __name__ == "__main__":
pytest.main([__file__])
2. 集成测试
import asyncio
from httpx import AsyncClient
import pytest
@pytest.mark.asyncio
async def test_api_integration():
"""API集成测试"""
async with AsyncClient(app=app, base_url="http://test") as ac:
# 测试健康检查
response = await ac.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
# 测试用户注册和登录流程
user_data = {
"username": "integration_user",
"email": "integration@example.com",
"password": "testpass123"
}
# 注册
register_response = await ac.post("/api/v1/users/", json=user_data)
assert register_response.status_code == 201
# 登录
login_response = await ac.post("/auth/login", data={
"username": "integration_user",
"password": "testpass123"
})
assert login_response.status_code == 200
token = login_response.json()["access_token"]
# 创建文章
post_data = {
"title": "Integration Test Post",
"content": "This is an integration test post"
}
post_response = await ac.post(
"/api/v1/posts/",
json=post_data,
headers={"Authorization": f"Bearer {token}"}
)
assert post_response.status_code == 201
# 获取文章列表
posts_response = await ac.get("/api/v1/posts/")
assert posts_response.status_code == 200
posts_data = posts_response.json()
assert len(posts_data["items"]) > 0
API文档
1. 自动文档生成
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
def custom_openapi():
"""自定义OpenAPI文档"""
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title="博客API",
version="1.0.0",
description="一个功能完整的博客API系统",
routes=app.routes,
)
# 添加安全配置
openapi_schema["components"]["securitySchemes"] = {
"BearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT",
}
}
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
# 添加标签描述
tags_metadata = [
{
"name": "users",
"description": "用户管理操作,包括注册、登录、用户信息管理等。",
},
{
"name": "posts",
"description": "文章管理操作,包括创建、编辑、删除文章等。",
},
{
"name": "comments",
"description": "评论管理操作,包括发表评论、回复评论等。",
},
{
"name": "auth",
"description": "认证相关操作,包括登录、注册、令牌管理等。",
},
]
app = FastAPI(
title="博客API",
description="一个使用FastAPI构建的完整博客API系统",
version="1.0.0",
openapi_tags=tags_metadata,
)
部署和监控
1. 生产部署
# main.py
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# 生产环境配置
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=False, # 生产环境关闭热重载
workers=4, # 多进程
log_level="info"
)
2. Docker部署
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:password@db:5432/blogdb
depends_on:
- db
db:
image: postgres:13
environment:
- POSTGRES_DB=blogdb
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
今日总结
今天我们学习了FastAPI和RESTful API开发:
1. RESTful设计原则:资源导向、统一接口、无状态
2. FastAPI框架:高性能、自动文档、类型提示
3. API路由设计:用户、文章、评论API
4. 认证授权:JWT认证、权限控制
5. 中间件和异常处理:自定义中间件、异常处理
6. API测试:单元测试、集成测试
7. 文档和部署:自动文档生成、Docker部署
FastAPI是一个现代化的API开发框架,特别适合构建高性能的Web API。
练习建议
1. 创建一个完整的博客API系统
2. 实现用户认证和权限控制
3. 添加API文档和测试
4. 部署到云服务器