Deploy em Produção - Colocando IA no Ar

📚 Aula 13 de 15
⏱️ 35 min
🔢 Módulo 5
📅 2025-08-26
Progresso do Curso 85.7% completo

🚀 Deploy em Produção: Do Código ao Mundo Real

🌍

Hora de colocar sua IA no ar!

Fazer deploy de modelos de IA em produção envolve muito mais que só código - é sobre criar sistemas robustos, escaláveis e confiáveis.

Nesta aula, você vai aprender a transformar seus projetos locais em serviços profissionais rodando em servidores reais, acessíveis para milhões de usuários!

🎯 Estratégias de Deploy para IA

🐳 Docker + API REST

Para: Máximo controle e flexibilidade

Containerize seu modelo e exponha via API REST profissional.

☁️ Cloud Platforms

Para: Deploy rápido e escalável

Use AWS, Google Cloud, Azure para deploy automatizado.

🔗 Serverless

Para: Custo-benefício otimizado

Deploy usando AWS Lambda, Vercel, ou similar.

🖥️ Edge Computing

Para: Baixa latência

Deploy próximo aos usuários finais.

🛠️ Sistema Completo de Deploy

# requirements.txt
fastapi==0.104.1
uvicorn==0.24.0
pydantic==2.5.0
transformers==4.35.0
torch==2.1.0
numpy==1.24.3
pandas==2.0.3
python-multipart==0.0.6
aiofiles==23.2.1
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
scikit-learn==1.3.0
pillow==10.1.0
redis==5.0.1
prometheus-client==0.19.0
psutil==5.9.6
from fastapi import FastAPI, HTTPException, Depends, UploadFile, File, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import uvicorn
import torch
import numpy as np
import pandas as pd
from transformers import pipeline
import json
import logging
import time
import asyncio
from datetime import datetime
import os
import redis
from prometheus_client import Counter, Histogram, generate_latest
import psutil
from typing import Optional, List, Dict, Any
import pickle
import joblib
from PIL import Image
import io

# Configuração de logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Métricas Prometheus
REQUEST_COUNT = Counter('ai_requests_total', 'Total AI requests', ['model', 'status'])
REQUEST_DURATION = Histogram('ai_request_duration_seconds', 'Request duration')
MODEL_LOAD_TIME = Histogram('model_load_duration_seconds', 'Model loading time')

class AIModelService:
    """Serviço principal para gerenciar modelos de IA"""
    
    def __init__(self):
        self.models = {}
        self.redis_client = None
        self.setup_redis()
        self.load_models()
    
    def setup_redis(self):
        """Configura Redis para cache"""
        try:
            self.redis_client = redis.Redis(
                host=os.getenv('REDIS_HOST', 'localhost'),
                port=int(os.getenv('REDIS_PORT', 6379)),
                db=0,
                decode_responses=True
            )
            self.redis_client.ping()
            logger.info("Redis conectado com sucesso")
        except Exception as e:
            logger.warning(f"Redis não disponível: {e}")
            self.redis_client = None
    
    @MODEL_LOAD_TIME.time()
    def load_models(self):
        """Carrega todos os modelos necessários"""
        logger.info("Carregando modelos de IA...")
        
        try:
            # Modelo de sentiment analysis
            self.models['sentiment'] = pipeline(
                "sentiment-analysis",
                model="cardiffnlp/twitter-roberta-base-sentiment-latest",
                device=0 if torch.cuda.is_available() else -1
            )
            
            # Modelo de classificação de texto
            self.models['text_classifier'] = pipeline(
                "zero-shot-classification",
                device=0 if torch.cuda.is_available() else -1
            )
            
            # Modelo personalizado (exemplo)
            if os.path.exists('custom_model.pkl'):
                with open('custom_model.pkl', 'rb') as f:
                    self.models['custom'] = pickle.load(f)
            
            logger.info(f"Modelos carregados: {list(self.models.keys())}")
            
        except Exception as e:
            logger.error(f"Erro ao carregar modelos: {e}")
            raise
    
    def get_cached_prediction(self, cache_key: str) -> Optional[Dict]:
        """Busca predição no cache"""
        if not self.redis_client:
            return None
        
        try:
            cached = self.redis_client.get(cache_key)
            if cached:
                return json.loads(cached)
        except Exception as e:
            logger.warning(f"Erro ao buscar cache: {e}")
        
        return None
    
    def cache_prediction(self, cache_key: str, prediction: Dict, ttl: int = 3600):
        """Armazena predição no cache"""
        if not self.redis_client:
            return
        
        try:
            self.redis_client.setex(
                cache_key, 
                ttl, 
                json.dumps(prediction)
            )
        except Exception as e:
            logger.warning(f"Erro ao salvar cache: {e}")
    
    async def predict_sentiment(self, text: str) -> Dict:
        """Análise de sentimento com cache"""
        cache_key = f"sentiment:{hash(text)}"
        
        # Verificar cache
        cached = self.get_cached_prediction(cache_key)
        if cached:
            cached['from_cache'] = True
            return cached
        
        # Fazer predição
        start_time = time.time()
        try:
            result = self.models['sentiment'](text)
            prediction = {
                'text': text[:100] + '...' if len(text) > 100 else text,
                'sentiment': result[0]['label'],
                'confidence': round(result[0]['score'], 4),
                'processing_time': round(time.time() - start_time, 4),
                'from_cache': False,
                'timestamp': datetime.now().isoformat()
            }
            
            # Salvar no cache
            self.cache_prediction(cache_key, prediction)
            
            REQUEST_COUNT.labels(model='sentiment', status='success').inc()
            return prediction
            
        except Exception as e:
            REQUEST_COUNT.labels(model='sentiment', status='error').inc()
            raise HTTPException(status_code=500, detail=f"Erro na predição: {str(e)}")
    
    async def classify_text(self, text: str, categories: List[str]) -> Dict:
        """Classificação de texto zero-shot"""
        cache_key = f"classify:{hash(text + str(categories))}"
        
        cached = self.get_cached_prediction(cache_key)
        if cached:
            cached['from_cache'] = True
            return cached
        
        start_time = time.time()
        try:
            result = self.models['text_classifier'](text, categories)
            
            prediction = {
                'text': text[:100] + '...' if len(text) > 100 else text,
                'categories': categories,
                'predictions': [
                    {
                        'label': label,
                        'confidence': round(score, 4)
                    }
                    for label, score in zip(result['labels'], result['scores'])
                ],
                'top_category': result['labels'][0],
                'top_confidence': round(result['scores'][0], 4),
                'processing_time': round(time.time() - start_time, 4),
                'from_cache': False,
                'timestamp': datetime.now().isoformat()
            }
            
            self.cache_prediction(cache_key, prediction)
            REQUEST_COUNT.labels(model='classifier', status='success').inc()
            return prediction
            
        except Exception as e:
            REQUEST_COUNT.labels(model='classifier', status='error').inc()
            raise HTTPException(status_code=500, detail=f"Erro na classificação: {str(e)}")

# Inicializar serviço
ai_service = AIModelService()

# FastAPI App
app = FastAPI(
    title="AI Production API",
    description="API profissional para modelos de IA em produção",
    version="1.0.0",
    docs_url="/docs",
    redoc_url="/redoc"
)

# CORS Middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Em produção, especifique domínios
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Security
security = HTTPBearer()

def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """Verificação básica de token"""
    # Em produção, implemente verificação JWT real
    if credentials.credentials != os.getenv('API_TOKEN', 'seu_token_secreto'):
        raise HTTPException(
            status_code=401,
            detail="Token inválido"
        )
    return credentials.credentials

# Modelos Pydantic
class SentimentRequest(BaseModel):
    text: str
    
    class Config:
        schema_extra = {
            "example": {
                "text": "Estou muito feliz com os resultados!"
            }
        }

class ClassificationRequest(BaseModel):
    text: str
    categories: List[str]
    
    class Config:
        schema_extra = {
            "example": {
                "text": "Este é um produto incrível que recomendo",
                "categories": ["positivo", "neutro", "negativo"]
            }
        }

class BatchRequest(BaseModel):
    texts: List[str]
    max_concurrent: Optional[int] = 5

class HealthResponse(BaseModel):
    status: str
    timestamp: str
    models_loaded: List[str]
    system_info: Dict[str, Any]

# Rotas de Health Check
@app.get("/health", response_model=HealthResponse)
async def health_check():
    """Verificação de saúde do serviço"""
    
    system_info = {
        "cpu_usage": psutil.cpu_percent(),
        "memory_usage": psutil.virtual_memory().percent,
        "disk_usage": psutil.disk_usage('/').percent,
        "python_version": os.sys.version,
        "torch_cuda_available": torch.cuda.is_available(),
        "redis_connected": ai_service.redis_client is not None
    }
    
    return HealthResponse(
        status="healthy",
        timestamp=datetime.now().isoformat(),
        models_loaded=list(ai_service.models.keys()),
        system_info=system_info
    )

@app.get("/metrics")
async def get_metrics():
    """Métricas Prometheus"""
    return generate_latest()

# Rotas da API
@app.post("/predict/sentiment")
@REQUEST_DURATION.time()
async def predict_sentiment(
    request: SentimentRequest,
    token: str = Depends(verify_token)
):
    """Análise de sentimento"""
    if not request.text.strip():
        raise HTTPException(status_code=400, detail="Texto não pode estar vazio")
    
    return await ai_service.predict_sentiment(request.text)

@app.post("/predict/classify")
@REQUEST_DURATION.time()
async def classify_text(
    request: ClassificationRequest,
    token: str = Depends(verify_token)
):
    """Classificação de texto"""
    if not request.text.strip():
        raise HTTPException(status_code=400, detail="Texto não pode estar vazio")
    
    if len(request.categories) < 2:
        raise HTTPException(status_code=400, detail="Forneça pelo menos 2 categorias")
    
    return await ai_service.classify_text(request.text, request.categories)

@app.post("/predict/batch/sentiment")
async def batch_sentiment(
    request: BatchRequest,
    background_tasks: BackgroundTasks,
    token: str = Depends(verify_token)
):
    """Análise de sentimento em lote"""
    if len(request.texts) > 100:
        raise HTTPException(status_code=400, detail="Máximo 100 textos por lote")
    
    # Processar em paralelo com limite
    semaphore = asyncio.Semaphore(request.max_concurrent)
    
    async def process_single(text: str) -> Dict:
        async with semaphore:
            return await ai_service.predict_sentiment(text)
    
    tasks = [process_single(text) for text in request.texts]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Processar resultados
    successful = []
    errors = []
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            errors.append({
                'index': i,
                'text': request.texts[i][:50] + '...',
                'error': str(result)
            })
        else:
            successful.append({
                'index': i,
                'result': result
            })
    
    return {
        'total_processed': len(request.texts),
        'successful': len(successful),
        'errors': len(errors),
        'results': successful,
        'error_details': errors
    }

@app.post("/predict/image")
async def analyze_image(
    file: UploadFile = File(...),
    token: str = Depends(verify_token)
):
    """Análise de imagem (placeholder)"""
    
    if not file.content_type.startswith('image/'):
        raise HTTPException(status_code=400, detail="Apenas imagens são aceitas")
    
    # Ler imagem
    contents = await file.read()
    image = Image.open(io.BytesIO(contents))
    
    # Placeholder para análise real
    # Em produção, use modelos como CLIP, ResNet, etc.
    
    return {
        'filename': file.filename,
        'size': image.size,
        'format': image.format,
        'mode': image.mode,
        'analysis': 'Análise de imagem não implementada nesta versão',
        'timestamp': datetime.now().isoformat()
    }

# Rota de monitoramento avançado
@app.get("/admin/stats")
async def get_stats(token: str = Depends(verify_token)):
    """Estatísticas avançadas do sistema"""
    
    stats = {
        'uptime': time.time() - startup_time,
        'requests_processed': sum([
            REQUEST_COUNT.labels(model=m, status=s)._value._value 
            for m in ['sentiment', 'classifier'] 
            for s in ['success', 'error']
        ]),
        'cache_stats': {},
        'model_info': {},
        'system_resources': {
            'cpu_percent': psutil.cpu_percent(interval=1),
            'memory_percent': psutil.virtual_memory().percent,
            'disk_percent': psutil.disk_usage('/').percent,
            'load_average': os.getloadavg() if hasattr(os, 'getloadavg') else [0, 0, 0]
        }
    }
    
    # Cache stats
    if ai_service.redis_client:
        try:
            cache_info = ai_service.redis_client.info()
            stats['cache_stats'] = {
                'total_keys': cache_info.get('db0', {}).get('keys', 0),
                'memory_usage': cache_info.get('used_memory_human', '0B'),
                'hits': cache_info.get('keyspace_hits', 0),
                'misses': cache_info.get('keyspace_misses', 0)
            }
        except Exception as e:
            stats['cache_stats'] = {'error': str(e)}
    
    return stats

# Startup event
startup_time = time.time()

@app.on_event("startup")
async def startup_event():
    """Inicialização da aplicação"""
    logger.info("🚀 AI Production API iniciando...")
    logger.info(f"Modelos carregados: {list(ai_service.models.keys())}")
    logger.info("✅ API pronta para receber requisições")

@app.on_event("shutdown") 
async def shutdown_event():
    """Limpeza no shutdown"""
    logger.info("🛑 Desligando AI Production API...")
    if ai_service.redis_client:
        ai_service.redis_client.close()
    logger.info("✅ Shutdown completo")

# Rota principal
@app.get("/")
async def root():
    return {
        "message": "AI Production API",
        "version": "1.0.0",
        "status": "operational",
        "docs": "/docs",
        "health": "/health",
        "metrics": "/metrics"
    }

if __name__ == "__main__":
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=int(os.getenv('PORT', 8000)),
        workers=int(os.getenv('WORKERS', 1)),
        reload=False,  # False em produção
        access_log=True
    )

🎯 API de Produção Completa:

  • ✅ FastAPI profissional
  • ✅ Sistema de cache com Redis
  • ✅ Métricas Prometheus
  • ✅ Health checks automáticos
  • ✅ Processamento em lote
  • ✅ Autenticação via token
  • ✅ Monitoramento de recursos
  • ✅ Tratamento de erros robusto

🐳 Containerização com Docker

Dockerfile

# Dockerfile
FROM python:3.10-slim

# Instalar dependências do sistema
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

# Definir diretório de trabalho
WORKDIR /app

# Copiar requirements
COPY requirements.txt .

# Instalar dependências Python
RUN pip install --no-cache-dir -r requirements.txt

# Copiar código da aplicação
COPY . .

# Criar usuário não-root
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

# Expor porta
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Comando para executar
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose para Desenvolvimento

# docker-compose.yml
version: '3.8'

services:
  ai-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - REDIS_HOST=redis
      - API_TOKEN=seu_token_desenvolvimento
    depends_on:
      - redis
    volumes:
      - ./models:/app/models
    restart: unless-stopped
    
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    restart: unless-stopped
    
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    restart: unless-stopped
    
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana_data:/var/lib/grafana
    restart: unless-stopped

volumes:
  redis_data:
  grafana_data:

☁️ Deploy na Cloud

🅰️ AWS (Amazon Web Services)

# Deploy usando AWS ECS + Fargate

# 1. Criar repositório ECR
aws ecr create-repository --repository-name ai-production-api

# 2. Build e push da imagem
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin YOUR_ACCOUNT.dkr.ecr.us-east-1.amazonaws.com

docker build -t ai-production-api .
docker tag ai-production-api:latest YOUR_ACCOUNT.dkr.ecr.us-east-1.amazonaws.com/ai-production-api:latest
docker push YOUR_ACCOUNT.dkr.ecr.us-east-1.amazonaws.com/ai-production-api:latest

# 3. Usar CloudFormation ou Terraform para infraestrutura

🌐 Google Cloud Platform

# Deploy usando Cloud Run

# 1. Configurar projeto
gcloud config set project SEU_PROJETO_ID

# 2. Build e deploy
gcloud run deploy ai-production-api \
  --source . \
  --platform managed \
  --region us-central1 \
  --allow-unauthenticated \
  --memory 2Gi \
  --cpu 2 \
  --max-instances 100

🔵 Microsoft Azure

# Deploy usando Azure Container Instances

# 1. Criar grupo de recursos
az group create --name ai-production-rg --location eastus

# 2. Criar container registry
az acr create --resource-group ai-production-rg --name aiproductionacr --sku Basic

# 3. Deploy da aplicação
az container create \
  --resource-group ai-production-rg \
  --name ai-production-api \
  --image aiproductionacr.azurecr.io/ai-api:latest \
  --cpu 2 --memory 4 \
  --ports 8000

📊 Monitoramento em Produção

# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'ai-api'
    static_configs:
      - targets: ['ai-api:8000']
    scrape_interval: 5s
    metrics_path: /metrics
    
rule_files:
  - "alert_rules.yml"

alerting:
  alertmanagers:
    - static_configs:
        - targets:
          - alertmanager:9093
# alert_rules.yml
groups:
  - name: ai-api-alerts
    rules:
      - alert: HighErrorRate
        expr: rate(ai_requests_total{status="error"}[5m]) > 0.1
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "Alta taxa de erro na API de IA"
          description: "Taxa de erro acima de 10% nos últimos 5 minutos"
          
      - alert: HighResponseTime
        expr: histogram_quantile(0.95, rate(ai_request_duration_seconds_bucket[5m])) > 2
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Tempo de resposta alto"
          description: "95% das requisições levam mais de 2 segundos"
          
      - alert: APIDown
        expr: up{job="ai-api"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "API de IA fora do ar"
          description: "A API não está respondendo aos health checks"

🎯 Parabéns! Você Dominou Deploy de IA!

Você acabou de aprender como transformar projetos locais em serviços profissionais de IA rodando em produção!

O que você conquistou hoje:

  • ✅ API REST profissional com FastAPI
  • ✅ Containerização com Docker
  • ✅ Deploy em múltiplas plataformas cloud
  • ✅ Sistema de cache para performance
  • ✅ Monitoramento com Prometheus
  • ✅ Health checks automáticos
  • ✅ Tratamento robusto de erros
  • ✅ Sistema de alertas inteligentes

Na próxima aula, vamos abordar os aspectos éticos e de segurança essenciais para sistemas de IA em produção!

"Deploy bem feito é a ponte entre ideias e impacto real"

- Isaque Victor

🎯 Teste Seus Conhecimentos