Skip to content

Module ciocore.client_db

Functions

get_default_db_filepath

Source
def get_default_db_filepath():



    return os.path.join(tempfile.gettempdir(), DB_FILENAME)
get_default_db_filepath()

Return a default filepath to use for storing the sqlite.

Depending on the platform, this will be located in some sort of temporary directory, such as: * /usr/temp (linux) * c:\users\\appdata\local\temp (windows)

chunker

Source
def chunker(input_list, chunk_size):

    # for pos in range(0, len(thelist), chunk_size):

    return (input_list[pos : pos + chunk_size] for pos in range(0, len(input_list), chunk_size))
chunker(input_list, chunk_size)
For the given list, return a tuple which creates smaller lists of the given size (length), containing the contents of the the orginal list

Classes

TableDB

Source
def __init__(self, db_filepath, thread_safe=True):
    self.thread_safe = thread_safe
    self.db_filepath = db_filepath
    self.connection = self.connnect_to_db(db_filepath)
    self.create_table()
TableDB(db_filepath, thread_safe=True)

Represents a single sql table to query/operate upon. This has admittedly limited functionality (as there can only be as single table to interact with).

Descendants

  • ciocore.client_db.FilesDB

Class variables

table_name

table_name :

columns

columns :

column_parameters

column_parameters :

Static methods

sqlite_dict_factory

Source
@classmethod
def sqlite_dict_factory(cls, cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d

sqlite_dict_factory(cursor, row) :

connnect_to_db

Source
@classmethod
def connnect_to_db(cls, db_filepath, timeout=300):

    # If the db filepath does not exist, create one with open permissions
    if not os.path.exists(db_filepath):
        file_utils.create_file(db_filepath)

    # Check to make sure we have write permissions
    if not os.access(db_filepath, os.W_OK):
        raise Exception("database filepath is not writable: %s" % db_filepath)

    connection = sqlite3.connect(
        db_filepath, detect_types=sqlite3.PARSE_DECLTYPES, timeout=timeout
    )
    connection.row_factory = cls.sqlite_dict_factory
    # overrides text type to be unicode
    # TODO:(this) may not be a good idea, but too lazy to convert all text to unicode first
    connection.text_factory = str
    return connection
connnect_to_db(db_filepath, timeout=300)

Create a connection to the database with the specified database filepath and return the connection object

timeout: float. The amount of seconds that a connection will wait to establish itself before it times out and raises an "OperationalError: database is locked" exception. This is important when threading bc sqlite can't handle that many concurrent connections and will quickly throw that exception unless the timeout is high enough. Honestly this is kind of a hack and may not work in all circumstances. We should really just query the db in a single thread (IMO -lws)

get_table_sql

Source
@classmethod
def get_table_sql(cls):


    sql = "CREATE TABLE IF NOT EXISTS %s (" % cls.table_name
    for column in cls.columns:
        sql += "\n%s %s NOT NULL %s," % (
            column["name"],
            column["sqlite_type"],
            "PRIMARY KEY" if column.get("primary_key") else "",
        )

    sql = sql.rstrip(",")
    sql += ");"
    return sql
get_table_sql()
create a table with the columns defined in self.columns

row_to_dict

Source
@classmethod
def row_to_dict(cls, row):

    row_dict = {}
    for idx, row in enumerate(cls.rows):
        column_name = cls.columns[idx]["name"]
        row_dict[column_name] = row

    return row_dict
row_to_dict(row)
Return the give sqlite row object as a python dict

dict_to_row_tuple

Source
@classmethod
def dict_to_row_tuple(cls, input_dict):

    row_data = []
    for column in cls.columns:
        column_name = column["name"]
        assert (
            column_name in input_dict
        ), "input dict does not have expected key (%s). Got %s" % (column_name, input_dict)
        column_data = input_dict[column_name]
        assert isinstance(
            column_data, column["python_type"]
        ), "Wrong type (%s). Expected %s: %s" % (
            type(column_data),
            column["python_type"],
            column_data,
        )
        row_data.append(column_data)

    return tuple(row_data)
dict_to_row_tuple(input_dict)
Convert the the given dictionary of data into a tuple of data that is suitable to use for a db row insert.

get_column_names

Source
@classmethod
def get_column_names(cls):

    names = [column["name"] for column in cls.columns]
    return names
get_column_names()
Return the name of all columns in the table

Methods

close_connection

Source
def close_connection(self):
    self.connection.close()

close_connection(self) :

sql_execute

Source
def sql_execute(self, sql, params=None, many=False):

    params = params or []

    if self.thread_safe:
        self.connection = self.connnect_to_db(self.db_filepath)

    cursor = self.connection.cursor()

    if many:
        cursor.executemany(sql, params)
    else:
        cursor.execute(sql, params)

    self.connection.commit()
    cursor.close()
sql_execute(self, sql, params=None, many=False)

Execute the given sql command

new_connection: bool. If True, will instantiate a new sql connection object. This is necessary when running this method across multiple threads.

many: bool. If True, will execute the given sql command in batch, using the given params as a list of variables for each call.

sql_fetch

Source
def sql_fetch(self, sql, params=None):

    params = params or []
    if self.thread_safe:
        self.connection = self.connnect_to_db(self.db_filepath)

    cursor = self.connection.cursor()
    cursor.execute(sql, params)
    data = cursor.fetchall()
    cursor.close()
    return data
sql_fetch(self, sql, params=None)
Fetch data from the db via the given sql string and paramaters

create_table

Source
def create_table(self):

    sql = self.get_table_sql()
    #         logger.debug("sql: %s", sql)
    self.sql_execute(sql)
create_table(self)
create the table (if it does not already exist in the db), with the self.columns

insert_row

Source
def insert_row(self, row_dict, replace=True):

    return self.insert_rows([row_dict], replace=replace)
insert_row(self, row_dict, replace=True)

Add the given row data (dictionary) to the the db

row_data: dict, where the keys are the columns names replace: bool. When True, will replace the the existing row in the db (if there is one) that matches the row's Primary Key.

insert_rows

Source
def insert_rows(self, row_dicts, replace=True):


    or_replace = "OR REPLACE" if replace else ""
    sql = "INSERT %s INTO %s VALUES (%s)" % (
        or_replace,
        self.table_name,
        ",".join("?" * len(self.columns)),
    )
    #         logger.debug("sql: %s", sql)
    row_tuples = [self.dict_to_row_tuple(row_dict) for row_dict in row_dicts]
    return self.sql_execute(sql, row_tuples, many=True)
insert_rows(self, row_dicts, replace=True)

Add the given list of of row data (dictionaries) to the the db

row_data: dict, where the keys are the columns names replace: bool. When True, will replace the the existing row in the db (if there is one) that matches the row's Primary Key.

FilesDB

Source
def __init__(self, db_filepath, thread_safe=True):
    super(FilesDB, self).__init__(db_filepath, thread_safe=thread_safe)
FilesDB(db_filepath, thread_safe=True)

Represents a single sql table to query/operate upon. This has admittedly limited functionality (as there can only be as single table to interact with).

Ancestors (in MRO)

  • ciocore.client_db.TableDB

Class variables

table_name

table_name :

columns

columns :

Static methods

add_file

Source
@classmethod
def add_file(cls, file_info, db_filepath=None, thread_safe=True):

    #         logger.debug("file_info: %s", file_info)
    cls.add_files([file_info], db_filepath=db_filepath, thread_safe=thread_safe)
add_file(file_info, db_filepath=None, thread_safe=True)

Add the given file to the files table

file_info: a dictionaries with the following keys: "filepath", "modtime" "filesize"

add_files

Source
@classmethod
def add_files(cls, files_info, db_filepath=None, thread_safe=True):

    if not db_filepath:
        db_filepath = get_default_db_filepath()

    db = cls(db_filepath, thread_safe=thread_safe)
    db.insert_rows(files_info, replace=True)

    # close the connection.  Not sure if this is actually necesary
    db.close_connection()
add_files(files_info, db_filepath=None, thread_safe=True)

Add the given list of files to the files db table.

files_info: a list of dictionaries, each with the following keys: "filepath", "modtime" "filesize"

query_files

Source
@classmethod
#     @common.dec_timer_exit(log_level=logging.DEBUG)
def query_files(cls, filepaths, return_dict=False, db_filepath=None, thread_safe=True):

    if not db_filepath:
        db_filepath = get_default_db_filepath()

    db = cls(db_filepath, thread_safe=thread_safe)

    files = []
    chunk_size = 500
    for filepaths_chunk in chunker(filepaths, chunk_size):
        # generate a sql string of files to match against
        files_sql = "(%s)" % ",".join("?" * len(filepaths_chunk))
        query = "SELECT * FROM files WHERE filepath IN %s" % files_sql
        file_rows = db.sql_fetch(query, filepaths_chunk)
        for file_ in file_rows:
            files.append(file_)

    # close the connection.  Not sure if this is actually necesary
    db.close_connection()

    if return_dict:
        files = dict([(entry["filepath"], entry) for entry in files])

    return files
query_files(filepaths, return_dict=False, db_filepath=None, thread_safe=True)

Query the db for all files which match the given filepaths.

Note that this achieved through chunked queries so not to breach sqlite's maximum of 999 arguments

one: bool. If True, treat th

query_file

Source
@classmethod
def query_file(cls, filepath, db_filepath=None, thread_safe=True):

    filepaths = [filepath]
    files = cls.query_files(
        filepaths, return_dict=False, db_filepath=db_filepath, thread_safe=thread_safe
    )
    if not files:
        return
    assert len(files) == 1, "More than one file entry found: %s" % files
    return files[0]
query_file(filepath, db_filepath=None, thread_safe=True)

Query the db for all files which match the given filepaths.

Note that this achieved through chunked queries so not to breach sqlite's maximum of 999 arguments

get_comparison_column_names

Source
@classmethod
def get_comparison_column_names(cls):

    return [name for name in cls.get_column_names() if name != "md5"]
get_comparison_column_names()
Return a list of column names whose data (for a given file row), should be used to compare against the data of a file from disk (e.g. to check whether a file on disk has stale cache or not). This should return names such as "filepath", "modtime, and "size", but not "md5" (bc the file on disk doesn't have the md5 info available...which is the whole point of quering the db for it :) )

get_cached_file

Source
@classmethod
def get_cached_file(cls, file_info, db_filepath=None, thread_safe=True):


    filepath = file_info["filepath"]
    file_entry = cls.query_file(filepath, db_filepath=db_filepath, thread_safe=thread_safe)

    # if there is a file entry, check to see whether it's stale info
    if file_entry:
        # Cycle through column of the file data to ensure that the db cache matches the current
        # file info
        for column_name in cls.get_comparison_column_names():
            # If there is any mismatch then break the loop
            if file_info.get(column_name) != file_entry[column_name]:
                return

    # If all columns match, then return the file entry from the db
    return file_entry
get_cached_file(file_info, db_filepath=None, thread_safe=True)
For the given file (file_info), return it's db entry if is not considered "stale", otherwise return None
Back to top