from datetime import datetime
import os
import uuid
from types import FunctionType
from typing import List, Optional
from sqlalchemy_utils.functions import create_database, database_exists
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
from easyfilewatcher.utils.config import DEFAULT_ENGINE
from easyfilewatcher.adapters.ORM import init_tables
from easyfilewatcher.domain.EasyFileWatcherUnit import EasyFileWatcherUnit
from easyfilewatcher.UoW import EasyFileWatcherUoW
[docs]class EasyFileWatcher:
def __init__(self, jobstore: Optional[str] = None) -> None:
"""This method initializes the EasyFileWatcher.
:param Optional[str] jobstore: url to database storing jobs; default local sqlite file
:returns: None
:rtype: None
"""
self.__int_filewatcher_sheduler(jobstore=jobstore)
self.__init_database()
self.__init_database_tables()
def __int_filewatcher_sheduler(self, jobstore: str = None) -> None:
"""This method initializes and configures the Scheduler for the EasyFileWatcher.
:returns: None
:rtype: None
"""
global filewatcher_sheduler
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite' if jobstore is None else jobstore)
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=20)
}
job_defaults = {
'coalesce': False,
'max_instances': 10000
}
filewatcher_sheduler = BackgroundScheduler()
filewatcher_sheduler.configure(jobstores=jobstores,
executors=executors, job_defaults=job_defaults)
filewatcher_sheduler.start()
def __init_database(self) -> None:
"""This method initializes the database for the EasyFileWatcher if it doesn't exist yet.
:returns: None
:rtype: None
"""
try:
if not database_exists(DEFAULT_ENGINE.url):
create_database(DEFAULT_ENGINE.url)
except Exception as e:
print(e)
print("database exists")
def __init_database_tables(self) -> None:
"""This method initializes the database tables for the EasyFileWatcher if they don't exist yet.
:returns: None
:rtype: None
"""
try:
init_tables()
except Exception as e:
print(e)
print("database tables already exist")
[docs] def delete_easy_file_watcher(self, directory_watcher_id) -> bool:
"""This method deletes an EasyFileWatcher. It returns True at success and False at failure.
:returns: None
:rtype: None
"""
try:
filewatcher_sheduler.remove_job(
job_id=directory_watcher_id)
except Exception as e:
filewatcher_sheduler.print_jobs()
print(e)
try:
with EasyFileWatcherUoW() as uow:
file_watcher_units_in_db = uow.easy_file_watcher_repository.get_all_by_id(
directory_watcher_id=directory_watcher_id)
[uow.easy_file_watcher_repository.delete(
easy_file_watcher) for easy_file_watcher in file_watcher_units_in_db]
uow.commit()
except Exception as e:
filewatcher_sheduler.print_jobs()
print(e)
return False
return True
[docs] def get_directory_watcher_jobs(self) -> list:
"""This method return a list of current EasyFileWatcher Jobs.
Good way to retrieve DirectoryWatcherID if not initially provided.
:returns: list of EasyFileWatcher Jobs
:rtype: list
"""
return filewatcher_sheduler.get_jobs()
@staticmethod
def __get_all_easy_file_watcher_units(directory_path: str, directory_watcher_id: str) -> List[EasyFileWatcherUnit]:
"""This method returns all content and its metadata from a specified directory path. Temporary contents/files are disregarded.
:param str directory_path: path of directory to watch
:param str directory_watcher_id: assigned ID of watcher
:returns: List of EasyFileWatcherUnits
:rtype: List[EasyFileWatcherUnit]
"""
file_contents = []
for (dirpath, dirnames, filenames) in os.walk(directory_path):
[file_contents.append(EasyFileWatcherUnit(**{
"directory_watcher_id": directory_watcher_id,
"filepath": os.path.join(dirpath, file),
"size": os.path.getsize(os.path.join(dirpath, file)),
"last_modification": os.path.getmtime(os.path.join(dirpath, file))})) for file in filenames if not "~$" in file]
return file_contents
@staticmethod
def __get_all_current_filewatcher_units_by_directory_watcher_id(directory_watcher_id) -> List[EasyFileWatcherUnit]:
"""This method returns all content and its metadata from database associated to a certain EasyFileWatcher.
:param str directory_watcher_id: assigned ID of watcher
:returns: List of EasyFileWatcherUnits
:rtype: List[EasyFileWatcherUnit]
"""
with EasyFileWatcherUoW() as uow:
current_file_watcher_units = uow.easy_file_watcher_repository.get_all_by_id(
directory_watcher_id=directory_watcher_id)
uow.commit()
return current_file_watcher_units
[docs] def add_directory_to_watch(self, directory_path: str, callback: FunctionType,
start_date: Optional[datetime] = datetime.now(), end_date: Optional[datetime] = None,
callback_param: Optional[dict] = None, directory_watcher_id: Optional[str] = None,
event_on_deletion: Optional[bool] = True, polling_time: Optional[int] = 2) -> None:
"""This method registers the directory of interest to watch.
It further requires the function signature.
Parameters to the function can be passed as a dictionary.
Passing a directory_watcher_id is highly recommended as it can be used to delete a certain FileWatcher.
:param str directory_path: path of directory to watch
:param func callback: custom user to function to be executed at change in directory
:param Optional[dict] callback_param: parameters to be passed to callback function
:param Optional[str] directory_watcher_id: assigned ID of watcher
:param Optional[bool] event_on_deletion: shall an event be triggered at deletion of File
:param Optional[datetime] start_date: When shall File Watcher starts tracking
:param Optional[datetime] end_date: When shall File Watcher end tracking
:param Optional[int] polling_time: Interval Watcher checks on specified directory
:returns: List of EasyFileWatcherUnits
:rtype: List[EasyFileWatcherUnit]
"""
if directory_watcher_id is None:
directory_watcher_id = "directory_watcher_" + uuid.uuid4().hex
easy_file_watcher_units = EasyFileWatcher.__get_all_easy_file_watcher_units(
directory_path=directory_path, directory_watcher_id=directory_watcher_id)
with EasyFileWatcherUoW() as uow:
uow.easy_file_watcher_repository.add_all(
easy_file_watcher_units=easy_file_watcher_units)
uow.commit()
filewatcher_sheduler.add_job(EasyFileWatcher.execute_job, 'interval', [directory_watcher_id, directory_path, callback, callback_param, event_on_deletion],
seconds=polling_time, replace_existing=True, id=directory_watcher_id, start_date=start_date, end_date=end_date)
[docs] @staticmethod
def execute_job(*args):
"""This method executes the call back of the directory of interest to watch."""
file_watcher_units_in_db = EasyFileWatcher.__get_all_current_filewatcher_units_by_directory_watcher_id(
args[0])
file_watcher_units = EasyFileWatcher.__get_all_easy_file_watcher_units(
args[1], args[0])
confirmed_change = EasyFileWatcher.__detect_change(
old_file_watcher_units=file_watcher_units_in_db, new_file_watcher_units=file_watcher_units, event_on_deletion=args[4])
if confirmed_change:
with EasyFileWatcherUoW() as uow:
[uow.easy_file_watcher_repository.delete(
easy_file_watcher) for easy_file_watcher in file_watcher_units_in_db]
uow.commit()
uow.easy_file_watcher_repository.add_all(file_watcher_units)
uow.commit()
args[2](**args[3])
@staticmethod
def __detect_change(old_file_watcher_units: List[EasyFileWatcherUnit], new_file_watcher_units: List[EasyFileWatcherUnit], event_on_deletion: bool) -> bool:
"""This method detects changes in the directory of interest to watch."""
try:
if not event_on_deletion:
if old_file_watcher_units > new_file_watcher_units and sorted(new_file_watcher_units) == sorted(list(set(old_file_watcher_units).intersection(new_file_watcher_units))):
return False
if len(new_file_watcher_units) != len(old_file_watcher_units) or sorted(old_file_watcher_units) != sorted(new_file_watcher_units):
return True
return False
except Exception as e:
print(e)
[docs] def resume_file_watching(self, directory_watcher_id: str) -> bool:
"""This method resumes a FileWatcher if it was paused before.
:param str directory_watcher_id: assigned ID of watcher
:returns: success
:rtype: bool
"""
try:
filewatcher_sheduler.resume_job(job_id=directory_watcher_id)
return True
except Exception as e:
print(e)
return False
[docs] def pause_file_watching(self, directory_watcher_id: str) -> bool:
"""This method pauses a FileWatcher.
:param str directory_watcher_id: assigned ID of watcher
:returns: success
:rtype: bool
"""
try:
filewatcher_sheduler.pause_job(job_id=directory_watcher_id)
return True
except Exception as e:
print(e)
return False