For my startup project darticle, we have several "jobs" which need to be taken care of outside of standard web page serving, including e-mailing and web scraping. For scalability, these jobs are handled asynchronously and concurrently, with the goal of the job queue being able to scale across various servers. Here I will write about darticle's solution for implementing such a job management system with CouchDB.
Workers need to grab a job document from the database, but which document(s) should they grab? A simple CouchDB mapping view solves this easily. In darticle's case, job can be queued for some time in the future. For example, an e-mail can be queued to be sent the following day, and a scrape can be queued to be run as soon as a worker is available. The map function assigns the date as one of its keys so it can be queried (e.g. from 0 to "now").
In darticle's implementation, there are multiple types of work which can be performed. Workers can subscribe to look for one or several of these types of work. The type of work ("worker" field) is put in the key of the view in addition to the date so it can be filtered easily.
Below is a simplified version of darticle's _design/work/new view.
map: function (doc) { if (doc.type !== 'WorkItem' || doc.claimed) { return; } // null dates default to 0 var date = doc.date || 0; emit([ doc.worker, date ], null); }
(darticle's engine uses UNIX timestamps for dates (less semantic, but more practical for indexing).)
For a worker to look for e-mails which need to be sent, for example, it would query the view with { "startkey": [ "email", 0 ], "endkey": [ "email", now ] }, where now is e.g. 1296945062 (for Sat, 05 Feb 2011 22:31:02). The worker would then claim (see next section) as many documents as it could and send the e-mails.
CouchDB operates on documents. Unlike with Redis, we can't use a set and pull work items atomically from the set. And unlike some SQL databases, we can't exclusive-lock a row, delete the row, then perform the work. With CouchDB, we have to depend upon collision handling, else we will run into concurrency issues.
To make the problem more clear: if we have to worker servers running simultaneously, and we enqueue a work item (say, to send an e-mail), both worker servers will be informed of the new work item. If both are currently unbusy, they'll both grab the work item and see that it hasn't been processed. Both work servers will then send the e-mail, and mark the job as completed! Clearly, we need some type of atomic "give me a new work item" procedure in the database. CouchDB does not offer one directly, so we must implement it ourselves (or switch to something else).
One of the beauties of CouchDB is that document changes do not override each other; if two servers pull a document, update it, then push the changes, there will be a conflict for the last to push the document, and that push will fail. We can then implement locking as such:
true), the worker ignores the job, as someone else has already claimed it for their own.true and pushes the document.This procedure ensures a document is not claimed by multiple workers. Below is a JavaScript code example of the locking procedure (using promises).
claim: function (document, successCallback, failCallback /* both callbacks optional */) { if (document.claimed) { throw new Error('Work item already claimed'); } document = util.deepClone(document); document.claimed = true; return db.put(document._id, document) .then(function (response) { document._rev = response.rev; document._id = response.id; return document; }) .then(successCallback, failCallback) .then(undefined, function () { // Make sure conflict errors don't throw }); };
Depending upon the implementation of the work queue, the worker can delete the job document when it is done with the job, or as soon as it claims the job. It can also mark the job as completed and leave it in the database. In darticle's case, there is a "status" field which is set to "fail" or "retry" if the job couldn't successfully be completed, and the document is deleted upon success.
A work queue database should not be replicated as a live database (from which workers can ask for work). I have not found a solution where replication could work with the above locking model.
Sharing, however, should work, as job documents have their own locking state contained and jobs are independent of each other. (I have not tested this, though.)
Although CouchDB does not have atomic or locking operations out-of-the-box, they can be simulated using conflict resolution to create a distributed work queue system.