Skip to content

NeaByteLab/Diques

Repository files navigation

Diques License: MIT Node.js CI

A distributed queue system and job scheduler for Node.js applications with timezone support.

📦 Installation

npm install @neabyte/diques

or with Redis integration:

npm install redis

🚀 Getting Started

📥 Import Module

// CommonJS
const Diques = require('@neabyte/diques')

// ESM (ES Modules)
import Diques from '@neabyte/diques'

// TypeScript types
import type {
  QueueJob, QueueOptions,
  ScheduledJob, ScheduledOptions
} from '@neabyte/diques'

✨ Create Instance

// Create Redis client
import { createClient } from 'redis'
const redisClient = createClient({ url: 'redis://localhost:6379' })
await redisClient.connect()

// Initialize Diques with Redis
const instance = new Diques(redisClient)

// Or use without Redis (memory mode)
const instance = new Diques()

🔄 Job Deduplication

Prevents duplicate jobs from being processed by debouncing rapid-fire requests:

// Queue with default debounce
const emailQueue = instance.queue(async (job) => {
  console.log('Processing email:', job.data.email)
}, { name: 'email-queue', debounce: 5000 })

// Individual job debouncing
await emailQueue.add({ email: '...' }, { debounce: 2000 })
await emailQueue.add({ email: '...' }, { debounce: 2000 }) // This will cancel the previous job
await emailQueue.add({ email: '...' }, { debounce: 2000 }) // Only this one will execute after 2 seconds

⚠️ Job Error Retry

Control how many times a job should retry before being discarded:

// Fail-fast queue (no retries)
const criticalQueue = instance.queue(async (job) => {
  console.log('Critical task:', job.data)
}, { name: 'critical-queue', maxFailure: 1 }) // Fail immediately

// Queue with custom retry configuration
const emailQueue = instance.queue(async (job) => {
  console.log('Processing email:', job.data.email)
}, { name: 'email-queue', maxFailure: 3 }) // Retry up to 3 times

// Individual job retry mechanism
await emailQueue.add({ type: 'critical' }, { maxFailure: 1 })  // Fail fast
await emailQueue.add({ type: 'normal' }, { maxFailure: 3 })    // Retry 3 times

Default behavior: maxFailure: 1 (fail immediately)


🎯 Job Priorities

Controls job processing order with priority-based scheduling:

// High priority jobs (lower number = higher priority)
await emailQueue.add({
  email: 'admin@example.com',
  type: 'emergency'
}, { priority: 1 }) // Processed first

// Normal priority jobs
await emailQueue.add({
  email: 'user@example.com',
  type: 'normal'
}, { priority: 5 }) // Default priority

// Low priority jobs
await emailQueue.add({
  email: 'newsletter@example.com',
  type: 'bulk'
}, { priority: 10 }) // Processed last

📡 Observables (Event Streaming)

Monitor job lifecycle events in real-time with event listeners:

// Create a queue
const emailQueue = instance.queue(async (job) => {
  console.log('Processing email:', job.data.email)
  await sendEmail(job.data.email, job.data.subject)
}, { name: 'email-queue' })

// Listen for job events
emailQueue.on('job:started', (job) => {
  console.log(`📡 Job started: ${job.data.email}`)
})
emailQueue.on('job:completed', (job) => {
  console.log(`✅ Job completed: ${job.data.email}`)
})
emailQueue.on('job:failed', (job, error) => {
  console.log(`❌ Job failed: ${job.data.email} - ${error.message}`)
})

// Add jobs and watch events
await emailQueue.add({ email: 'user@example.com', subject: 'Welcome!' })

🔧 Event Management

// Remove specific event listener
const listener = (job) => console.log('Job started:', job.id)
emailQueue.on('job:started', listener)
emailQueue.off('job:started', listener) // Remove listener

// Multiple listeners for same event
emailQueue.on('job:completed', (job) => console.log('Logger: Job done'))
emailQueue.on('job:completed', (job) => console.log('Analytics: Job done'))

💡 Job Scheduler

// Initialize instance
const instance = new Diques()

// Schedule a job to run every minute
const job = instance.schedule('* * * * *', () => {
  console.log('Running every minute!')
})

// Control the job
job.stop()   // Stop execution
job.start()  // Resume execution

⚙️ With Options

import Diques, { type ScheduledJob, type ScheduledOptions } from '@neabyte/diques'

// Initialize instance
const instance = new Diques()

// Schedule with custom options
const options: ScheduledOptions = {
  name: 'morning-greeting',
  timezone: 'Asia/Jakarta'
}

// Create scheduled job
const job: ScheduledJob = instance.schedule('0 9 * * 1-5', () => {
  console.log('Good morning! Running weekdays at 9 AM')
}, options)

// Check job status
console.log('Job active:', job.isActive())
console.log('Next run:', job.getNextRun())

⏰ Cron Syntax

Supports standard cron syntax with 5 fields:

# ┌──────────────  minute (0 - 59)
# │ ┌────────────  hour (0 - 23)
# │ │ ┌──────────  day of month (1 - 31)
# │ │ │ ┌────────  month (1 - 12)
# │ │ │ │ ┌──────  day of week (0 - 6, 0 is Sunday)
# │ │ │ │ │
# │ │ │ │ │
# * * * * *

📋 Field Values

Field Range Description
minute 0-59 Minutes after the hour
hour 0-23 Hours in 24-hour format
day of month 1-31 Day of the month
month 1-12 Month of the year
day of week 0-6 Day of the week (0 = Sunday)

🔧 Special Characters

Character Description Example Meaning
* Any value * * * * * Every minute
, List separator 1,3,5 * * * * At minutes 1, 3, and 5
- Range 0-30 * * * * Every minute from 0 to 30
/ Step values */15 * * * * Every 15 minutes

📄 License

This project is licensed under the MIT license. See the LICENSE file for more info.