Encryption at rest#

Data security is a big thing these days and its especially prevalent in my life as I work in Cyber Security, however, I am also not a massive business who can afford to pay for Mongo enterprise to support encryption at rest so this is my next best thing.

The new document subclass supports both encrypting fields with AES and hashing fields with SHA512.

This does not replace password hashing. Do not use this for storing passwords. Use an algorithm such as Bcrypt or Argon2id

Database design impacts#

Q: I want to encrypt my fields but need to be able to do query filtering on them?

A: I suggest mirroring the fields, one hashed and one encrypted.

You can run filters against the hashed field as hashes don’t change, and when you need to gain access to that data you can fetch it via the encrypted field.

Note that if your database is leaked this does mean the barrier to retrieving your data is only that the hash is cracked. The hashing used in Alaric should not be considered secure against brute forcing methods and thus you accept that data may be reversed from hashed fields.

Q: I want to hash my XXX field, but I also need to know the value sometimes?

A: See above.

Q: How do I deal with encrypted items in cursor’s?

A: Cursor’s have first class support for encrypted fields.

Q: Why are only the field values encrypted?

A: Because this is driver support, not built in.

If you want to hide keys, consider nesting your data behind a single key which is encrypted. It’s not a great idea, but it’d work.

Q: I want my as_filter method on my converter to be a hashed filter.

A: You can use alaric.util.hash_field to hash your returned dictionary values in the same way that Alaric will hash your DB. This means you can filter based on hashed fields easily.

For example:

from alaric import util

class Test:
    def __init__(self, data, id, _id=None):
        self.data = data
        self.id = id
        self._id = _id

    def as_dict(self):
        return {"data": self.data, "id": self.id}

    def as_filter(self):
        return {"id_hashed": util.hash_field("id", self.id)}

Q: I encrypted my data using a generated key and lost it. Help!

A: Your data is gone if you lose your key.

The whole point of encrypting fields is so people without the key are unable to decrypt the data. When you lose the key, you also fall into this group of people.

Q: How do I query a hashed field if I don’t know the hash?

A: Created your query as per usual, just wrap your comparison object in HQF(...) and Alaric will handle it for you.

from alaric import AQ
from alaric.comparison import EQ
from alaric.encryption import HQF

query = AQ(HQF(EQ("_id", 1)))

Class Reference#

class alaric.EncryptedDocument(database: AsyncIOMotorDatabase, document_name: str, *, encryption_key: bytes, hashed_fields: HashedFields | None = None, automatic_hashed_fields: AutomaticHashedFields | None = None, encrypted_fields: EncryptedFields | None = None, converter: Type[T] | None = None, encrypt_all_fields: bool = False)#
__init__(database: AsyncIOMotorDatabase, document_name: str, *, encryption_key: bytes, hashed_fields: HashedFields | None = None, automatic_hashed_fields: AutomaticHashedFields | None = None, encrypted_fields: EncryptedFields | None = None, converter: Type[T] | None = None, encrypt_all_fields: bool = False)#
Parameters:
  • database (AsyncIOMotorDatabase) – The database we are connected to

  • document_name (str) – What this collection should be called

  • encryption_key (bytes) – The key to use for AES encryption

  • hashed_fields (Optional[HashedFields]) – A list of fields to SHA512 hash when encountered

  • automatic_hashed_fields (Optional[AutomaticHashedFields]) – A list of fields to create an additional column in the db for with a hashed variant without exposing the hashed data to the end user.

  • encrypted_fields (Optional[EncryptedFields]) – A list of fields to AES encrypt when encountered

  • converter (Optional[Type[T]]) – An optional class to try to convert all data-types which return either Dict or List into

  • encrypt_all_fields (bool) –

    If set to True, encrypt all fields regardless of hashed_fields and encrypted_fields options.

    This option respects ignored fields.

1from motor.motor_asyncio import AsyncIOMotorClient
2
3client = AsyncIOMotorClient(connection_url)
4database = client["my_database"]
5config_document = Document(database, "config")
async bulk_insert(data: List[Dict], ignore_fields: IgnoreFields | None = None) None#

Given a List of Dictionaries, bulk insert all the given dictionaries in a single call.

Notes

Supports encrypted and hashed fields.

Parameters:
  • data (List[Dict]) – The data to bulk insert

  • ignore_fields (Optional[IgnoreFields]) –

    Any fields to ignore during the hashing / encryption step.

    Useful if your passing this method an already hashed value and you don’t want to hash the hash.

1# Insert 25 documents
2await Document.bulk_insert(
3    {"_id": i}
4    for i in range(25)
5)
async change_field_to(filter_dict: Dict[str, Any] | Buildable | Filterable, field: str, new_value: Any) None#

Modify a single field and change the value.

Parameters:
  • filter_dict (Union[Dict[Any, Any], Buildable, Filterable]) – A dictionary to use as a filter or AQ object.

  • field (str) – The key for the field to increment

  • new_value (Any) – What the field should get changed to

1# Assuming a data structure of
2# {"_id": 1, "prefix": "!"}
3await Document.change_field_to({"_id": 1}, "prefix", "?")
4
5# This will now look like
6# {"_id": 1, "prefix": "?"}
property collection_name: str#

The connected collections name.

async count(filter_dict: Dict[Any, Any] | Buildable | Filterable) int#

Return a count of how many items match the filter.

Parameters:

filter_dict (Union[Dict[Any, Any], Buildable, Filterable]) – The count filer.

Returns:

How many items matched the filter.

Return type:

int

1# How many items have the `enabled` field set to True
2count: int = await Document.count({"enabled": True})
create_cursor() Cursor#
async delete(filter_dict: Dict | Buildable | Filterable) DeleteResult | None#

Delete an item from the Document if an item with the provided filter exists.

Parameters:

filter_dict (Union[Dict, Buildable, Filterable]) – A dictionary to use as a filter or AQ object.

Returns:

The result of deletion if it occurred.

Return type:

Optional[DeleteResult]

1# Delete items with a `prefix` of `!`
2await Document.delete({"prefix": "!"})
async delete_all() None#

Delete all data associated with this document.

Notes

This will attempt to complete the operation in a single call, however, if that fails it will fall back to manually deleting items one by one.

Warning

There is no going back if you call this accidentally.

This also currently doesn’t appear to work.

property document_name: str#

Same as collection_name()

async find(filter_dict: Dict[str, Any] | Buildable | Filterable, projections: Dict[str, Any] | Projection | None = None, *, try_convert: bool = True) Dict[str, Any] | Type[T] | None#

Find and return one item.

Parameters:
  • filter_dict (Union[Dict, Buildable, Filterable]) – A dictionary to use as a filter or AQ object.

  • projections (Optional[Union[Dict[str, Any], Projection]]) – Specify the data you want returned from matching queries.

  • try_convert (bool) –

    Whether to attempt to run convertors on returned data.

    Defaults to True

Returns:

The result of the query

Return type:

Optional[Union[Dict[str, Any], Type[T]]]

1# Find the document where the `_id` field is equal to `my_id`
2data: dict = await Document.find({"_id": "my_id"})
async find_many(filter_dict: Dict[str, Any] | Buildable | Filterable, projections: Dict[str, Any] | Projection | None = None, *, try_convert: bool = True) List[Dict[str, Any] | Type[T]]#

Find and return all items matching the given filter.

Parameters:
  • filter_dict (Union[Dict[str, Any], Buildable, Filterable]) – A dictionary to use as a filter or AQ object.

  • projections (Optional[Union[Dict[str, Any], Projection]]) – Specify the data you want returned from matching queries.

  • try_convert (bool) –

    Whether to attempt to run convertors on returned data.

    Defaults to True

Returns:

The result of the query

Return type:

List[Union[Dict[str, Any], Type[T]]]

Notes

This uses a cursor internally, consider using them for more complicated queries.

1# Find all documents where the key `my_field` is `true`
2data: list[dict] = await Document.find_many({"my_field": True})
classmethod generate_aes_key() bytes#

Generate a valid AES key for usage with this class.

The output should be stored in an environment variable for future usage as otherwise you will lose your data.

For storage purposes, you may find the following methods useful:
  • bytes.hex()

  • bytes.fromhex()

Returns:

A valid AES key

Return type:

bytes

async get_all(filter_dict: Dict[str, Any] | Buildable | Filterable | None = None, projections: Dict[str, Any] | Projection | None = None, *args: Any, try_convert: bool = True, **kwargs: Any) List[Dict[str, Any] | Type[T] | None]#

Fetches and returns all items which match the given filter.

Parameters:
  • filter_dict (Optional[Union[Dict[str, Any], Buildable, Filterable]]) – A dictionary to use as a filter or AQ object.

  • projections (Optional[Union[Dict[str, Any], Projection]]) – Specify the data you want returned from matching queries.

  • try_convert (bool) –

    Whether to attempt to run convertors on returned data.

    Defaults to True

Returns:

The items matching the filter

Return type:

List[Optional[Union[Dict[str, Any], Type[T]]]]

1data: list[dict] = await Document.get_all()
async increment(filter_dict: Dict[str, Any] | Buildable | Filterable, field: str, amount: int | float) None#

Increment the provided field.

Parameters:
  • filter_dict (Union[Dict[str, Any], Buildable, Filterable]) – The ‘thing’ we want to increment

  • field (str) – The key for the field to increment

  • amount (Union[int, float]) – How much to increment (or decrement) by

Notes

This seamlessly handles incrementing encrypted fields.

1# Assuming a data structure of
2# {"_id": 1, "counter": 4}
3await Document.increment({"_id": 1}, "counter", 1)
4
5# Now looks like
6# {"_id": 1, "counter": 5}
Raises:
  • ValueError – Nested field updates on encrypted fields is not supported.

  • ValueError – Item to increment didn’t exist with this filter.

Notes

You can also use negative numbers to decrease the count of a field.

async insert(data: Dict[str, Any] | Saveable, *, ignore_fields: IgnoreFields | None = None) None#

Insert the provided data into the document.

Parameters:
  • data (Union[Dict[str, Any], Saveable]) – The data to insert

  • ignore_fields (Optional[IgnoreFields]) –

    Any fields to ignore during the hashing / encryption step.

    Useful if your passing this method an already hashed value and you don’t want to hash the hash.

  • code-block: (..) –

    python: :linenos:

    # If you don’t provide an _id, # Mongo will generate one for you automatically await Document.insert({“_id”: 1, “data”: “hello world”})

property raw_collection: AsyncIOMotorCollection#

The connection collection instance.

property raw_database: AsyncIOMotorDatabase#

Access to the database instance.

async unset(filter_dict: Dict[str, Any] | Buildable | Filterable, field: Any) None#

Delete a given field on a collection

Parameters:
  • filter_dict (Union[Dict[str, Any], Buildable, Filterable]) – The fields to match on (Think _id)

  • field (Any) – The field to remove

1# Assuming we have a document that looks like
2# {"_id": 1, "field_one": True, "field_two": False}
3await Document.unset({"_id": 1}, "field_two")
4
5# This data will now look like the following
6# {"_id": 1, "field_one": True}
async update(filter_dict: Dict[str, Any] | Buildable | Filterable, update_data: Dict[str, Any] | Saveable, option: str = 'set', *args: Any, ignore_fields: IgnoreFields | None = None, **kwargs: Any) None#

Performs an UPDATE operation.

Parameters:
1# Update the document with an `_id` of 1
2# So that it now equals the second argument
3await Document.upsert({"_id": 1}, {"_id": 1, "data": "new data"})
async upsert(filter_dict: Dict[str, Any] | Buildable | Filterable, update_data: Dict[str, Any] | Saveable, option: str = 'set', *args: Any, ignore_fields: IgnoreFields | None = None, **kwargs: Any) None#

Performs an UPSERT operation.

Parameters:
1# Update the document with an `_id` of `1`
2# So that it now equals the second argument
3# NOTE: If a document with an `_id` of `1`
4# does not exist, then this method will
5# insert the data instead.
6await Document.update({"_id": 1}, {"_id": 1, "data": "new data"})