diff --git a/ci/test/test_functional.py b/ci/test/test_functional.py index 98fd200e6..b01baae21 100644 --- a/ci/test/test_functional.py +++ b/ci/test/test_functional.py @@ -32,19 +32,25 @@ def test_rms(): """ returns the rms value (sqrt of the squares average) """ - #assert fandango.functional.rms + data = 1,2,3,4 + assert fandango.functional.rms(data) == \ + fandango.functional.math.sqrt(sum(a**2 for a in data)) def test_randomize(): """ returns a randomized version of the list """ - #assert fandango.functional.randomize + data = 1,2,3,4 + rdata = fandango.functional.randomize(data) + assert all(a in rdata for a in data) and len(data) == len(rdata) def test_randpop(): """ removes and returns a random item from the sequence """ - #assert fandango.functional.randpop + data = 1,2,3,4 + rdata = [fandango.functional.randpop(data) for i in range(4)] + assert all(a in rdata for a in data) and len(data) == len(rdata) def test_floor(): """ diff --git a/fandango/CHANGELOG b/fandango/CHANGELOG index 7f2a7ac00..c4db4b4ae 100644 --- a/fandango/CHANGELOG +++ b/fandango/CHANGELOG @@ -8,6 +8,15 @@ IMPORTANT: MASTER branch provides python 2.6 compatible releases DEVELOP branch will provide the current Py2.7/3.3 code + +15.0.1 Develop Branch + + like 14.8.1, but only compatible with PyTango2.7 (using future) + +################################################################ + +14.8.2 + solve bugs on tango_monitor and ThreadDict.stop() 14.8.1 Apply bugfixes/pull requests from github diff --git a/fandango/VERSION b/fandango/VERSION index 6ebe0c0b0..0a73f9bb0 100644 --- a/fandango/VERSION +++ b/fandango/VERSION @@ -1 +1 @@ -14.8.1 +14.9.2 diff --git a/fandango/__init__.py b/fandango/__init__.py index 829a214d4..005bdab33 100644 --- a/fandango/__init__.py +++ b/fandango/__init__.py @@ -32,6 +32,13 @@ ## along with this program; if not, see . ####################################################################@######## """ +from __future__ import print_function +from __future__ import absolute_import +try: + from builtins import map +except: + pass + __doc__ = """ @package fandango @@ -56,7 +63,7 @@ try: os.remove(p) print('%s removed ...'%p) - except Exception,e: + except Exception as e: print(e) print('fandango, CLEAN OLD FILES ERROR!:') print('An old file still exists at:\n\t%s'%p) @@ -66,21 +73,22 @@ try: # LOAD VERSION NUMBER - import objects,imp - from objects import ReleaseNumber + from . import objects + import imp + from .objects import ReleaseNumber PATH = os.path.dirname(objects.__file__) vf = open(PATH+'/VERSION') - RELEASE = ReleaseNumber(map(int,vf.read().strip().split('.'))) + RELEASE = ReleaseNumber(list(map(int,vf.read().strip().split('.')))) vf.close() -except Exception,e: +except Exception as e: print(traceback.format_exc()) print('Unable to load RELEASE number: %s'%e) try: import pkg_resources __version__ = pkg_resources.get_distribution(__name__).version -except Exception, e: +except Exception as e: #print ('Unable to get distribution version number, fandango has ' # 'probably not been installed as a package') __version__ = RELEASE @@ -88,48 +96,48 @@ __test__ = ['tango'] try: - from functional import * -except Exception,e: + from .functional import * +except Exception as e: print('Unable to import functional module: %s'%e) try: - from log import printf,Logger,LogFilter,shortstr,\ + from .log import printf,Logger,LogFilter,shortstr,\ except2str,FakeLogger,pprint -except Exception,e: +except Exception as e: print('Unable to import log module: %s'%e) try: - from excepts import trial,getLastException,getPreviousExceptions, \ + from .excepts import trial,getLastException,getPreviousExceptions, \ ExceptionWrapper,Catched,CatchedArgs except: print('Unable to import excepts module') try: - from objects import Object,Singleton,SingletonMap,Struct,NamedProperty - from objects import dirModule,loadModule,dirClasses,obj2dict,copy - from objects import Decorator,ClassDecorator,Decorated,BoundDecorator - from objects import Cached, Variable -except Exception,e: + from .objects import Object,Singleton,SingletonMap,Struct,NamedProperty + from .objects import dirModule,loadModule,dirClasses,obj2dict,copy + from .objects import Decorator,ClassDecorator,Decorated,BoundDecorator + from .objects import Cached, Variable +except Exception as e: print('Unable to import objects module: %s'%traceback.format_exc()) try: - from linos import shell_command,ping,sysargs_to_dict,listdir, \ + from .linos import shell_command,ping,sysargs_to_dict,listdir, \ sendmail,MyMachine,get_fqdn except: print('Unable to import linos module: %s\n'%traceback.format_exc()) try: - from arrays import CSVArray, TimedQueue + from .arrays import CSVArray, TimedQueue except: print('Unable to import arrays module') try: - from doc import * + from .doc import * except: print('Unable to import doc module') try: - from dicts import ThreadDict,CaselessDict,ReversibleDict, \ + from .dicts import ThreadDict,CaselessDict,ReversibleDict, \ CaselessDefaultDict,DefaultThreadDict, \ Enumeration,SortedDict,CaselessList, \ defaultdict,defaultdict_fromkey, \ @@ -139,19 +147,19 @@ traceback.print_exc() try: - from threads import WorkerProcess,WorkerThread,SingletonWorker,\ + from .threads import WorkerProcess,WorkerThread,SingletonWorker,\ wait,timed_range except: print('Unable to import threads module') try: - from debug import Timed, timeit -except Exception,e: + from .debug import Timed, timeit +except Exception as e: print('Unable to import debug module') #TANGO related modules try: - from tango import finder,get_device,get_database,get_database_device, \ + from .tango import finder,get_device,get_database,get_database_device, \ get_all_devices,get_device_info,get_alias_for_device, \ get_device_for_alias,get_tango_host, \ find_devices,find_attributes, find_properties,\ @@ -164,12 +172,12 @@ fakeEvent,fakeEventType, get_attribute_events, check_attribute_events try: - from device import Dev4Tango,DevChild,TangoCommand - except Exception,e: raise Exception('fandango.device: %s'%e) + from .device import Dev4Tango,DevChild,TangoCommand + except Exception as e: raise Exception('fandango.device: %s'%e) try: - from servers import ServersDict,Astor,ProxiesDict,ComposersDict - except Exception,e: raise Exception('fandango.servers: %s'%e) + from .servers import ServersDict,Astor,ProxiesDict,ComposersDict + except Exception as e: raise Exception('fandango.servers: %s'%e) try: path = imp.find_module('fandango')[1] @@ -180,33 +188,33 @@ %','.join(deprecated)) try: [os.remove(f) for f in deprecated] except: print('... and should be removed manually!') - from interface import FullTangoInheritance,NewTypeInheritance - except Exception,e: raise Exception('fandango.interface: %s'%e) + from .interface import FullTangoInheritance,NewTypeInheritance + except Exception as e: raise Exception('fandango.interface: %s'%e) try: - from dynamic import DynamicDS,DynamicDSClass,DynamicAttribute, \ + from .dynamic import DynamicDS,DynamicDSClass,DynamicAttribute, \ DynamicDSTypes,CreateDynamicCommands,DynamicServer - except Exception,e: raise Exception('fandango.dynamic: %s'%e) + except Exception as e: raise Exception('fandango.dynamic: %s'%e) try: - from callbacks import EventSource,EventThread,EventListener, \ + from .callbacks import EventSource,EventThread,EventListener, \ CachedAttributeProxy,TangoListener,TangoAttribute - except Exception,e: raise Exception('fandango.callbacks: %s'%e) + except Exception as e: raise Exception('fandango.callbacks: %s'%e) -except Exception,e: +except Exception as e: print('Unable to import fandango.*tango modules: %s'%e) - print traceback.format_exc() + print(traceback.format_exc()) #OTHER fancy modules if False: #Disabled to avoid extra dependencies!! - try: import web - except: print 'Unable to import fandango.web module' - try: import qt - except: print 'Unable to import fandango.qt module' - try: from db import FriendlyDB - except: print 'Unable to import db module' + try: from . import web + except: print('Unable to import fandango.web module') + try: from . import qt + except: print('Unable to import fandango.qt module') + try: from .db import FriendlyDB + except: print('Unable to import db module') __all__ = ['dicts','excepts','log','objects','db','device','web','threads', 'dynamic','callbacks','arrays','servers','linos','functional', diff --git a/fandango/arrays.py b/fandango/arrays.py index ae2322a02..e5db04d1e 100644 --- a/fandango/arrays.py +++ b/fandango/arrays.py @@ -35,7 +35,7 @@ ## along with this program; if not, see . ########################################################################### -import csv,sys,re,operator,traceback +import csv,sys,re,operator,traceback,math import functional as fun from fandango.log import printf from fandango.dicts import SortedDict @@ -215,50 +215,73 @@ def decimate_array(data,fixed_size=0,keep_nones=True,fixed_inc=0, F_ZERO = 4 #fill with zeroes F_NEXT = 5 #fill with next value -# For all this methos arguments may be just values sequence or +# For all this methods arguments may be just values sequence or # currentsequence / previousvalue +# THIS METHODS ARE AGGREGATORS! Not Filters! + def average(*args): + """ aggregator""" return fun.avg(args[0]) def rms_value(*args): + """ aggregator""" return fun.rms(args[0]) def pickfirst(*args): - """ Just pick first valid value """ - for v in args[0]: - if v is not None: - return v + """ aggregator, Just pick first valid value """ + #for v in args[0]: + #if v is not None: + #return v + return fun.first(args[0]) def maxdiff(*args): - """ Filter that maximizes changes (therefore, noise) """ + """ aggregator that maximizes changes (therefore, noise) """ seq,ref = args - if None in seq: return None - return sorted((fun.absdiff(s,ref,0),s) for s in seq)[-1][-1] - + r,d0 = ref,0 + for s in seq: #AVOID GENERATION EXHAUSTION + if s is None: + return s + d = fun.absdiff(s,ref,0) + if d > d0: + r,d0 = s,d + return r + def mindiff(*args): - """ Filter that maximizes changes (therefore, noise) """ + """ aggregator that maximizes changes (therefore, noise) """ seq,ref = args - return sorted((fun.absdiff(s,ref,0),s) for s in seq - if s is not None)[0][-1] + r,d0 = None,None + for s in seq: + if s == ref: + return s + elif None not in (s,ref): + d = fun.absdiff(s,ref,0) + if None in (r,d0) or d < d0: + r,d0 = s,d + return r def notnone(*args): - """ This method returns an averaging method applied to all none values - in a sequence """ + """ + notnone(sequence, [ref value, method]) + + aggregator, It gets a None/NaN value from sequence. + + If ref and an averaging method is provided, + it is applied to all filtered values (using ref as previous last value). + """ seq,ref = args[0],fun.first(args[1:] or [0]) - method = fun.first(args[2:] or [average]) + method = fun.first(args[2:] or [pickfirst]) try: - if np: - return method(*((v for v in seq if v is not None - and not np.isnan(v)),ref)) - else: - return method(*((v for v in seq if v is not None),ref)) + seq = (v for v in seq if v is not None and not math.isnan(v)) + return method(seq,ref) except: traceback.print_exc() return ref def maxmin(*args): """ + aggregator + Returns timed ((t,max),(t,min)) values from a (t,v) dataset When used to filter an array the winndow will have to be doubled to allocate both values (or just keep the one with max absdiff from previous). @@ -268,10 +291,11 @@ def maxmin(*args): mn,mx = (t[0][1],t[0][0]),(t[-1][1],t[-1][0]) return sorted((mx,mn)) -##METHODS OBTAINED FROM PyTangoArchiving READER +## aggregators METHODS OBTAINED FROM PyTangoArchiving READER def choose_first_value(v,w,t=0,tmin=-300): """ + aggregator Args are v,w for values and t for point to calcullate; tmin is the min epoch to be considered valid """ @@ -285,6 +309,7 @@ def choose_first_value(v,w,t=0,tmin=-300): def choose_last_value(v,w,t=0,tmin=-300): """ + aggregator Args are v,w for values and t for point to calcullate; tmin is the min epoch to be considered valid """ @@ -298,6 +323,7 @@ def choose_last_value(v,w,t=0,tmin=-300): def choose_max_value(v,w,t=0,tmin=-300): """ + aggregator Args are v,w for values and t for point to calcullate; tmin is the min epoch to be considered valid """ @@ -313,6 +339,7 @@ def choose_max_value(v,w,t=0,tmin=-300): def choose_last_max_value(v,w,t=0,tmin=-300): """ + aggregator This method returns max value for epochs out of interval For epochs in interval, it returns latest Args are v,w for values and t for point to calcullate; @@ -395,12 +422,12 @@ def filter_array(data,window=300,method=average,begin=0,end=0,filling=F_LAST, (crosschecked with 1e6 samples against the PyTangoArchiving.utils.decimate_array method using numpy) """ - if 1: #trace: - print('filter_array([%d],w=%f' % (len(data),window)) + print('filter_array([%d],w=%f' % (len(data),window)) data = sorted(data) #DATA MUST BE ALWAYS SORTED begin,end,window = map(float,((begin,end,window))) try: - assert window<1. + if window>=1.: + raise Exception('numpy not needed') import numpy ranger = numpy.arange tfloor = lambda x: float(fun.floor(x,window)) diff --git a/fandango/callbacks.py b/fandango/callbacks.py index 8afcfc3c4..fb0d14b70 100644 --- a/fandango/callbacks.py +++ b/fandango/callbacks.py @@ -1533,7 +1533,7 @@ def set(self, event): ############################################################################### # OLD API, DEPRECATED -class EventCallback(): +class EventHook(): """ It provides persistent storage. lock.acquire and lock.release should be used to prevent threading problems!, @@ -1546,25 +1546,29 @@ def __init__(self): def push_event(self,event): self.lock.acquire() + try: #Pruning tango:$TANGO_HOST and other tags attr_name = '/'.join(event.attr_name.split('/')[-4:]) dev_name = hasattr(event.device,'name') and event.device.name() or event.device - print("in EventCallback.push_event(",dev_name,": ",attr_name,")") + print("in EventHook.push_event(",dev_name,": ",attr_name,")") + if not event.err and event.attr_value is not None: - print("in EventCallback.push_event(...): ",attr_name,"=", + print("in EventHook.push_event(...): ",attr_name,"=", event.attr_value.value) self.TimeOutErrors=0 _EventsList[attr_name.lower()].set(event) if attr_name.lower().endswith('/state'): _StatesList[dev_name.lower()]=event.attr_value.value _AttributesList[event.attr_name.lower()]=event.attr_value + else: - print 'in EventCallback.push_event(...): Received an Error Event!: ',event.errors + print 'in EventHook.push_event(...): Received an Error Event!: ',event.errors _EventsList[attr_name.lower()].set(event) #if 'OutOfSync' in event.errors[0]['reason']: or 'API_EventTimeout' in event.errors[0]['reason']: #if [e for e in event.errors if hasattr(e,'keys') and 'reason' in e.keys() and any(re.search(exp,e['reason']) for exp in self.NotifdExceptions)]: reasons = [getattr(e,'reason',e.get('reason',str(e)) if hasattr(e,'get') else str(e)) for e in event.errors] #Needed to catch both PyTango3 and PyTango7 exceptions + if any(n in r for n in self.NotifdExceptions for r in reasons): print 'callbacks=> DISCARDED EVENT FOR NOTIFD REASONS!!! %s(%s)' \ %(dev_name,reasons) @@ -1576,6 +1580,7 @@ def push_event(self,event): if attr_name.lower().endswith('/state'): _StatesList[dev_name.lower()]=None #An unreaded State cannot be UNKNOWN, it must be None to notify that an exception occurred! _AttributesList[attr_name.lower()]=None + #Launching Device.push_event() for rec in _EventsList[attr_name].receivers: if rec in _EventReceivers.keys(): _EventReceivers[rec].push_event(event) @@ -1583,15 +1588,17 @@ def push_event(self,event): elif isinstance(rec,threading.Event): rec.set() elif callable(rec): rec() else: raise 'UnknownEventReceiverType' + except Exception,e: - print 'exception in EventCallback.push_event(): ',e, ";", getLastException() + print 'exception in EventHook.push_event(): ',e, ";", getLastException() + self.lock.release() ############################################################################### # OLD API, DEPRECATED #THIS IS THE EVENTS CALLBACK SINGLETONE: -GlobalCallback = EventCallback() +GlobalCallback = EventHook() ## @TODO ... implemented in addTAttr and addReceiver ... missing a dp attribute to finish the work #def subscribeToAttribute(subscriber,att_name): diff --git a/fandango/db.py b/fandango/db.py index 49e2a31e0..a99b2c992 100755 --- a/fandango/db.py +++ b/fandango/db.py @@ -43,6 +43,7 @@ import os,time,datetime,log,traceback,sys from .objects import Struct from . import functional as fn +from .dicts import defaultdict """ MySQL API's are loaded at import time, but can be modified afterwards. @@ -67,23 +68,31 @@ """ -try: - # This import will fail in Debian as mysqlclient is loaded as MySQLdb - # in other OS, mysqlclient should be used - import mysqlclient - import mysqlclient as mysql_api - mysql = Struct() - mysql.connector = MySQLdb = None -except: +# mysql.connector is not the default, but to use prepared cursors +# it can be imported before fandango and take precedence + +if 'mysql.connector' in sys.modules: + import mysql.connector + import mysql.connector as mysql_api + mysqlclient = MySQLdb = None +else: try: - import MySQLdb - import MySQLdb as mysql_api + # This import will fail in Debian as mysqlclient is loaded as MySQLdb + # in other OS, mysqlclient should be used + import mysqlclient + import mysqlclient as mysql_api mysql = Struct() - mysqlclient = mysql.connector = None + mysql.connector = MySQLdb = None except: - import mysql.connector - import mysql.connector as mysql_api - mysqlclient = MySQLdb = None + try: + import MySQLdb + import MySQLdb as mysql_api + mysql = Struct() + mysqlclient = mysql.connector = None + except: + import mysql.connector + import mysql.connector as mysql_api + mysqlclient = MySQLdb = None class FriendlyDB(log.Logger): """ @@ -97,7 +106,7 @@ def __init__(self,db_name,host='',user='',passwd='',autocommit=True, self.__class__.__name__+'(%s@%s)' % (db_name, host), format='%(levelname)-8s %(asctime)s %(name)s: %(message)s') self.setLogLevel(loglevel or 'WARNING') - self.debug('Using %s as MySQL python API' % mysql_api) + self.info('Using %s as MySQL python API' % mysql_api) #def __init__(self,api,db_name,user='',passwd='', host=''): #if not api or not database: #self.error('ArchivingAPI and database are required arguments for ArchivingDB initialization!') @@ -143,7 +152,6 @@ def setAutocommit(self,autocommit = None): % autocommit) #raise Exception,e - def renewMySQLconnection(self): try: if hasattr(self,'db') and self.db: @@ -172,7 +180,10 @@ def getCursor(self,renew=True,klass=None): ''' try: if klass in ({},dict): - klass = mysql_api.cursors.DictCursor + try: + klass = mysql_api.cursors.DictCursor + except: + klass = mysql_api.cursor.MySQLCursorDict if (renew or klass) and self._cursor: if not self._recursion: self._cursor.close() @@ -322,7 +333,7 @@ def getTableRows(self, table, partition = None): % (table, self.db_name)) if partition: q += " and partition_name like '%s'" % partition - return (fn.toList(self.Query(q)) or [0])[0] + return sum((t or [0])[0] for t in (fn.toList(self.Query(q)))) def check(self, method = None, tables = None, verbose = False): """ @@ -356,12 +367,12 @@ def checkTable(self, table, partition = None): q += " limit 1" method,args = self.Query,[q] else: - method,args = self.getTableCreator, [] + method,args = self.getTableCreator, [table] try: method(*args) return True except: - tracebac.print_exc() + traceback.print_exc() return False def getTableLength(self,table=''): @@ -382,6 +393,13 @@ def getTableSize(self,table=''): " table_schema = '%s' and table_name like '%s';" % (self.db_name,table)) return 0 if not res else (int(res[0][1]) if len(res)==1 else dict(res)) + + def getTableIndex(self,table): + q = self.Query('show indexes from '+table,asDict=True) + r = defaultdict(dict) + for l in q: + r[l['Key_name']][l['Column_name']] = l + return r def getPartitionSize(self,table='',partition=''): """ diff --git a/fandango/dicts.py b/fandango/dicts.py index a0f00f0b8..ddbd4ca32 100644 --- a/fandango/dicts.py +++ b/fandango/dicts.py @@ -140,12 +140,15 @@ def json2dict(jstr,encoding=ENC): return d class ThreadDict(dict): - ''' Thread safe dictionary with redefinable read/write methods and a backgroud thread for hardware update. + ''' + Thread safe dictionary with redefinable read/write methods and a background thread for hardware update. All methods are thread-safe using @self_lock decorator. NOTE: any method decorated in this way CANNOT call other decorated methods! All values of the dictionary will be automatically updated in a separate Thread using read_method provided. Any value overwritten in the dict should launch the write_method. + delay argument will pause the thread for a time after start() is called + Briefing: a[2] equals to a[2]=read_method(2) a[2]=1 equals to a[2]=write_method(2,1) @@ -169,11 +172,13 @@ class ThreadDict(dict): @deprecated now in tau.core.utils.containers ''' - def __init__(self,other=None,read_method=None,write_method=None,timewait=0.1,threaded=True,trace=False): + def __init__(self,other=None,read_method=None,write_method=None, + timewait=0.1,threaded=True,trace=False,delay=0.): self.read_method = read_method self.write_method = write_method self.timewait = timewait self.threaded = threaded + self.delay = delay self._threadkeys = [] self._periods = {} self._updates = {} @@ -199,7 +204,9 @@ def start(self): if hasattr(self,'_Thread') and self._Thread and self._Thread.isAlive(): print 'ThreadDict.start(): ThreadDict.stop() must be executed first!' return - print 'In ThreadDict.start(), keys are: %s' % self.threadkeys() + if self.delay: + self.event.wait(self.delay) + print 'In ThreadDict.start(), keys are: %s' % self.threadkeys() self.event.clear() self._Thread = threading.Thread(target=self.run) self._Thread.setDaemon(True) @@ -220,9 +227,9 @@ def stop(self): def alive(self): if not hasattr(self,'_Thread') or not self._Thread: - return False + return None #Thread never started else: - return self._Thread.isAlive() + return self._Thread.isAlive() #True or False def __del__(self): self.stop() diff --git a/fandango/dynamic.py b/fandango/dynamic.py index fcecaae34..9384fab15 100644 --- a/fandango/dynamic.py +++ b/fandango/dynamic.py @@ -423,7 +423,7 @@ def get_DynDS_properties(self,db=None): self.Lambdas = dict(l.split(':',1) for l in value if ':' in l) try: - self.Lambdas = dict((k,eval(v)) for k,v in self.Lambdas) + self.Lambdas = dict((k,eval(v)) for k,v in self.Lambdas.items()) except: traceback.print_exc() diff --git a/fandango/functional.py b/fandango/functional.py index 6cc335b54..480f72ace 100644 --- a/fandango/functional.py +++ b/fandango/functional.py @@ -151,24 +151,29 @@ def reldiff(x,y,floor=None): floor would be a decimal value, e.g. 0.05 """ d = x-y - if not d: return 0 + if not d: + return 0 ref = x or y d = float(d)/ref return d if not floor else (0,d)[abs(d)>=floor] #return 0 if x*(1-r)10: + MEMORY_VALUES.pop(0) + return MEMORY_VALUES[-1] except: print traceback.format_exc() return 0 +get_memory = get_process_memory + def get_cpu(pid): """ Uses ps to get the CPU usage of a process by PID ; it will trigger exception of PID doesn't exist """ return float(linos.shell_command('ps h -p %d -o pcpu'%pid)) diff --git a/fandango/objects.py b/fandango/objects.py index 0b4263d7f..9db3e5199 100644 --- a/fandango/objects.py +++ b/fandango/objects.py @@ -47,8 +47,18 @@ Enum classes are borrowed from taurus.core.utils (by Tiago Coutinho) """ -import __builtin__ -from __builtin__ import object +from __future__ import print_function +try: + from future import standard_library + standard_library.install_aliases() + from builtins import map + from builtins import str + from past.builtins import basestring + from builtins import object + import builtins + from builtins import object +except: + pass import traceback from fandango.functional import * @@ -60,11 +70,11 @@ #Python 2-3 conundrum try: - import queue - import queue as Queue -except: import Queue import Queue as queue +except: + import queue + import queue as Queue try: from collections import namedtuple #Only available since python 2.6 @@ -74,7 +84,7 @@ ## Inspection methods def dirModule(module): - return [a for a,v in module.__dict__.items() + return [a for a,v in list(module.__dict__.items()) if getattr(v,'__module__','') == module.__name__] def findModule(module): @@ -109,7 +119,7 @@ def loadModule(module,modulename=None): return mchild def dirClasses(module,owned=False): - v = [a for a,v in module.__dict__.items() if isinstance(v,type)] + v = [a for a,v in list(module.__dict__.items()) if isinstance(v,type)] if owned: return [a for a in dirModule(module) if a in v] else: return v @@ -154,13 +164,13 @@ def obj2dict(obj,type_check=True,class_check=False,fltr=None): try: if type(attr).__name__ not in dir(__builtin__): if isinstance(attr,dict): - attr = dict((k,v) for k,v in attr.items()) + attr = dict((k,v) for k,v in list(attr.items())) else: attr = str(attr) except: continue dct[name] = attr - except Exception,e: + except Exception as e: print(e) if class_check: @@ -172,7 +182,7 @@ def obj2dict(obj,type_check=True,class_check=False,fltr=None): if '__base__' not in dct: dct['__base__'] = klass.__base__.__name__ - except Exception,e: + except Exception as e: print(e) return(dct) @@ -229,14 +239,14 @@ def load(self,*args,**kwargs): dct = args[0] if len(args)==1 else (args or kwargs) if isSequence(dct) and not isDictionary(dct): dct = dict.fromkeys(dct) #isDictionary also matches items lists - [setattr(self,k,v) for k,v in (dct.items() + [setattr(self,k,v) for k,v in (list(dct.items()) if hasattr(dct,'items') else dct)] #Overriding dictionary methods def update(self,*args,**kwargs): return self.load(*args,**kwargs) - def keys(self): return self.__dict__.keys() - def values(self): return self.__dict__.values() - def items(self): return self.__dict__.items() + def keys(self): return list(self.__dict__.keys()) + def values(self): return list(self.__dict__.values()) + def items(self): return list(self.__dict__.items()) def dict(self): return self.__dict__ def get(self,k,default=None): @@ -247,7 +257,7 @@ def get(self,k,default=None): def get_key(self,value): """ Reverse lookup """ - for k,v in self.items(): + for k,v in list(self.items()): if v == value: return k raise Exception('%s_NotFound!'%value) @@ -255,7 +265,7 @@ def get_key(self,value): def set(self,k,v): return setattr(self,k,v) def setdefault(self,v): self.dict().setdefault(v) def pop(self,k): return self.__dict__.pop(k) - def has_key(self,k): return self.__dict__.has_key(k) + def has_key(self,k): return k in self.__dict__ def __getitem__(self,k): return getattr(self,k) def __setitem__(self,k,v): return setattr(self,k,v) def __contains__(self,k): return hasattr(self,k) @@ -268,7 +278,7 @@ def __call__(self,*args,**kwargs): else: self.load(*args,**kwargs) def __repr__(self): return 'fandango.Struct({\n'+'\n'.join("\t'%s': %s,"%(k,v) - for k,v in self.__dict__.items())+'\n\t})' + for k,v in list(self.__dict__.items()))+'\n\t})' def __str__(self): return self.__repr__().replace('\n','').replace('\t','') def to_str(self,order=None,sep=','): @@ -282,7 +292,7 @@ def default_cast(self,key=None,value=None): If it is, it will return value as an evaluable string. If it is not, then it will do same action on the passed value. """ - if key not in self.keys() and not value: + if key not in list(self.keys()) and not value: key,value = None,key #defaults to single argument mode value = notNone(value,key and self.get(key)) if not isString(value): @@ -302,8 +312,8 @@ def cast_items(self,items=[],update=True): """ The cast() method is used to convert an struct to a pickable/json obj """ - items = items or self.items() - items = [(k,self.cast(value=v)) for k,v in self.items()] + items = items or list(self.items()) + items = [(k,self.cast(value=v)) for k,v in list(self.items())] if update: [self.set(k,v) for k,v in items] return items @@ -327,7 +337,7 @@ class Variable(object): e.g. fandango.DEFAULT_TIME_FORMAT <=> functional.DEFAULT_TIME_FORMAT """ def __new__(cls, value): - print(cls,value) + print((cls,value)) __instance = object.__new__(cls, value) cls.__init__(__instance, value) return __instance.value @@ -386,8 +396,8 @@ def locked(f,*args,**kwargs): try: _lock.acquire() return f(*args,**kwargs) - except Exception,e: - print 'Exception in%s(*%s,**%s): %s' % (f.__name__,args,kwargs,e) + except Exception as e: + print('Exception in%s(*%s,**%s): %s' % (f.__name__,args,kwargs,e)) finally: _lock.release() @@ -587,7 +597,7 @@ def __init__(self,d=1,*args,**kwargs): nkw[arg], i = _args[i], i+1 self.call_all__init__(base,*_args,**_kw) self.call__init__(base,**nkw) - except Exception,e: + except Exception as e: print('Unable to execute %s.__init__!: %s' % (base.__name__,str(e))) return @@ -871,7 +881,7 @@ def getCachedObject(obj,methods=[],depth=10.,expire=3.,catched=False): """ klass = obj if isinstance(obj,type) else type(obj) if not methods: - methods = [k for k,f in klass.__dict__.items() if isCallable(f)] + methods = [k for k,f in list(klass.__dict__.items()) if isCallable(f)] for k in methods: try: m = Cached(getattr(klass,k),depth,expire,catched=catched) @@ -893,7 +903,7 @@ def prune(self,expire=None,depth=None): self.lock.acquire() depth = notNone(depth,self.depth) expire = time.time()-notNone(expire,self.expire) - cache = sorted(k for k in self.cache.keys() if k[0]>expire) + cache = sorted(k for k in list(self.cache.keys()) if k[0]>expire) if (len(cache)!=len(self.cache) or len(cache)>self.depth): #self._log('pruning: %s => %s'%(len(self.cache),len(cache))) pass @@ -933,7 +943,7 @@ def execute(self,*args,**kwargs): else: try: v = self.func(*args,**kwargs) - except Exception,e: + except Exception as e: v = e #self._log('%s(%s,%s) = %s'%(self.func,args,kwargs,v)) try: @@ -954,6 +964,26 @@ def execute(self,*args,**kwargs): raise v else: return v + +def Queued(my_queue): + """ + This decorator will put the result of the function + into a Queue-like object (using put() method) + + Intended to be used in processes and threads + + If use_id is True, then str(fun)+str(args) will be used + as ID for the result. + """ + def QueuedDecorator(method, use_id=False): + def QueuedFunction(*args,**kwargs): + r = method(*args,**kwargs) + if use_id: + r = (str(method)+str(args)+str(sorted(kwargs.items())), + r) + my_queue.put(r) + return QueuedFunction + return QueuedDecorator ########################################################################### @@ -1008,7 +1038,7 @@ def __init__(self): self._trace = False def __get__(self,obj,type=None):return self def __set__(self,obj,value):self._trace = value - def __nonzero__(self): return self._trace + def __bool__(self): return self._trace def __call__(self,msg): if self: print(msg) diff --git a/fandango/tango/defaults.py b/fandango/tango/defaults.py index 91abaaf11..ef46ed809 100644 --- a/fandango/tango/defaults.py +++ b/fandango/tango/defaults.py @@ -463,20 +463,52 @@ def __init__(self,device,attr_name,attr_value,err,errors): self.err=err self.errors=errors -class EventCallback(object): - def __init__(self,device,hook=None): +class TangoEventCallback(object): + """ + Template class for Tango event callbacks + """ + def __init__(self,device,attribute='',hook=None,trace=False): self.proxy = get_device(device) self.eid = None - self.hook = None + self.attribute = attribute + self.hook = hook + self.trace = trace + + def __del__(self): + self._trace('%s(%s,%s).__del__()'%(type(self),self.device,self.attribute)) + try: + self.unsubscribe() + except: + pass + + def _trace(self, msg): + msg = fn.time2str()+' : '+msg + if self.trace: + if fn.isCallable(self.trace): + self.trace(msg) + else: + print(msg) + def subscribe(self,attribute='State', event_type=PyTango.EventType.CHANGE_EVENT, filters=[],stateless=False): + if self.eid: + raise Exception('AlreadySubscribed!') + self.attribute = attribute + self._trace('%s(%s,%s).subscribe()'%(type(self),self.device,self.attribute)) self.eid = self.proxy.subscribe_event(attribute, event_type,self,filters,stateless) return self + + def unsubscribe(self): + self._trace('%s(%s,%s).unsubscribe()'%(type(self),self.device,self.attribute)) + self.proxy.unsubscribe(self.eid) + def push_event(self,*args,**kwargs): # Reimplement this method in subclasses try: + self._trace('%s(%s,%s).push_event(%s)' % ( + type(self),self.device,self.attribute,str(args[0]))) if self.hook is not None: return self.hook(self,*args,**kwargs) except: diff --git a/fandango/tango/methods.py b/fandango/tango/methods.py index 2378b3cf8..238f155c8 100644 --- a/fandango/tango/methods.py +++ b/fandango/tango/methods.py @@ -691,7 +691,7 @@ def hook(self,*args,**kwargs): if self.eid is not None: self.proxy.unsubscribe_event(eid) - cb = EventCallback(dp,hook).subscribe(attr,ev_type) + cb = TangoEventCallback(dp,hook).subscribe(attr,ev_type) period = dp.get_attribute_poll_period(attr) result[ev_type] = period or True except: diff --git a/fandango/threads.py b/fandango/threads.py index 3ca7ce6af..3cfb01985 100644 --- a/fandango/threads.py +++ b/fandango/threads.py @@ -56,7 +56,10 @@ def wait(seconds,event=True,hook=None): """ :param seconds: seconds to wait for - :param event: if True (default) it uses a dummy Event, if False it uses time.sleep, if Event is passed then it calls event.wait(seconds) + :param event: if True (default) it uses a dummy Event, + if False it uses time.sleep, + if Event is passed then it calls event.wait(seconds) + :param hook: a callable to be executed before the wait """ r = 0 try: @@ -814,17 +817,20 @@ def add(self,key,target=None,args=None,period=0,expire=0,callback=None): # Adds a command to be periodically executed data = self.data[key] = ProcessedData(key,target=target,args=args,period=period,expire=expire,callback=callback) self.send(self.__ADDKEY,target=data.get_args()) + def get(self,key,default=__NULL,_raise=False): # Returns a key value (or default if defined) if key not in self.data and default!=self.__NULL: return default result = self.data[key].get() if _raise and isinstance(result,Exception): raise result return result + def pop(self,key): # Returns a key value and removes from dictionary d = self.data.pop(key) self.send(self.__REMOVEKEY,key) return d + def pause(self,timeout): # Stops for a while the execution of scheduled keys self.paused = time.time()+timeout @@ -845,6 +851,7 @@ def send(self,key,target,args=None,callback=None): #self.trace('send(%s,%s,%s,%s) => %s'%(key,target,args,callback,self.callbacks[key])) self.callbacks[key].append(callback) return + def command(self,command,args=None): """ This method performs a synchronous command (no callback, no persistence), it doesn't return until it is resolved """ @@ -983,10 +990,12 @@ def _run_process(self,pipe,event,executor=None): #print traceback.format_exc() #print e pipe.send((key,getPickable(e))) + except Exception,e: self.trace('.Process:\tUnknown Error in process!\n%s'%traceback.format_exc()) key = None event.wait(self.timewait) + print '!'*80 self.trace('.Process: exit_process: event=%s, thread not alive for %d s' % (event.is_set(),time.time()-last_alive)) @@ -1148,54 +1157,90 @@ def _remove_task(self,item=None): ############################################################################### -def SubprocessMethod(obj,method,*args,**kwargs): +""" +SubprocessMethod and AsynchronousFunction provide an API to execute tasks +in background processes +""" + +def SubprocessMethod(obj,*args,**kwargs): """ - Method for executing reader.get_attribute_values in background with a timeout (30 s by default) - In fact, it allows to call any object method passed by name; or just pass a callable as object. - This method could be embedded in a thread with very high timeout to trigger a callback when data is received. - This advanced behavior is not implemented yet. + arguments (this will be extracted from kwargs): + object : object to extract method or callable + method : string or callable to get from object + timeout : seconds + callback : optional method to be called + + Method for executing reader.get_attribute_values in background + with a timeout (30 s by default) + + In fact, it allows to call any object method passed by name; + or just pass a callable as object. + + This method could be embedded in a thread with very high timeout + to trigger a callback when data is received. + + This advanced behavior can be implemented using AsynchronousFunction example: reader,att = PyTangoArchiving.Reader(),'just/some/nice/attribute' dates = '2014-06-23 00:00','2014-06-30 00:00' - values = fandango.threads.SubprocessMethod(reader,'get_attribute_values',att,*dates,timeout=10.) + values = fandango.threads.SubprocessMethod(reader,'get_attribute_values', + att,*dates,timeout=10.) or - def callback(v): print('>> received %d values'%len(v)) - fandango.threads.SubprocessMethod(reader,'get_attribute_values',att,*dates,timeout=10.,callback=callback) + def callback(v): + print('>> received %d values'%len(v)) + + fandango.threads.SubprocessMethod(reader, + 'get_attribute_values',att,*dates, + timeout=10.,callback=callback) >> received 414131 values """ + method = kwargs.pop('method',None) timeout = kwargs.pop('timeout',30.) callback = kwargs.pop('callback',None) - print args - print kwargs + + #Using pipe because it's faster than queue and more efficient local,remote = multiprocessing.Pipe(False) - def do_query(o,m,conn,*a,**k): - if None in (o,m): m = o or m - else: m = getattr(o,m) - print m,a,k - conn.send(m(*a,**k)) - conn.close() + + def do_it(o,m,conn,*a,**k): + try: + if None in (o,m): + m = (o or m) + elif isString(m): + m = getattr(o,m) + #print m,a,k + conn.send(m(*a,**k)) + #conn.close() + except Exception as e: + conn.send(e) + args = (obj,method,remote)+args - subproc = multiprocessing.Process(target=do_query,args=args,kwargs=kwargs) - subproc.start() + proc = multiprocessing.Process(target=do_it,args=args,kwargs=kwargs) + proc.daemon = True + proc.start() t0 = time.time() result = None + while time.time()t0+timeout: - raise Exception('TimeOut(%s)!'%str(obj)) - elif callback: + result = Exception('TimeOut(%s,%s)!'%(str(obj),timeout)) + if callback: callback(result) - else: - return result + elif isinstance(result,Exception): + raise result + + return result class AsynchronousFunction(threading.Thread): '''This class executes a given function in a separate thread diff --git a/setup.py b/setup.py index 5c4828c64..226dd8a11 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,5 @@ #!/usr/bin/env python + # Always prefer setuptools over distutils import os, imp, sys from setuptools import setup, find_packages @@ -23,9 +24,11 @@ ------------------------------------------------------------------------------- """ - -if 'help' in str(sys.argv): - print(__doc__) +try: + #python3 + from builtins import str +except: + pass release = open('fandango/VERSION').read() @@ -85,9 +88,10 @@ 'Topic :: Software Development :: Libraries', ], platforms=[ "Linux,Windows XP/Vista/7/8" ], - install_requires=[], + install_requires=['python-future'], scripts=scripts, entry_points=entry_points, include_package_data=True, - zip_safe=False + zip_safe=False, + python_requires='==2.7', )