Module ciocore.common¶
Functions¶
signal_handler¶
Source
def signal_handler(sig_number, stack_frame):
logger.debug("in signal_handler. setting common.SIGINT_EXIT to True")
global SIGINT_EXIT
SIGINT_EXIT = True
signal_handler(sig_number, stack_frame)
register_sigint_signal_handler¶
Source
def register_sigint_signal_handler(signal_handler=signal_handler):
logger.debug("REGISTERING SIGNAL HANDLER")
signal.signal(signal.SIGINT, signal_handler)
register_sigint_signal_handler(signal_handler=<function signal_handler>)
on_windows¶
Source
def on_windows():
return platform.system() == "Windows"
on_windows()
- Return True if the current system is a Windows platform
dec_timer_exit¶
Source
def dec_timer_exit(log_level=logging.INFO):
def timer_decorator(func):
@functools.wraps(func)
def wrapper(*a, **kw):
func_name = getattr(func, "__name__", "<Unknown function>")
start_time = time.time()
result = func(*a, **kw)
finish_time = "%s :%.2f seconds" % (func_name, time.time() - start_time)
logger.log(log_level, finish_time)
return result
return wrapper
return timer_decorator
dec_timer_exit(log_level=20)
dec_catch_exception¶
Source
def dec_catch_exception(raise_=False):
def catch_decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwds):
try:
return func(*args, **kwds)
except:
func_name = getattr(func, "__name__", "<Unknown function>")
stack_str = "".join(traceback.format_exception(*sys.exc_info()))
msg = (
"\n#############################################\n"
'Failed to call "%s". Caught traceback stack:\n'
"%s\n"
"#############################################" % (func_name, stack_str)
)
logger.error(msg)
if raise_:
raise
return wrapper
return catch_decorator
dec_catch_exception(raise_=False)
- DECORATOR Wraps the decorated function/method so that if the function raises an exception, the exception will be caught, it's message will be printed, and optionally the function will return (suppressing the exception) .
run¶
Source
def run(cmd):
logger.debug("about to run command: " + cmd)
command = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = command.communicate()
status = command.returncode
return status, stdout, stderr
run(cmd)
get_md5¶
Source
def get_md5(file_path):
hasher = hashlib.md5()
with open(file_path, "rb") as afile:
buf = afile.read(MD5_BLOCKSIZE)
while len(buf) > 0:
hasher.update(buf)
buf = afile.read(MD5_BLOCKSIZE)
return hasher.digest()
get_md5(file_path)
get_base64_md5¶
Source
def get_base64_md5(*args, **kwargs):
if not os.path.isfile(args[0]):
return None
md5 = get_md5(*args)
return base64.b64encode(md5).decode("utf-8")
get_base64_md5(*args, **kwargs)
generate_md5¶
Source
def generate_md5(
filepath,
poll_seconds=None,
callback=None,
log_level=logging.INFO,
):
file_size = os.path.getsize(filepath)
hash_obj = hashlib.md5()
buffer_count = 1
last_time = time.time()
with open(filepath, "rb") as file_obj:
try:
file_buffer = file_obj.read(MD5_BLOCKSIZE)
except IOError:
logger.log(
log_level,
"Cannot read file '%s'. Is it readable by the user running the" " uploader?",
filepath,
)
raise
while len(file_buffer) > 0:
hash_obj.update(file_buffer)
file_buffer = file_obj.read(MD5_BLOCKSIZE)
curtime = time.time()
bytes_processed = buffer_count * MD5_BLOCKSIZE
percentage_processed = int((bytes_processed / float(file_size)) * 100)
if poll_seconds and curtime - last_time >= poll_seconds:
logger.log(
log_level,
"MD5 hashing %s%% (size %s bytes): %s ",
percentage_processed,
file_size,
filepath,
)
last_time = curtime
if callback:
callback(filepath, file_size, bytes_processed, log_level=log_level)
buffer_count += 1
md5 = hash_obj.digest()
logger.log(log_level, "MD5 hashing 100%% (size %s bytes): %s ", file_size, filepath)
return str(base64.b64encode(md5).decode("utf-8"))
generate_md5(filepath, poll_seconds=None, callback=None, log_level=20)
-
Generate and return md5 hash (base64) for the given filepath
filepath: str. The file path to generate an md5 hash for.
base_64: bool. whether or not to return a base64 string.
poll_seconds: int, the number of seconds to wait between logging out to the console when md5 hashing (particularly a large file which may take a while) log_level: logging.level. The log level that should be used when logging messages.
callback: A callable that is called during the md5 hashing process. It's called every time a block of data has been hashed (see blocksize arg).The callable receives the following arguments:
filepath: see above file_size: the total size of the file (in bytes) bytes_processed: the amount of bytes that has currently been hashed log_level: see above
get_human_bytes¶
Source
def get_human_bytes(bytes_):
if bytes_ > BYTES_1TB:
return "%.2fTB" % (bytes_ / float(BYTES_1TB))
elif bytes_ > BYTES_1GB:
return "%.2fGB" % (bytes_ / float(BYTES_1GB))
elif bytes_ > BYTES_1MB:
return "%.2fMB" % (bytes_ / float(BYTES_1MB))
return "%.2fKB" % (bytes_ / float(BYTES_1KB))
get_human_bytes(bytes_)
- For the given bytes (integer), convert and return a "human friendly: representation of that data size.
get_progress_percentage¶
Source
def get_progress_percentage(current, total):
if not all([current, total]):
progress_int = 0
else:
progress_int = int(current / float(total) * 100)
return "%s%%" % progress_int
get_progress_percentage(current, total)
- Return a string percentage, e.g. "80%" given current bytes (int) and total bytes (int)
get_human_duration¶
Source
def get_human_duration(seconds):
return str(datetime.timedelta(seconds=round(seconds)))
get_human_duration(seconds)
- convert the given seconds (float) into a human friendly unit
get_human_timestamp¶
Source
def get_human_timestamp(seconds_since_epoch):
return str(datetime.datetime.fromtimestamp(int(seconds_since_epoch)))
get_human_timestamp(seconds_since_epoch)
- convert the given seconds since epoch (float)
Classes¶
ExceptionAction¶
Source
def __init__(self, raise_=True, omitted_exceptions=(), disable_var=""):
self.omitted_exceptions = omitted_exceptions
self.raise_ = raise_
self.disable_var = disable_var
ExceptionAction(raise_=True, omitted_exceptions=(), disable_var='')
-
This is a base class to be used for constructing decorators that take a specific action when the decorated method/function raises an exception. For example, it can send a message or record data to a database before the exception is raised, and then (optionally) raise the exception. Optionally specify particular exceptions (classes) to be omiited from taking action on (though still raise the exception)
disable_var: str. An environment variable name, which if found in the runtime environement will disable the action from being taken. This can be useful when a developer is activeky developing and does not want the decorator to take action.
Descendants¶
- ciocore.common.ExceptionLogger
Methods¶
take_action¶
Source
def take_action(self, e): raise NotImplementedError
take_action(self, e)
-
Overide this method to do something useful before raising the exception.
e.g.: print "sending error message to mom: %s" % traceback.format_exc()
ExceptionLogger¶
Source
def __init__(
self,
message="",
log_traceback=True,
log_level=logging.WARNING,
raise_=False,
omitted_exceptions=(),
disable_var="",
):
self._message = message
self._log_traceback = log_traceback
self._log_level = log_level
super(ExceptionLogger, self).__init__(
raise_=raise_, omitted_exceptions=omitted_exceptions, disable_var=disable_var
)
ExceptionLogger(message='', log_traceback=True, log_level=30, raise_=False, omitted_exceptions=(), disable_var='')
-
DECORATOR If the decorated function raises an exception, this decorator can log the exception and continue (suppressing the actual exception. A message may be prepended to the exception message.
example output:
>> broken_function() # Warning: ciocore.common : My prependend message Traceback (most recent call last): File "/usr/local/lschlosser/code/conductor_client/conductor/lib/common.py", line 85, in decorater_function return function(*args, **kwargs) File "<maya console>", line 4, in broken_function ZeroDivisionError: integer division or modulo by zero
message: str. The prepended message log_level: int. The log level to log the message as raise_: bool. Whether to raise (i.e. not supress) the exception after it's been logged.
Ancestors (in MRO)¶
- ciocore.common.ExceptionAction
Methods¶
take_action¶
Source
def take_action(self, error): msg = "" if self._message: msg += self._message if self._log_traceback: # check if msg is empty or not. Don't want to add a newline to empty line. if msg: msg += "\n" msg += traceback.format_exc() logger.log(self._log_level, msg)
take_action(self, error)
- Log out the message
DecRetry¶
Source
def __init__(self, retry_exceptions=Exception, skip_exceptions=(), tries=8, static_sleep=None):
self.retry_exceptions = retry_exceptions
self.skip_exceptions = skip_exceptions
self.tries = tries
self.static_sleep = static_sleep
DecRetry(retry_exceptions=builtins.Exception, skip_exceptions=(), tries=8, static_sleep=None)
-
Decorator that retries the decorated function using an exponential backoff sleep.
retry_exceptions: An Exception class (or a tuple of Exception classes) that this decorator will catch/retry. All other exceptions that occur will NOT be retried. By default, all exceptions are caught (due to the default arguemnt of Exception)
skip_exceptions: An Exception class (or a tuple of Exception classes) that this decorator will NOT catch/retry. This will take precedence over the retry_exceptions.
tries: int. number of times to try (not retry) before raising static_sleep: The amount of seconds to sleep before retrying. When set to None, the sleep time will use exponential backoff. See below.
This retry function not only incorporates exponential backoff, but also "jitter". see http://www.awsarchitectureblog.com/2015/03/backoff.html. Instead of merely increasing the backoff time exponentially (determininstically), there is a randomness added that will set the sleeptime anywhere between 0 and the full exponential backoff time length.
Methods¶
sleep¶
Source
def sleep(self, seconds): time.sleep(seconds)
sleep(self, seconds)
: