FugokuFugoku Docs
Mask

Model Deployment

Deploy and serve machine learning models with auto-scaling inference endpoints

Model Deployment

Deploy trained ML models as production-ready inference endpoints with automatic scaling, load balancing, and monitoring.

Deployment Options

1. REST API Server

Deploy model behind HTTP API endpoint.

Frameworks supported:

  • TorchServe (PyTorch)
  • TensorFlow Serving
  • FastAPI + custom code
  • Flask + custom code
  • ONNX Runtime Server

2. Batch Inference

Process large datasets asynchronously.

3. Real-Time Inference

Low-latency predictions (<100ms).

4. Edge Deployment

Coming Q3 2026 - Deploy to edge locations.

Quick Start: Deploy with FastAPI

Train Model

# train.py
import torch
import torch.nn as nn
from torchvision import models, transforms
from PIL import Image

# Train model (simplified)
model = models.resnet18(pretrained=True)
model.fc = nn.Linear(512, 10)  # 10 classes

# ... training code ...

# Save model
torch.save(model.state_dict(), 'model.pth')

Create Inference Server

# serve.py
from fastapi import FastAPI, File, UploadFile
from PIL import Image
import torch
from torchvision import models, transforms
import io

app = FastAPI()

# Load model
model = models.resnet18()
model.fc = torch.nn.Linear(512, 10)
model.load_state_dict(torch.load('model.pth'))
model.eval()

# Preprocessing
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

@app.post("/predict")
async def predict(file: UploadFile = File(...)):
    # Read image
    image = Image.open(io.BytesIO(await file.read()))
    
    # Preprocess
    input_tensor = transform(image).unsqueeze(0)
    
    # Inference
    with torch.no_grad():
        output = model(input_tensor)
        probabilities = torch.nn.functional.softmax(output[0], dim=0)
    
    # Get top prediction
    top_prob, top_class = torch.topk(probabilities, 1)
    
    return {
        "class": int(top_class),
        "confidence": float(top_prob)
    }

@app.get("/health")
def health():
    return {"status": "healthy"}

Deploy to Fugoku

# Create GPU instance for inference
fugoku create instance \
  --name inference-1 \
  --plan gpu-a100-1 \
  --image pytorch-2.0-cuda12 \
  --region lagos-1 \
  --wait

# Copy model and server code
scp model.pth serve.py ubuntu@<instance-ip>:/home/ubuntu/

# SSH in
fugoku ssh inference-1

# Install FastAPI
pip install fastapi uvicorn python-multipart

# Run server
uvicorn serve:app --host 0.0.0.0 --port 8000

Test Inference

curl -X POST http://<instance-ip>:8000/predict \
  -F "file=@cat.jpg"

# Response:
# {"class": 3, "confidence": 0.94}

TorchServe Deployment

TorchServe is PyTorch's official model serving framework.

Package Model

# Install torch-model-archiver
pip install torch-model-archiver

# Create model archive
torch-model-archiver \
  --model-name resnet18 \
  --version 1.0 \
  --model-file model.py \
  --serialized-file model.pth \
  --handler image_classifier \
  --export-path model-store

Start TorchServe

# Install TorchServe
pip install torchserve torch-model-archiver

# Start server
torchserve --start \
  --model-store model-store \
  --models resnet18=resnet18.mar \
  --ncs

Inference API

# Predict
curl -X POST http://localhost:8080/predictions/resnet18 \
  -F "data=@cat.jpg"

# Management API
curl http://localhost:8081/models

Configuration

# config.properties
inference_address=http://0.0.0.0:8080
management_address=http://0.0.0.0:8081
number_of_netty_threads=8
job_queue_size=100
model_store=/home/ubuntu/model-store

TensorFlow Serving

Export SavedModel

# Export TensorFlow model
model.save('/tmp/my_model/1')  # Version 1

Run TF Serving

# Using Docker
docker run -p 8501:8501 \
  --mount type=bind,source=/tmp/my_model,target=/models/my_model \
  -e MODEL_NAME=my_model \
  -t tensorflow/serving

# Inference
curl -X POST http://localhost:8501/v1/models/my_model:predict \
  -H "Content-Type: application/json" \
  -d '{"instances": [[1.0, 2.0, 3.0]]}'

Production Deployment

Load Balancing

Deploy multiple inference servers behind load balancer:

# Create 3 inference instances
for i in {1..3}; do
  fugoku create instance \
    --name inference-$i \
    --plan gpu-a100-1 \
    --image pytorch-2.0-cuda12 \
    --region lagos-1 \
    --wait &
done
wait

# Deploy model to each instance
# (use Ansible or deployment script)

# Create load balancer
fugoku lb create \
  --name inference-lb \
  --region lagos-1 \
  --algorithm least-connections \
  --protocol http \
  --port 80 \
  --backend-port 8000

# Add backends
for i in {1..3}; do
  fugoku lb add-backend inference-lb --instance inference-$i
done

# Configure health check
fugoku lb set-health-check inference-lb \
  --path /health \
  --interval 10 \
  --timeout 5

Auto-Scaling (Manual)

Monitor load and scale:

# Get load balancer stats
fugoku lb stats inference-lb

# If request rate high, add instance
fugoku create instance \
  --name inference-4 \
  --plan gpu-a100-1 \
  --image pytorch-2.0-cuda12 \
  --region lagos-1

# Deploy model and add to LB
fugoku lb add-backend inference-lb --instance inference-4

# If load drops, remove instance
fugoku lb remove-backend inference-lb --instance inference-4
fugoku instances delete inference-4 --confirm

Auto-scaling service coming Q3 2026.

Monitoring

Application metrics:

# serve.py
from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Response

request_count = Counter('inference_requests_total', 'Total requests')
inference_time = Histogram('inference_duration_seconds', 'Inference time')

@app.post("/predict")
async def predict(file: UploadFile):
    request_count.inc()
    with inference_time.time():
        # ... inference code ...
        pass

@app.get("/metrics")
def metrics():
    return Response(generate_latest(), media_type="text/plain")

Infrastructure metrics:

# GPU utilization
nvidia-smi --query-gpu=utilization.gpu --format=csv,noheader,nounits

Log aggregation:

# Ship logs to centralized logging
# Example: Loki, Elasticsearch, CloudWatch

Optimization

Model Quantization

Reduce model size and increase inference speed:

PyTorch:

import torch

# Post-training quantization
model.eval()
quantized_model = torch.quantization.quantize_dynamic(
    model,
    {torch.nn.Linear},
    dtype=torch.qint8
)

# Save quantized model
torch.save(quantized_model.state_dict(), 'model_quantized.pth')

TensorFlow:

import tensorflow as tf

# Convert to TFLite with quantization
converter = tf.lite.TFLiteConverter.from_saved_model('/tmp/my_model/1')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
quantized_model = converter.convert()

with open('model_quantized.tflite', 'wb') as f:
    f.write(quantized_model)

ONNX Runtime

Convert models to ONNX for cross-framework deployment:

PyTorch to ONNX:

import torch

# Export to ONNX
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
    model,
    dummy_input,
    "model.onnx",
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}
)

Serve with ONNX Runtime:

import onnxruntime as ort
import numpy as np
from fastapi import FastAPI

app = FastAPI()
session = ort.InferenceSession("model.onnx")

@app.post("/predict")
async def predict(data: list):
    input_data = np.array(data, dtype=np.float32)
    outputs = session.run(None, {"input": input_data})
    return {"prediction": outputs[0].tolist()}

TensorRT (NVIDIA GPUs)

Maximum performance on NVIDIA GPUs:

import tensorrt as trt

# Convert ONNX to TensorRT
logger = trt.Logger(trt.Logger.WARNING)
builder = trt.Builder(logger)
network = builder.create_network()
parser = trt.OnnxParser(network, logger)

with open('model.onnx', 'rb') as model:
    parser.parse(model.read())

# Build engine
config = builder.create_builder_config()
config.max_workspace_size = 1 << 30  # 1 GB
engine = builder.build_engine(network, config)

# Save engine
with open('model.trt', 'wb') as f:
    f.write(engine.serialize())

Batching

Increase throughput by batching requests:

# serve.py
import asyncio
from collections import deque

batch_size = 8
batch_timeout = 0.1  # seconds
pending_requests = deque()

@app.post("/predict")
async def predict(data: dict):
    future = asyncio.Future()
    pending_requests.append((data, future))
    return await future

@app.on_event("startup")
async def start_batch_processor():
    asyncio.create_task(process_batches())

async def process_batches():
    while True:
        if len(pending_requests) >= batch_size:
            batch = [pending_requests.popleft() for _ in range(batch_size)]
            inputs = [item[0] for item in batch]
            futures = [item[1] for item in batch]
            
            # Batch inference
            results = model(inputs)
            
            # Return results
            for future, result in zip(futures, results):
                future.set_result(result)
        else:
            await asyncio.sleep(batch_timeout)

Deployment Patterns

A/B Testing

Deploy multiple model versions:

# Create instances for model v1 and v2
fugoku create instance --name inference-v1-1 --plan gpu-a100-1 --image pytorch-2.0-cuda12
fugoku create instance --name inference-v2-1 --plan gpu-a100-1 --image pytorch-2.0-cuda12

# Deploy models
# inference-v1-1: model_v1.pth
# inference-v2-1: model_v2.pth

# Load balancer with weighted backends
fugoku lb add-backend inference-lb --instance inference-v1-1 --weight 90
fugoku lb add-backend inference-lb --instance inference-v2-1 --weight 10
# 90% traffic to v1, 10% to v2

Canary Deployment

Gradual rollout:

# Week 1: 5% traffic to new model
fugoku lb add-backend inference-lb --instance inference-v2-1 --weight 5

# Week 2: 25% if metrics good
fugoku lb update-backend inference-lb --instance inference-v2-1 --weight 25

# Week 3: 100%
fugoku lb remove-backend inference-lb --instance inference-v1-1
fugoku lb update-backend inference-lb --instance inference-v2-1 --weight 100

Blue-Green Deployment

Zero-downtime updates:

# Deploy new version (green)
fugoku create instance --name inference-green-1 --plan gpu-a100-1

# Test green environment
curl http://<green-ip>:8000/health

# Switch traffic (via floating IP)
fugoku networking unassign-ip --ip <floating-ip> inference-blue-1
fugoku networking assign-ip --ip <floating-ip> inference-green-1

# Rollback if issues
fugoku networking assign-ip --ip <floating-ip> inference-blue-1

Security

API Authentication

# serve.py
from fastapi import Header, HTTPException

API_KEY = "your-secret-key"

@app.post("/predict")
async def predict(file: UploadFile, x_api_key: str = Header(None)):
    if x_api_key != API_KEY:
        raise HTTPException(status_code=401, detail="Invalid API key")
    # ... inference code ...

Rate Limiting

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/predict")
@limiter.limit("10/minute")
async def predict(request: Request, file: UploadFile):
    # ... inference code ...

HTTPS

# Use Caddy for automatic HTTPS
sudo apt install caddy

# Caddyfile
cat > Caddyfile << 'EOF'
api.example.com {
    reverse_proxy localhost:8000
}
EOF

sudo caddy start

Cost Optimization

Right-size GPU:

  • A100 for large models (>1B parameters)
  • T4 for smaller models (coming Q3 2026)
  • CPU for very small models (<100M parameters)

Use CPU for preprocessing: Offload image resizing, tokenization to CPU instances.

Batch inference: Process multiple requests together for better GPU utilization.

Spot instances: 70% cheaper for non-critical workloads (coming Q4 2026).

Examples

Text Generation (GPT-2)

# serve.py
from transformers import GPT2LMHeadModel, GPT2Tokenizer
from fastapi import FastAPI

app = FastAPI()

model = GPT2LMHeadModel.from_pretrained('gpt2')
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
model.eval()

@app.post("/generate")
async def generate(prompt: str, max_length: int = 50):
    inputs = tokenizer(prompt, return_tensors='pt')
    outputs = model.generate(
        inputs['input_ids'],
        max_length=max_length,
        num_return_sequences=1
    )
    text = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return {"generated_text": text}

Object Detection (YOLOv5)

# serve.py
import torch
from PIL import Image
from fastapi import FastAPI, File, UploadFile
import io

app = FastAPI()
model = torch.hub.load('ultralytics/yolov5', 'yolov5s')

@app.post("/detect")
async def detect(file: UploadFile):
    image = Image.open(io.BytesIO(await file.read()))
    results = model(image)
    
    detections = []
    for *box, conf, cls in results.xyxy[0]:
        detections.append({
            "class": model.names[int(cls)],
            "confidence": float(conf),
            "bbox": [float(x) for x in box]
        })
    
    return {"detections": detections}

Troubleshooting

High latency:

  • Check GPU utilization (should be high)
  • Enable batching
  • Use TensorRT or ONNX
  • Profile with py-spy or cProfile

Out of memory:

  • Reduce batch size
  • Use quantization
  • Use smaller model
  • Upgrade to larger GPU

Cold start slow:

  • Pre-warm model at startup
  • Use model caching
  • Keep instances running (don't stop/start frequently)

Support

Deployment help: support@fugoku.com

Community: Discord #ml-deployment


Next Steps:

On this page