Skip to content

Module ciocore.downloader

Command Line Process to run downloads.

Functions

random_exeption

Source
def random_exeption(percentage_chance):
    if random.random() < percentage_chance:
        raise Exception("Random exception raised (%s percent chance)" % percentage_chance)
    logger.debug("Skipped random exception (%s chance)", percentage_chance)
random_exeption(percentage_chance)

dec_random_exception

Source
def dec_random_exception(percentage_chance):


    def catch_decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwds):
            random_exeption(percentage_chance)
            return func(*args, **kwds)

        return wrapper

    return catch_decorator
dec_random_exception(percentage_chance)
DECORATOR for creating random exceptions for the wrapped function. This is used for simluating errors to test downloader recovery behavior/robustness

delete_file

Source
@common.DecRetry(tries=3, static_sleep=1)
def delete_file(filepath):
    os.remove(filepath)
delete_file(filepath)

chmod

Source
@common.DecRetry(tries=3, static_sleep=1)
def chmod(filepath, mode):
    os.chmod(filepath, mode)
chmod(filepath, mode)

download_file

Source
@common.DecRetry(retry_exceptions=CONNECTION_EXCEPTIONS)
def download_file(download_url, filepath, poll_rate=2, state=None):

    logger.info("Downloading: %s", filepath)
    response = requests.get(download_url, stream=True)

    logger.debug("download_url: %s", download_url)

    hash_obj = hashlib.md5()

    progress_bytes = 0
    last_poll = time.time()
    with open(filepath, "wb") as file_pointer:
        for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
            total_bytes = float(response.headers.get("content-length", 0))
            if chunk:
                progress_bytes += len(chunk)
                file_pointer.write(chunk)
                if state != None:
                    state.bytes_downloaded += len(chunk)
                now = time.time()
                if now >= last_poll + poll_rate:
                    progress_percentage = common.get_progress_percentage(
                        progress_bytes, total_bytes
                    )
                    logger.debug("Downloading %s %s", progress_percentage, filepath)
                    last_poll = now

                hash_obj.update(chunk)
    response.raise_for_status()
    logger.debug("Downloading 100%% %s", filepath)
    new_md5 = hash_obj.digest()
    logger.debug("Md5 Checking: %s", filepath)
    return base64.b64encode(new_md5).decode("utf-8")
download_file(download_url, filepath, poll_rate=2, state=None)
state: class with .bytes_downloaded property. Reflects the amount of bytes that have currently been downloaded. This can be used by other threads to report "progress". Note that this must be a mutable object (hence a class), so that this function, as well as other threads will read/write to the same object.

safe_mkdirs

Source
def safe_mkdirs(dirpath):

    try:
        os.makedirs(dirpath)
    except OSError:
        if not os.path.isdir(dirpath):
            raise
safe_mkdirs(dirpath)
Create the given directory. If it already exists, suppress the exception. This function is useful when handling concurrency issues where it's not possible to reliably check whether a directory exists before creating it.

run_downloader

Source
def run_downloader(args):


    # Set up logging
    log_level_name = args.get("log_level")
    log_level = loggeria.LEVEL_MAP.get(log_level_name)
    log_dirpath = args.get("log_dir")
    set_logging(log_level, log_dirpath)

    api_client.ApiClient.register_client(client_name = Downloader.CLIENT_NAME, client_version=ciocore.__version__)

    logger.debug("Downloader args: %s", args)

    job_ids = args.get("job_id")
    thread_count = args.get("thread_count")

    if job_ids:
        Downloader.download_jobs(
            job_ids,
            task_id=args.get("task_id"),
            thread_count=thread_count,
            output_dir=args.get("output"),
        )

    else:
        Downloader.start_daemon(
            thread_count=thread_count, location=args.get("location"), output_dir=args.get("output")
        )
run_downloader(args)
Start the downloader. If a job id(s) were given, exit the downloader upon completion. Otherwise, run the downloader indefinitely (daemon mode), polling the Conductor cloud app for files that need to be downloaded.

set_logging

Source
def set_logging(level=None, log_dirpath=None):
    log_filepath = None
    if log_dirpath:
        log_filepath = os.path.join(log_dirpath, "conductor_dl_log")
    loggeria.setup_conductor_logging(
        logger_level=level,
        console_formatter=LOG_FORMATTER,
        file_formatter=LOG_FORMATTER,
        log_filepath=log_filepath,
    )
set_logging(level=None, log_dirpath=None)

report_error

Source
def report_error(self, download_id, error_message):
    try:
        logger.error("failing upload due to: \n%s" % error_message)
        # report error_message to the app
        resp_str, resp_code = self.api_helper.make_request(
            "/downloads/%s/fail" % download_id, data=error_message, verb="POST", use_api_key=True
        )
    except e:
        pass
    return True
report_error(self, download_id, error_message)

Classes

ThreadState

ThreadState()

Use a class as mutable datatype so that it can be used across threads

Descendants

  • ciocore.downloader.FileDownloadState
  • ciocore.downloader.TaskDownloadState

FileDownloadState

FileDownloadState()

Use a class as mutable datatype so that it can be used across threads

Ancestors (in MRO)

  • ciocore.downloader.ThreadState

Class variables

STATE_QUEUED

STATE_QUEUED :

STATE_PREPARING_DIRECTORY

STATE_PREPARING_DIRECTORY :

STATE_HASHING_EXISTING_FILE

STATE_HASHING_EXISTING_FILE :

STATE_DOWNLOADING

STATE_DOWNLOADING :

STATE_COMPLETE

STATE_COMPLETE :

bytes_downloaded

bytes_downloaded :

hash_progress

hash_progress :

status

status :

use_existing

use_existing :

filepath

filepath :

time_started

time_started :

time_completed

time_completed :

file_info

file_info :

thread_name

thread_name :

download_id

download_id :

Methods

get_duration

Source
def get_duration(self):

    if self.time_started:
        if self.time_completed:
            return self.time_completed - self.time_started
        else:
            return time.time() - self.time_started

    return 0
get_duration(self)
Return the amount of time in seconds

HistoryTableStr

Source
def __init__(self, data, column_names, title="", footer="", upper_headers=True):

    self.cell_modifiers = {
        "Completed at": functools.partial(self.human_time),
        "Size": functools.partial(self.human_bytes, rjust=8),
        "Duration": functools.partial(common.get_human_duration),
    }

    super(HistoryTableStr, self).__init__(
        data=data,
        column_names=column_names,
        title=title,
        footer=footer,
        upper_headers=upper_headers,
    )
HistoryTableStr(data, column_names, title='', footer='', upper_headers=True)

A class to help log/print tables of data

######## DOWNLOAD HISTORY

COMPLETED AT DOWNLOAD ID JOB TASK SIZE ACTION DURATION THREAD FILEPATH 2016-01-16 01:12:46 5228833175240704 00208 010 137.51MB DL 0:00:57 Thread-12 /tmp/conductor_daemon_dl/04/cental/cental.010.exr 2016-01-16 01:12:42 6032237141164032 00208 004 145.48MB DL 0:02:24 Thread-2 /tmp/conductor_daemon_dl/04/cental/cental.004.exr 2016-01-16 01:12:40 5273802288136192 00208 012 140.86MB DL 0:02:02 Thread-16 /tmp/conductor_daemon_dl/04/cental/cental.012.exr

args: data: list of dicts. Each dict represents a row, where the key is the column name, and the value is the...value

column_names: list of str. The columns of data to show (and the order in which they are shown)

title: str. if provided, will be printed above the table

footer: str. if provided, will be printed below the table

upper_headers: bool. If True, will automatically uppercase the column header names

Ancestors (in MRO)

  • ciocore.loggeria.TableStr

Methods

human_time

Source
def human_time(self, timestamp, ljust=0, rjust=0):
    return common.get_human_timestamp(timestamp).ljust(ljust).rjust(rjust)

human_time(self, timestamp, ljust=0, rjust=0) :

human_bytes

Source
def human_bytes(self, bytes_, ljust=0, rjust=0):
    return common.get_human_bytes(bytes_).ljust(ljust).rjust(rjust)

human_bytes(self, bytes_, ljust=0, rjust=0) :

TaskDownloadState

Source
def __init__(self):
    self.reset()
TaskDownloadState()

Use a class as mutable datatype so that it can be used across threads

Ancestors (in MRO)

  • ciocore.downloader.ThreadState

Class variables

STATE_QUEUED

STATE_QUEUED :

STATE_PREPARING_DIRECTORY

STATE_PREPARING_DIRECTORY :

STATE_HASHING_EXISTING_FILE

STATE_HASHING_EXISTING_FILE :

STATE_DOWNLOADING

STATE_DOWNLOADING :

STATE_COMPLETE

STATE_COMPLETE :

STATE_ERROR

STATE_ERROR :

ENTITY_STATES

ENTITY_STATES :

Methods

get_bytes_downloaded

Source
def get_bytes_downloaded(self):
    bytes_list = [file_state.bytes_downloaded for file_state in self.file_download_states]
    return sum(bytes_list or [0])

get_bytes_downloaded(self) :

reset_bytes_downloaded

Source
def reset_bytes_downloaded(self):
    for file_state in self.file_download_states:
        file_state.bytes_downloaded = 0

reset_bytes_downloaded(self) :

get_duration

Source
def get_duration(self):

    if self.time_started:
        if self.time_completed:
            return self.time_completed - self.time_started
        else:
            return time.time() - self.time_started

    return 0
get_duration(self)
Return the amount of time in seconds

get_entity_status

Source
def get_entity_status(self):

    return self.ENTITY_STATES[self.status]
get_entity_status(self)
Return corresponding entity status for the TaskDownloadState's current status

initialize

Source
def initialize(self, task_download):

    self.reset()
    self.task_download = task_download
initialize(self, task_download)
Reset/initialize the properties

reset

Source
def reset(self):
    self.status = self.STATE_QUEUED
    self.task_download = {}
    self.file_download_states = {}
    self.time_started = None
    self.time_completed = None

reset(self) :

Downloader

Source
def __init__(self, thread_count=None, location=None, output_dir=None):
    cfg = config.config().config
    # Turn on the SIGINT handler.  This will catch
    common.register_sigint_signal_handler()

    self.api_client = api_client.ApiClient()

    self.thread_count = int(thread_count or cfg["thread_count"])
    logger.debug("thread_count: %s", self.thread_count)

    self.location = location
    logger.debug("location: %s", self.location)

    self.output_dir = output_dir
Downloader(thread_count=None, location=None, output_dir=None)

A Downloader daemon which downloads completed frames from finished tasks.

Each task has an associated Download entity that represents all images/files that were produced from that task. A task may have more than file to download.

  1. Query the app for the "next" download to download. The app will return the Download that it deems most appropriate (probably of the lowest jobid). Note the app will automatically change the status of that Download entity to "downloading". This is probably a terrible model (but it's what I'm inheriting). Note that there is cron job that resets Download entities that are in state of Downloading but have not "dialed home" after x amount of time.

  2. Place this Download in the queue to be downloaded.

  3. Spawn threads to actively take "work" (Downloads) from the queue.

  4. Each thread is responsible for downloading the Download that it took from the queue.

  5. Each thread is responsible for updating the app of it's downloading status: "downloading", etc

  6. Each thread is responsible for catching their own exceptions and placing the Download back into the queue

  7. Because the downloader is threaded, it makes communication/tracking of data more difficult. In order to facilitate this, ThreadState objects are shared across threads to read/write data to. This allows download progress to be communicated from multiple threads into a single object, which can then be read from a single process and "summarized".

A TaskDownloadState .files= [FileDownloadState, FileDownloadState]

To complicate matters further, if the downloader is killed (via keyboard, etc), then it should clean up after itself. So we need to catch the SIGINT EXIT signal at any point in the code and handle it gracefully.

Class variables

CLIENT_NAME

CLIENT_NAME :

naptime

naptime :

endpoint_downloads_next

endpoint_downloads_next :

endpoint_downloads_job

endpoint_downloads_job :

endpoint_downloads_status

endpoint_downloads_status :

download_progess_polling

download_progess_polling :

md5_progess_polling

md5_progess_polling :

history_queue_max

history_queue_max :

start_time

start_time :

Static methods

start_daemon

Source
@classmethod
def start_daemon(cls, thread_count=None, location=None, output_dir=None, summary_interval=10):

    downloader = cls(thread_count=thread_count, location=location, output_dir=output_dir)
    thread_states = downloader.start(summary_interval=summary_interval)

    while not common.SIGINT_EXIT:

        # Give the other threads a chance to do their work
        time.sleep(1)

    downloader._print_download_history()
    downloader.print_uptime()
start_daemon(thread_count=None, location=None, output_dir=None, summary_interval=10)
Run the downloader as a daemon

download_jobs

Source
@classmethod
def download_jobs(cls, job_ids, task_id=None, thread_count=None, output_dir=None):

    downloader = cls(thread_count=thread_count, output_dir=output_dir)
    thread_states = downloader.start(job_ids, task_id=task_id)
    while not common.SIGINT_EXIT and (
        not downloader.pending_queue.empty() or not downloader.downloading_queue.empty()
    ):
        sleep_time = 2
        time.sleep(sleep_time)

    downloader._print_download_history()
    downloader.print_uptime()
download_jobs(job_ids, task_id=None, thread_count=None, output_dir=None)
Run the downloader for explicit jobs, and terminate afterwards.

Methods

start

Source
def start(self, job_ids=None, task_id=None, summary_interval=10):
    # Create new queues
    self.start_time = time.time()
    self.pending_queue = queue.Queue()
    self.downloading_queue = queue.Queue()
    self.history_queue = queue.Queue()

    # If a job id has been specified then only load the queue up with that work
    if job_ids:
        self.history_queue_max = None
        self.get_jobs_downloads(job_ids, task_id)

    # otherwise create a queue thread the polls the app for wor
    else:
        self.start_queue_thread()

    task_download_states = self.start_download_threads(
        self.downloading_queue, self.pending_queue
    )
    thread_states = {"task_downloads": task_download_states}

    self.start_summary_thread(thread_states, interval=summary_interval)

    # Record all of the original threads immediately so that we can monitor their state change
    self._original_threads = threading.enumerate()
    return thread_states

start(self, job_ids=None, task_id=None, summary_interval=10) :

Source
def print_uptime(self):

    seconds = time.time() - self.start_time
    human_duration = common.get_human_duration(seconds)
    logger.info("Uptime: %s", human_duration)
print_uptime(self)
Return the amount of time that the uploader has been running, e.g "0:01:28"

start_queue_thread

Source
def start_queue_thread(self):


    thread = threading.Thread(
        name="QueueThread",
        target=self.queue_target,
        args=(self.pending_queue, self.downloading_queue),
    )
    thread.setDaemon(True)
    thread.start()
    return thread
start_queue_thread(self)
Start and return a thread that is responsible for pinging the app for Downloads to download (and populating the queue)

start_summary_thread

Source
def start_summary_thread(self, thread_states, interval):

    # logger.debug("thread_states: %s", thread_states)
    thread = threading.Thread(
        name="SummaryThread", target=self.print_summary, args=(thread_states, interval)
    )
    thread.setDaemon(True)
    thread.start()
    return thread
start_summary_thread(self, thread_states, interval)
Start and return a thread that is responsible for pinging the app for Downloads to download (and populating the queue)

start_download_threads

Source
def start_download_threads(self, downloading_queue, pending_queue):
    threads = []

    task_download_states = []

    for thread_number in range(self.thread_count):

        # create a task download state. This object is persistant and resused over and over
        # again for each Task that a thread downloads It's important that the state information
        # is wiped clean (reset) every time a new task begins.
        task_download_state = TaskDownloadState()
        task_download_states.append(task_download_state)
        thread = threading.Thread(
            target=self.download_target,
            args=(pending_queue, downloading_queue, task_download_state),
        )
        thread.setDaemon(True)
        thread.start()
        threads.append(thread)
    return task_download_states

start_download_threads(self, downloading_queue, pending_queue) :

start_reporter_thread

Source
def start_reporter_thread(self, download_data):
    reporter_thread_name = "ReporterThread"
    current_thread_name = threading.current_thread().name
    thread_number_match = re.match(r"Thread-(\d+)", current_thread_name)
    if thread_number_match:
        reporter_thread_name += "-%s" % thread_number_match.groups()[0]

    thread = threading.Thread(
        name=reporter_thread_name,
        target=self.reporter_target,
        args=(download_data, threading.current_thread()),
    )
    thread.setDaemon(True)
    thread.start()
    return thread

start_reporter_thread(self, download_data) :

queue_target

Source
def queue_target(self, pending_queue, downloading_queue):


    while True:

        try:

            empty_queue_slots = (self.thread_count * 2) - pending_queue.qsize()
            # If the queue is full, then sleep
            if empty_queue_slots <= 0:
                self.nap()
                continue


            logger.debug("empty_queue_slots: %s", empty_queue_slots)

            # Get the the next download. Cap the request to 20 Downloads at a time (this keeps
            # the request from not taking a super long time). This is an arbitrary number and
            # can be adjusted as needed
            max_count = 20
            downloads = self.get_next_downloads(count=min([empty_queue_slots, max_count]))

            # If there is not a download, then sleep
            if not downloads:
                logger.debug("No more downloads to queue. sleeping %s seconds...", self.naptime)
                self.nap()
                continue

            # Ensure that each Download is not already in the pending/downloading queues
            for download in downloads:
                if _in_queue(pending_queue, download, "download_id") or _in_queue(
                    downloading_queue, download, "download_id"
                ):
                    logger.warning("Omitting Redundant Download: %s", download)
                    continue

                # Add the download to the pending queue
                logger.debug("adding to pending queue: %s", download)
                self.pending_queue.put(download)
        except:
            logger.exception("Exception occurred in QueueThead:\n")

        # Wait for the queue to be consumed before querying for more
        self.nap()
queue_target(self, pending_queue, downloading_queue)

Fill the download queue by quering the app for the "next" Download. Only fill the queue to have as many items as there are threads.

Perpetually run this this function until the daemon has been terminated

Note: The size of the Queue can vary quite a lot. It's filled with Download objects - not file paths. A Download object represents all the files from a given Task. If a Task generates a single file, than a Queue of 20 will only contain 20 file paths. However, if a Task generates 100 files, than a Queue of 20 Downloads will actually contain 2000 paths to download.

nap

Source
def nap(self):
    while not common.SIGINT_EXIT:

        time.sleep(self.naptime)
        return

nap(self) :

get_next_downloads

Source
@common.dec_timer_exit(log_level=logging.DEBUG)
def get_next_downloads(self, count):
    try:
        downloads = _get_next_downloads(
            self.location, self.endpoint_downloads_next, self.api_client, count=count
        )
        return downloads
    except Exception as e:
        logger.exception("Could not get next download")

get_next_downloads(self, count) :

get_jobs_downloads

Source
def get_jobs_downloads(self, job_ids, task_id):

    task_ids = [t for t in task_id.split(",") if t] if task_id else [None]

    for job_id in job_ids:
        endpoint = self.endpoint_downloads_job % job_id
        for tid in task_ids:
            downloads = _get_job_download(endpoint, self.api_client, job_id, tid)
            if downloads:
                for task_download in downloads.get("downloads", []):
                    print("putting in queue: %s" % task_download)
                    self.pending_queue.put(task_download, block=True)
get_jobs_downloads(self, job_ids, task_id)

Get each job and optional comma-separated task list.

There will only be a task list if there is just one job, due to earlier arg validation.

If there is no task list, _get_job_download is called with tid=None (i.e. the whole job)

download_target

Source
@common.dec_catch_exception(raise_=True)
def download_target(self, pending_queue, downloading_queue, task_download_state):

    task_download_state.thread_name = threading.currentThread().name

    # Start the reporter sub thread (feeding it empty data) Setup the reporter child thread to
    # update the apps' Download entity
    self.start_reporter_thread(task_download_state)

    while not common.SIGINT_EXIT:
        task_download = None

        try:
            # Get the next task download from the pending queue.
            task_download = pending_queue.get(block=True)

            # Tell the pending queue that the task has been completed (to remove the task from
            # the queue)
            pending_queue.task_done()

            # Transfer the task download into the downloading queue
            downloading_queue.put(task_download)

            logger.debug("download: %s", task_download)

            # The task_download_state variable is constructed as a mutable object (a class) so
            # that it can be passed into multiple functions/threads and have them read/write to
            # it like a global variable. Initialize/reset the task_download_state
            task_download_state.initialize(task_download)
            task_download_state.status = TaskDownloadState.STATE_DOWNLOADING

            # Explicitly report the Download status (to indicate that it's now "downloading"
            self.report_download_status(task_download_state)

            output_dir = task_download["output_dir"]
            logger.info("output directory: %s", output_dir)

            if self.output_dir:
                output_dir = self.output_dir
                logger.info("Overriding default output directory: %s", output_dir)

            # If the output folder cannot be created, create one in the current directory.
            try:
                safe_mkdirs(output_dir)
            except OSError:
                output_dir = os.path.join(os.getcwd(),DOWNLOADS_FOLDER)
                logger.warning("Can't create desired outout directory! Using '{}' instead.".format(output_dir))

            for file_info in task_download["files"]:

                # Create file state object
                file_download_state = FileDownloadState()
                file_download_state.file_info = file_info
                file_download_state.thread_name = threading.currentThread().name
                file_download_state.download_id = task_download["download_id"]

                # attach the file state object to the task state object
                task_download_state.file_download_states[file_download_state] = file_info

                # Define the local filepath to download to
                local_filepath = os.path.join(output_dir, file_info["relative_path"])

                # Record the filepath to the state object
                file_download_state.filepath = local_filepath

                logger.debug("Handling task file: %s", local_filepath)

                # Get the immediate parent directory for the file to be downloaded. Note that
                # this is not necessarily the same as the output_dir. The output_dir is simply
                # the top level directory to transer the files to. It does not necessarily
                # account for a render's generated subdirectories
                dirpath = os.path.dirname(local_filepath)

                # Ensure that the destination directory exists and set open permissions
                logger.debug("Creating destination directory if necessary: %s", dirpath)
                file_download_state.status = FileDownloadState.STATE_PREPARING_DIRECTORY
                safe_mkdirs(dirpath)

                logger.debug("chmodding directory: %s", dirpath)
                try:
                    chmod(dirpath, 0o777)
                except:
                    logger.warning("Failed to chmod filepath  %s", dirpath)

                file_download_state.time_started = time.time()
                self.download_file(
                    file_info["url"], local_filepath, file_info["md5"], file_download_state
                )

                file_download_state.status = FileDownloadState.STATE_COMPLETE
                file_download_state.time_completed = time.time()
                self.add_to_history(file_download_state)

            # Update the status to downloaded
            task_download_state.status = TaskDownloadState.STATE_COMPLETE

            # Explicitly report_download_status to app.  We can't always rely on the reporter
            # thread because it only pings the app every x seconds.
            self.report_download_status(task_download_state)

            # Remove the Download from the downloading queue.
            downloading_queue.get(block=False)
            downloading_queue.task_done()

        # If there is an error when handling the task_download, put it back into the pending
        # queue, and report to the app that it's pending again.
        except:
            if task_download:
                task_download_state.reset_bytes_downloaded()
                task_download_state.status = TaskDownloadState.STATE_ERROR
                logger.exception(
                    "Failed to download Download %s", task_download.get("download_id")
                )
                logger.debug("Putting Download back on queue: %s", task_download)
                pending_queue.put(task_download)
                # Remove the Download from the downloading queue.
                downloading_queue.get(block=False)
                downloading_queue.task_done()

                # Explicitly report_download_status to app.  We can't always rely on the
                # reporter thread because it only pings the app every x seconds. Make sure this
                # doesn't throw an exception. Don't want to kill the thread.
                try:
                    self.report_download_status(task_download_state)
                except:
                    # I think the worst that could happen is that the Download may not get its
                    # status changed to "downloaded".  This will eventually get cleaned up by
                    # the cron, and prompt a redownloading
                    download_id = task_download_state.task_download.get("download_id")
                    logger.exception(
                        "Failed to report final status for Download %s, due to error",
                        download_id,
                    )

        # Reset The task_download_state object
        task_download_state.reset()

    # If the daemon is terminated, clean up the active Download, resetting it's status on the
    # app
    logger.debug("Exiting thread. Cleaning up state for Download: ")
    task_download_state.reset_bytes_downloaded()
    task_download_state.status = TaskDownloadState.STATE_ERROR
    self.report_download_status(task_download_state)
    downloading_queue.get(block=True)
    downloading_queue.task_done()
download_target(self, pending_queue, downloading_queue, task_download_state)

This function is called in a new thread (and many threads may be executing this at a single time). This function is responsible for downloading a single Download (essentially an entity in Datastore which represents the output data from a single Task). This may consist of many files.

This function pulls one Download from the pending queue and attempts to download it. If it fails, it places it back on the queue.

The function also spawns a child thread that is responsible for constantly updating the app with the status of the Download, such as: - status (e.g. "pending", "downloading", "downloaded") - bytes transferred (the total amout of bytes that have been transferred for the Download. Note that these bytes encompass all of the bytes that have been transferred for ALL of the files that are part of the Download (as opposed to only a single file)

task_download_state: a class, serving as global mutable object, which allows this thread to report data about its Download state, so that other threads can read and output that data. This object is persistant for each thread, and is used over and over again, everytime thime this function is called, for each Task that a thread downloads It's important that the state information is wiped clean (reset) every time a new task begins. This is the resposnbility of this function

reporter_target

Source
def reporter_target(self, task_download_state, downloader_thread):

    while True:

        try:

            if common.SIGINT_EXIT:
                task_download_state.status = TaskDownloadState.STATE_ERROR
                task_download_state.reset_bytes_downloaded()

            if task_download_state.task_download != None:
                bytes_downloaded = task_download_state.get_bytes_downloaded()
                response_string, response_code = self.report_download_status(
                    task_download_state
                )
        except:
            logger.exception("failed to report download status")

        sleep_time = 5

        time.sleep(sleep_time)

        # Check to make sure that that the downloader thread that this reporter thread is
        # reporting about is still alive. Otherwise exit the reporter loop
        if not downloader_thread.is_alive():
            logger.warning(
                "Detected %s thread is dead. Exiting %s thread now",
                downloader_thread.name,
                threading.current_thread().name,
            )
            return

        # Check to make sure that that the downloader thread that this reporter thread is
        # reporting about is still alive. Otherwise exit the reporter loop
        if not downloader_thread.is_alive():
            logger.warning(
                "Detected %s thread is dead. Exiting %s thread now",
                downloader_thread.name,
                threading.current_thread().name,
            )
            return

reporter_target(self, task_download_state, downloader_thread) :

download_file

Source
@common.DecRetry(tries=3, static_sleep=1)
def download_file(self, url, local_filepath, md5, file_state):

    # Reset bytes downloaded to 0 (in case of retries)
    file_state.bytes_downloaded = 0

    logger.debug("Checking for existing file %s", local_filepath)

    # If the file already exists on disk
    if os.path.isfile(local_filepath):
        file_state.status = FileDownloadState.STATE_HASHING_EXISTING_FILE

        # Create a callback function that the md5 hashing function will call periodically
        callback = functools.partial(self._update_file_state_callback, file_state)

        local_md5 = common.generate_md5(
            local_filepath,
            poll_seconds=self.md5_progess_polling,
            callback=callback,
        )
        # If the local md5 matchs the expected md5 then no need to download. Skip to next file
        if md5 == local_md5:
            file_state.use_existing = True
            logger.info("Existing file is up to date: %s", local_filepath)
            return

        logger.debug("md5 does not match existing file: %s vs %s", md5, local_md5)

        logger.debug("Deleting dirty file: %s", local_filepath)
        delete_file(local_filepath)

    file_state.status = FileDownloadState.STATE_DOWNLOADING

    # Download to a temporary file and then move it
    dirpath, filename = os.path.split(local_filepath)
    # hack to use tempfile to generate a unique filename.  close file object immediately.  This
    # will get thrown out soon
    tmpfile = tempfile.NamedTemporaryFile(prefix=filename, dir=dirpath)
    # close this. otherwise we get warnings/errors about the file handler not being closed
    tmpfile.close()

    tmp_filepath = tmpfile.name
    logger.debug("tmp_filepath: %s", tmp_filepath)
    # download the file.
    new_md5 = download_file(
        url, tmp_filepath, poll_rate=self.download_progess_polling, state=file_state
    )

    logger.debug("new_md5: %s", new_md5)
    if new_md5 != md5:
        try:
            logger.debug("Cleaning up temp file: %s", tmp_filepath)
            os.remove(tmp_filepath)
        except:
            logger.warning("Could not cleanup temp file: %s", tmp_filepath)
        raise Exception(
            "Downloaded file does not have expected md5. %s vs %s: %s"
            % (new_md5, md5, tmp_filepath)
        )

    logger.debug("File md5 verified: %s", tmp_filepath)

    logger.debug("Moving: %s to %s", tmp_filepath, local_filepath)
    shutil.move(tmp_filepath, local_filepath)

    # Set file permissions
    logger.debug("\tsetting file perms to 666")
    chmod(local_filepath, 0o666)
download_file(self, url, local_filepath, md5, file_state)
For the given file information, download the file to disk. Check whether the file already exists and matches the expected md5 before downloading.

add_to_history

Source
def add_to_history(self, file_download_state):

    self.history_queue.put(file_download_state, block=False)

    if self.history_queue_max != None:
        # Only save the last n file downloads
        while self.history_queue.qsize() > self.history_queue_max:
            self.history_queue.get(block=False)

add_to_history(self, file_download_state) :

report_download_status

Source
def report_download_status(self, task_download_state):
    download_id = task_download_state.task_download.get("download_id")
    if not download_id:
        return None, None

    data = {
        "download_id": download_id,
        "status": task_download_state.get_entity_status(),
        "bytes_downloaded": task_download_state.get_bytes_downloaded(),
        "bytes_to_download": task_download_state.task_download.get("size") or 0,
    }

    return self.api_client.make_request(
        self.endpoint_downloads_status, data=json.dumps(data), use_api_key=True
    )

report_download_status(self, task_download_state) :

Source
def print_summary(self, thread_states, interval):


    while True:
        try:
            self._print_threads_alive()
            self._print_download_history()
            self._print_pending_queue()
            self._print_active_downloads(thread_states)
            self._print_dead_thread_warnings()
        except:
            logger.exception("Failed to report Summary")

        time.sleep(interval)
print_summary(self, thread_states, interval)
  • total threads running
  • thread names
  • total download threads

Last 20 files downloaded Last 20 tasks downloaded Last 20 Downlaods downloaded

Currently downloading jobs Currenly downloading files Currently downloading Downloads

# SUMMARY

Active Thread Count: 14

Threads: ErrorThread MainThread MetricStore ProgressThread ProgressThread ProgressThread ProgressThread ProgressThread ProgressThread QueueThread Thread-1 Thread-2 Thread-3 Thread-4 Thread-5

ACTIVE DOWNLOADS

Job 08285 Task 004 - 80% (200/234MB) - Thread-1 /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/beauty/deep_lidar.deep.0005.exr HASHING EXISTING FILE 80% /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/data/deep_lidar.deep.0005.exr DOWLOADING 20%

Job 08285 Task 003 - 20% (20/234MB) - Thread-2 /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01142.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01074.exr

PENDING DOWNLOADS

Job 08285 Task 006
Job 08285 Task 007
Job 08285 Task 008
Job 08285 Task 009

HISTORY

Last 20 files downloaded:

/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01142.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01074.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01038.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01111.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01087.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01143.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01095.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01156.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01016.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01039.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01130.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01030.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01015.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01138.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01063.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01006.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01065.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01096.exr
/Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01055.exr

Last 20 tasks downloaded Job 08285 Task 004 (Download 3492394234) Job 08285 Task 002 (Download 3492394234) Job 08285 Task 003 (Download 3492394234) Job 08285 Task 001 (Download 3492394234) Job 08285 Task 000 (Download 3492394234) Job 08284 Task 065 (Download 3492394234) Job 08283 Task 064 (Download 3492394234) Job 08283 Task 063 (Download 3492394234) Job 08282 Task 032 (Download 3492394234) Job 08282 Task 025 (Download 3492394234) Job 08282 Task 001 (Download 3492394234)

construct_active_downloads_summary

Source
def construct_active_downloads_summary(self, task_download_states):

    header = "##### ACTIVE DOWNLOADS #####"
    content = ""

    for task_download_state in task_download_states:
        if task_download_state.task_download.get("download_id"):
            content += "\n\n%s" % self._contruct_active_download_summary(task_download_state)

    if content:
        return header + content

    return ""
construct_active_downloads_summary(self, task_download_states)

ACTIVE DOWNLOADS

Job 08285 Task 004 - 80% (200/234MB) - Thread-1 /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/beauty/deep_lidar.deep.0005.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/data/deep_lidar.deep.0005.exr

Job 08285 Task 003 - 20% (20/234MB) - Thread-2 /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01142.exr /Volumes/af/show/wash/shots/MJ/MJ_0080/sandbox/jfunk/katana/renders/MJ_0080_light_v001/deep_lidar/deep_lidar.deep.01074.exr

construct_file_downloads_history_summary

Source
def construct_file_downloads_history_summary(self, file_download_history):


    table_data = []

    for file_download_state in file_download_history:
        row_data = {}
        row_data["Completed at"] = file_download_state.time_completed
        row_data["Download ID"] = file_download_state.download_id
        row_data["Job"] = file_download_state.file_info["job_id"]
        row_data["Task"] = file_download_state.file_info["task_id"]
        row_data["Size"] = file_download_state.file_info["size"]
        row_data["Action"] = "Reused" if file_download_state.use_existing else "DL"
        row_data["Duration"] = file_download_state.get_duration()
        row_data["Thread"] = file_download_state.thread_name
        row_data["Filepath"] = file_download_state.filepath
        table_data.append(row_data)

    column_names = [
        "Completed at",
        "Download ID",
        "Job",
        "Task",
        "Size",
        "Action",
        "Duration",
        "Thread",
        "Filepath",
    ]
    title = " DOWNLOAD HISTORY %s " % (
        ("(last %s files)" % self.history_queue_max) if self.history_queue_max else ""
    )
    table = HistoryTableStr(
        data=list(table_data),
        column_names=column_names,
        title=title.center(100, "#"),
        footer="#" * 180,
        upper_headers=True,
    )

    return table.make_table_str()
construct_file_downloads_history_summary(self, file_download_history)

DOWNLOAD HISTORY

6227709558521856 Job 08285 Task 001 20MB CACHED /work/renders/light_v001/beauty/deep_lidar.deep.0005.exr 7095580349853434 Job 08285 Task 002 10MB DL /work/renders/spider_fly01/beauty/deep_lidar.deep.0005.exr 5343402947290140 Job 08284 Task 001 5MB DL /work/renders/light_v002/data/light_002.deep.0005.exr

Back to top