Погода в городе (FastAPI, celery, rabbimq)Погода в городе (FastAPI, celery, rabbimq)

1. Установка необходимых библиотек и сервисов

Для проекта потребуется установить FastAPI, Celery, клиент для RabbitMQ, а также сам RabbitMQ.

  1. Установка библиотек:
   pip install fastapi uvicorn celery requests aiohttp
  1. Установка и настройка RabbitMQ:
  • Скачайте и установите RabbitMQ с официального сайта.
  • Запустите RabbitMQ после установки. Откройте консоль RabbitMQ Management (обычно доступно по адресу http://localhost:15672).

2. Настройка проекта FastAPI

Создаем основную структуру проекта:

weather_notifications/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── celery_worker.py
│   ├── models.py
│   ├── database.py
│   ├── tasks.py
│   ├── crud.py
│   └── schemas.py
└── requirements.txt

3. Настройка базы данных и моделей

Мы будем хранить пользователей, их подписки на города и последний зарегистрированный статус погоды. Используем SQLAlchemy.

# app/database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "sqlite:///./test.db"  # для тестирования

engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()
# app/models.py
from sqlalchemy import Column, Integer, String, Float, ForeignKey
from sqlalchemy.orm import relationship
from .database import Base

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    city = Column(String, index=True)
    last_temperature = Column(Float)
    email = Column(String, unique=True, index=True)

4. Создание API-эндпоинтов

Эндпоинты для регистрации пользователей и подписок на города.

# app/main.py
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
from . import models, crud, schemas
from .database import engine, SessionLocal

models.Base.metadata.create_all(bind=engine)

app = FastAPI()

@app.post("/users/")
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    return crud.create_user(db=db, user=user)

@app.get("/weather_update/{city}")
def get_weather_update(city: str, db: Session = Depends(get_db)):
    return crud.get_weather_update_for_city(db=db, city=city)

5. Подключение Celery для асинхронных задач

Настроим Celery для взаимодействия с RabbitMQ.

# app/celery_worker.py
from celery import Celery
import requests

# Создаем приложение Celery
celery_app = Celery(
    "tasks",
    broker="pyamqp://guest@localhost//",  # подключение к локальному RabbitMQ
    backend="rpc://"
)

@celery_app.task
def check_weather(user_id, city):
    response = requests.get(f"http://api.weatherapi.com/v1/current.json?key=YOUR_API_KEY&q={city}")
    data = response.json()

    # Логика проверки погоды: если изменения существенны, отправляем уведомление
    temperature = data['current']['temp_c']
    # Здесь может быть код для проверки изменений температуры и уведомления пользователя

6. Планирование задач Celery с периодичностью

Для периодической проверки погоды можно использовать Celery Beat. В данном случае, раз Celery Beat не требует Docker, его можно запустить как обычное приложение Celery.

# app/tasks.py
from .celery_worker import check_weather
from .database import SessionLocal
from .models import User

def schedule_weather_updates():
    db = SessionLocal()
    users = db.query(User).all()
    for user in users:
        check_weather.delay(user.id, user.city)
    db.close()

7. Запуск Celery Worker и Celery Beat

  1. Запускаем Celery Worker:
   celery -A app.celery_worker worker --loglevel=info
  1. Для автоматического запуска задач можно запустить Celery Beat (при необходимости):
   celery -A app.celery_worker beat --loglevel=info

8. Реализация уведомлений

Уведомления можно отправлять через email или другой способ. Ниже пример функции для отправки email.

# app/notifications.py
import smtplib

def send_email(to_email, subject, body):
    with smtplib.SMTP("smtp.mailtrap.io", 2525) as server:
        server.login("USERNAME", "PASSWORD")
        message = f"Subject: {subject}\n\n{body}"
        server.sendmail("from@example.com", to_email, message)

9. Тестирование

Проверьте работу API, выполнение задач в Celery, и убедитесь, что данные о погоде обновляются и отправляются пользователям корректно.

Итоги и дальнейшие шаги

Проект с использованием FastAPI, Celery и RabbitMQ без Docker теперь настроен для уведомлений о погоде.

Поделится