"""Table base classes for defning new tables"""
import inspect
from abc import ABCMeta, abstractmethod
import hashlib
import dill as pickle
import pandas as pd
[docs]def post_process(table, post_processors):
"""Applies the list of post processing methods if any"""
table_result = table
for processor in post_processors:
table_result = processor(table_result)
return table_result
[docs]def describe(cls, full=False):
"""Prints a description of the table based on the provided
documentation and post processors"""
divider_double = "=" * 80
divider_single = "-" * 80
description = cls.__doc__
message = []
message.append(divider_double)
message.append(cls.__name__ + ':')
message.append(description)
if full and cls.post_processors(cls):
message.append(divider_single)
message.append("Post processors:")
message.append(divider_single)
for processor in cls.post_processors(cls):
message.append(">" + " " * 3 + processor.__name__ + ':')
message.append(" " * 4 + processor.__doc__)
message.append('')
message.append(divider_double)
message.append('')
for line in message:
print(line)
[docs]class BaseTableABC(metaclass=ABCMeta):
"""Abstract Base class for minimum table import"""
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
@abstractmethod
[docs] def source(self):
"""Path to the original raw data"""
pass
source.dependencies = list()
@abstractmethod
[docs] def output(self):
"""Path to the processed table (output path)"""
pass
@abstractmethod
[docs] def fetch(self, rebuild=False, cache=True):
"""Method for fetching data"""
pass
@abstractmethod
[docs] def post_processors(self):
"""A list of functions to be applied for post processing"""
return list()
@classmethod
[docs] def describe_processors(cls):
"""List all postprocessors and their description"""
# TODO: Add dependencies to this dictionary
for processor in cls.post_processors(cls):
yield {'name': processor.__name__,
'description': processor.__doc__,
'processor': processor}
@classmethod
[docs] def describe(cls, full=False):
"""Prints a description of the table based on the provided
documentation and post processors.
Args:
full (bool): Include post processors in the printed description.
"""
return describe(cls, full)
@classmethod
[docs] def dependencies(cls):
"""Returns a list of all dependent tables,
in the order they are defined.
Add new dependencies for source and every post proecssor like this::
source.dependencies = [PersonalData]
some_post_processor.dependencies = [SomeOtherTable, AnotherTable]
`some_post_processor.dependencies` needs to be placed after
`some_post_processor` is defined.
"""
dependencies = []
try:
dependencies += cls.source.dependencies
except AttributeError:
pass
for processor in cls.post_processors(cls):
try:
assert isinstance(processor.dependencies, list), \
"{}.dependencies must be a list".format(processor.__name__)
dependencies += processor.dependencies
except AttributeError:
pass
return dependencies
dep = dependencies
"""dep is an alias of dependencies"""
# TODO: Make dependent columns method for use in tests.
[docs] def get_settings_list(self):
"""The settings list used for building the cache id."""
return [
self.source,
self.output,
self.kwargs,
self.post_processors,
]
[docs] def get_hash(self):
"""Retruns a hash based on the the current table code and kwargs.
Also changes based on dependent tables."""
depencency_hashes = [dep.get_hash() for dep in self.dep()]
sl = inspect.getsourcelines
hash_sources = [sl(self.__class__), self.args,
self.kwargs, *depencency_hashes]
hash_input = pickle.dumps(hash_sources)
return hashlib.md5(hash_input).hexdigest()
[docs] def get_cached_filename(self, filename, extention, settings_list=None):
"""Creates a filename with md5 cache string based on settings list
Args:
filename (str): the filename without extention
extention (str): the file extention without dot. (i.e. 'pkl')
settings_list (dict|list): the settings list as list (optional)
NB! The dictionaries have to be sorted or hash id will change
arbitrarely.
"""
cached_name = "_".join([filename, self.get_hash()])
return ".".join([cached_name, extention])
[docs]class Table(BaseTableABC, metaclass=ABCMeta):
"""MetaClass for defining tables.
Attention! The following methods are required when defining a
class the inherits from Table
Methods:
source(self): Should return the table.
For example pd.read_csv() **(required, method)**
output(self): Should return the output path for
where the finished table should be stored.
For example a cache directory. **(required, method)**
post_processors(self): a list of post processor
functions of methods. **(required, method)**
Example:
Defining a table::
class UserDataTable(Table):
def source(self):
return pd.read_csv('/path/to/file')
def output(self):
return "/path/to/output"
def post_processors(self):
return [
my_custom_function(),
my_second_custom_function(),
]
"""
def __init__(self, *args, **kwargs):
super(Table, self).__init__(*args, **kwargs)
@abstractmethod
[docs] def source(self):
"""Path to the original raw data"""
pass
@abstractmethod
[docs] def output(self):
"""Path to the processed table (output path)"""
pass
@abstractmethod
[docs] def post_processors(self):
"""A list of functions to be applied for post processing"""
return list()
[docs] def to_cache(self, table):
"""Defines the default cache method. Can be overwritten if needed"""
table.to_pickle(self.output())
[docs] def read_cache(self):
"""Defines how to read table from cache.
Should be overwritten if to cache is overwritten"""
return pd.read_pickle(self.output())
def _process_table(self, cache=True):
"""Applies the post processors"""
table = self.source()
assert not isinstance(table, None.__class__), \
"{}.source needs to return something, not None".format(self.__class__.__name__)
table = post_process(table, self.post_processors())
if cache:
self.to_cache(table)
return table
# TODO: Check upstream if a table needs to be rerun (will be fixed based on hash included in settings for dependent variables)
[docs] def fetch(self, rebuild=False, cache=True):
"""Fetches the table and applies all post processors.
Args:
rebuild (bool): Rebuild the table and ignore cache. Default: False
cache (bool): Cache the finished table for faster future loading.
Default: True
"""
if rebuild:
return self._process_table(cache)
try:
return self.read_cache()
except FileNotFoundError:
return self._process_table(cache)