|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def register_table_methods(base_classes, **kwargs): |
|
base_classes.insert(0, TableResource) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TableResource: |
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
|
|
def batch_writer(self, overwrite_by_pkeys=None): |
|
"""Create a batch writer object. |
|
|
|
This method creates a context manager for writing |
|
objects to Amazon DynamoDB in batch. |
|
|
|
The batch writer will automatically handle buffering and sending items |
|
in batches. In addition, the batch writer will also automatically |
|
handle any unprocessed items and resend them as needed. All you need |
|
to do is call ``put_item`` for any items you want to add, and |
|
``delete_item`` for any items you want to delete. |
|
|
|
Example usage:: |
|
|
|
with table.batch_writer() as batch: |
|
for _ in range(1000000): |
|
batch.put_item(Item={'HashKey': '...', |
|
'Otherstuff': '...'}) |
|
# You can also delete_items in a batch. |
|
batch.delete_item(Key={'HashKey': 'SomeHashKey'}) |
|
|
|
:type overwrite_by_pkeys: list(string) |
|
:param overwrite_by_pkeys: De-duplicate request items in buffer |
|
if match new request item on specified primary keys. i.e |
|
``["partition_key1", "sort_key2", "sort_key3"]`` |
|
|
|
""" |
|
return BatchWriter( |
|
self.name, self.meta.client, overwrite_by_pkeys=overwrite_by_pkeys |
|
) |
|
|
|
|
|
class BatchWriter: |
|
"""Automatically handle batch writes to DynamoDB for a single table.""" |
|
|
|
def __init__( |
|
self, table_name, client, flush_amount=25, overwrite_by_pkeys=None |
|
): |
|
""" |
|
|
|
:type table_name: str |
|
:param table_name: The name of the table. The class handles |
|
batch writes to a single table. |
|
|
|
:type client: ``botocore.client.Client`` |
|
:param client: A botocore client. Note this client |
|
**must** have the dynamodb customizations applied |
|
to it for transforming AttributeValues into the |
|
wire protocol. What this means in practice is that |
|
you need to use a client that comes from a DynamoDB |
|
resource if you're going to instantiate this class |
|
directly, i.e |
|
``boto3.resource('dynamodb').Table('foo').meta.client``. |
|
|
|
:type flush_amount: int |
|
:param flush_amount: The number of items to keep in |
|
a local buffer before sending a batch_write_item |
|
request to DynamoDB. |
|
|
|
:type overwrite_by_pkeys: list(string) |
|
:param overwrite_by_pkeys: De-duplicate request items in buffer |
|
if match new request item on specified primary keys. i.e |
|
``["partition_key1", "sort_key2", "sort_key3"]`` |
|
|
|
""" |
|
self._table_name = table_name |
|
self._client = client |
|
self._items_buffer = [] |
|
self._flush_amount = flush_amount |
|
self._overwrite_by_pkeys = overwrite_by_pkeys |
|
|
|
def put_item(self, Item): |
|
self._add_request_and_process({'PutRequest': {'Item': Item}}) |
|
|
|
def delete_item(self, Key): |
|
self._add_request_and_process({'DeleteRequest': {'Key': Key}}) |
|
|
|
def _add_request_and_process(self, request): |
|
if self._overwrite_by_pkeys: |
|
self._remove_dup_pkeys_request_if_any(request) |
|
self._items_buffer.append(request) |
|
self._flush_if_needed() |
|
|
|
def _remove_dup_pkeys_request_if_any(self, request): |
|
pkey_values_new = self._extract_pkey_values(request) |
|
for item in self._items_buffer: |
|
if self._extract_pkey_values(item) == pkey_values_new: |
|
self._items_buffer.remove(item) |
|
logger.debug( |
|
"With overwrite_by_pkeys enabled, skipping " "request:%s", |
|
item, |
|
) |
|
|
|
def _extract_pkey_values(self, request): |
|
if request.get('PutRequest'): |
|
return [ |
|
request['PutRequest']['Item'][key] |
|
for key in self._overwrite_by_pkeys |
|
] |
|
elif request.get('DeleteRequest'): |
|
return [ |
|
request['DeleteRequest']['Key'][key] |
|
for key in self._overwrite_by_pkeys |
|
] |
|
return None |
|
|
|
def _flush_if_needed(self): |
|
if len(self._items_buffer) >= self._flush_amount: |
|
self._flush() |
|
|
|
def _flush(self): |
|
items_to_send = self._items_buffer[: self._flush_amount] |
|
self._items_buffer = self._items_buffer[self._flush_amount :] |
|
response = self._client.batch_write_item( |
|
RequestItems={self._table_name: items_to_send} |
|
) |
|
unprocessed_items = response['UnprocessedItems'] |
|
if not unprocessed_items: |
|
unprocessed_items = {} |
|
item_list = unprocessed_items.get(self._table_name, []) |
|
|
|
|
|
self._items_buffer.extend(item_list) |
|
logger.debug( |
|
"Batch write sent %s, unprocessed: %s", |
|
len(items_to_send), |
|
len(self._items_buffer), |
|
) |
|
|
|
def __enter__(self): |
|
return self |
|
|
|
def __exit__(self, exc_type, exc_value, tb): |
|
|
|
|
|
while self._items_buffer: |
|
self._flush() |
|
|