Module ciocore.downloader¶
Command Line Process to run downloads.
Functions¶
random_exeption¶
Source
def random_exeption(percentage_chance):
if random.random() < percentage_chance:
raise Exception("Random exception raised (%s percent chance)" % percentage_chance)
logger.debug("Skipped random exception (%s chance)", percentage_chance)
random_exeption(percentage_chance)
dec_random_exception¶
Source
def dec_random_exception(percentage_chance):
def catch_decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwds):
random_exeption(percentage_chance)
return func(*args, **kwds)
return wrapper
return catch_decorator
dec_random_exception(percentage_chance)
- DECORATOR for creating random exceptions for the wrapped function. This is used for simluating errors to test downloader recovery behavior/robustness
delete_file¶
Source
@common.DecRetry(tries=3, static_sleep=1)
def delete_file(filepath):
os.remove(filepath)
delete_file(filepath)
chmod¶
Source
@common.DecRetry(tries=3, static_sleep=1)
def chmod(filepath, mode):
os.chmod(filepath, mode)
chmod(filepath, mode)
download_file¶
Source
@common.DecRetry(retry_exceptions=CONNECTION_EXCEPTIONS)
def download_file(download_url, filepath, poll_rate=2, state=None):
logger.info("Downloading: %s", filepath)
response = requests.get(download_url, stream=True)
logger.debug("download_url: %s", download_url)
hash_obj = hashlib.md5()
progress_bytes = 0
last_poll = time.time()
with open(filepath, "wb") as file_pointer:
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
total_bytes = float(response.headers.get("content-length", 0))
if chunk:
progress_bytes += len(chunk)
file_pointer.write(chunk)
if state != None:
state.bytes_downloaded += len(chunk)
now = time.time()
if now >= last_poll + poll_rate:
progress_percentage = common.get_progress_percentage(
progress_bytes, total_bytes
)
logger.debug("Downloading %s %s", progress_percentage, filepath)
last_poll = now
hash_obj.update(chunk)
response.raise_for_status()
logger.debug("Downloading 100%% %s", filepath)
new_md5 = hash_obj.digest()
logger.debug("Md5 Checking: %s", filepath)
return base64.b64encode(new_md5).decode("utf-8")
download_file(download_url, filepath, poll_rate=2, state=None)
- state: class with .bytes_downloaded property. Reflects the amount of bytes that have currently been downloaded. This can be used by other threads to report "progress". Note that this must be a mutable object (hence a class), so that this function, as well as other threads will read/write to the same object.
safe_mkdirs¶
Source
def safe_mkdirs(dirpath):
try:
os.makedirs(dirpath)
except OSError:
if not os.path.isdir(dirpath):
raise
safe_mkdirs(dirpath)
- Create the given directory. If it already exists, suppress the exception. This function is useful when handling concurrency issues where it's not possible to reliably check whether a directory exists before creating it.
run_downloader¶
Source
def run_downloader(args):
# Set up logging
log_level_name = args.get("log_level")
log_level = loggeria.LEVEL_MAP.get(log_level_name)
log_dirpath = args.get("log_dir")
set_logging(log_level, log_dirpath)
api_client.ApiClient.register_client(client_name = Downloader.CLIENT_NAME, client_version=ciocore.__version__)
logger.debug("Downloader args: %s", args)
job_ids = args.get("job_id")
thread_count = args.get("thread_count")
if job_ids:
Downloader.download_jobs(
job_ids,
task_id=args.get("task_id"),
thread_count=thread_count,
output_dir=args.get("output"),
)
else:
Downloader.start_daemon(
thread_count=thread_count, location=args.get("location"), output_dir=args.get("output")
)
run_downloader(args)
- Start the downloader. If a job id(s) were given, exit the downloader upon completion. Otherwise, run the downloader indefinitely (daemon mode), polling the Conductor cloud app for files that need to be downloaded.
set_logging¶
Source
def set_logging(level=None, log_dirpath=None):
log_filepath = None
if log_dirpath:
log_filepath = os.path.join(log_dirpath, "conductor_dl_log")
loggeria.setup_conductor_logging(
logger_level=level,
console_formatter=LOG_FORMATTER,
file_formatter=LOG_FORMATTER,
log_filepath=log_filepath,
)
set_logging(level=None, log_dirpath=None)
report_error¶
Source
def report_error(self, download_id, error_message):
try:
logger.error("failing upload due to: \n%s" % error_message)
# report error_message to the app
resp_str, resp_code = self.api_helper.make_request(
"/downloads/%s/fail" % download_id, data=error_message, verb="POST", use_api_key=True
)
except e:
pass
return True
report_error(self, download_id, error_message)
Classes¶
ThreadState¶
ThreadState()
-
Use a class as mutable datatype so that it can be used across threads
Descendants¶
- ciocore.downloader.FileDownloadState
- ciocore.downloader.TaskDownloadState
FileDownloadState¶
FileDownloadState()
-
Use a class as mutable datatype so that it can be used across threads
Ancestors (in MRO)¶
- ciocore.downloader.ThreadState
Class variables¶
STATE_QUEUED¶
STATE_QUEUED
:STATE_PREPARING_DIRECTORY¶
STATE_PREPARING_DIRECTORY
:STATE_HASHING_EXISTING_FILE¶
STATE_HASHING_EXISTING_FILE
:STATE_DOWNLOADING¶
STATE_DOWNLOADING
:STATE_COMPLETE¶
STATE_COMPLETE
:bytes_downloaded¶
bytes_downloaded
:hash_progress¶
hash_progress
:status¶
status
:use_existing¶
use_existing
:filepath¶
filepath
:time_started¶
time_started
:time_completed¶
time_completed
:file_info¶
file_info
:thread_name¶
thread_name
:download_id¶
download_id
:Methods¶
get_duration¶
Source
def get_duration(self): if self.time_started: if self.time_completed: return self.time_completed - self.time_started else: return time.time() - self.time_started return 0
get_duration(self)
- Return the amount of time in seconds
HistoryTableStr¶
Source
def __init__(self, data, column_names, title="", footer="", upper_headers=True):
self.cell_modifiers = {
"Completed at": functools.partial(self.human_time),
"Size": functools.partial(self.human_bytes, rjust=8),
"Duration": functools.partial(common.get_human_duration),
}
super(HistoryTableStr, self).__init__(
data=data,
column_names=column_names,
title=title,
footer=footer,
upper_headers=upper_headers,
)
HistoryTableStr(data, column_names, title='', footer='', upper_headers=True)
-
A class to help log/print tables of data
######## DOWNLOAD HISTORY¶
COMPLETED AT DOWNLOAD ID JOB TASK SIZE ACTION DURATION THREAD FILEPATH 2016-01-16 01:12:46 5228833175240704 00208 010 137.51MB DL 0:00:57 Thread-12 /tmp/conductor_daemon_dl/04/cental/cental.010.exr 2016-01-16 01:12:42 6032237141164032 00208 004 145.48MB DL 0:02:24 Thread-2 /tmp/conductor_daemon_dl/04/cental/cental.004.exr 2016-01-16 01:12:40 5273802288136192 00208 012 140.86MB DL 0:02:02 Thread-16 /tmp/conductor_daemon_dl/04/cental/cental.012.exr
args: data: list of dicts. Each dict represents a row, where the key is the column name, and the value is the...value
column_names: list of str. The columns of data to show (and the order in which they are shown)
title: str. if provided, will be printed above the table
footer: str. if provided, will be printed below the table
upper_headers: bool. If True, will automatically uppercase the column header names
Ancestors (in MRO)¶
- ciocore.loggeria.TableStr
Methods¶
human_time¶
Source
def human_time(self, timestamp, ljust=0, rjust=0): return common.get_human_timestamp(timestamp).ljust(ljust).rjust(rjust)
human_time(self, timestamp, ljust=0, rjust=0)
:human_bytes¶
Source
def human_bytes(self, bytes_, ljust=0, rjust=0): return common.get_human_bytes(bytes_).ljust(ljust).rjust(rjust)
human_bytes(self, bytes_, ljust=0, rjust=0)
:
TaskDownloadState¶
Source
def __init__(self):
self.reset()
TaskDownloadState()
-
Use a class as mutable datatype so that it can be used across threads
Ancestors (in MRO)¶
- ciocore.downloader.ThreadState
Class variables¶
STATE_QUEUED¶
STATE_QUEUED
:STATE_PREPARING_DIRECTORY¶
STATE_PREPARING_DIRECTORY
:STATE_HASHING_EXISTING_FILE¶
STATE_HASHING_EXISTING_FILE
:STATE_DOWNLOADING¶
STATE_DOWNLOADING
:STATE_COMPLETE¶
STATE_COMPLETE
:STATE_ERROR¶
STATE_ERROR
:ENTITY_STATES¶
ENTITY_STATES
:Methods¶
get_bytes_downloaded¶
Source
def get_bytes_downloaded(self): bytes_list = [file_state.bytes_downloaded for file_state in self.file_download_states] return sum(bytes_list or [0])
get_bytes_downloaded(self)
:reset_bytes_downloaded¶
Source
def reset_bytes_downloaded(self): for file_state in self.file_download_states: file_state.bytes_downloaded = 0
reset_bytes_downloaded(self)
:get_duration¶
Source
def get_duration(self): if self.time_started: if self.time_completed: return self.time_completed - self.time_started else: return time.time() - self.time_started return 0
get_duration(self)
- Return the amount of time in seconds
get_entity_status¶
Source
def get_entity_status(self): return self.ENTITY_STATES[self.status]
get_entity_status(self)
- Return corresponding entity status for the TaskDownloadState's current status
initialize¶
Source
def initialize(self, task_download): self.reset() self.task_download = task_download
initialize(self, task_download)
- Reset/initialize the properties
reset¶
Source
def reset(self): self.status = self.STATE_QUEUED self.task_download = {} self.file_download_states = {} self.time_started = None self.time_completed = None
reset(self)
:
Downloader¶
Source
def __init__(self, thread_count=None, location=None, output_dir=None):
cfg = config.config().config
# Turn on the SIGINT handler. This will catch
common.register_sigint_signal_handler()
self.api_client = api_client.ApiClient()
self.thread_count = int(thread_count or cfg["thread_count"])
logger.debug("thread_count: %s", self.thread_count)
self.location = location
logger.debug("location: %s", self.location)
self.output_dir = output_dir
Downloader(thread_count=None, location=None, output_dir=None)
-
A Downloader daemon which downloads completed frames from finished tasks.
Each task has an associated Download entity that represents all images/files that were produced from that task. A task may have more than file to download.
-
Query the app for the "next" download to download. The app will return the Download that it deems most appropriate (probably of the lowest jobid). Note the app will automatically change the status of that Download entity to "downloading". This is probably a terrible model (but it's what I'm inheriting). Note that there is cron job that resets Download entities that are in state of Downloading but have not "dialed home" after x amount of time.
-
Place this Download in the queue to be downloaded.
-
Spawn threads to actively take "work" (Downloads) from the queue.
-
Each thread is responsible for downloading the Download that it took from the queue.
-
Each thread is responsible for updating the app of it's downloading status: "downloading", etc
-
Each thread is responsible for catching their own exceptions and placing the Download back into the queue
-
Because the downloader is threaded, it makes communication/tracking of data more difficult. In order to facilitate this, ThreadState objects are shared across threads to read/write data to. This allows download progress to be communicated from multiple threads into a single object, which can then be read from a single process and "summarized".
A TaskDownloadState .files= [FileDownloadState, FileDownloadState]
To complicate matters further, if the downloader is killed (via keyboard, etc), then it should clean up after itself. So we need to catch the SIGINT EXIT signal at any point in the code and handle it gracefully.
Class variables¶
CLIENT_NAME¶
CLIENT_NAME
:naptime¶
naptime
:endpoint_downloads_next¶
endpoint_downloads_next
:endpoint_downloads_job¶
endpoint_downloads_job
:endpoint_downloads_status¶
endpoint_downloads_status
:download_progess_polling¶
download_progess_polling
:md5_progess_polling¶
md5_progess_polling
:history_queue_max¶
history_queue_max
:start_time¶
start_time
:Static methods¶
start_daemon¶
Source
@classmethod def start_daemon(cls, thread_count=None, location=None, output_dir=None, summary_interval=10): downloader = cls(thread_count=thread_count, location=location, output_dir=output_dir) thread_states = downloader.start(summary_interval=summary_interval) while not common.SIGINT_EXIT: # Give the other threads a chance to do their work time.sleep(1) downloader._print_download_history() downloader.print_uptime()
start_daemon(thread_count=None, location=None, output_dir=None, summary_interval=10)
- Run the downloader as a daemon
download_jobs¶
Source
@classmethod def download_jobs(cls, job_ids, task_id=None, thread_count=None, output_dir=None): downloader = cls(thread_count=thread_count, output_dir=output_dir) thread_states = downloader.start(job_ids, task_id=task_id) while not common.SIGINT_EXIT and ( not downloader.pending_queue.empty() or not downloader.downloading_queue.empty() ): sleep_time = 2 time.sleep(sleep_time) downloader._print_download_history() downloader.print_uptime()
download_jobs(job_ids, task_id=None, thread_count=None, output_dir=None)
- Run the downloader for explicit jobs, and terminate afterwards.
Methods¶
start¶
Source
def start(self, job_ids=None, task_id=None, summary_interval=10): # Create new queues self.start_time = time.time() self.pending_queue = queue.Queue() self.downloading_queue = queue.Queue() self.history_queue = queue.Queue() # If a job id has been specified then only load the queue up with that work if job_ids: self.history_queue_max = None self.get_jobs_downloads(job_ids, task_id) # otherwise create a queue thread the polls the app for wor else: self.start_queue_thread() task_download_states = self.start_download_threads( self.downloading_queue, self.pending_queue ) thread_states = {"task_downloads": task_download_states} self.start_summary_thread(thread_states, interval=summary_interval) # Record all of the original threads immediately so that we can monitor their state change self._original_threads = threading.enumerate() return thread_states
start(self, job_ids=None, task_id=None, summary_interval=10)
:print_uptime¶
Source
def print_uptime(self): seconds = time.time() - self.start_time human_duration = common.get_human_duration(seconds) logger.info("Uptime: %s", human_duration)
print_uptime(self)
- Return the amount of time that the uploader has been running, e.g "0:01:28"
start_queue_thread¶
Source
def start_queue_thread(self): thread = threading.Thread( name="QueueThread", target=self.queue_target, args=(self.pending_queue, self.downloading_queue), ) thread.setDaemon(True) thread.start() return thread
start_queue_thread(self)
- Start and return a thread that is responsible for pinging the app for Downloads to download (and populating the queue)
start_summary_thread¶
Source
def start_summary_thread(self, thread_states, interval): # logger.debug("thread_states: %s", thread_states) thread = threading.Thread( name="SummaryThread", target=self.print_summary, args=(thread_states, interval) ) thread.setDaemon(True) thread.start() return thread
start_summary_thread(self, thread_states, interval)
- Start and return a thread that is responsible for pinging the app for Downloads to download (and populating the queue)
start_download_threads¶
Source
def start_download_threads(self, downloading_queue, pending_queue): threads = [] task_download_states = [] for thread_number in range(self.thread_count): # create a task download state. This object is persistant and resused over and over # again for each Task that a thread downloads It's important that the state information # is wiped clean (reset) every time a new task begins. task_download_state = TaskDownloadState() task_download_states.append(task_download_state) thread = threading.Thread( target=self.download_target, args=(pending_queue, downloading_queue, task_download_state), ) thread.setDaemon(True) thread.start() threads.append(thread) return task_download_states
start_download_threads(self, downloading_queue, pending_queue)
:start_reporter_thread¶
Source
def start_reporter_thread(self, download_data): reporter_thread_name = "ReporterThread" current_thread_name = threading.current_thread().name thread_number_match = re.match(r"Thread-(\d+)", current_thread_name) if thread_number_match: reporter_thread_name += "-%s" % thread_number_match.groups()[0] thread = threading.Thread( name=reporter_thread_name, target=self.reporter_target, args=(download_data, threading.current_thread()), ) thread.setDaemon(True) thread.start() return thread
start_reporter_thread(self, download_data)
:queue_target¶
Source
def queue_target(self, pending_queue, downloading_queue): while True: try: empty_queue_slots = (self.thread_count * 2) - pending_queue.qsize() # If the queue is full, then sleep if empty_queue_slots <= 0: self.nap() continue logger.debug("empty_queue_slots: %s", empty_queue_slots) # Get the the next download. Cap the request to 20 Downloads at a time (this keeps # the request from not taking a super long time). This is an arbitrary number and # can be adjusted as needed max_count = 20 downloads = self.get_next_downloads(count=min([empty_queue_slots, max_count])) # If there is not a download, then sleep if not downloads: logger.debug("No more downloads to queue. sleeping %s seconds...", self.naptime) self.nap() continue # Ensure that each Download is not already in the pending/downloading queues for download in downloads: if _in_queue(pending_queue, download, "download_id") or _in_queue( downloading_queue, download, "download_id" ): logger.warning("Omitting Redundant Download: %s", download) continue # Add the download to the pending queue logger.debug("adding to pending queue: %s", download) self.pending_queue.put(download) except: logger.exception("Exception occurred in QueueThead:\n") # Wait for the queue to be consumed before querying for more self.nap()
queue_target(self, pending_queue, downloading_queue)
-
Fill the download queue by quering the app for the "next" Download. Only fill the queue to have as many items as there are threads.
Perpetually run this this function until the daemon has been terminated
Note: The size of the Queue can vary quite a lot. It's filled with Download objects - not file paths. A Download object represents all the files from a given Task. If a Task generates a single file, than a Queue of 20 will only contain 20 file paths. However, if a Task generates 100 files, than a Queue of 20 Downloads will actually contain 2000 paths to download.
nap¶
Source
def nap(self): while not common.SIGINT_EXIT: time.sleep(self.naptime) return
nap(self)
:get_next_downloads¶
Source
@common.dec_timer_exit(log_level=logging.DEBUG) def get_next_downloads(self, count): try: downloads = _get_next_downloads( self.location, self.endpoint_downloads_next, self.api_client, count=count ) return downloads except Exception as e: logger.exception("Could not get next download")
get_next_downloads(self, count)
:get_jobs_downloads¶
Source
def get_jobs_downloads(self, job_ids, task_id): task_ids = [t for t in task_id.split(",") if t] if task_id else [None] for job_id in job_ids: endpoint = self.endpoint_downloads_job % job_id for tid in task_ids: downloads = _get_job_download(endpoint, self.api_client, job_id, tid) if downloads: for task_download in downloads.get("downloads", []): print("putting in queue: %s" % task_download) self.pending_queue.put(task_download, block=True)
get_jobs_downloads(self, job_ids, task_id)
-
Get each job and optional comma-separated task list.
There will only be a task list if there is just one job, due to earlier arg validation.
If there is no task list, _get_job_download is called with tid=None (i.e. the whole job)
download_target¶
Source
@common.dec_catch_exception(raise_=True) def download_target(self, pending_queue, downloading_queue, task_download_state): task_download_state.thread_name = threading.currentThread().name # Start the reporter sub thread (feeding it empty data) Setup the reporter child thread to # update the apps' Download entity self.start_reporter_thread(task_download_state) while not common.SIGINT_EXIT: task_download = None try: # Get the next task download from the pending queue. task_download = pending_queue.get(block=True) # Tell the pending queue that the task has been completed (to remove the task from # the queue) pending_queue.task_done() # Transfer the task download into the downloading queue downloading_queue.put(task_download) logger.debug("download: %s", task_download) # The task_download_state variable is constructed as a mutable object (a class) so # that it can be passed into multiple functions/threads and have them read/write to # it like a global variable. Initialize/reset the task_download_state task_download_state.initialize(task_download) task_download_state.status = TaskDownloadState.STATE_DOWNLOADING # Explicitly report the Download status (to indicate that it's now "downloading" self.report_download_status(task_download_state) output_dir = task_download["output_dir"] logger.info("output directory: %s", output_dir) if self.output_dir: output_dir = self.output_dir logger.info("Overriding default output directory: %s", output_dir) # If the output folder cannot be created, create one in the current directory. try: safe_mkdirs(output_dir) except OSError: output_dir = os.path.join(os.getcwd(),DOWNLOADS_FOLDER) logger.warning("Can't create desired outout directory! Using '{}' instead.".format(output_dir)) for file_info in task_download["files"]: # Create file state object file_download_state = FileDownloadState() file_download_state.file_info = file_info file_download_state.thread_name = threading.currentThread().name file_download_state.download_id = task_download["download_id"] # attach the file state object to the task state object task_download_state.file_download_states[file_download_state] = file_info # Define the local filepath to download to local_filepath = os.path.join(output_dir, file_info["relative_path"]) # Record the filepath to the state object file_download_state.filepath = local_filepath logger.debug("Handling task file: %s", local_filepath) # Get the immediate parent directory for the file to be downloaded. Note that # this is not necessarily the same as the output_dir. The output_dir is simply # the top level directory to transer the files to. It does not necessarily # account for a render's generated subdirectories dirpath = os.path.dirname(local_filepath) # Ensure that the destination directory exists and set open permissions logger.debug("Creating destination directory if necessary: %s", dirpath) file_download_state.status = FileDownloadState.STATE_PREPARING_DIRECTORY safe_mkdirs(dirpath) logger.debug("chmodding directory: %s", dirpath) try: chmod(dirpath, 0o777) except: logger.warning("Failed to chmod filepath %s", dirpath) file_download_state.time_started = time.time() self.download_file( file_info["url"], local_filepath, file_info["md5"], file_download_state ) file_download_state.status = FileDownloadState.STATE_COMPLETE file_download_state.time_completed = time.time() self.add_to_history(file_download_state) # Update the status to downloaded task_download_state.status = TaskDownloadState.STATE_COMPLETE # Explicitly report_download_status to app. We can't always rely on the reporter # thread because it only pings the app every x seconds. self.report_download_status(task_download_state) # Remove the Download from the downloading queue. downloading_queue.get(block=False) downloading_queue.task_done() # If there is an error when handling the task_download, put it back into the pending # queue, and report to the app that it's pending again. except: if task_download: task_download_state.reset_bytes_downloaded() task_download_state.status = TaskDownloadState.STATE_ERROR logger.exception( "Failed to download Download %s", task_download.get("download_id") ) logger.debug("Putting Download back on queue: %s", task_download) pending_queue.put(task_download) # Remove the Download from the downloading queue. downloading_queue.get(block=False) downloading_queue.task_done() # Explicitly report_download_status to app. We can't always rely on the # reporter thread because it only pings the app every x seconds. Make sure this # doesn't throw an exception. Don't want to kill the thread. try: self.report_download_status(task_download_state) except: # I think the worst that could happen is that the Download may not get its # status changed to "downloaded". This will eventually get cleaned up by # the cron, and prompt a redownloading download_id = task_download_state.task_download.get("download_id") logger.exception( "Failed to report final status for Download %s, due to error", download_id, ) # Reset The task_download_state object task_download_state.reset() # If the daemon is terminated, clean up the active Download, resetting it's status on the # app logger.debug("Exiting thread. Cleaning up state for Download: ") task_download_state.reset_bytes_downloaded() task_download_state.status = TaskDownloadState.STATE_ERROR self.report_download_status(task_download_state) downloading_queue.get(block=True) downloading_queue.task_done()
download_target(self, pending_queue, downloading_queue, task_download_state)
-
This function is called in a new thread (and many threads may be executing this at a single time). This function is responsible for downloading a single Download (essentially an entity in Datastore which represents the output data from a single Task). This may consist of many files.
This function pulls one Download from the pending queue and attempts to download it. If it fails, it places it back on the queue.
The function also spawns a child thread that is responsible for constantly updating the app with the status of the Download, such as: - status (e.g. "pending", "downloading", "downloaded") - bytes transferred (the total amout of bytes that have been transferred for the Download. Note that these bytes encompass all of the bytes that have been transferred for ALL of the files that are part of the Download (as opposed to only a single file)
task_download_state: a class, serving as global mutable object, which allows this thread to report data about its Download state, so that other threads can read and output that data. This object is persistant for each thread, and is used over and over again, everytime thime this function is called, for each Task that a thread downloads It's important that the state information is wiped clean (reset) every time a new task begins. This is the resposnbility of this function
reporter_target¶
Source
def reporter_target(self, task_download_state, downloader_thread): while True: try: if common.SIGINT_EXIT: task_download_state.status = TaskDownloadState.STATE_ERROR task_download_state.reset_bytes_downloaded() if task_download_state.task_download != None: bytes_downloaded = task_download_state.get_bytes_downloaded() response_string, response_code = self.report_download_status( task_download_state ) except: logger.exception("failed to report download status") sleep_time = 5 time.sleep(sleep_time) # Check to make sure that that the downloader thread that this reporter thread is # reporting about is still alive. Otherwise exit the reporter loop if not downloader_thread.is_alive(): logger.warning( "Detected %s thread is dead. Exiting %s thread now", downloader_thread.name, threading.current_thread().name, ) return # Check to make sure that that the downloader thread that this reporter thread is # reporting about is still alive. Otherwise exit the reporter loop if not downloader_thread.is_alive(): logger.warning( "Detected %s thread is dead. Exiting %s thread now", downloader_thread.name, threading.current_thread().name, ) return
reporter_target(self, task_download_state, downloader_thread)
:download_file¶
Source
@common.DecRetry(tries=3, static_sleep=1) def download_file(self, url, local_filepath, md5, file_state): # Reset bytes downloaded to 0 (in case of retries) file_state.bytes_downloaded = 0 logger.debug("Checking for existing file %s", local_filepath) # If the file already exists on disk if os.path.isfile(local_filepath): file_state.status = FileDownloadState.STATE_HASHING_EXISTING_FILE # Create a callback function that the md5 hashing function will call periodically callback = functools.partial(self._update_file_state_callback, file_state) local_md5 = common.generate_md5( local_filepath, poll_seconds=self.md5_progess_polling, callback=callback, ) # If the local md5 matchs the expected md5 then no need to download. Skip to next file if md5 == local_md5: file_state.use_existing = True logger.info("Existing file is up to date: %s", local_filepath) return logger.debug("md5 does not match existing file: %s vs %s", md5, local_md5) logger.debug("Deleting dirty file: %s", local_filepath) delete_file(local_filepath) file_state.status = FileDownloadState.STATE_DOWNLOADING # Download to a temporary file and then move it dirpath, filename = os.path.split(local_filepath) # hack to use tempfile to generate a unique filename. close file object immediately. This # will get thrown out soon tmpfile = tempfile.NamedTemporaryFile(prefix=filename, dir=dirpath) # close this. otherwise we get warnings/errors about the file handler not being closed tmpfile.close() tmp_filepath = tmpfile.name logger.debug("tmp_filepath: %s", tmp_filepath) # download the file. new_md5 = download_file( url, tmp_filepath, poll_rate=self.download_progess_polling, state=file_state ) logger.debug("new_md5: %s", new_md5) if new_md5 != md5: try: logger.debug("Cleaning up temp file: %s", tmp_filepath) os.remove(tmp_filepath) except: logger.warning("Could not cleanup temp file: %s", tmp_filepath) raise Exception( "Downloaded file does not have expected md5. %s vs %s: %s" % (new_md5, md5, tmp_filepath) ) logger.debug("File md5 verified: %s", tmp_filepath) logger.debug("Moving: %s to %s", tmp_filepath, local_filepath) shutil.move(tmp_filepath, local_filepath) # Set file permissions logger.debug("\tsetting file perms to 666") chmod(local_filepath, 0o666)
download_file(self, url, local_filepath, md5, file_state)
- For the given file information, download the file to disk. Check whether the file already exists and matches the expected md5 before downloading.
add_to_history¶
Source
def add_to_history(self, file_download_state): self.history_queue.put(file_download_state, block=False) if self.history_queue_max != None: # Only save the last n file downloads while self.history_queue.qsize() > self.history_queue_max: self.history_queue.get(block=False)
add_to_history(self, file_download_state)
:report_download_status¶
Source
def report_download_status(self, task_download_state): download_id = task_download_state.task_download.get("download_id") if not download_id: return None, None data = { "download_id": download_id, "status": task_download_state.get_entity_status(), "bytes_downloaded": task_download_state.get_bytes_downloaded(), "bytes_to_download": task_download_state.task_download.get("size") or 0, } return self.api_client.make_request( self.endpoint_downloads_status, data=json.dumps(data), use_api_key=True )
report_download_status(self, task_download_state)
:print_summary¶
Source
def print_summary(self, thread_states, interval): while True: try: self._print_threads_alive() self._print_download_history() self._print_pending_queue() self._print_active_downloads(thread_states) self._print_dead_thread_warnings() except: logger.exception("Failed to report Summary") time.sleep(interval)
print_summary(self, thread_states, interval)
-
- total threads running
- thread names
- total download threads
Last 20 files downloaded Last 20 tasks downloaded Last 20 Downlaods downloaded
Currently downloading jobs Currenly downloading files Currently downloading Downloads
# SUMMARY¶
Active Thread Count: 14
Threads: ErrorThread MainThread MetricStore ProgressThread ProgressThread ProgressThread ProgressThread ProgressThread ProgressThread QueueThread Thread-1 Thread-2 Thread-3 Thread-4 Thread-5
ACTIVE DOWNLOADS¶
Job 08285 Task 004 - 80% (200/234MB) - Thread-1 /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/beauty/deep_lidar.deep.0005.exr HASHING EXISTING FILE 80% /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/data/deep_lidar.deep.0005.exr DOWLOADING 20%
Job 08285 Task 003 - 20% (20/234MB) - Thread-2 /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01142.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01074.exr
PENDING DOWNLOADS¶
Job 08285 Task 006 Job 08285 Task 007 Job 08285 Task 008 Job 08285 Task 009
HISTORY¶
Last 20 files downloaded:
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01142.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01074.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01038.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01111.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01087.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01143.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01095.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01156.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01016.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01039.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01130.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01030.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01015.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01138.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01063.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01006.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01065.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01096.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01055.exr
Last 20 tasks downloaded Job 08285 Task 004 (Download 3492394234) Job 08285 Task 002 (Download 3492394234) Job 08285 Task 003 (Download 3492394234) Job 08285 Task 001 (Download 3492394234) Job 08285 Task 000 (Download 3492394234) Job 08284 Task 065 (Download 3492394234) Job 08283 Task 064 (Download 3492394234) Job 08283 Task 063 (Download 3492394234) Job 08282 Task 032 (Download 3492394234) Job 08282 Task 025 (Download 3492394234) Job 08282 Task 001 (Download 3492394234)
¶
construct_active_downloads_summary¶
Source
def construct_active_downloads_summary(self, task_download_states): header = "##### ACTIVE DOWNLOADS #####" content = "" for task_download_state in task_download_states: if task_download_state.task_download.get("download_id"): content += "\n\n%s" % self._contruct_active_download_summary(task_download_state) if content: return header + content return ""
construct_active_downloads_summary(self, task_download_states)
-
ACTIVE DOWNLOADS¶
Job 08285 Task 004 - 80% (200/234MB) - Thread-1 /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/beauty/deep_lidar.deep.0005.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/data/deep_lidar.deep.0005.exr
Job 08285 Task 003 - 20% (20/234MB) - Thread-2 /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01142.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01074.exr
construct_file_downloads_history_summary¶
Source
def construct_file_downloads_history_summary(self, file_download_history): table_data = [] for file_download_state in file_download_history: row_data = {} row_data["Completed at"] = file_download_state.time_completed row_data["Download ID"] = file_download_state.download_id row_data["Job"] = file_download_state.file_info["job_id"] row_data["Task"] = file_download_state.file_info["task_id"] row_data["Size"] = file_download_state.file_info["size"] row_data["Action"] = "Reused" if file_download_state.use_existing else "DL" row_data["Duration"] = file_download_state.get_duration() row_data["Thread"] = file_download_state.thread_name row_data["Filepath"] = file_download_state.filepath table_data.append(row_data) column_names = [ "Completed at", "Download ID", "Job", "Task", "Size", "Action", "Duration", "Thread", "Filepath", ] title = " DOWNLOAD HISTORY %s " % ( ("(last %s files)" % self.history_queue_max) if self.history_queue_max else "" ) table = HistoryTableStr( data=list(table_data), column_names=column_names, title=title.center(100, "#"), footer="#" * 180, upper_headers=True, ) return table.make_table_str()
construct_file_downloads_history_summary(self, file_download_history)
-
DOWNLOAD HISTORY¶
6227709558521856 Job 08285 Task 001 20MB CACHED /work/renders/light_v001/beauty/deep_lidar.deep.0005.exr
7095580349853434 Job 08285 Task 002 10MB DL /work/renders/spider_fly01/beauty/deep_lidar.deep.0005.exr 5343402947290140 Job 08284 Task 001 5MB DL /work/renders/light_v002/data/light_002.deep.0005.exr
-