Source code for dp_tornado.helper.web.aws.dynamodb.item

# -*- coding: utf-8 -*-


import boto3

from dp_tornado.engine.helper import Helper as dpHelper


[docs]class ItemHelper(dpHelper):
[docs] def put( self, access_key_id, secret_access_key, region_name, table_name, items, overwrite_by_pkeys=None): items = items if self.helper.misc.type.check.array(items) else (items, ) return self._batch( access_key_id=access_key_id, secret_access_key=secret_access_key, region_name=region_name, table_name=table_name, items=items, fn_key='Item', fn_proc='put_item', overwrite_by_pkeys=overwrite_by_pkeys)
[docs] def remove( self, access_key_id, secret_access_key, region_name, table_name, keys, overwrite_by_pkeys=None): keys = keys if self.helper.misc.type.check.array(keys) else (keys, ) return self._batch( access_key_id=access_key_id, secret_access_key=secret_access_key, region_name=region_name, table_name=table_name, items=keys, fn_key='Key', fn_proc='delete_item', overwrite_by_pkeys=overwrite_by_pkeys)
[docs] def get( self, access_key_id, secret_access_key, region_name, table_name, keys): multi = True if self.helper.misc.type.check.array(keys) else False keys = keys if self.helper.misc.type.check.array(keys) else (keys, ) output = [] try: table = self.helper.web.aws.dynamodb.table.get( access_key_id=access_key_id, secret_access_key=secret_access_key, region_name=region_name, table_name=table_name) if not table: return False for key in keys: got = table.get_item(Key=key) if not got or 'Item' not in got: continue output.append(got['Item']) return output if multi else (output[0] if output else []) except Exception as e: self.logging.exception(e) return False
[docs] def query( self, access_key_id, secret_access_key, region_name, table_name, count=False, **kwargs): table = self.helper.web.aws.dynamodb.table.get( access_key_id=access_key_id, secret_access_key=secret_access_key, region_name=region_name, table_name=table_name) if count: kwargs['Select'] = 'COUNT' try: result = table.query(**kwargs) except Exception as e: self.logging.exception(e) return False if count: if 'Count' not in result: return False return result['Count'] if not result or 'Items' not in result: return [] return result['Items']
[docs] def scan( self, access_key_id, secret_access_key, region_name, table_name, count=False, **kwargs): table = self.helper.web.aws.dynamodb.table.get( access_key_id=access_key_id, secret_access_key=secret_access_key, region_name=region_name, table_name=table_name) if count: kwargs['Select'] = 'COUNT' try: result = table.scan(**kwargs) except Exception as e: self.logging.exception(e) return False if count: if 'Count' not in result: return False return result['Count'] if not result or 'Items' not in result: return [] return result['Items']
def _batch( self, access_key_id, secret_access_key, region_name, table_name, items, fn_key, fn_proc, overwrite_by_pkeys=None): try: table = self.helper.web.aws.dynamodb.table.get( access_key_id=access_key_id, secret_access_key=secret_access_key, region_name=region_name, table_name=table_name) if not table: return False with table.batch_writer(overwrite_by_pkeys=overwrite_by_pkeys) as batch: fn_proc = getattr(batch, fn_proc) for item in items: fn_proc(**{fn_key: item}) return True except Exception as e: self.logging.exception(e) return False def _payloadize(self, payload): output = {} for k, v in payload.items(): if v is None: output[k] = {'NULL': v} # A Null data type. elif v is True or v is False: output[k] = {'BOOL': v} # A Boolean data type. elif self.helper.misc.type.check.numeric(v): output[k] = {'N': str(v)} # A Number data type. elif self.helper.misc.type.check.string(v): output[k] = {'S': str(v)} # A String data type. elif self.helper.misc.type.check.dict(v): output[k] = {'L': self._payloadize(v)} # A List of attribute values. elif self.helper.misc.type.check.array(v): tk = 'NS' # A Number Set data type. for e in v: if self.helper.misc.type.check.string(e): tk = 'SS' # A String Set data type. output[k] = {tk: [str(e) for e in v]} else: output[k] = {'S': str(v)} return output