Made to Order Software Corporation Logo

Safely Processing a Queue of Jobs

What is a Queue

In software a queue is a container which is used to add items on one side and remove them on the other in a very efficient manner.

Sorted Queues

When used by a Journal, a queue is often going to support some kind of sorted order. Some jobs are more important than others and these should be built sooner. In other words, we give those jobs a higher priority and we sort the queue by priority first then using the time at which the job gets inserted.

Note that the C++ std::queue and std::unque containers do not offer anyway to support a priority. Instead you have to use the std::priority_queue.

Permanent Queues

Whenever it is important to execute a job even if you quit your software, you need to make your journal permanent. In most case, you use a database system to save these jobs. This could be a Redis, MySQL, PosgreSQL or your own implementation. The main point in this case is that whenever you add a new job, it gets added to a file so if you reboot, the job information is still available and it can still be executed when you restart your application.

I won't go in detail on how you make a queue permanent. However, to secure your jobs, you want to make sure to write them to disk as soon as they are added to the queue. Caching the jobs in memory is not going to be enough in case your software crashes. In some cases, using a data store such as Redis is a good medium solution. Redis will take care of saving the data to file as required and Redis shouldn't crash when your application crashes (although Redis could also be affected by a bug which makes it crash and your data may get lost that way).

Queue for a Journal

When using a queue to handle jobs, it is very important to handle the queue in a manner which will prevent the journal from running one job forever.

Say you have a few classes as follow:

class JobRunner
{
public:
    typedef std::shared_ptr<JobRunner>   pointer_t;
    typedef std::queue<pointer_t>  queue_t;

    virtual ~JobRunner();
    virtual bool  run() = 0;
};

class JobQueue
{
public:
    void add_job(JobRunner::pointer_t job)
    {
        queue_.push_back(job);
    }

    void run_job()
    {
        if(!queue_.empty())
        {
            queue_.front()->run();
            queue_.pop_front();
        }
    }

private:
    queue_t     queue_;
};

The problem of the run_job() function is that it will not pop the job it just ran if the job run() function emits an exception. Assuming you do catch the exception at a higher level, when you call the run_job() again, you're goign to re-run the same job over and over again until that job does not emit an exception.

In order to fix the problem, you must pop the job first, then call the run() function. Another possibility is to have a counter in the JobRunner. Each time you execute the job, you increase that counter. Once the counter reaches a set maximum, you pop the job from the queue and that's it (maybe calling a "job failed" function so the event can be logged or some other function run to potentially fix the problem).