Package web2py :: Package gluon :: Module scheduler
[hide private]
[frames] | no frames]

Source Code for Module web2py.gluon.scheduler

  1  #### WORK IN PROGRESS... NOT SUPPOSED TO WORK YET 
  2   
  3  USAGE = """ 
  4  ## Example 
  5   
  6  For any existing app 
  7   
  8  Create File: app/models/scheduler.py ====== 
  9  from gluon.scheduler import Scheduler 
 10   
 11  def demo1(*args,**vars): 
 12      print 'you passed args=%s and vars=%s' % (args, vars) 
 13      return 'done!' 
 14   
 15  def demo2(): 
 16      1/0 
 17   
 18  scheduler = Scheduler(db,dict(demo1=demo1,demo2=demo2)) 
 19  ## run worker nodes with: 
 20   
 21     cd web2py 
 22     python gluon/scheduler.py -u sqlite://storage.sqlite \ 
 23                               -f applications/myapp/databases/ \ 
 24                               -t mytasks.py 
 25  (-h for info) 
 26  python scheduler.py -h 
 27   
 28  ## schedule jobs using 
 29  http://127.0.0.1:8000/scheduler/appadmin/insert/db/scheduler_task 
 30   
 31  ## monitor scheduled jobs 
 32  http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_task.id>0 
 33   
 34  ## view completed jobs 
 35  http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_run.id>0 
 36   
 37  ## view workers 
 38  http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_worker.id>0 
 39   
 40  ## Comments 
 41  """ 
 42   
 43  import os 
 44  import time 
 45  import multiprocessing 
 46  import sys 
 47  import cStringIO 
 48  import threading 
 49  import traceback 
 50  import signal 
 51  import socket 
 52  import datetime 
 53  import logging 
 54  import optparse 
 55   
 56  try: 
 57      from gluon.contrib.simplejson import loads, dumps 
 58  except: 
 59      from simplejson import loads, dumps 
 60   
 61  if 'WEB2PY_PATH' in os.environ: 
 62      sys.path.append(os.environ['WEB2PY_PATH']) 
 63  else: 
 64      os.environ['WEB2PY_PATH'] = os.getcwd() 
 65   
 66  from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET 
 67  from gluon.utils import web2py_uuid 
 68   
 69  QUEUED = 'QUEUED' 
 70  ASSIGNED = 'ASSIGNED' 
 71  RUNNING = 'RUNNING' 
 72  COMPLETED = 'COMPLETED' 
 73  FAILED = 'FAILED' 
 74  TIMEOUT = 'TIMEOUT' 
 75  STOPPED = 'STOPPED' 
 76  ACTIVE = 'ACTIVE' 
 77  INACTIVE = 'INACTIVE' 
 78  DISABLED = 'DISABLED' 
 79  SECONDS = 1 
 80  HEARTBEAT = 3*SECONDS 
 81   
82 -class Task(object):
83 - def __init__(self,app,function,timeout,args='[]',vars='{}',**kwargs):
84 logging.debug(' new task allocated: %s.%s' % (app,function)) 85 self.app = app 86 self.function = function 87 self.timeout = timeout 88 self.args = args # json 89 self.vars = vars # json 90 self.__dict__.update(kwargs)
91 - def __str__(self):
92 return '<Task: %s>' % self.function
93
94 -class TaskReport(object):
95 - def __init__(self,status,result=None,output=None,tb=None):
96 logging.debug(' new task report: %s' % status) 97 if tb: 98 logging.debug(' traceback: %s' % tb) 99 else: 100 logging.debug(' result: %s' % result) 101 self.status = status 102 self.result = result 103 self.output = output 104 self.tb = tb
105 - def __str__(self):
106 return '<TaskReport: %s>' % self.status
107
108 -def demo_function(*argv,**kwargs):
109 """ test function """ 110 for i in range(argv[0]): 111 print 'click',i 112 time.sleep(1) 113 return 'done'
114 115 #the two functions below deal with simplejson decoding as unicode, esp for the dict decode 116 #and subsequent usage as function Keyword arguments unicode variable names won't work! 117 #borrowed from http://stackoverflow.com/questions/956867/how-to-get-string-objects-instead-unicode-ones-from-json-in-python
118 -def _decode_list(lst):
119 newlist = [] 120 for i in lst: 121 if isinstance(i, unicode): 122 i = i.encode('utf-8') 123 elif isinstance(i, list): 124 i = _decode_list(i) 125 newlist.append(i) 126 return newlist
127
128 -def _decode_dict(dct):
129 newdict = {} 130 for k, v in dct.iteritems(): 131 if isinstance(k, unicode): 132 k = k.encode('utf-8') 133 if isinstance(v, unicode): 134 v = v.encode('utf-8') 135 elif isinstance(v, list): 136 v = _decode_list(v) 137 newdict[k] = v 138 return newdict
139
140 -def executor(queue,task):
141 """ the background process """ 142 logging.debug(' task started') 143 stdout, sys.stdout = sys.stdout, cStringIO.StringIO() 144 try: 145 if task.app: 146 os.chdir(os.environ['WEB2PY_PATH']) 147 from gluon.shell import env 148 from gluon.dal import BaseAdapter 149 from gluon import current 150 level = logging.getLogger().getEffectiveLevel() 151 logging.getLogger().setLevel(logging.WARN) 152 _env = env(task.app,import_models=True) 153 logging.getLogger().setLevel(level) 154 scheduler = current._scheduler 155 scheduler_tasks = current._scheduler.tasks 156 _function = scheduler_tasks[task.function] 157 globals().update(_env) 158 args = loads(task.args) 159 vars = loads(task.vars, object_hook=_decode_dict) 160 result = dumps(_function(*args,**vars)) 161 else: 162 ### for testing purpose only 163 result = eval(task.function)( 164 *loads(task.args, object_hook=_decode_dict), 165 **loads(task.vars, object_hook=_decode_dict)) 166 stdout, sys.stdout = sys.stdout, stdout 167 queue.put(TaskReport(COMPLETED, result,stdout.getvalue())) 168 except BaseException,e: 169 sys.stdout = stdout 170 tb = traceback.format_exc() 171 queue.put(TaskReport(FAILED,tb=tb))
172
173 -class MetaScheduler(threading.Thread):
174 - def __init__(self):
175 threading.Thread.__init__(self) 176 self.process = None # the backround process 177 self.have_heartbeat = True # set to False to kill
178 - def async(self,task):
179 """ 180 starts the background process and returns: 181 ('ok',result,output) 182 ('error',exception,None) 183 ('timeout',None,None) 184 ('terminated',None,None) 185 """ 186 queue = multiprocessing.Queue(maxsize=1) 187 p = multiprocessing.Process(target=executor,args=(queue,task)) 188 self.process = p 189 logging.debug(' task starting') 190 p.start() 191 try: 192 p.join(task.timeout) 193 except: 194 p.terminate() 195 p.join() 196 self.have_heartbeat = False 197 logging.debug(' task stopped') 198 return TaskReport(STOPPED) 199 if p.is_alive(): 200 p.terminate() 201 p.join() 202 logging.debug(' task timeout') 203 return TaskReport(TIMEOUT) 204 elif queue.empty(): 205 self.have_heartbeat = False 206 logging.debug(' task stopped') 207 return TaskReport(STOPPED) 208 else: 209 logging.debug(' task completed or failed') 210 return queue.get()
211
212 - def die(self):
213 logging.info('die!') 214 self.have_heartbeat = False 215 self.terminate_process()
216
217 - def terminate_process(self):
218 try: 219 self.process.terminate() 220 except: 221 pass # no process to terminate
222
223 - def run(self):
224 """ the thread that sends heartbeat """ 225 counter = 0 226 while self.have_heartbeat: 227 self.send_heartbeat(counter) 228 counter += 1
229
230 - def start_heartbeats(self):
231 self.start()
232
233 - def send_heartbeat(self,counter):
234 print 'thum' 235 time.sleep(1)
236
237 - def pop_task(self):
238 return Task( 239 app = None, 240 function = 'demo_function', 241 timeout = 7, 242 args = '[2]', 243 vars = '{}')
244
245 - def report_task(self,task,task_report):
246 print 'reporting task' 247 pass
248
249 - def sleep(self):
250 pass
251
252 - def loop(self):
253 try: 254 self.start_heartbeats() 255 while True and self.have_heartbeat: 256 logging.debug('looping...') 257 task = self.pop_task() 258 if task: 259 self.report_task(task,self.async(task)) 260 else: 261 logging.debug('sleeping...') 262 self.sleep() 263 except KeyboardInterrupt: 264 self.die()
265 266 267 TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED) 268 RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED) 269 WORKER_STATUS = (ACTIVE,INACTIVE,DISABLED) 270
271 -class TYPE(object):
272 """ 273 validator that check whether field is valid json and validate its type 274 """ 275
276 - def __init__(self,myclass=list,parse=False):
277 self.myclass = myclass 278 self.parse=parse
279
280 - def __call__(self,value):
281 from gluon import current 282 try: 283 obj = loads(value) 284 except: 285 return (value,current.T('invalid json')) 286 else: 287 if isinstance(obj,self.myclass): 288 if self.parse: 289 return (obj,None) 290 else: 291 return (value,None) 292 else: 293 return (value,current.T('Not of type: %s') % self.myclass)
294
295 -class Scheduler(MetaScheduler):
296 - def __init__(self,db,tasks={},migrate=True, 297 worker_name=None,group_names=None,heartbeat=HEARTBEAT):
298 299 MetaScheduler.__init__(self) 300 301 self.db = db 302 self.db_thread = None 303 self.tasks = tasks 304 self.group_names = group_names or ['main'] 305 self.heartbeat = heartbeat 306 self.worker_name = worker_name or socket.gethostname()+'#'+str(web2py_uuid()) 307 308 from gluon import current 309 current._scheduler = self 310 311 self.define_tables(db,migrate=migrate)
312
313 - def define_tables(self,db,migrate):
314 from gluon import current 315 logging.debug('defining tables (migrate=%s)' % migrate) 316 now = datetime.datetime.now() 317 db.define_table( 318 'scheduler_task', 319 Field('application_name',requires=IS_NOT_EMPTY(), 320 default=None,writable=False), 321 Field('task_name',requires=IS_NOT_EMPTY()), 322 Field('group_name',default='main',writable=False), 323 Field('status',requires=IS_IN_SET(TASK_STATUS), 324 default=QUEUED,writable=False), 325 Field('function_name', 326 requires=IS_IN_SET(sorted(self.tasks.keys()))), 327 Field('args','text',default='[]',requires=TYPE(list)), 328 Field('vars','text',default='{}',requires=TYPE(dict)), 329 Field('enabled','boolean',default=True), 330 Field('start_time','datetime',default=now), 331 Field('next_run_time','datetime',default=now), 332 Field('stop_time','datetime',default=now+datetime.timedelta(days=1)), 333 Field('repeats','integer',default=1,comment="0=unlimted"), 334 Field('period','integer',default=60,comment='seconds'), 335 Field('timeout','integer',default=60,comment='seconds'), 336 Field('times_run','integer',default=0,writable=False), 337 Field('last_run_time','datetime',writable=False,readable=False), 338 Field('assigned_worker_name',default='',writable=False), 339 migrate=migrate,format='%(task_name)s') 340 if hasattr(current,'request'): 341 db.scheduler_task.application_name.default=current.request.application 342 343 db.define_table( 344 'scheduler_run', 345 Field('scheduler_task','reference scheduler_task'), 346 Field('status',requires=IS_IN_SET(RUN_STATUS)), 347 Field('start_time','datetime'), 348 Field('stop_time','datetime'), 349 Field('output','text'), 350 Field('result','text'), 351 Field('traceback','text'), 352 Field('worker_name',default=self.worker_name), 353 migrate=migrate) 354 355 db.define_table( 356 'scheduler_worker', 357 Field('worker_name'), 358 Field('first_heartbeat','datetime'), 359 Field('last_heartbeat','datetime'), 360 Field('status',requires=IS_IN_SET(WORKER_STATUS)), 361 migrate=migrate) 362 db.commit()
363
364 - def loop(self,worker_name=None):
365 MetaScheduler.loop(self)
366
367 - def pop_task(self):
368 now = datetime.datetime.now() 369 db, ts = self.db, self.db.scheduler_task 370 try: 371 logging.debug(' grabbing all queued tasks') 372 all_available = db(ts.status.belongs((QUEUED,RUNNING)))\ 373 ((ts.times_run<ts.repeats)|(ts.repeats==0))\ 374 (ts.start_time<=now)\ 375 (ts.stop_time>now)\ 376 (ts.next_run_time<=now)\ 377 (ts.enabled==True)\ 378 (ts.group_name.belongs(self.group_names))\ 379 (ts.assigned_worker_name.belongs((None,'',self.worker_name))) #None? 380 number_grabbed = all_available.update( 381 assigned_worker_name=self.worker_name,status=ASSIGNED) 382 db.commit() 383 except: 384 number_grabbed = None 385 db.rollback() 386 if number_grabbed: 387 logging.debug(' grabbed %s tasks' % number_grabbed) 388 grabbed = db(ts.assigned_worker_name==self.worker_name)\ 389 (ts.status==ASSIGNED) 390 task = grabbed.select(limitby=(0,1), orderby=ts.next_run_time).first() 391 392 logging.debug(' releasing all but one (running)') 393 if task: 394 task.update_record(status=RUNNING,last_run_time=now) 395 grabbed.update(assigned_worker_name='',status=QUEUED) 396 db.commit() 397 else: 398 return None 399 next_run_time = task.last_run_time + datetime.timedelta(seconds=task.period) 400 times_run = task.times_run + 1 401 if times_run < task.repeats or task.repeats==0: 402 run_again = True 403 else: 404 run_again = False 405 logging.debug(' new scheduler_run record') 406 while True: 407 try: 408 run_id = db.scheduler_run.insert( 409 scheduler_task = task.id, 410 status=RUNNING, 411 start_time=now, 412 worker_name=self.worker_name) 413 db.commit() 414 break 415 except: 416 db.rollback 417 logging.info('new task %(id)s "%(task_name)s" %(application_name)s.%(function_name)s' % task) 418 return Task( 419 app = task.application_name, 420 function = task.function_name, 421 timeout = task.timeout, 422 args = task.args, #in json 423 vars = task.vars, #in json 424 task_id = task.id, 425 run_id = run_id, 426 run_again = run_again, 427 next_run_time=next_run_time, 428 times_run = times_run)
429
430 - def report_task(self,task,task_report):
431 logging.debug(' recording task report in db (%s)' % task_report.status) 432 db = self.db 433 db(db.scheduler_run.id==task.run_id).update( 434 status = task_report.status, 435 stop_time = datetime.datetime.now(), 436 result = task_report.result, 437 output = task_report.output, 438 traceback = task_report.tb) 439 if task_report.status == COMPLETED: 440 d = dict(status = task.run_again and QUEUED or COMPLETED, 441 next_run_time = task.next_run_time, 442 times_run = task.times_run, 443 assigned_worker_name = '') 444 else: 445 d = dict( 446 assigned_worker_name = '', 447 status = {'FAILED':'FAILED', 448 'TIMEOUT':'TIMEOUT', 449 'STOPPED':'QUEUED'}[task_report.status]) 450 db(db.scheduler_task.id==task.task_id)\ 451 (db.scheduler_task.status==RUNNING).update(**d) 452 db.commit() 453 logging.info('task completed (%s)' % task_report.status)
454
455 - def send_heartbeat(self,counter):
456 if not self.db_thread: 457 logging.debug('thread building own DAL object') 458 self.db_thread = DAL(self.db._uri,folder = self.db._adapter.folder) 459 self.define_tables(self.db_thread,migrate=False) 460 try: 461 db = self.db_thread 462 sw, st = db.scheduler_worker, db.scheduler_task 463 now = datetime.datetime.now() 464 expiration = now-datetime.timedelta(seconds=self.heartbeat*3) 465 # record heartbeat 466 logging.debug('........recording heartbeat') 467 if not db(sw.worker_name==self.worker_name)\ 468 .update(last_heartbeat = now, status = ACTIVE): 469 sw.insert(status = ACTIVE,worker_name = self.worker_name, 470 first_heartbeat = now,last_heartbeat = now) 471 if counter % 10 == 0: 472 # deallocate jobs assigned to inactive workers and requeue them 473 logging.debug(' freeing workers that have not sent heartbeat') 474 inactive_workers = db(sw.last_heartbeat<expiration) 475 db(st.assigned_worker_name.belongs( 476 inactive_workers._select(sw.worker_name)))\ 477 (st.status.belongs((RUNNING,ASSIGNED,QUEUED)))\ 478 .update(assigned_worker_name='',status=QUEUED) 479 inactive_workers.delete() 480 db.commit() 481 except: 482 db.rollback() 483 time.sleep(self.heartbeat)
484
485 - def sleep(self):
486 time.sleep(self.heartbeat) # should only sleep until next available task
487
488 -def main():
489 """ 490 allows to run worker without python web2py.py .... by simply python this.py 491 """ 492 parser = optparse.OptionParser() 493 parser.add_option( 494 "-w", "--worker_name", dest="worker_name", default=None, 495 help="start a worker with name") 496 parser.add_option( 497 "-b", "--heartbeat",dest="heartbeat", default = 10, 498 help="heartbeat time in seconds (default 10)") 499 parser.add_option( 500 "-L", "--logger_level",dest="logger_level", 501 default = 'INFO', 502 help="level of logging (DEBUG, INFO, WARNING, ERROR)") 503 parser.add_option( 504 "-g", "--group_names",dest="group_names", 505 default = 'main', 506 help="comma separated list of groups to be picked by the worker") 507 parser.add_option( 508 "-f", "--db_folder",dest="db_folder", 509 default = '/Users/mdipierro/web2py/applications/scheduler/databases', 510 help="location of the dal database folder") 511 parser.add_option( 512 "-u", "--db_uri",dest="db_uri", 513 default = 'sqlite://storage.sqlite', 514 help="database URI string (web2py DAL syntax)") 515 parser.add_option( 516 "-t", "--tasks",dest="tasks",default=None, 517 help="file containing task files, must define" + \ 518 "tasks = {'task_name':(lambda: 'output')} or similar set of tasks") 519 (options, args) = parser.parse_args() 520 if not options.tasks or not options.db_uri: 521 print USAGE 522 if options.tasks: 523 path,filename = os.path.split(options.tasks) 524 if filename.endswith('.py'): 525 filename = filename[:-3] 526 sys.path.append(path) 527 print 'importing tasks...' 528 tasks = __import__(filename, globals(), locals(), [], -1).tasks 529 print 'tasks found: '+', '.join(tasks.keys()) 530 else: 531 tasks = {} 532 group_names = [x.strip() for x in options.group_names.split(',')] 533 534 logging.getLogger().setLevel(options.logger_level) 535 536 print 'groups for this worker: '+', '.join(group_names) 537 print 'connecting to database in folder: ' + options.db_folder or './' 538 print 'using URI: '+options.db_uri 539 db = DAL(options.db_uri,folder=options.db_folder) 540 print 'instantiating scheduler...' 541 scheduler=Scheduler(db = db, 542 worker_name = options.worker_name, 543 tasks = tasks, 544 migrate = True, 545 group_names = group_names, 546 heartbeat = options.heartbeat) 547 print 'starting main worker loop...' 548 scheduler.loop()
549 550 if __name__=='__main__': 551 main() 552