locking and concurrency in python, part 1

I would like to do file-locking concurrency control in python. Additionally, I would like to provide a “run-once-and-only-once” functionality on a shared cluster; in other words, I have multiple batch jobs to run over a shared compute cluster and I want a simple way to provide fault tolerance for parallel jobs.

The batch jobs should leverage a locking mechanism with the following method signatures,

class Lock:

    def acquire(self)
        pass

    def release(self)
        pass

    def wait(self, timeout)
        pass

Using a shared filesystem, such as NFS, we can use file or directory locking, provided we can guarantee atomicity for the creation of the lock. I.e., only one host in a cluster can acquire a named lock. There are different ways to guarantee atomicity on file operation, depending on your filesystem.

One approach is os.makedir(), which is atomic on POSIX systems. Alternatively, you can use the following,

>>>
>>> fd = os.open('foo.lock', os.O_CREAT|os.O_EXCL|os.O_RDWR)
>>> 

This is atomic on most filesystems. Lastly, os.rename() is atomic on POSIX and most network file systems. In other words, if multiple hosts attempt the same os.rename operation on a shared file, only one will succeed and the others will raise on OSError.

In order to maximize fault-tolerance, we can create a lockfile with a hostname and process-id, rename the file, and then read the renamed file to verify the correct hostname and process-id. This will cover most all network shared filesystems (that may or may not be POSIX compliant). The following python snippet will perform this multi-lock,

class MultiLock:
    def __init__(self, lockname='lock'
        self.lockname = lockname
        self.lockfile = os.path.join(lockname, lockname + '.lock')
        self.lockedfile = os.path.join(lockname, lockname + '.locked')
        self.hostname = socket.gethostname()
        self.pid = os.getpid()
        self.fd = None

    def acquire(self):
        if not self.verify():
            logging.debug('you do not have the lock %s' %(self.lockedfile))
            try:
                logging.debug('attempt to create lock %s' %(self.lockfile))
                os.mkdir(os.path.dirname(self.lockfile))
                self.fd = os.open(self.lockfile, os.O_CREAT|os.O_EXCL|os.O_RDWR)
                os.write(self.fd, self.hostname+' '+str(self.pid))
                os.fsync(self.fd)
                os.close(self.fd)
                logging.debug('attempt multilock %s' %(self.lockedfile))
                os.rename(self.lockfile, self.lockedfile)
                return self.verify()
            except OSError:
                logging.debug('unable to multilock %s' %(self.lockfile))
        return 0

    def verify(self):
        logging.debug('test if this is your lock, %s' %(self.lockedfile))
        try:
            self.fd = os.open(self.lockedfile, os.O_RDWR)
            qhostname, qpid = os.read(self.fd, 1024).strip().split()
            os.close(self.fd)
            if qhostname != self.hostname or int(qpid) != int(self.pid):
                logging.debug('%s:%s claims to have the lock' %(qhostname, qpid))
                return 0
            logging.debug('success, you have lock %s' %(self.lockedfile))
            return 1
        except:
            logging.debug('you do not have lock %s' %(self.lockedfile))
            return 0

Furthermore, I would like a “lockgroup” such that I can create several locks in a group and a wait() function that will wait for all of the locks in a group to complete. In other words, we can start multiple jobs in parallel which can be distributed across the cluster (say, one per node) and then a wait() statement will wait for all jobs to complete.

Putting this all together, we can create a python “multilock” module with a “MultiLock” class, which is also available in this github repo, as follows,

import time, socket, shutil, os, logging, errno

class MultiLockTimeoutException(Exception):
    pass

class MultiLockDeniedException(Exception):
    pass

class MultiLock:
    def __init__(self, lockname='lock', lockgroup='.locks', basepath='.', poll=0.5):
        ''' MultiLock instance

            lockname: the name of this lock, default is 'lock'
            lockgroup: the name of the lockgroup, default is '.locks'
            basepath: the directory to store the locks, default is the current directory
            poll: the max time in seconds for a lock to be established, this must be larger
                  than the max time it takes to acquire a lock
        '''
        self.lockname = lockname
        self.basepath = os.path.realpath(basepath)
        self.lockgroup = os.path.join(self.basepath, lockgroup)
        self.lockfile = os.path.join(self.lockgroup, lockname, lockname + '.lock')
        self.lockedfile = os.path.join(self.lockgroup, lockname, lockname + '.locked')
        self.hostname = socket.gethostname()
        self.pid = os.getpid()
        self.poll = int(poll)
        self.fd = None


    def acquire(self, maxage=None):
        if not self.verify():
            logging.debug('you do not have the lock %s' %(self.lockedfile))
            if maxage:
                self.cleanup(maxage)
            try:
                logging.debug('make sure that the lockgroup %s exists' %(self.lockgroup))
                os.makedirs(self.lockgroup)
            except OSError as exc:
                if exc.errno == errno.EEXIST:
                    pass
                else:
                    logging.error('fatal error trying to access lockgroup %s' %(self.lockgroup))
                    raise
            try:
                logging.debug('attempt to create lock %s' %(self.lockfile))
                os.mkdir(os.path.dirname(self.lockfile))
                self.fd = os.open(self.lockfile, os.O_CREAT|os.O_EXCL|os.O_RDWR)
                os.write(self.fd, self.hostname+' '+str(self.pid))
                os.fsync(self.fd)
                os.close(self.fd)
                logging.debug('attempt multilock %s' %(self.lockedfile))
                os.rename(self.lockfile, self.lockedfile)
                return self.verify()
            except OSError:
                logging.debug('unable to multilock %s' %(self.lockfile))
        return 0

   
    def release(self):
        try:
            if self.verify():
                shutil.rmtree(os.path.dirname(self.lockedfile))
                try:
                    logging.debug('released lock %s, will try to clean up lockgroup %s' %(self.lockname, self.lockgroup))
                    os.rmdir(self.lockgroup)
                except OSError as exc:
                    if exc.errno == errno.ENOTEMPTY:
                        logging.debug('lockgroup %s is not empty' %(self.lockgroup))
                        pass
                    else:
                        raise
        finally:
            return self.cleanup()


    def verify(self):
        logging.debug('test if this is your lock, %s' %(self.lockedfile))
        try:
            self.fd = os.open(self.lockedfile, os.O_RDWR)
            qhostname, qpid = os.read(self.fd, 1024).strip().split()
            os.close(self.fd)
            if qhostname != self.hostname or int(qpid) != int(self.pid):
                logging.debug('%s:%s claims to have the lock' %(qhostname, qpid))
                return 0
            logging.debug('success, you have lock %s' %(self.lockedfile))
            return 1
        except:
            logging.debug('you do not have lock %s' %(self.lockedfile))
            return 0

   
    def cleanup(self, maxage=None):
        ''' safely cleanup any lock files or directories (artifacts from race conditions and exceptions)
        '''
        if maxage and os.path.exists(os.path.dirname(self.lockedfile)):
            try:
                tdiff = time.time() - os.stat(os.path.dirname(self.lockedfile))[8]
                if tdiff >= maxage:
                    logging.debug('lock %s is older than maxage %s' %(os.path.dirname(self.lockedfile), maxage))
                    shutil.rmtree(os.path.dirname(self.lockedfile))
            except:
                pass
        if os.path.isfile(self.lockedfile):
            logging.debug('lock %s exists, checking hostname:pid' % (self.lockedfile))
            qhostname, qpid = (None, None)
            try:
                fh = open(self.lockedfile)
                qhostname, qpid = fh.read().strip().split()
                fh.close()
            except:
                pass
            if self.hostname == qhostname:
                try:
                    if int(qpid) > 0:
                        os.kill(int(qpid), 0)
                except OSError, e:
                    if e.errno != errno.EPERM:
                        logging.error('lock %s exists on this host, but pid %s is NOT running, force release' % (self.lockedfile, qpid))
                        shutil.rmtree(os.path.dirname(self.lockedfile))
                        return 1
                    else:
                        logging.debug('lock %s exists on this host but pid %s might still be running' %(self.lockedfile, qpid))
                else:
                    logging.debug('lock %s exists on this host with pid %s still running' %(self.lockedfile, qpid))
            return 0
        return 1


    def wait(self, timeout=86400):
        logging.debug('waiting for lockgroup %s to complete' %(self.lockgroup))
        timeout = int(timeout)
        start_time = time.time()
        while True:
            try:
                if (time.time() - start_time) >= timeout:
                    raise MultiLockTimeoutException("Timeout %s seconds" %(timeout))
                elif os.path.isdir(self.lockgroup):
                    time.sleep(self.poll)
                    os.rmdir(self.lockgroup)
                return 1
            except OSError as exc:
                if exc.errno == errno.ENOTEMPTY:
                    pass
                elif exc.errno == errno.ENOENT:
                    pass
                else:
                    logging.error('fatal error waiting for %s' %(self.lockgroup))
                    raise


    def __del__(self):
        self.release()

    
    def __enter__(self):
        ''' pythonic 'with' statement

            e.g.,
            >>> with MultiLock('spam') as spam:
            ...     logging.debug('we have spam')
        '''
        if self.acquire():
            return self
        raise MultiLockDeniedException(self.lockname)


    def __exit__(self, type, value, traceback):
        ''' executed after the with statement
        '''
        if self.verify():
            self.release()

We can use this class to manage locks and lockgroups across network file shares, next, I’d like to demonstrate a simple command-line utility that uses this functionality.