Previously, I created a “MultiLock” class for managing locks and lockgroups across a shared file system. Now I want to create a simple command-line utility that uses this functionality.
To start, we can create a simple runone() function that leverages MutliLock, e.g.,
def _runone(func, lockname, lockgroup, basedir, *args, **kwargs):
''' run one, AND ONLY ONE, instance (respect locking)
>>>
>>> _runone(print, 'lock', 'locks', '.', 'hello world')
>>>
'''
lock = MultiLock(lockname, lockgroup, basedir)
if lock.acquire():
func(*args, **kwargs)
lock.release()
Any python function (with its *args and **kwargs) will be called if (and-only-if) the named lock was acquired. At a minimum, this guarantees that one (and only one) instance of the function can be called at a given time.
To make this slightly more magic, we can wrap this as a decorator function — a decorator that accepts arguments,
def runone(lockname='lock', lockgroup='.locks', basedir='.'):
''' decorator with closure
returns a function that will run one, and only one, instance per lockgroup
'''
def wrapper(fn):
def new_fn(*args, **kwargs):
return _runone(fn, lockname, lockgroup, basedir, *args, **kwargs)
return new_fn
return wrapper
The closure is used so that we can pass arguments to the decorator function, e.g.,
@runone('lock', 'lockgroup', '/shared/path')
def spam():
#do work, only if we acquire /shared/path/lockgroup/lock
Putting this all together, we can create a command-line utility that will execute any command-line program if (and only if) it acquires a named lock in the lockgroup. With such a utility we can add concurrency and fault-tolerance to any shell script that can be executed across all nodes in a cluster. This code is also available in this github repo.
import time, sys, subprocess, optparse, logging
from multilock import MultiLock
def runone(lockname='lock', lockgroup='.locks', basedir='.'):
''' decorator with closure
returns a function that will run one, and only one, instance per lockgroup
'''
def wrapper(fn):
def new_fn(*args, **kwargs):
return _runone(fn, lockname, lockgroup, basedir, *args, **kwargs)
return new_fn
return wrapper
def _runone(func, lockname, lockgroup, basedir, *args, **kwargs):
''' run one, AND ONLY ONE, instance (respect locking)
>>>
>>> _runone(print, 'lock', 'locks', '.', 'hello world')
>>>
'''
lock = MultiLock(lockname, lockgroup, basedir)
if lock.acquire():
func(*args, **kwargs)
lock.release()
if __name__ == '__main__':
p = optparse.OptionParser('usage: %prog [options] cmd [args]')
p.add_option('--lockname', '-l', dest="lockname", default='lock', help="the lock name, should be unique for this instance")
p.add_option('--lockgroup', '-g', dest="lockgroup", default='.locks', help="the lockgroup, a collection of locks independent locks")
p.add_option('--basedir', '-d', dest="basedir", default='.', help="the base directory where the lock files should be written")
p.add_option('--wait', '-w', dest="wait", default=None, help="optional, wait (up till the number of seconds specified) for all locks to complete in the lockgroup")
options, args = p.parse_args()
if options.wait:
lock = MultiLock(options.lockname, options.lockgroup, options.basedir)
lock.wait(options.wait)
sys.exit()
@runone(options.lockname, options.lockgroup, options.basedir)
def _main():
subprocess.call(args)
_main()