Skip to main content

Worker (Oban) và Queue

landing_page_backend chạy job nền bằng Oban (DB-backed), kết hợp RabbitMQ cho job cross-service và Kafka cho event stream. Module worker Oban nằm trong lib/workers/.

Oban

Cấu hình

  • Khai báo queue + plugin ở config/*.exs (:landing_page, Oban).
  • Migration của Oban đã có sẵn trong priv/repo/migrations/.
  • DB-backed nên job vẫn còn sau khi restart node.

Ví dụ cấu hình:

config :landing_page, Oban,
repo: LandingPage.Repo,
queues: [
default: 20,
email: 10,
indexing: 10,
analytics: 30,
google: 5,
domain: 5,
transactions: 10,
susa: 5,
partner_service: 5
],
plugins: [
Oban.Plugins.Pruner,
{Oban.Plugins.Cron, crontab: [...]}
]

Mẫu worker

defmodule LandingPage.Workers.EmailWorker do
use Oban.Worker, queue: :email, max_attempts: 5

@impl true
def perform(%Oban.Job{args: %{"to" => to, "template" => template} = args}) do
LandingPage.Email.send_template(to, template, args)
end
end
  • Job dài: tinh chỉnh max_attempts + backoff.
  • Job idempotent: thêm unique: [period: 60] để tránh trùng lặp.
  • Enqueue: Oban.insert(EmailWorker.new(%{...})).

Danh mục worker

TệpMục đích
analytics_worker.exGộp event analytics, ghi vào QuestDB / Postgres.
botcake_worker.exCầu nối tới Botcake (chatbot).
domain_worker.exVerify domain TXT + SSL.
draft_form_worker.exXử lý lead dạng draft.
email_worker.exGửi email transactional.
form_data_worker.exĐẩy lead sang CRM / Sheet / webhook.
google_worker.exJob Google Ads / Sheet / Drive.
indexing_worker.exCập nhật index Elasticsearch.
main_worker.exFallback tổng quát.
partner_service_worker.exĐồng bộ partner service.
susa_worker.exĐồng bộ Susa.
task_pool_worker.exTask tuần tự tổng quát.
transactions_worker.exĐối soát giao dịch.

Khi tạo worker mới, bổ sung vào bảng trên.

Cron qua Oban

{Oban.Plugins.Cron, crontab: [
{"*/15 * * * *", LandingPage.Workers.AnalyticsWorker, args: %{type: :aggregate}},
{"0 * * * *", LandingPage.Workers.DomainWorker, args: %{action: :recheck}},
]}

Quantum (LandingPage.Scheduler)

  • Một vài cron dùng Quantum (lib/landing_page/scheduler.ex, schedules/) cho legacy hoặc workflow có nhánh.
  • Quy tắc chọn:
    • Cron đơn giản, idempotent → Oban Cron plugin.
    • Workflow phân nhánh → Quantum với module riêng.

Abstraction queue (lib/queue/)

Lớp wrap cho Rabbit / Oban / Kafka. Controller không nên gọi AMQP.Basic.publish trực tiếp — dùng LandingPage.Queue.publish/3.

RabbitMQ

  • Consumer ở lib/rabbit/ (use GenRMQ.Consumer).
  • Topology khai báo trong LandingPage.Rabbit.Topology.
  • Consumer mới phải đăng ký trong Rabbit.Supervisor.

Kafka

  • Producer + consumer trong lib/event_streaming/.
  • Đặt tên consumer group: webcake.<service>.<topic>.
  • Helper producer: EventStreaming.publish/3.

Outbox

  • lib/outbox/: ghi dòng outbox trong cùng transaction; dispatcher nền sẽ đẩy ra queue / webhook.
  • Dùng cho các luồng cần at-least-once (lead → CRM, transaction → bank).

Theo dõi

  • Oban Web UI (nếu cài): xem queue, retry, kill.
  • Oban.check_queue(:default) cho trạng thái runtime.
  • Oban.retry_job(job_id) / Oban.cancel_job(job_id).

Trong IEx:

import Ecto.Query

from(j in Oban.Job, group_by: j.queue, select: {j.queue, count(j.id)})
|> LandingPage.Repo.all()

Best practice

  • Worker phải idempotent — chạy hai lần không tạo dữ liệu trùng.
  • Truyền payload tối thiểu (%{form_id: id}) và load lại state trong perform/1.
  • Catch lỗi, log, sau đó re-raise để Sentry có stacktrace.
  • Queue luôn pending thường do concurrency: 0 — kiểm tra config khi cảnh báo nổi lên.

Xem thêm