Triggers#
Execute a workflow triggered by a specific type of event
Classes:
|
Base class to be subclassed by any custom defined trigger. |
|
Directory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen. |
|
Performs a trigger action every time_gap seconds. |
- class covalent.triggers.BaseTrigger(lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#
Bases:
objectBase class to be subclassed by any custom defined trigger. Implements all the necessary methods used for interacting with dispatches, including getting their statuses and performing a redispatch of them whenever the trigger gets triggered.
- Parameters
lattice_dispatch_id (
Optional[str]) – Dispatch ID of the worfklow which has to be redispatched in case this trigger gets triggereddispatcher_addr (
Optional[str]) – Address of dispatcher server used to retrieve info about or redispatch any dispatchestriggers_server_addr (
Optional[str]) – Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default
- self.lattice_dispatch_id#
Dispatch ID of the worfklow which has to be redispatched in case this trigger gets triggered
- self.dispatcher_addr#
Address of dispatcher server used to retrieve info about or redispatch any dispatches
- self.triggers_server_addr#
Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default
- self.new_dispatch_ids#
List of all the newly created dispatch ids from performing redispatch
- self.observe_blocks#
Boolean to indicate whether the self.observe method is a blocking call
- self.event_loop#
Event loop to be used if directly calling dispatcher’s functions instead of the REST APIs
- self.use_internal_funcs#
Boolean indicating whether to use dispatcher’s functions directly instead of through API calls
- self.stop_flag#
To handle stopping mechanism in a thread safe manner in case self.observe() is a blocking call (e.g. see TimeTrigger)
Methods:
observe()Start observing for any change which can be used to trigger this trigger.
register()Register this trigger to the Triggers server and start observing.
stop()Stop observing for changes.
to_dict()Return a dictionary representation of this trigger which can later be used to regenerate it.
trigger()Trigger this trigger and perform a redispatch of the connected dispatch id’s workflow.
- abstract observe()[source]#
Start observing for any change which can be used to trigger this trigger. To be implemented by the subclass.
- register()[source]#
Register this trigger to the Triggers server and start observing.
- Return type
None
- class covalent.triggers.DirTrigger(dir_path, event_names, batch_size=1, lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None, recursive=False)[source]#
Bases:
covalent.triggers.base.BaseTriggerDirectory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen.
- Parameters
dir_path – Path to the file/dir which is to be observed for events
event_names – List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”].
batch_size (
int) – The number of changes to wait for before performing the trigger action, default is 1.recursive (
bool) – Whether to recursively watch the directory, default is False.
- self.dir_path#
Path to the file/dir which is to be observed for events
- self.event_names#
List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”]
- self.batch_size#
The number of events to wait for before performing the trigger action, default is 1.
- self.recursive#
Whether to recursively watch the directory, default is False.
- self.n_changes#
Number of events since last trigger action. Whenever self.n_changes == self.batch_size a trigger action happens.
Methods:
Dynamically attaches and overrides the “on_*” methods to the handler depending on which ones are requested by the user.
observe()Start observing the file/dir for any possible events among the ones mentioned in self.event_names.
stop()Stop observing the file or directory for changes.
- attach_methods_to_handler()[source]#
Dynamically attaches and overrides the “on_*” methods to the handler depending on which ones are requested by the user.
- Parameters
event_names – List of event names upon which to perform a trigger action
- Return type
None
- class covalent.triggers.TimeTrigger(time_gap, lattice_dispatch_id=None, dispatcher_addr=None, triggers_server_addr=None)[source]#
Bases:
covalent.triggers.base.BaseTriggerPerforms a trigger action every time_gap seconds.
- Parameters
time_gap (
int) – Amount of seconds to wait before doing a trigger action
- self.time_gap#
Amount of seconds to wait before doing a trigger action
- self.stop_flag#
Thread safe flag used to check whether the stop condition has been met
Methods:
observe()Keep performing the trigger action every self.time_gap seconds until stop condition has been met.
stop()Stop the running self.observe() method by setting the self.stop_flag flag.