Skip to content

Module ciocore.conductor_submit

General Submitter for sending jobs and uploads to Conductor

Classes

Submit

Source
def __init__(self, args):

    self.upload_paths = list(args.get("upload_paths", []))
    self.md5_caching = args.get("md5_caching", True)
    self.enforced_md5s = args.get("enforced_md5s", {})
    self.database_filepath = args.get("database_filepath", "")
    self.api_client = api_client.ApiClient()

    self.progress_handler = None
    self.uploader_ = None

    self.payload = {
        # Attributes that have default==None, indicate that the coresponding arg is required.
        "instance_type": None,
        "project": None,
        "tasks_data": None,
        "output_path": None,
        # Attributes below are optional.
        "autoretry_policy": {},
        "docker_image": "",
        "environment": {},
        "job_title": "Unknown job title",
        "local_upload": True,
        "location": "",
        "preemptible": False,
        "metadata": {},
        "priority": 5,
        "scout_frames": "",
        "software_package_ids": [],
        "max_instances": 0,
        "chunk_size": 1,
        "owner": getpass.getuser(),
        "notify": [],
        # Attributes below are set during main run.
        "upload_files": [],
        "upload_size": 0,
    }

    for arg in self.payload:
        default = self.payload[arg]
        try:
            self.payload[arg] = args[arg]
        except KeyError:
            if default is None:
                logger.error("Submit: You must provide the '{}' argument.".format(arg))
                raise

    # HACK: Posix -> Windows submission - must windowize output_path. Only available for
    # developers. If a customer tries to submit from Mac to Windows, then they have access to
    # Windows instances by mistake. Yes this code could get them out of a bind, but it will
    # generate support tickets when they try to use the uploader daemon for example.
    self.ensure_windows_drive_letters = FEATURE_DEV and self.payload["instance_type"].endswith("-w")
    self.payload["output_path"] = self._ensure_windows_drive_letter(self.payload["output_path"])

    self.payload["notify"] = {"emails": self.payload["notify"]}

    for arg in self.payload:
        if arg not in ["tasks_data", "upload_paths"]:
            logger.debug("{}:{}".format(arg, self.payload[arg]))
Submit(args)

Conductor Submission object.

Initialize the payload and some other properties needed for preparing uploads.

Payload properties must not be None. Any property that has None for the default value must be provided in the args.

Methods

upload_progress_callback

Source
def upload_progress_callback(self, upload_stats):
    '''
    Call the progress handler
    '''

    if self.progress_handler:
        logger.debug("Sending progress update to {}".format(self.progress_handler))        
        self.progress_handler(upload_stats)
upload_progress_callback(self, upload_stats)
Call the progress handler

stop_work

Source
def stop_work(self):
    '''
    Cancel the submission process
    '''

    logger.debug("Submitter was requested to stop work.")

    if self.uploader_:
        logger.debug("Uploader set to cancel.")            
        self.uploader_.cancel=True
stop_work(self)
Cancel the submission process

main

Source
def main(self):


    self._log_threads(message_template="{thread_count} threads before starting upload")

    processed_filepaths =  file_utils.process_upload_filepaths(self.upload_paths)
    file_map = {path: None for path in processed_filepaths}

    if self.payload["local_upload"]:
        file_map = self._handle_local_upload(file_map)

    elif self.enforced_md5s:
        file_map = self._enforce_md5s(file_map)

    for path in file_map:
        expanded = self._expand_stats(path, file_map[path])


        self.payload["upload_files"].append(expanded)
        self.payload["upload_size"] += expanded["st_size"]


    self._log_threads(message_template="{thread_count} threads after upload")

    logger.info("Sending Job...")

    response, response_code = self.api_client.make_request(
        uri_path="jobs/", 
        data=json.dumps(self.payload), 
        raise_on_error=False, 
        use_api_key=True
    )

    if response_code not in [201, 204]:
        raise Exception("Job Submission failed: Error %s ...\n%s" % (response_code, response))

    return json.loads(response), response_code
main(self)

Submit the job

There are two possible submission flows. 1. local_upload=True: md5 calcs and uploads are performed on the artist's machine in the session.

  1. local_upload=False: md5 calcs and uploads are performed on on any machine with access to the filesystem on which the files reside, and by the same paths as the submission machine.
Back to top