Previously, I created a “MultiLock” class for managing locks and lockgroups across a shared file system. Now I want to create a simple command-line utility that uses this functionality.
To start, we can create a simple runone() function that leverages MutliLock, e.g.,
def _runone(func, lockname, lockgroup, basedir, *args, **kwargs): ''' run one, AND ONLY ONE, instance (respect locking) >>> >>> _runone(print, 'lock', 'locks', '.', 'hello world') >>> ''' lock = MultiLock(lockname, lockgroup, basedir) if lock.acquire(): func(*args, **kwargs) lock.release()
Any python function (with its *args and **kwargs) will be called if (and-only-if) the named lock was acquired. At a minimum, this guarantees that one (and only one) instance of the function can be called at a given time.
To make this slightly more magic, we can wrap this as a decorator function — a decorator that accepts arguments,
def runone(lockname='lock', lockgroup='.locks', basedir='.'): ''' decorator with closure returns a function that will run one, and only one, instance per lockgroup ''' def wrapper(fn): def new_fn(*args, **kwargs): return _runone(fn, lockname, lockgroup, basedir, *args, **kwargs) return new_fn return wrapper
The closure is used so that we can pass arguments to the decorator function, e.g.,
@runone('lock', 'lockgroup', '/shared/path') def spam(): #do work, only if we acquire /shared/path/lockgroup/lock
Putting this all together, we can create a command-line utility that will execute any command-line program if (and only if) it acquires a named lock in the lockgroup. With such a utility we can add concurrency and fault-tolerance to any shell script that can be executed across all nodes in a cluster. This code is also available in this github repo.
import time, sys, subprocess, optparse, logging from multilock import MultiLock def runone(lockname='lock', lockgroup='.locks', basedir='.'): ''' decorator with closure returns a function that will run one, and only one, instance per lockgroup ''' def wrapper(fn): def new_fn(*args, **kwargs): return _runone(fn, lockname, lockgroup, basedir, *args, **kwargs) return new_fn return wrapper def _runone(func, lockname, lockgroup, basedir, *args, **kwargs): ''' run one, AND ONLY ONE, instance (respect locking) >>> >>> _runone(print, 'lock', 'locks', '.', 'hello world') >>> ''' lock = MultiLock(lockname, lockgroup, basedir) if lock.acquire(): func(*args, **kwargs) lock.release() if __name__ == '__main__': p = optparse.OptionParser('usage: %prog [options] cmd [args]') p.add_option('--lockname', '-l', dest="lockname", default='lock', help="the lock name, should be unique for this instance") p.add_option('--lockgroup', '-g', dest="lockgroup", default='.locks', help="the lockgroup, a collection of locks independent locks") p.add_option('--basedir', '-d', dest="basedir", default='.', help="the base directory where the lock files should be written") p.add_option('--wait', '-w', dest="wait", default=None, help="optional, wait (up till the number of seconds specified) for all locks to complete in the lockgroup") options, args = p.parse_args() if options.wait: lock = MultiLock(options.lockname, options.lockgroup, options.basedir) lock.wait(options.wait) sys.exit() @runone(options.lockname, options.lockgroup, options.basedir) def _main(): subprocess.call(args) _main()