Skip to content

Module ciocore.worker

Classes

Reporter

Source
def __init__(self, metric_store=None):
    self.metric_store = metric_store
    self.api_helper = ciocore.api_client.ApiClient()
    self.thread = None
    self.terminate = False
Reporter(metric_store=None)

Static methods

working

Source
@staticmethod
def working():
    return WORKING

working() :

Methods

kill

Source
def kill(self, block=False):
    self.terminate = True
    if block:
        logger.debug('joining reporter thread...')
        self.thread.join()
        logger.debug('reporter_thread exited')

kill(self, block=False) :

target

Source
def target(self):
    raise NotImplementedError

target(self) :

start

Source
def start(self):
    if self.thread:
        logger.error('threads already started. will not start more')
        return self.thread

    logger.debug('starting reporter thread')
    thd = threading.Thread(target=self.target, name=self.__class__.__name__)
    thd.daemon = True
    thd.start()
    self.thread = thd
    return self.thread

start(self) :

ThreadWorker

Source
def __init__(self, **kwargs):

    # the in_queue provides work for us to do
    self.in_queue = kwargs['in_queue']

    # results of work are put into the out_queue
    self.out_queue = kwargs['out_queue']

    # exceptions will be put here if provided
    self.error_queue = kwargs['error_queue']

    # set the thread count (default: 1)
    self.thread_count = int(kwargs.get('thread_count', 1))

    # an optional metric store to share counters between threads
    self.metric_store = kwargs['metric_store']

    # create a list to hold the threads that we create
    self.threads = []

    self.thread_complete_counter = Counter()
    self._worker_started = Marker()
    self._job_counter = Counter()
ThreadWorker(**kwargs)

Abstract worker class.

The class defines the basic function and data structures that all workers need.

TODO: move this into it's own lib

Descendants

  • ciocore.uploader._uploader.FileStatWorker
  • ciocore.uploader._uploader.HttpBatchWorker
  • ciocore.uploader._uploader.MD5OutputWorker
  • ciocore.uploader._uploader.MD5Worker
  • ciocore.uploader._uploader.UploadWorker

Static methods

PoisonPill

Source
@staticmethod
def PoisonPill():
    return 'PoisonPill'

PoisonPill() :

Methods

do_work

Source
def do_work(self, job):
    '''
    This needs to be implemented for each worker type. The work task from the in_queue is passed
    as the job argument.

    Returns the result to be passed to the out_queue
    '''

    raise NotImplementedError
do_work(self, job)

This needs to be implemented for each worker type. The work task from the in_queue is passed as the job argument.

Returns the result to be passed to the out_queue

check_for_poison_pill

Source
def check_for_poison_pill(self, job):
    if job == self.PoisonPill():
        self.mark_done()
        exit()

check_for_poison_pill(self, job) :

kill

Source
def kill(self, block=False):
    logger.debug('killing workers %s (%s threads)', self.__class__.__name__, len(self.threads))
    for _ in self.threads:
        self.in_queue.put(self.PoisonPill())

    if block:
        for index, thd in enumerate(self.threads):
            logger.debug('waiting for thread %s (%s)', index, self)
            thd.join(0.1)
            logger.debug('thread %s (%s) joined', index, self)

    return True

kill(self, block=False) :

join

Source
def join(self):
    logger.debug("Waiting for in_queue to join. ({}-{}). {} items left.".format(self.__class__.__name__, self, self.in_queue.qsize()))

    while True:
        try:
            logger.debug("Getting remaining task from in_queue ({}-{}). {} items left.".format(self.__class__.__name__, self, self.in_queue.qsize()))
            job = self.in_queue.get(block=False)
            logger.debug("Dropping task {} from in_queue ({}-{}). {} items left.".format(job, self.__class__.__name__, self, self.in_queue.qsize()))
            self.in_queue.task_done()
        except queue.Empty:
            logger.debug("in_queue is empty ({}-{}). {} items left.".format(self.__class__.__name__, self, self.in_queue.qsize()))
            break

    self.kill(True)

join(self) :

target

Source
@ciocore.common.dec_catch_exception(raise_=True)
def target(self, thread_int):

    while not ciocore.common.SIGINT_EXIT:
        try:

            job = None

            try:
                logger.debug("Worker querying for job")
                job = self.in_queue.get(timeout=2)
                queue_size = self.in_queue.qsize()                     

            except queue.Empty:

                if self._job_counter.value > 0:
                    logger.debug("Worker has completed all of its tasks.".format(job))
                    self.thread_complete_counter.decrement()
                    break

                else:
                    logger.debug("Worker waiting for first job")
                    continue

            logger.debug("Worker got job {}".format(job))
            self._job_counter.increment()
            logger.debug("Processing Job '{}' #{} on {}. {} tasks remaining in queue".format( job, 
                                                                                              self._job_counter.value, 
                                                                                              self, 
                                                                                              queue_size))

            # exit if we were passed 'PoisonPill'
            self.check_for_poison_pill(job)

            # start working on job
            try:
                output = None
                output = self.do_work(job, thread_int)
            except Exception as exception:
                logger.exception('CAUGHT EXCEPTION on job "{}" [{}]":\n'.format(job, self))

                # if there is no error queue to dump data into, then simply raise the exception
                if not self.error_queue:
                    raise

                # Otherwise put the exception in the error queue
                self.mark_done()
                self.error_queue.put(sys.exc_info())
                # exit the while loop to stop the thread
                break

            # put result in out_queue
            self.put_job(output)

            # signal that we are done with this task (needed for the Queue.join() operation to
            # work.
            self.mark_done()

        except Exception:
            logger.error('[thread %s]+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=', thread_int)
            logger.error(traceback.print_exc())
            logger.error(traceback.format_exc())
            raise

    logger.info("Worker {} completed".format(self))

target(self, thread_int) :

start

Source
def start(self):
    '''
    Start number_of_threads threads.
    '''
    if self.threads:
        logger.error('threads already started. will not start more')
        return self.threads

    for thread_int in range(self.thread_count):
        name = "%s-%s" % (self.__class__.__name__, thread_int)
        logger.debug('starting thread %s', name)

        # thread will begin execution on self.target()
        thd = threading.Thread(target=self.target, args=(thread_int,), name=name)

        # make sure threads don't stop the program from exiting
        thd.daemon = True

        # start thread
        thd.start()
        self.threads.append(thd)

    self.thread_complete_counter.value = self.thread_count

    return self.threads
start(self)
Start number_of_threads threads.

mark_done

Source
def mark_done(self):
    try:
        self.in_queue.task_done()

    except ValueError:
        # this will happen if we are draining queues
        logger.debug('WORKING: %s', WORKING)

        if WORKING:
            logger.error('error hit when marking task_done')
            # this should not happen if we are still working
            raise
    return

mark_done(self) :

put_job

Source
def put_job(self, job):
    # don't to anything if we were not provided an out_queue
    logger.debug("Attempting to put job to out_queue ({} -> {})".format(job,self, self.out_queue))

    if not self.out_queue:
        return

    # if were not supposed to be working, don't create new jobs
    if not WORKING:
        return

    logger.debug("Adding job to out_queue ({} -> {})".format(job,self, self.out_queue))

    # add item to job
    self.out_queue.put(job)
    return True

put_job(self, job) :

is_complete

Source
def is_complete(self):        
    return self.thread_complete_counter.value == 0

is_complete(self) :

MetricStore

Source
def __init__(self):
    self.metric_store = {}
    self.update_queue = queue.Queue()
    self.thread = None
    self.started = False
MetricStore()

This provides a thread-safe integer store that can be used by workers to share atomic counters.

Note: writes are eventually consistent

Methods

join

Source
def join(self):
    self.update_queue.join()
    self.thread.join()
    return True

join(self) :

start

Source
def start(self):
    '''
    needs to be single-threaded for atomic updates
    '''
    if self.started:
        logger.debug('metric_store already started')
        return None

    logger.debug('starting metric_store')

    self.stop = False
    self.thread = threading.Thread(target=self.target, name=self.__class__.__name__)
    self.thread.daemon = True
    self.thread.start()
    self.started = True

    return self.thread
start(self)
needs to be single-threaded for atomic updates

set

Source
def set(self, key, value):
    self.metric_store[key] = value

set(self, key, value) :

get

Source
def get(self, variable):
    return self.metric_store.get(variable, 0)

get(self, variable) :

increment

Source
def increment(self, variable, step_size=1, filename=""):
    self.update_queue.put(('increment', variable, step_size, filename))

increment(self, variable, step_size=1, filename='') :

do_increment

Source
def do_increment(self, *args):
    variable, step_size, filename = args

    # initialize variable to 0 if not set
    if not variable in self.metric_store:
        self.metric_store[variable] = 0

    # increment variable by step_size
    self.metric_store[variable] += step_size

    if filename:
        if 'files' not in self.metric_store:
            self.metric_store['files'] = {}

        if filename not in self.metric_store['files']:
            self.metric_store['files'][filename] = {'bytes_to_upload': 0, 'bytes_uploaded': 0, 'already_uploaded': False}

        if variable == 'bytes_uploaded':
            self.metric_store['files'][filename]['bytes_uploaded'] += step_size

        elif variable == 'bytes_to_upload':
            self.metric_store['files'][filename]['bytes_to_upload'] = step_size

        elif variable == 'already_uploaded':
            self.metric_store['files'][filename]['already_uploaded'] = step_size # True/False            

do_increment(self, *args) :

set_dict

Source
def set_dict(self, dict_name, key, value):
    self.update_queue.put(('set_dict', dict_name, key, value))

set_dict(self, dict_name, key, value) :

do_set_dict

Source
def do_set_dict(self, *args):
    dict_name, key, value = args

    if not dict_name in self.metric_store:
        self.metric_store[dict_name] = {}

    self.metric_store[dict_name][key] = value

do_set_dict(self, *args) :

get_dict

Source
def get_dict(self, dict_name, key=None):
    # if dict_name does not exist, return an empty dict
    if not dict_name in self.metric_store:
        return {}

    # if key was not provided, return full dict
    if not key:
        return self.metric_store[dict_name]

    # return value of key
    return self.metric_store[dict_name].get(key)

get_dict(self, dict_name, key=None) :

append

Source
def append(self, list_name, value):
    self.update_queue.put(('append', list_name, value))

append(self, list_name, value) :

do_append

Source
def do_append(self, *args):
    list_name, value = args

    # initialize to empty list if not yet created
    if not list_name in self.metric_store:
        self.metric_store[list_name] = []

    # append value to list
    self.metric_store[list_name] = value

do_append(self, *args) :

get_list

Source
def get_list(self, list_name):
    return self.metric_store.get(list_name, [])

get_list(self, list_name) :

target

Source
@ciocore.common.dec_catch_exception(raise_=True)
def target(self):
    logger.debug('created metric_store target thread')

    while not self.stop:

        logger.debug("Metric store self.stop={}".format(self.stop))

        try:
            # block until update given
            update_tuple = self.update_queue.get(True, 2)

        except queue.Empty:
            continue                

        method = update_tuple[0]
        method_args = update_tuple[1:]
        # check to see what action is to be carried out
        if method == 'increment':
            self.do_increment(*method_args)
        elif method == 'append':
            self.do_append(*method_args)
        elif method == 'set_dict':
            self.do_set_dict(*method_args)
        else:
            raise "method '%s' not valid" % method

        # mark task done
        self.update_queue.task_done()

target(self) :

JobManager

Source
def __init__(self, job_description, reporter_description=None):
    self.error = []
    self.workers = []
    self.reporters = []
    self.error_queue = queue.Queue()
    self.metric_store = MetricStore()
    self.work_queues = [queue.Queue()]
    self.job_description = job_description
    self.reporter_description = reporter_description
    self._queue_started = False
    self.error_handler_stop = False
    self.error_handler_thread = None
JobManager(job_description, reporter_description=None)

Methods

drain_queues

Source
def drain_queues(self):
    logger.error('draining queues')
    # http://stackoverflow.com/questions/6517953/clear-all-items-from-the-queue
    for the_queue in self.work_queues:
        the_queue.mutex.acquire()
        the_queue.queue.clear()
        the_queue.mutex.release()
    return True

drain_queues(self) :

mark_all_tasks_complete

Source
def mark_all_tasks_complete(self):
    logger.error('clearing out all tasks')
    # http://stackoverflow.com/questions/6517953/clear-all-items-from-the-queue
    for the_queue in self.work_queues:
        the_queue.mutex.acquire()
        the_queue.all_tasks_done.notify_all()
        the_queue.unfinished_tasks = 0
        the_queue.mutex.release()
    return True

mark_all_tasks_complete(self) :

kill_workers

Source
def kill_workers(self):
    global WORKING
    WORKING = False
    for worker in self.workers:
        logger.debug("Killing worker: %s", worker)
        worker.kill(block=True) # Wait to ensure the worker was killed

kill_workers(self) :

kill_reporters

Source
def kill_reporters(self):
    for reporter in self.reporters:
        logger.debug('killing reporter %s', reporter)
        reporter.kill()

kill_reporters(self) :

stop_work

Source
def stop_work(self, force=False):

    global WORKING

    if WORKING:

        logger.info("Stopping Worker Manager")

        WORKING = False  # stop any new jobs from being created
        self.drain_queues()  # clear out any jobs in queue
        self.kill_workers()  # kill all threads
        self.kill_reporters()
        self.mark_all_tasks_complete()  # reset task counts

        self.metric_store.stop = True
        self.metric_store.join()

        self.error_handler_stop = True
        self.error_handler_thread.join()

    else:
        logger.info("Worker Manager has already been stopped.")

    return self.error

stop_work(self, force=False) :

error_handler_target

Source
@ciocore.common.dec_catch_exception(raise_=True)
def error_handler_target(self):

    while not self.error_handler_stop:

        try:
            error = self.error_queue.get(True, 2)

        except queue.Empty:
            continue

        logger.error('Got something from the error queue: {}'.format(error))
        self.error.append(error)
        try:
            self.error_queue.task_done()
        except ValueError:
            pass

error_handler_target(self) :

start_error_handler

Source
def start_error_handler(self):

    logger.debug('Creating error handler thread')
    self.error_handler_thread = threading.Thread(target=self.error_handler_target, name="ErrorThread")
    self.error_handler_thread.daemon = True
    self.error_handler_thread.start()

    return None

start_error_handler(self) :

add_task

Source
def add_task(self, task):

    # This allows us to keep track of the difference between an empty queue
    # because no tasks have been added or an empty queue because all the tasks
    # have been completed
    if not self._queue_started:            
        self._queue_started = True

    self.work_queues[0].put(task)

    return True

add_task(self, task) :

start

Source
def start(self):
    global WORKING
    WORKING = True

    # start shared metric store
    self.metric_store.start()

    # create error handler
    self.start_error_handler()

    # create worker pools based on job_description
    next_queue = None
    last_queue = self.work_queues[0]
    last_worker = next(reversed(self.job_description))

    for worker_description in self.job_description:
        worker_class = worker_description[0]
        args = []
        kwargs = {}

        if len(worker_description) > 1:
            args = worker_description[1]

        if len(worker_description) > 2:
            kwargs = worker_description[2]

        kwargs['in_queue'] = last_queue

        if last_worker == worker_class:
            # the last worker does not need an output queue
            kwargs['out_queue'] = None
        else:
            next_queue = queue.Queue()
            self.work_queues.append(next_queue)
            kwargs['out_queue'] = next_queue

        kwargs['error_queue'] = self.error_queue
        kwargs['metric_store'] = self.metric_store

        worker = worker_class(*args, **kwargs)

        logger.debug('starting worker %s', worker_class.__name__)
        worker.start()
        self.workers.append(worker)
        last_queue = next_queue

    # start reporters
    if self.reporter_description:
        for reporter_class, download_id in self.reporter_description:
            reporter = reporter_class(self.metric_store)
            logger.debug('starting reporter %s', reporter_class.__name__)
            reporter.start(download_id)
            self.reporters.append(reporter)

    return True

start(self) :

join

Source
def join(self):
    ''' Block until all work is complete '''

    for _, worker in enumerate(self.workers):
        worker_class_name = worker.__class__.__name__
        logger.debug('waiting for %s workers to finish', worker_class_name)
        worker.join()
    logger.debug('all workers finished')
    self.metric_store.stop = True
    self.metric_store.join()
    logger.debug('metric store in sync')

    self.error_handler_stop = True
    self.error_handler_thread.join()

    if self.error:
        return self.error

    self.kill_workers()
    self.kill_reporters()

    return None
join(self)
Block until all work is complete

worker_queue_status_text

Source
def worker_queue_status_text(self):
    msg = '\n' + '#' * 80 + '\n'
    for index, worker_info in enumerate(self.job_description):
        worker_class = worker_info[0]
        q_size = self.work_queues[index].qsize()
        worker_threads = self.workers[index].threads

        # thread.isAlive() was renamed to is_alive() in Python 3.9 
        try:
            num_active_threads = len([thd for thd in worker_threads if thd.isAlive()])
        except AttributeError:
            num_active_threads = len([thd for thd in worker_threads if thd.is_alive()])

        msg += '%s \titems in queue: %s' % (q_size, worker_class.__name__)
        msg += '\t\t%s threads' % num_active_threads
        msg += '\n'
    return msg

worker_queue_status_text(self) :

is_complete

Source
def is_complete(self):

    is_complete = True

    # Manager has completed if all workers have completed
    for worker in self.workers:
        logger.debug("Worker {} is complete: {}".format(worker.__class__.__name__, worker.is_complete()))
        is_complete &= worker.is_complete()

    logger.debug("All Workers have completed: {}".format(is_complete))

    return is_complete

is_complete(self) :

Counter

Source
def __init__(self):
    self.value = 0
    self._lock = threading.Lock()
Counter()

Methods

increment

Source
def increment(self):
    with self._lock:
        self.value += 1

increment(self) :

decrement

Source
def decrement(self):
    with self._lock:
        self.value -= 1            

decrement(self) :

Marker

Source
def __init__(self):
    self.marker = False
    self._lock = threading.Lock()
Marker()

Methods

mark

Source
def mark(self):

    with self._lock:

        logger.debug("Lock aquired (marker={})".format(self.marker))

        if self.marker:
            return True

        else:            

            self.marker = True
            return False        

mark(self) :

Back to top