view env/bin/dynamodb_load @ 4:79f47841a781 draft

"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author shellac
date Thu, 14 May 2020 16:47:39 -0400
parents 26e78fe6e8c4
children
line wrap: on
line source

#!/Users/pldms/Development/Projects/2020/david-matthews-galaxy/guppy_basecaller/env/bin/python3

import argparse
import os

import boto
from boto.compat import json
from boto.compat import six
from boto.dynamodb.schema import Schema


DESCRIPTION = """Load data into one or more DynamoDB tables.

For each table, data is read from two files:
  - {table_name}.metadata for the table's name, schema and provisioned
    throughput (only required if creating the table).
  - {table_name}.data for the table's actual contents.

Both files are searched for in the current directory. To read them from
somewhere else, use the --in-dir parameter.

This program does not wipe the tables prior to loading data. However, any
items present in the data files will overwrite the table's contents.
"""


def _json_iterload(fd):
    """Lazily load newline-separated JSON objects from a file-like object."""
    buffer = ""
    eof = False
    while not eof:
        try:
            # Add a line to the buffer
            buffer += fd.next()
        except StopIteration:
            # We can't let that exception bubble up, otherwise the last
            # object in the file will never be decoded.
            eof = True
        try:
            # Try to decode a JSON object.
            json_object = json.loads(buffer.strip())

            # Success: clear the buffer (everything was decoded).
            buffer = ""
        except ValueError:
            if eof and buffer.strip():
                # No more lines to load and the buffer contains something other
                # than whitespace: the file is, in fact, malformed.
                raise
            # We couldn't decode a complete JSON object: load more lines.
            continue

        yield json_object


def create_table(metadata_fd):
    """Create a table from a metadata file-like object."""


def load_table(table, in_fd):
    """Load items into a table from a file-like object."""
    for i in _json_iterload(in_fd):
        # Convert lists back to sets.
        data = {}
        for k, v in six.iteritems(i):
            if isinstance(v, list):
                data[k] = set(v)
            else:
                data[k] = v
        table.new_item(attrs=data).put()


def dynamodb_load(tables, in_dir, create_tables):
    conn = boto.connect_dynamodb()
    for t in tables:
        metadata_file = os.path.join(in_dir, "%s.metadata" % t)
        data_file = os.path.join(in_dir, "%s.data" % t)
        if create_tables:
            with open(metadata_file) as meta_fd:
                metadata = json.load(meta_fd)
            table = conn.create_table(
                name=t,
                schema=Schema(metadata["schema"]),
                read_units=metadata["read_units"],
                write_units=metadata["write_units"],
            )
            table.refresh(wait_for_active=True)
        else:
            table = conn.get_table(t)

        with open(data_file) as in_fd:
            load_table(table, in_fd)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        prog="dynamodb_load",
        description=DESCRIPTION
    )
    parser.add_argument(
        "--create-tables",
        action="store_true",
        help="Create the tables if they don't exist already (without this flag, attempts to load data into non-existing tables fail)."
    )
    parser.add_argument("--in-dir", default=".")
    parser.add_argument("tables", metavar="TABLES", nargs="+")

    namespace = parser.parse_args()

    dynamodb_load(namespace.tables, namespace.in_dir, namespace.create_tables)