Consume with Node.js
This guide provides practical examples for consuming messages from Hooque queues using Node.js.
Polling (REST API)
The Pull model is ideal for workers that process messages at their own pace.
This is the most reliable way to consume messages. You manually acknowledge (Ack) only after successful processing.
const axios = require('axios');
// Obtain this from your consumer configuration in the Hooque dashboard
const QUEUE_URL = 'https://app.hooque.io/queues/cons_xxxxxxxx';
const TOKEN = 'your_consumer_token';
async function poll() {
const headers = { 'Authorization': `Bearer ${TOKEN}` };
const url = `${QUEUE_URL}/next`;
console.log(`Starting worker for ${QUEUE_URL}...`);
while (true) {
try {
// 1. Pull the next message
const res = await axios.get(url, {
headers,
validateStatus: s => s < 300
});
if (res.status === 200) {
const payload = res.data;
const meta = JSON.parse(res.headers['x-hooque-meta']);
try {
// 2. Process your business logic here
console.log('Processing:', payload);
// 3. Success: Acknowledge (Ack)
await axios.post(meta.ackUrl, {}, { headers });
} catch (err) {
console.error('Processing error:', err.message);
// 4. Failure: Negative Acknowledge (Nack)
// This will cause the message to be retried according to your queue policy.
await axios.post(meta.nackUrl, { reason: err.message }, { headers });
// Note: use meta.rejectUrl if the failure is permanent
// and you don't want to retry.
}
} else {
// 204 No Content: Empty queue, wait a bit
await new Promise(r => setTimeout(r, 1000));
}
} catch (err) {
console.error('Polling error:', err.message);
await new Promise(r => setTimeout(r, 5000));
}
}
}
poll();
Hooque deletes the message immediately upon delivery. Use this for non-critical tasks where "at-least-once" delivery isn't required.
const axios = require('axios');
const QUEUE_URL = 'https://app.hooque.io/queues/cons_xxxxxxxx';
const TOKEN = 'your_consumer_token';
async function poll() {
const headers = { 'Authorization': `Bearer ${TOKEN}` };
const params = { autoAck: 'true' };
while (true) {
try {
const res = await axios.get(`${QUEUE_URL}/next`, { headers, params });
if (res.status === 200) {
// Process your business logic here.
// Message is already deleted from the queue.
console.log('Processing (Auto Ack):', res.data);
} else {
await new Promise(r => setTimeout(r, 1000));
}
} catch (err) {
console.error('Error:', err.message);
await new Promise(r => setTimeout(r, 5000));
}
}
}
poll();
Streaming (SSE)
Real-time message delivery over a persistent connection.
const EventSource = require('eventsource');
const axios = require('axios');
const QUEUE_URL = 'https://app.hooque.io/queues/cons_xxxxxxxx';
const TOKEN = 'your_consumer_token';
const headers = { 'Authorization': `Bearer ${TOKEN}` };
const es = new EventSource(`${QUEUE_URL}/stream`, { headers });
es.addEventListener('message', async (event) => {
const { payload, meta } = JSON.parse(event.data);
try {
// 1. Process your business logic here
console.log('Received message:', payload);
// 2. Controlled Ack
await axios.post(meta.ackUrl, {}, { headers });
} catch (err) {
console.error('Error:', err.message);
// 3. Controlled Nack for retry
await axios.post(meta.nackUrl, { reason: err.message }, { headers });
}
});
es.addEventListener('error', (err) => {
console.error('SSE Connection Error:', err);
});
const EventSource = require('eventsource');
const QUEUE_URL = 'https://app.hooque.io/queues/cons_xxxxxxxx';
const TOKEN = 'your_consumer_token';
const headers = { 'Authorization': `Bearer ${TOKEN}` };
const es = new EventSource(`${QUEUE_URL}/stream?autoAck=true`, { headers });
es.addEventListener('message', (event) => {
const { payload } = JSON.parse(event.data);
// Process your business logic here
console.log('Received message (Auto Ack):', payload);
});