Source code for orion.core.io.database.pickleddb

# -*- coding: utf-8 -*-
"""
:mod:`orion.core.io.database.pickleddb` -- Pickled Database
===========================================================

.. module:: database
   :platform: Unix
   :synopsis: Implement permanent version of :class:`orion.core.io.database.EphemeralDB`

"""

from contextlib import contextmanager
import logging
import os
import pickle
from pickle import PicklingError

from filelock import FileLock

import orion.core
from orion.core.io.database import AbstractDB
from orion.core.io.database.ephemeraldb import EphemeralDB

log = logging.getLogger(__name__)

DEFAULT_HOST = os.path.join(orion.core.DIRS.user_data_dir, 'orion', 'orion_db.pkl')


[docs]def find_unpickable_doc(dict_of_dict): """Look for a dictionary that cannot be pickled.""" for name, collection in dict_of_dict.items(): documents = collection.find() for doc in documents: try: pickle.dumps(doc) except (PicklingError, AttributeError): return name, doc return None, None
[docs]def find_unpickable_field(doc): """Look for a field in a dictionary that cannot be pickled""" if not isinstance(doc, dict): doc = doc.to_dict() for k, v in doc.items(): try: pickle.dumps(v) except (PicklingError, AttributeError): return k, v return None, None
# pylint: disable=too-many-public-methods
[docs]class PickledDB(AbstractDB): """Pickled EphemeralDB to support permanancy and concurrency This is a very simple and inefficient implementation of a permanent database on disk for Oríon. The data is loaded from disk for every operation, and every operation is protected with a filelock. Parameters ---------- host: str File path to save pickled ephemeraldb. Default is {user data dir}/orion/orion_db.pkl ex: $HOME/.local/share/orion/orion_db.pkl """ # pylint: disable=unused-argument def __init__(self, host=DEFAULT_HOST, *args, **kwargs): super(PickledDB, self).__init__(host) if os.path.dirname(host): os.makedirs(os.path.dirname(host), exist_ok=True) @property def is_connected(self): """Return true, always.""" return True
[docs] def initiate_connection(self): """Do nothing""" pass
[docs] def close_connection(self): """Do nothing""" pass
[docs] def ensure_index(self, collection_name, keys, unique=False): """Create given indexes if they do not already exist in database. Indexes are only created if `unique` is True. """ with self.locked_database() as database: database.ensure_index(collection_name, keys, unique=unique)
[docs] def index_information(self, collection_name): """Return dict of names and sorting order of indexes""" with self.locked_database(write=False) as database: return database.index_information(collection_name)
[docs] def drop_index(self, collection_name, name): """Remove index from the database""" with self.locked_database() as database: return database.drop_index(collection_name, name)
[docs] def write(self, collection_name, data, query=None): """Write new information to a collection. Perform insert or update. .. seealso:: :meth:`AbstractDB.write` for argument documentation. """ with self.locked_database() as database: return database.write(collection_name, data, query=query)
[docs] def read(self, collection_name, query=None, selection=None): """Read a collection and return a value according to the query. .. seealso:: :meth:`AbstractDB.read` for argument documentation. """ with self.locked_database(write=False) as database: return database.read(collection_name, query=query, selection=selection)
[docs] def read_and_write(self, collection_name, query, data, selection=None): """Read a collection's document and update the found document. Returns the updated document, or None if nothing found. .. seealso:: :meth:`AbstractDB.read_and_write` for argument documentation. """ with self.locked_database() as database: return database.read_and_write(collection_name, query=query, data=data, selection=selection)
[docs] def count(self, collection_name, query=None): """Count the number of documents in a collection which match the `query`. .. seealso:: :meth:`AbstractDB.count` for argument documentation. """ with self.locked_database(write=False) as database: return database.count(collection_name, query=query)
[docs] def remove(self, collection_name, query): """Delete from a collection document[s] which match the `query`. .. seealso:: :meth:`AbstractDB.remove` for argument documentation. """ with self.locked_database() as database: return database.remove(collection_name, query=query)
def _get_database(self): """Read fresh DB state from pickled file""" if not os.path.exists(self.host): return EphemeralDB() with open(self.host, 'rb') as f: data = f.read() if not data: database = EphemeralDB() else: database = pickle.loads(data) return database def _dump_database(self, database): """Write pickled DB on disk""" tmp_file = self.host + '.tmp' try: with open(tmp_file, 'wb') as f: pickle.dump(database, f) except (PicklingError, AttributeError): collection, doc = find_unpickable_doc(database._db) # pylint: disable=protected-access log.error('Document in (collection: %s) is not pickable\ndoc: %s', collection, doc.to_dict()) key, value = find_unpickable_field(doc) log.error('because (value %s) in (field: %s) is not pickable', value, key) raise os.rename(tmp_file, self.host)
[docs] @contextmanager def locked_database(self, write=True): """Lock database file during wrapped operation call.""" lock = FileLock(self.host + '.lock') with lock.acquire(timeout=60): database = self._get_database() yield database if write: self._dump_database(database)