Module ciocore.client_db¶
Functions¶
get_default_db_filepath¶
Source
def get_default_db_filepath():
return os.path.join(tempfile.gettempdir(), DB_FILENAME)
get_default_db_filepath()
-
Return a default filepath to use for storing the sqlite.
Depending on the platform, this will be located in some sort of temporary directory, such as: * /usr/temp (linux) * c:\users\
\appdata\local\temp (windows)
chunker¶
Source
def chunker(input_list, chunk_size):
# for pos in range(0, len(thelist), chunk_size):
return (input_list[pos : pos + chunk_size] for pos in range(0, len(input_list), chunk_size))
chunker(input_list, chunk_size)
- For the given list, return a tuple which creates smaller lists of the given size (length), containing the contents of the the orginal list
Classes¶
TableDB¶
Source
def __init__(self, db_filepath, thread_safe=True):
self.thread_safe = thread_safe
self.db_filepath = db_filepath
self.connection = self.connnect_to_db(db_filepath)
self.create_table()
TableDB(db_filepath, thread_safe=True)
-
Represents a single sql table to query/operate upon. This has admittedly limited functionality (as there can only be as single table to interact with).
Descendants¶
- ciocore.client_db.FilesDB
Class variables¶
table_name¶
table_name
:columns¶
columns
:column_parameters¶
column_parameters
:Static methods¶
sqlite_dict_factory¶
Source
@classmethod def sqlite_dict_factory(cls, cursor, row): d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d
sqlite_dict_factory(cursor, row)
:connnect_to_db¶
Source
@classmethod def connnect_to_db(cls, db_filepath, timeout=300): # If the db filepath does not exist, create one with open permissions if not os.path.exists(db_filepath): file_utils.create_file(db_filepath) # Check to make sure we have write permissions if not os.access(db_filepath, os.W_OK): raise Exception("database filepath is not writable: %s" % db_filepath) connection = sqlite3.connect( db_filepath, detect_types=sqlite3.PARSE_DECLTYPES, timeout=timeout ) connection.row_factory = cls.sqlite_dict_factory # overrides text type to be unicode # TODO:(this) may not be a good idea, but too lazy to convert all text to unicode first connection.text_factory = str return connection
connnect_to_db(db_filepath, timeout=300)
-
Create a connection to the database with the specified database filepath and return the connection object
timeout: float. The amount of seconds that a connection will wait to establish itself before it times out and raises an "OperationalError: database is locked" exception. This is important when threading bc sqlite can't handle that many concurrent connections and will quickly throw that exception unless the timeout is high enough. Honestly this is kind of a hack and may not work in all circumstances. We should really just query the db in a single thread (IMO -lws)
get_table_sql¶
Source
@classmethod def get_table_sql(cls): sql = "CREATE TABLE IF NOT EXISTS %s (" % cls.table_name for column in cls.columns: sql += "\n%s %s NOT NULL %s," % ( column["name"], column["sqlite_type"], "PRIMARY KEY" if column.get("primary_key") else "", ) sql = sql.rstrip(",") sql += ");" return sql
get_table_sql()
- create a table with the columns defined in self.columns
row_to_dict¶
Source
@classmethod def row_to_dict(cls, row): row_dict = {} for idx, row in enumerate(cls.rows): column_name = cls.columns[idx]["name"] row_dict[column_name] = row return row_dict
row_to_dict(row)
- Return the give sqlite row object as a python dict
dict_to_row_tuple¶
Source
@classmethod def dict_to_row_tuple(cls, input_dict): row_data = [] for column in cls.columns: column_name = column["name"] assert ( column_name in input_dict ), "input dict does not have expected key (%s). Got %s" % (column_name, input_dict) column_data = input_dict[column_name] assert isinstance( column_data, column["python_type"] ), "Wrong type (%s). Expected %s: %s" % ( type(column_data), column["python_type"], column_data, ) row_data.append(column_data) return tuple(row_data)
dict_to_row_tuple(input_dict)
- Convert the the given dictionary of data into a tuple of data that is suitable to use for a db row insert.
get_column_names¶
Source
@classmethod def get_column_names(cls): names = [column["name"] for column in cls.columns] return names
get_column_names()
- Return the name of all columns in the table
Methods¶
close_connection¶
Source
def close_connection(self): self.connection.close()
close_connection(self)
:sql_execute¶
Source
def sql_execute(self, sql, params=None, many=False): params = params or [] if self.thread_safe: self.connection = self.connnect_to_db(self.db_filepath) cursor = self.connection.cursor() if many: cursor.executemany(sql, params) else: cursor.execute(sql, params) self.connection.commit() cursor.close()
sql_execute(self, sql, params=None, many=False)
-
Execute the given sql command
new_connection: bool. If True, will instantiate a new sql connection object. This is necessary when running this method across multiple threads.
many: bool. If True, will execute the given sql command in batch, using the given params as a list of variables for each call.
sql_fetch¶
Source
def sql_fetch(self, sql, params=None): params = params or [] if self.thread_safe: self.connection = self.connnect_to_db(self.db_filepath) cursor = self.connection.cursor() cursor.execute(sql, params) data = cursor.fetchall() cursor.close() return data
sql_fetch(self, sql, params=None)
- Fetch data from the db via the given sql string and paramaters
create_table¶
Source
def create_table(self): sql = self.get_table_sql() # logger.debug("sql: %s", sql) self.sql_execute(sql)
create_table(self)
- create the table (if it does not already exist in the db), with the self.columns
insert_row¶
Source
def insert_row(self, row_dict, replace=True): return self.insert_rows([row_dict], replace=replace)
insert_row(self, row_dict, replace=True)
-
Add the given row data (dictionary) to the the db
row_data: dict, where the keys are the columns names replace: bool. When True, will replace the the existing row in the db (if there is one) that matches the row's Primary Key.
insert_rows¶
Source
def insert_rows(self, row_dicts, replace=True): or_replace = "OR REPLACE" if replace else "" sql = "INSERT %s INTO %s VALUES (%s)" % ( or_replace, self.table_name, ",".join("?" * len(self.columns)), ) # logger.debug("sql: %s", sql) row_tuples = [self.dict_to_row_tuple(row_dict) for row_dict in row_dicts] return self.sql_execute(sql, row_tuples, many=True)
insert_rows(self, row_dicts, replace=True)
-
Add the given list of of row data (dictionaries) to the the db
row_data: dict, where the keys are the columns names replace: bool. When True, will replace the the existing row in the db (if there is one) that matches the row's Primary Key.
FilesDB¶
Source
def __init__(self, db_filepath, thread_safe=True):
super(FilesDB, self).__init__(db_filepath, thread_safe=thread_safe)
FilesDB(db_filepath, thread_safe=True)
-
Represents a single sql table to query/operate upon. This has admittedly limited functionality (as there can only be as single table to interact with).
Ancestors (in MRO)¶
- ciocore.client_db.TableDB
Class variables¶
table_name¶
table_name
:columns¶
columns
:Static methods¶
add_file¶
Source
@classmethod def add_file(cls, file_info, db_filepath=None, thread_safe=True): # logger.debug("file_info: %s", file_info) cls.add_files([file_info], db_filepath=db_filepath, thread_safe=thread_safe)
add_file(file_info, db_filepath=None, thread_safe=True)
-
Add the given file to the files table
file_info: a dictionaries with the following keys: "filepath", "modtime" "filesize"
add_files¶
Source
@classmethod def add_files(cls, files_info, db_filepath=None, thread_safe=True): if not db_filepath: db_filepath = get_default_db_filepath() db = cls(db_filepath, thread_safe=thread_safe) db.insert_rows(files_info, replace=True) # close the connection. Not sure if this is actually necesary db.close_connection()
add_files(files_info, db_filepath=None, thread_safe=True)
-
Add the given list of files to the files db table.
files_info: a list of dictionaries, each with the following keys: "filepath", "modtime" "filesize"
query_files¶
Source
@classmethod # @common.dec_timer_exit(log_level=logging.DEBUG) def query_files(cls, filepaths, return_dict=False, db_filepath=None, thread_safe=True): if not db_filepath: db_filepath = get_default_db_filepath() db = cls(db_filepath, thread_safe=thread_safe) files = [] chunk_size = 500 for filepaths_chunk in chunker(filepaths, chunk_size): # generate a sql string of files to match against files_sql = "(%s)" % ",".join("?" * len(filepaths_chunk)) query = "SELECT * FROM files WHERE filepath IN %s" % files_sql file_rows = db.sql_fetch(query, filepaths_chunk) for file_ in file_rows: files.append(file_) # close the connection. Not sure if this is actually necesary db.close_connection() if return_dict: files = dict([(entry["filepath"], entry) for entry in files]) return files
query_files(filepaths, return_dict=False, db_filepath=None, thread_safe=True)
-
Query the db for all files which match the given filepaths.
Note that this achieved through chunked queries so not to breach sqlite's maximum of 999 arguments
one: bool. If True, treat th
query_file¶
Source
@classmethod def query_file(cls, filepath, db_filepath=None, thread_safe=True): filepaths = [filepath] files = cls.query_files( filepaths, return_dict=False, db_filepath=db_filepath, thread_safe=thread_safe ) if not files: return assert len(files) == 1, "More than one file entry found: %s" % files return files[0]
query_file(filepath, db_filepath=None, thread_safe=True)
-
Query the db for all files which match the given filepaths.
Note that this achieved through chunked queries so not to breach sqlite's maximum of 999 arguments
get_comparison_column_names¶
Source
@classmethod def get_comparison_column_names(cls): return [name for name in cls.get_column_names() if name != "md5"]
get_comparison_column_names()
- Return a list of column names whose data (for a given file row), should be used to compare against the data of a file from disk (e.g. to check whether a file on disk has stale cache or not). This should return names such as "filepath", "modtime, and "size", but not "md5" (bc the file on disk doesn't have the md5 info available...which is the whole point of quering the db for it :) )
get_cached_file¶
Source
@classmethod def get_cached_file(cls, file_info, db_filepath=None, thread_safe=True): filepath = file_info["filepath"] file_entry = cls.query_file(filepath, db_filepath=db_filepath, thread_safe=thread_safe) # if there is a file entry, check to see whether it's stale info if file_entry: # Cycle through column of the file data to ensure that the db cache matches the current # file info for column_name in cls.get_comparison_column_names(): # If there is any mismatch then break the loop if file_info.get(column_name) != file_entry[column_name]: return # If all columns match, then return the file entry from the db return file_entry
get_cached_file(file_info, db_filepath=None, thread_safe=True)
- For the given file (file_info), return it's db entry if is not considered "stale", otherwise return None