{"id":666,"date":"2012-11-05T22:06:13","date_gmt":"2012-11-05T22:06:13","guid":{"rendered":"http:\/\/tech.avant.net\/q\/?p=666"},"modified":"2019-04-30T16:09:47","modified_gmt":"2019-04-30T16:09:47","slug":"locking-and-concurrency-in-python","status":"publish","type":"post","link":"https:\/\/tech.avant.net\/q\/locking-and-concurrency-in-python\/","title":{"rendered":"locking and concurrency in python, part 1"},"content":{"rendered":"<p>I would like to do file-locking concurrency control in python. Additionally, I would like to provide a &#8220;run-once-and-only-once&#8221; functionality on a shared cluster; in other words, I have multiple batch jobs to run over a shared compute cluster and I want a simple way to provide fault tolerance for parallel jobs.<\/p>\n<p>The batch jobs should leverage a locking mechanism with the following method signatures,<\/p>\n<pre class=\"sh_python\">class Lock:\n\n    def acquire(self)\n        pass\n\n    def release(self)\n        pass\n\n    def wait(self, timeout)\n        pass\n<\/pre>\n<p>Using a shared filesystem, such as NFS, we can use file or directory locking, provided we can guarantee atomicity for the creation of the lock. I.e., only one host in a cluster can acquire a named lock. There are different ways to guarantee atomicity on file operation, depending on your filesystem.<\/p>\n<p>One approach is <strong>os.makedir<\/strong>(), which is atomic on POSIX systems. Alternatively, you can use the following,<\/p>\n<pre class=\"sh_python\">&gt;&gt;&gt;\n&gt;&gt;&gt; fd = os.open('foo.lock', os.O_CREAT|os.O_EXCL|os.O_RDWR)\n&gt;&gt;&gt; \n<\/pre>\n<p>This is atomic on most filesystems. Lastly, <strong>os.rename<\/strong>() is atomic on POSIX and most network file systems. In other words, if multiple hosts attempt the same os.rename operation on a shared file, only one will succeed and the others will raise on OSError.<\/p>\n<p>In order to maximize fault-tolerance, we can create a lockfile with a hostname and process-id, rename the file, and then read the renamed file to verify the correct hostname and process-id. This will cover most all network shared filesystems (that may or may not be POSIX compliant). The following python snippet will perform this multi-lock,<\/p>\n<pre class=\"sh_python\">class MultiLock:\n    def __init__(self, lockname='lock'\n        self.lockname = lockname\n        self.lockfile = os.path.join(lockname, lockname + '.lock')\n        self.lockedfile = os.path.join(lockname, lockname + '.locked')\n        self.hostname = socket.gethostname()\n        self.pid = os.getpid()\n        self.fd = None\n\n    def acquire(self):\n        if not self.verify():\n            logging.debug('you do not have the lock %s' %(self.lockedfile))\n            try:\n                logging.debug('attempt to create lock %s' %(self.lockfile))\n                os.mkdir(os.path.dirname(self.lockfile))\n                self.fd = os.open(self.lockfile, os.O_CREAT|os.O_EXCL|os.O_RDWR)\n                os.write(self.fd, self.hostname+' '+str(self.pid))\n                os.fsync(self.fd)\n                os.close(self.fd)\n                logging.debug('attempt multilock %s' %(self.lockedfile))\n                os.rename(self.lockfile, self.lockedfile)\n                return self.verify()\n            except OSError:\n                logging.debug('unable to multilock %s' %(self.lockfile))\n        return 0\n\n    def verify(self):\n        logging.debug('test if this is your lock, %s' %(self.lockedfile))\n        try:\n            self.fd = os.open(self.lockedfile, os.O_RDWR)\n            qhostname, qpid = os.read(self.fd, 1024).strip().split()\n            os.close(self.fd)\n            if qhostname != self.hostname or int(qpid) != int(self.pid):\n                logging.debug('%s:%s claims to have the lock' %(qhostname, qpid))\n                return 0\n            logging.debug('success, you have lock %s' %(self.lockedfile))\n            return 1\n        except:\n            logging.debug('you do not have lock %s' %(self.lockedfile))\n            return 0\n<\/pre>\n<p>Furthermore, I would like a &#8220;lockgroup&#8221; such that I can create several locks in a group and a wait() function that will wait for all of the locks in a group to complete. In other words, we can start multiple jobs in parallel which can be distributed across the cluster (say, one per node) and then a wait() statement will wait for all jobs to complete.<\/p>\n<p>Putting this all together, we can create a python &#8220;multilock&#8221; module with a &#8220;MultiLock&#8221; class, which is also available in this <a href=\"https:\/\/github.com\/timwarnock\/runone.py\">github repo<\/a>, as follows,<\/p>\n<pre class=\"sh_python\">import time, socket, shutil, os, logging, errno\n\nclass MultiLockTimeoutException(Exception):\n    pass\n\nclass MultiLockDeniedException(Exception):\n    pass\n\nclass MultiLock:\n    def __init__(self, lockname='lock', lockgroup='.locks', basepath='.', poll=0.5):\n        ''' MultiLock instance\n\n            lockname: the name of this lock, default is 'lock'\n            lockgroup: the name of the lockgroup, default is '.locks'\n            basepath: the directory to store the locks, default is the current directory\n            poll: the max time in seconds for a lock to be established, this must be larger\n                  than the max time it takes to acquire a lock\n        '''\n        self.lockname = lockname\n        self.basepath = os.path.realpath(basepath)\n        self.lockgroup = os.path.join(self.basepath, lockgroup)\n        self.lockfile = os.path.join(self.lockgroup, lockname, lockname + '.lock')\n        self.lockedfile = os.path.join(self.lockgroup, lockname, lockname + '.locked')\n        self.hostname = socket.gethostname()\n        self.pid = os.getpid()\n        self.poll = int(poll)\n        self.fd = None\n\n\n    def acquire(self, maxage=None):\n        if not self.verify():\n            logging.debug('you do not have the lock %s' %(self.lockedfile))\n            if maxage:\n                self.cleanup(maxage)\n            try:\n                logging.debug('make sure that the lockgroup %s exists' %(self.lockgroup))\n                os.makedirs(self.lockgroup)\n            except OSError as exc:\n                if exc.errno == errno.EEXIST:\n                    pass\n                else:\n                    logging.error('fatal error trying to access lockgroup %s' %(self.lockgroup))\n                    raise\n            try:\n                logging.debug('attempt to create lock %s' %(self.lockfile))\n                os.mkdir(os.path.dirname(self.lockfile))\n                self.fd = os.open(self.lockfile, os.O_CREAT|os.O_EXCL|os.O_RDWR)\n                os.write(self.fd, self.hostname+' '+str(self.pid))\n                os.fsync(self.fd)\n                os.close(self.fd)\n                logging.debug('attempt multilock %s' %(self.lockedfile))\n                os.rename(self.lockfile, self.lockedfile)\n                return self.verify()\n            except OSError:\n                logging.debug('unable to multilock %s' %(self.lockfile))\n        return 0\n\n   \n    def release(self):\n        try:\n            if self.verify():\n                shutil.rmtree(os.path.dirname(self.lockedfile))\n                try:\n                    logging.debug('released lock %s, will try to clean up lockgroup %s' %(self.lockname, self.lockgroup))\n                    os.rmdir(self.lockgroup)\n                except OSError as exc:\n                    if exc.errno == errno.ENOTEMPTY:\n                        logging.debug('lockgroup %s is not empty' %(self.lockgroup))\n                        pass\n                    else:\n                        raise\n        finally:\n            return self.cleanup()\n\n\n    def verify(self):\n        logging.debug('test if this is your lock, %s' %(self.lockedfile))\n        try:\n            self.fd = os.open(self.lockedfile, os.O_RDWR)\n            qhostname, qpid = os.read(self.fd, 1024).strip().split()\n            os.close(self.fd)\n            if qhostname != self.hostname or int(qpid) != int(self.pid):\n                logging.debug('%s:%s claims to have the lock' %(qhostname, qpid))\n                return 0\n            logging.debug('success, you have lock %s' %(self.lockedfile))\n            return 1\n        except:\n            logging.debug('you do not have lock %s' %(self.lockedfile))\n            return 0\n\n   \n    def cleanup(self, maxage=None):\n        ''' safely cleanup any lock files or directories (artifacts from race conditions and exceptions)\n        '''\n        if maxage and os.path.exists(os.path.dirname(self.lockedfile)):\n            try:\n                tdiff = time.time() - os.stat(os.path.dirname(self.lockedfile))[8]\n                if tdiff &gt;= maxage:\n                    logging.debug('lock %s is older than maxage %s' %(os.path.dirname(self.lockedfile), maxage))\n                    shutil.rmtree(os.path.dirname(self.lockedfile))\n            except:\n                pass\n        if os.path.isfile(self.lockedfile):\n            logging.debug('lock %s exists, checking hostname:pid' % (self.lockedfile))\n            qhostname, qpid = (None, None)\n            try:\n                fh = open(self.lockedfile)\n                qhostname, qpid = fh.read().strip().split()\n                fh.close()\n            except:\n                pass\n            if self.hostname == qhostname:\n                try:\n                    if int(qpid) &gt; 0:\n                        os.kill(int(qpid), 0)\n                except OSError, e:\n                    if e.errno != errno.EPERM:\n                        logging.error('lock %s exists on this host, but pid %s is NOT running, force release' % (self.lockedfile, qpid))\n                        shutil.rmtree(os.path.dirname(self.lockedfile))\n                        return 1\n                    else:\n                        logging.debug('lock %s exists on this host but pid %s might still be running' %(self.lockedfile, qpid))\n                else:\n                    logging.debug('lock %s exists on this host with pid %s still running' %(self.lockedfile, qpid))\n            return 0\n        return 1\n\n\n    def wait(self, timeout=86400):\n        logging.debug('waiting for lockgroup %s to complete' %(self.lockgroup))\n        timeout = int(timeout)\n        start_time = time.time()\n        while True:\n            try:\n                if (time.time() - start_time) &gt;= timeout:\n                    raise MultiLockTimeoutException(\"Timeout %s seconds\" %(timeout))\n                elif os.path.isdir(self.lockgroup):\n                    time.sleep(self.poll)\n                    os.rmdir(self.lockgroup)\n                return 1\n            except OSError as exc:\n                if exc.errno == errno.ENOTEMPTY:\n                    pass\n                elif exc.errno == errno.ENOENT:\n                    pass\n                else:\n                    logging.error('fatal error waiting for %s' %(self.lockgroup))\n                    raise\n\n\n    def __del__(self):\n        self.release()\n\n    \n    def __enter__(self):\n        ''' pythonic 'with' statement\n\n            e.g.,\n            &gt;&gt;&gt; with MultiLock('spam') as spam:\n            ...     logging.debug('we have spam')\n        '''\n        if self.acquire():\n            return self\n        raise MultiLockDeniedException(self.lockname)\n\n\n    def __exit__(self, type, value, traceback):\n        ''' executed after the with statement\n        '''\n        if self.verify():\n            self.release()\n<\/pre>\n<p>We can use this class to manage locks and lockgroups across network file shares, next, I&#8217;d like to demonstrate a simple <a href=\"\/q\/locking-and-concurrency-in-python-part-2\/\">command-line utility<\/a> that uses this functionality.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>I would like to do file-locking concurrency control in python. Additionally, I would like to provide a &#8220;run-once-and-only-once&#8221; functionality on a shared cluster; in other words, I have multiple batch jobs to run over a shared compute cluster and I want a simple way to provide fault tolerance for parallel jobs. The batch jobs should [&hellip;]<\/p>\n","protected":false},"author":2,"featured_media":0,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":[],"categories":[6],"tags":[],"_links":{"self":[{"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/posts\/666"}],"collection":[{"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/users\/2"}],"replies":[{"embeddable":true,"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/comments?post=666"}],"version-history":[{"count":10,"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/posts\/666\/revisions"}],"predecessor-version":[{"id":953,"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/posts\/666\/revisions\/953"}],"wp:attachment":[{"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/media?parent=666"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/categories?post=666"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/tech.avant.net\/q\/wp-json\/wp\/v2\/tags?post=666"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}