diff --git a/.circleci/config.yml b/.circleci/config.yml index eda5d4c..693d3b9 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,26 +1,35 @@ -version: 2 +version: 2.1 + +workflows: + build: + jobs: + - build: + context: + - circleci-user + jobs: build: docker: - - image: ubuntu:16.04 + - image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/sources-python:1.1.0 steps: - checkout - - run: - name: 'Install python 3.5.2' - command: | - apt update - apt install --yes python3 python3-pip python3-venv - run: name: 'Setup virtualenv' command: | - mkdir -p ~/.virtualenvs + pyenv global 3.11.7 python3 -m venv ~/.virtualenvs/singer-python source ~/.virtualenvs/singer-python/bin/activate - pip install -U pip setuptools - make install + pip install -U 'pip==20.3.4' 'setuptools<51.0.0' + pip install .[dev] + - run: + name: 'Pylint' + command: | + source ~/.virtualenvs/singer-python/bin/activate + pip install pylint + pylint singer --extension-pkg-whitelist=ciso8601 -d missing-docstring,broad-exception-raised,broad-exception-caught,bare-except,too-many-return-statements,too-many-branches,too-many-arguments,no-else-return,too-few-public-methods,fixme,protected-access,consider-using-f-string,reimported - run: - name: 'Run tests' + name: 'Run Tests' command: | - # Need to re-activate the virtualenv source ~/.virtualenvs/singer-python/bin/activate - make test + pip install nose2 + nose2 -v -s tests diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 0000000..ef49bc0 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,15 @@ +# Description of change +(write a short description or paste a link to JIRA) + +# Manual QA steps + - + +# Risks + - + +# Rollback steps + - revert this branch + +#### AI generated code +https://internal.qlik.dev/general/ways-of-working/code-reviews/#guidelines-for-ai-generated-code +- [ ] this PR has been written with the help of GitHub Copilot or another generative AI tool diff --git a/CHANGELOG.md b/CHANGELOG.md index b0a590c..e98553c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,88 @@ # Changelog +## 6.8.0 + * Rename state key `activate_versions` to `versions` in all relevant locations [#194](https://github.com/singer-io/singer-python/pull/194) + +## 6.7.0 + * Remove `key` from set_version, get_version, and clear_version state functions [#192](https://github.com/singer-io/singer-python/pull/192) + +## 6.6.0 + * Export singer.state functions from singer + * Rename singer.state.write_bookmark to singer.state.set_bookmark + * [#190](https://github.com/singer-io/singer-python/pull/190) + +## 6.5.0 + * Add `activate_versions` state functions [#188](https://github.com/singer-io/singer-python/pull/188) + * Deprecates bookmarks.py, functions are moved to state.py + +## 6.4.0 + * Update clear_offset to remove offset key from bookmark [#185](https://github.com/singer-io/singer-python/pull/185) + +## 6.3.0 + * Support allow_nan in message JSON output [#183](https://github.com/singer-io/singer-python/pull/183) + +## 6.2.3 + * Default type for non-standard data types is string [#182](https://github.com/singer-io/singer-python/pull/182) + +## 6.2.2 + * Updates json schema generation to not emit dates + * Handle multiple schemas with anyOf and emit them in a specific order + * Do not emit error messages when checking multiple schemas and a subsequent schema passes + * [#179](https://github.com/singer-io/singer-python/pull/179) + +## 6.2.1 + * Fixes json schema generation to not treat numbers as dates + * Fixes json schema generation to handle empty arrays + * Fixes record transformation to handle fields that could be either formatted string or nested data structure + * [#177](https://github.com/singer-io/singer-python/pull/177) + +## 6.2.0 + * Adds json schema generation [#175](https://github.com/singer-io/singer-python/pull/175) + +## 6.1.0 + * Make ensure_ascii Dynamic with Default Set to True in JSON Serialization. Required to handle the special characters [#168](https://github.com/singer-io/singer-python/pull/168) + +## 6.0.1 + * Pin backoff and simplejson to any version greater than or equal to the previously allowed version, up to the next major version [#167](https://github.com/singer-io/singer-python/pull/167) + +## 6.0.0 + * Bump backoff version to 2.2.1. This version drops support for python 3.5, but adds it for 3.10 [#165](https://github.com/singer-io/singer-python/pull/165) + +## 5.13.0 + * Add support for dev mode argument parsing [#158](https://github.com/singer-io/singer-python/pull/158) + +## 5.12.2 + * Removes pinned `pytz` version [#152](https://github.com/singer-io/singer-python/pull/152) + +## 5.12.1 + * Removes normalize function from `singer.decimal` to avoid scientific notation [#146](https://github.com/singer-io/singer-python/pull/146) + +## 5.12.0 + * Added support for `singer.decimal` types to transformer [#125](https://github.com/singer-io/singer-python/pull/125) + +## 5.11.0 + * Make `utils.handle_top_exception()` critically log each line of the exception separately so each line is prepended with `CRITICAL` [#141](https://github.com/singer-io/singer-python/pull/141) + +## 5.10.0 + * Add exception classes [#138](https://github.com/singer-io/singer-python/pull/138) + +## 5.9.1 + * Add nested schema support to Transformer's `filter_data_by_metadata` function [#130](https://github.com/singer-io/singer-python/pull/130) + +## 5.8.1 + * Allow empty lists for `key-properties` and `valid-replication-keys` in `get_standard_metadata` [#106](https://github.com/singer-io/singer-python/pull/106) + +## 5.7.0 + * Bumping backoff dependency to 1.8.0 for aiohttp support + * Added `get_selected_streams` to the `Catalog` class that orders streams returned with `currently_syncing` from state (if present) at the front of the list. [#100](https://github.com/singer-io/singer-python/pull/100) + * Added helper called `write_catalog` for use in discovery mode [#101](https://github.com/singer-io/singer-python/pull/101) + +## 5.6.1 + * Retain argument paths in `parse_args` [#88](https://github.com/singer-io/singer-python/pull/88) + +## 5.5.0 + * Add the ability to specify a default value when getting a bookmark [#95](https://github.com/singer-io/singer-python/pull/95) + ## 5.4.1 * Resolve JSON Schema refs when the schema contains an `anyOf` element [#93](https://github.com/singer-io/singer-python/pull/93) diff --git a/Makefile b/Makefile index 296c20f..2fe943e 100644 --- a/Makefile +++ b/Makefile @@ -8,5 +8,5 @@ install: check_prereqs python3 -m pip install -e '.[dev]' test: install - pylint singer -d missing-docstring,broad-except,bare-except,too-many-return-statements,too-many-branches,too-many-arguments,no-else-return,too-few-public-methods,fixme,protected-access + pylint singer --extension-pkg-whitelist=ciso8601 -d missing-docstring,broad-except,bare-except,too-many-return-statements,too-many-branches,too-many-arguments,no-else-return,too-few-public-methods,fixme,protected-access nosetests --with-doctest -v diff --git a/setup.cfg b/setup.cfg index b88034e..08aedd7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,2 @@ [metadata] -description-file = README.md +description_file = README.md diff --git a/setup.py b/setup.py index 0973369..e0b32eb 100755 --- a/setup.py +++ b/setup.py @@ -4,17 +4,18 @@ import subprocess setup(name="singer-python", - version='5.4.1', + version='6.8.0', description="Singer.io utility library", author="Stitch", classifiers=['Programming Language :: Python :: 3 :: Only'], url="http://singer.io", install_requires=[ - 'pytz==2018.4', - 'jsonschema==2.6.0', - 'simplejson==3.11.1', - 'python-dateutil>=2.6.0', - 'backoff==1.3.2', + 'pytz>=2018.4', + 'jsonschema>=2.6.0,==2.*', + 'simplejson>=3.13.2,==3.*', + 'python-dateutil>=2.7.3,==2.*', + 'backoff>=2.2.1,==2.*', + 'ciso8601>=2.3.1,==2.*', ], extras_require={ 'dev': [ diff --git a/singer/__init__.py b/singer/__init__.py index 2ad7333..613dc99 100644 --- a/singer/__init__.py +++ b/singer/__init__.py @@ -55,11 +55,16 @@ resolve_schema_references ) -from singer.catalog import Catalog +from singer.catalog import ( + Catalog, + CatalogEntry +) from singer.schema import Schema -from singer.bookmarks import ( - write_bookmark, +from singer.state import ( + set_bookmark, + # for backwards compatibility, use set_bookmark instead + set_bookmark as write_bookmark, get_bookmark, clear_bookmark, reset_stream, @@ -68,6 +73,17 @@ get_offset, set_currently_syncing, get_currently_syncing, + set_version, + clear_version, + get_version, +) + +from singer.exceptions import ( + SingerConfigurationError, + SingerDiscoveryError, + SingerError, + SingerRetryableRequestError, + SingerSyncError, ) if __name__ == "__main__": diff --git a/singer/bookmarks.py b/singer/bookmarks.py index e72c752..b506ff2 100644 --- a/singer/bookmarks.py +++ b/singer/bookmarks.py @@ -1,46 +1,33 @@ -def ensure_bookmark_path(state, path): - submap = state - for path_component in path: - if submap.get(path_component) is None: - submap[path_component] = {} +from singer import state as st + +## Note - This file is deprecated, use state.py functions. - submap = submap[path_component] - return state +def ensure_bookmark_path(state, path): + return st.ensure_state_path(state, path) def write_bookmark(state, tap_stream_id, key, val): - state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id]) - state['bookmarks'][tap_stream_id][key] = val - return state + return st.set_bookmark(state, tap_stream_id, key, val) def clear_bookmark(state, tap_stream_id, key): - state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id]) - state['bookmarks'][tap_stream_id].pop(key, None) - return state + return st.clear_bookmark(state, tap_stream_id, key) def reset_stream(state, tap_stream_id): - state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id]) - state['bookmarks'][tap_stream_id] = {} - return state + return st.reset_stream(state, tap_stream_id) -def get_bookmark(state, tap_stream_id, key): - return state.get('bookmarks', {}).get(tap_stream_id, {}).get(key) +def get_bookmark(state, tap_stream_id, key, default=None): + return st.get_bookmark(state, tap_stream_id, key, default) def set_offset(state, tap_stream_id, offset_key, offset_value): - state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id, "offset", offset_key]) - state['bookmarks'][tap_stream_id]["offset"][offset_key] = offset_value - return state + return st.set_offset(state, tap_stream_id, offset_key, offset_value) def clear_offset(state, tap_stream_id): - state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id, "offset"]) - state['bookmarks'][tap_stream_id]["offset"] = {} - return state + return st.clear_offset(state, tap_stream_id) -def get_offset(state, tap_stream_id): - return state.get('bookmarks', {}).get(tap_stream_id, {}).get("offset") +def get_offset(state, tap_stream_id, default=None): + return st.get_offset(state, tap_stream_id, default) def set_currently_syncing(state, tap_stream_id): - state['currently_syncing'] = tap_stream_id - return state + return st.set_currently_syncing(state, tap_stream_id) -def get_currently_syncing(state): - return state.get('currently_syncing') +def get_currently_syncing(state, default=None): + return st.get_currently_syncing(state, default) diff --git a/singer/catalog.py b/singer/catalog.py index 8d52262..d8ef147 100644 --- a/singer/catalog.py +++ b/singer/catalog.py @@ -1,13 +1,26 @@ '''Provides an object model for a Singer Catalog.''' - import json import sys -from singer.schema import Schema +from . import metadata as metadata_module +from .bookmarks import get_currently_syncing +from .logger import get_logger +from .schema import Schema + +LOGGER = get_logger() + + +def write_catalog(catalog): + # If the catalog has no streams, log a warning + if not catalog.streams: + LOGGER.warning("Catalog being written with no streams.") + + json.dump(catalog.to_dict(), sys.stdout, indent=2) # pylint: disable=too-many-instance-attributes class CatalogEntry(): + # pylint: disable=too-many-positional-arguments def __init__(self, tap_stream_id=None, stream=None, key_properties=None, schema=None, replication_key=None, is_view=None, database=None, table=None, row_count=None, @@ -33,7 +46,9 @@ def __eq__(self, other): return self.__dict__ == other.__dict__ def is_selected(self): - return self.schema.selected # pylint: disable=no-member + mdata = metadata_module.to_map(self.metadata) + # pylint: disable=no-member + return self.schema.selected or metadata_module.get(mdata, (), 'selected') def to_dict(self): result = {} @@ -78,7 +93,7 @@ def __eq__(self, other): @classmethod def load(cls, filename): - with open(filename) as fp: # pylint: disable=invalid-name + with open(filename, encoding="utf-8") as fp: return Catalog.from_dict(json.load(fp)) @classmethod @@ -116,3 +131,27 @@ def get_stream(self, tap_stream_id): if stream.tap_stream_id == tap_stream_id: return stream return None + + def _shuffle_streams(self, state): + currently_syncing = get_currently_syncing(state) + + if currently_syncing is None: + return self.streams + + matching_index = 0 + for i, catalog_entry in enumerate(self.streams): + if catalog_entry.tap_stream_id == currently_syncing: + matching_index = i + break + top_half = self.streams[matching_index:] + bottom_half = self.streams[:matching_index] + return top_half + bottom_half + + + def get_selected_streams(self, state): + for stream in self._shuffle_streams(state): + if not stream.is_selected(): + LOGGER.info('Skipping stream: %s', stream.tap_stream_id) + continue + + yield stream diff --git a/singer/exceptions.py b/singer/exceptions.py new file mode 100644 index 0000000..b13016d --- /dev/null +++ b/singer/exceptions.py @@ -0,0 +1,30 @@ +""" +The exceptions module contains Exception subclasses whose instances might be +raised by the singer library or taps that use the singer library. +""" + +class SingerError(Exception): + """The base Exeception class for singer""" + def __init__(self, message): + """Create an exeception with a multiline error message + + The first line is the error's class name. The subsequent lines are + the message that class was created with. + """ + super().__init__(f"{self.__class__.__name__}\n{message}") + + +class SingerConfigurationError(SingerError): + """The base class of errors encountered before discovery and before sync mode""" + + +class SingerDiscoveryError(SingerError): + """The base class of errors encountered in discovery mode""" + + +class SingerSyncError(SingerError): + """The base class of errors encountered in sync mode""" + + +class SingerRetryableRequestError(SingerError): + """This error is meant to be thrown when a tap encounters a retryable request""" diff --git a/singer/messages.py b/singer/messages.py index 0602cf5..941670c 100644 --- a/singer/messages.py +++ b/singer/messages.py @@ -1,24 +1,26 @@ import sys -import dateutil.parser import pytz import simplejson as json +import ciso8601 import singer.utils as u +from .logger import get_logger +LOGGER = get_logger() class Message(): '''Base class for messages.''' - def asdict(self): # pylint: disable=no-self-use + def asdict(self): raise Exception('Not implemented') def __eq__(self, other): return isinstance(other, Message) and self.asdict() == other.asdict() def __repr__(self): - pairs = ["{}={}".format(k, v) for k, v in self.asdict().items()] + pairs = [f"{k}={v}" for k, v in self.asdict().items()] attrstr = ", ".join(pairs) - return "{}({})".format(self.__class__.__name__, attrstr) + return f"{self.__class__.__name__}({attrstr})" def __str__(self): return str(self.asdict()) @@ -167,7 +169,7 @@ def asdict(self): def _required_key(msg, k): if k not in msg: - raise Exception("Message is missing required key '{}': {}".format(k, msg)) + raise Exception(f"Message is missing required key '{k}': {msg}") return msg[k] @@ -180,13 +182,20 @@ def parse_message(msg): # lossy conversions. However, this will affect # very few data points and we have chosen to # leave conversion as is for now. - obj = json.loads(msg) + obj = json.loads(msg, use_decimal=True) msg_type = _required_key(obj, 'type') if msg_type == 'RECORD': time_extracted = obj.get('time_extracted') if time_extracted: - time_extracted = dateutil.parser.parse(time_extracted) + try: + time_extracted = ciso8601.parse_datetime(time_extracted) + except: + LOGGER.warning("unable to parse time_extracted with ciso8601 library") + time_extracted = None + + + # time_extracted = dateutil.parser.parse(time_extracted) return RecordMessage(stream=_required_key(obj, 'stream'), record=_required_key(obj, 'record'), version=obj.get('version'), @@ -209,12 +218,17 @@ def parse_message(msg): return None -def format_message(message): - return json.dumps(message.asdict(), use_decimal=True) +def format_message(message, ensure_ascii=True, allow_nan=False): + return json.dumps( + message.asdict(), + use_decimal=True, + ensure_ascii=ensure_ascii, + allow_nan=allow_nan + ) -def write_message(message): - sys.stdout.write(format_message(message) + '\n') +def write_message(message, ensure_ascii=True, allow_nan=False): + sys.stdout.write(format_message(message, ensure_ascii=ensure_ascii, allow_nan=allow_nan) + '\n') sys.stdout.flush() diff --git a/singer/metadata.py b/singer/metadata.py index 57be90c..41153ea 100644 --- a/singer/metadata.py +++ b/singer/metadata.py @@ -21,3 +21,26 @@ def write(compiled_metadata, breadcrumb, k, val): def get(compiled_metadata, breadcrumb, k): return compiled_metadata.get(breadcrumb, {}).get(k) + +def get_standard_metadata(schema=None, schema_name=None, key_properties=None, + valid_replication_keys=None, replication_method=None): + mdata = {} + + if key_properties is not None: + mdata = write(mdata, (), 'table-key-properties', key_properties) + if replication_method: + mdata = write(mdata, (), 'forced-replication-method', replication_method) + if valid_replication_keys is not None: + mdata = write(mdata, (), 'valid-replication-keys', valid_replication_keys) + if schema: + mdata = write(mdata, (), 'inclusion', 'available') + + if schema_name: + mdata = write(mdata, (), 'schema-name', schema_name) + for field_name in schema['properties'].keys(): + if key_properties and field_name in key_properties: + mdata = write(mdata, ('properties', field_name), 'inclusion', 'automatic') + else: + mdata = write(mdata, ('properties', field_name), 'inclusion', 'available') + + return to_list(mdata) diff --git a/singer/schema.py b/singer/schema.py index b4da4ac..2fcafd6 100644 --- a/singer/schema.py +++ b/singer/schema.py @@ -31,7 +31,7 @@ class Schema(): # pylint: disable=too-many-instance-attributes ''' - # pylint: disable=too-many-locals + # pylint: disable=too-many-locals,too-many-positional-arguments def __init__(self, type=None, format=None, properties=None, items=None, selected=None, inclusion=None, description=None, minimum=None, maximum=None, exclusiveMinimum=None, exclusiveMaximum=None, diff --git a/singer/schema_generation.py b/singer/schema_generation.py new file mode 100644 index 0000000..177d659 --- /dev/null +++ b/singer/schema_generation.py @@ -0,0 +1,105 @@ +def add_observation(acc, path): + + node = acc + for i in range(0, len(path) - 1): + k = path[i] + if k not in node: + node[k] = {} + node = node[k] + + node[path[-1]] = True + +# pylint: disable=too-many-branches +def add_observations(acc, path, data): + if isinstance(data, dict): + for key in data: + add_observations(acc, path + ["object", key], data[key]) + elif isinstance(data, list): + if len(data) == 0: + add_observations(acc, path + ["array"], None) + for item in data: + add_observations(acc, path + ["array"], item) + elif isinstance(data, str): + try: + # If the string parses as a int, add an observation that it's a integer + int(data) + add_observation(acc, path + ["integer"]) + return acc + except (ValueError, TypeError): + pass + try: + # If the string parses as a float, add an observation that it's a number + float(data) + add_observation(acc, path + ["number"]) + return acc + except (ValueError, TypeError): + pass + add_observation(acc, path + ["string"]) + elif isinstance(data, bool): + add_observation(acc, path + ["boolean"]) + elif isinstance(data, int): + add_observation(acc, path + ["integer"]) + elif isinstance(data, float): + add_observation(acc, path + ["number"]) + elif data is None: + add_observation(acc, path + ["null"]) + else: + add_observation(acc, path + ["string"]) + + return acc + +def to_json_schema(obs): + types = [] + # add schema types in a specific order to anyOf list + for key in ['array', 'object', 'number', 'integer', 'boolean', 'string', 'null']: + if key not in obs: + continue + + result = {'type': ['null']} + + if key == 'object': + result['type'] += ['object'] + if 'properties' not in result: + result['properties'] = {} + for obj_key in obs['object']: + result['properties'][obj_key] = to_json_schema(obs['object'][obj_key]) + + elif key == 'array': + result['type'] += ['array'] + result['items'] = to_json_schema(obs['array']) + + elif key == 'string': + result['type'] += ['string'] + + elif key == 'boolean': + result['type'] += ['boolean'] + + elif key == 'integer': + result['type'] += ['integer'] + + elif key == 'number': + # Use type=string, format=singer.decimal + result['type'] += ['string'] + result['format'] = 'singer.decimal' + + elif key == 'null': + pass + + else: + raise Exception("Unexpected data type " + key) + + types.append(result) + + if len(types) == 0: + return {'type': ['null', 'string']} + + if len(types) == 1: + return types[0] + + return {'anyOf': types} + +def generate_schema(records): + obs = {} + for record in records: + obs = add_observations(obs, [], record) + return to_json_schema(obs) diff --git a/singer/state.py b/singer/state.py new file mode 100644 index 0000000..f7d4357 --- /dev/null +++ b/singer/state.py @@ -0,0 +1,60 @@ +def ensure_state_path(state, path): + submap = state + for path_component in path: + if submap.get(path_component) is None: + submap[path_component] = {} + + submap = submap[path_component] + return state + +def set_bookmark(state, tap_stream_id, key, val): + state = ensure_state_path(state, ['bookmarks', tap_stream_id]) + state['bookmarks'][tap_stream_id][key] = val + return state + +def clear_bookmark(state, tap_stream_id, key): + state = ensure_state_path(state, ['bookmarks', tap_stream_id]) + state['bookmarks'][tap_stream_id].pop(key, None) + return state + +def reset_stream(state, tap_stream_id): + state = ensure_state_path(state, ['bookmarks', tap_stream_id]) + state['bookmarks'][tap_stream_id] = {} + if 'versions' in state: + state = ensure_state_path(state, ['versions', tap_stream_id]) + state['versions'][tap_stream_id] = {} + return state + +def get_bookmark(state, tap_stream_id, key, default=None): + return state.get('bookmarks', {}).get(tap_stream_id, {}).get(key, default) + +def set_offset(state, tap_stream_id, offset_key, offset_value): + state = ensure_state_path(state, ['bookmarks', tap_stream_id, "offset", offset_key]) + state['bookmarks'][tap_stream_id]["offset"][offset_key] = offset_value + return state + +def clear_offset(state, tap_stream_id): + return clear_bookmark(state, tap_stream_id, "offset") + +def get_offset(state, tap_stream_id, default=None): + return state.get('bookmarks', {}).get(tap_stream_id, {}).get("offset", default) + +def set_currently_syncing(state, tap_stream_id): + state['currently_syncing'] = tap_stream_id + return state + +def get_currently_syncing(state, default=None): + return state.get('currently_syncing', default) + +def set_version(state, tap_stream_id, val): + state = ensure_state_path(state, ['versions', tap_stream_id]) + state['versions'][tap_stream_id] = val + return state + +def clear_version(state, tap_stream_id): + state = ensure_state_path(state, ['versions', tap_stream_id]) + state['versions'].pop(tap_stream_id, None) + return state + +def get_version(state, tap_stream_id, default=None): + return state.get('versions', {}).get(tap_stream_id, default) diff --git a/singer/statediff.py b/singer/statediff.py index 3ebddcb..bc21fd5 100644 --- a/singer/statediff.py +++ b/singer/statediff.py @@ -46,8 +46,8 @@ def diff(oldstate, newstate): # Convert oldstate and newstate from a deeply nested dict into a # single-level dict, mapping a path to a value. - olddict = {k: v for (k, v) in paths(oldstate)} - newdict = {k: v for (k, v) in paths(newstate)} + olddict = dict(paths(oldstate)) + newdict = dict(paths(newstate)) # Build the list of all paths in both oldstate and newstate to iterate # over. diff --git a/singer/transform.py b/singer/transform.py index 82888c2..3a9fc96 100644 --- a/singer/transform.py +++ b/singer/transform.py @@ -1,4 +1,6 @@ import datetime +import decimal +import logging import re from jsonschema import RefResolver @@ -35,6 +37,16 @@ def unix_seconds_to_datetime(value): return strftime(datetime.datetime.fromtimestamp(int(value), datetime.timezone.utc)) +def breadcrumb_path(breadcrumb): + """ + Transform breadcrumb into familiar object dot-notation + """ + name = ".".join(breadcrumb) + name = name.replace('properties.', '') + name = name.replace('.items', '[]') + return name + + class SchemaMismatch(Exception): def __init__(self, errors): if not errors: @@ -45,7 +57,7 @@ def __init__(self, errors): msg = "Errors during transform\n\t{}".format("\n\t".join(estrs)) msg += "\n\n\nErrors during transform: [{}]".format(", ".join(estrs)) - super(SchemaMismatch, self).__init__(msg) + super().__init__(msg) class SchemaKey: ref = "$ref" @@ -55,19 +67,27 @@ class SchemaKey: any_of = 'anyOf' class Error: - def __init__(self, path, data, schema=None): + def __init__(self, path, data, schema=None, logging_level=logging.INFO): self.path = path self.data = data self.schema = schema + self.logging_level = logging_level def tostr(self): path = ".".join(map(str, self.path)) if self.schema: - msg = "does not match {}".format(self.schema) + if self.logging_level >= logging.INFO: + msg = f"data does not match {self.schema}" + else: + msg = f"does not match {self.schema}" else: msg = "not in schema" - return "{}: {} {}".format(path, self.data, msg) + if self.logging_level >= logging.INFO: + output = f"{path}: {msg}" + else: + output = f"{path}: {self.data} {msg}" + return output class Transformer: @@ -80,20 +100,20 @@ def __init__(self, integer_datetime_fmt=NO_INTEGER_DATETIME_PARSING, pre_hook=No def log_warning(self): if self.filtered: - LOGGER.info("Filtered %s paths during transforms " - "as they were unsupported or not selected:\n\t%s", - len(self.filtered), - "\n\t".join(sorted(self.filtered))) + LOGGER.debug("Filtered %s paths during transforms " + "as they were unsupported or not selected:\n\t%s", + len(self.filtered), + "\n\t".join(sorted(self.filtered))) # Output list format to parse for reporting - LOGGER.info("Filtered paths list: %s", - sorted(self.filtered)) + LOGGER.debug("Filtered paths list: %s", + sorted(self.filtered)) if self.removed: - LOGGER.warning("Removed %s paths during transforms:\n\t%s", - len(self.removed), - "\n\t".join(sorted(self.removed))) + LOGGER.debug("Removed %s paths during transforms:\n\t%s", + len(self.removed), + "\n\t".join(sorted(self.removed))) # Output list format to parse for reporting - LOGGER.warning("Removed paths list: %s", sorted(self.removed)) + LOGGER.debug("Removed paths list: %s", sorted(self.removed)) def __enter__(self): return self @@ -101,21 +121,27 @@ def __enter__(self): def __exit__(self, *args): self.log_warning() - def filter_data_by_metadata(self, data, metadata): + def filter_data_by_metadata(self, data, metadata, parent=()): if isinstance(data, dict) and metadata: for field_name in list(data.keys()): - selected = singer.metadata.get(metadata, ('properties', field_name), 'selected') - inclusion = singer.metadata.get(metadata, ('properties', field_name), 'inclusion') + breadcrumb = parent + ('properties', field_name) + selected = singer.metadata.get(metadata, breadcrumb, 'selected') + inclusion = singer.metadata.get(metadata, breadcrumb, 'inclusion') if inclusion == 'automatic': continue - if selected is False: + if (selected is False) or (inclusion == 'unsupported'): data.pop(field_name, None) - self.filtered.add(field_name) + # Track that a field was filtered because the customer + # didn't select it or the tap declared it as unsupported. + self.filtered.add(breadcrumb_path(breadcrumb)) + else: + data[field_name] = self.filter_data_by_metadata( + data[field_name], metadata, breadcrumb) - if inclusion == 'unsupported': - data.pop(field_name, None) - self.filtered.add(field_name) + if isinstance(data, list) and metadata: + breadcrumb = parent + ('items',) + data = [self.filter_data_by_metadata(d, metadata, breadcrumb) for d in data] return data @@ -150,7 +176,7 @@ def transform_recur(self, data, schema, path): return success, transformed_data else: # pylint: disable=useless-else-on-loop # exhaused all types and didn't return, so we failed :-( - self.errors.append(Error(path, data, schema)) + self.errors.append(Error(path, data, schema, logging_level=LOGGER.level)) return False, None def _transform_anyof(self, data, schema, path): @@ -159,9 +185,11 @@ def _transform_anyof(self, data, schema, path): success, transformed_data = self.transform_recur(data, subschema, path) if success: return success, transformed_data + else: + self.errors.pop() else: # pylint: disable=useless-else-on-loop # exhaused all schemas and didn't return, so we failed :-( - self.errors.append(Error(path, data, schema)) + self.errors.append(Error(path, data, schema, logging_level=LOGGER.level)) return False, None def _transform_object(self, data, schema, path, pattern_properties): @@ -188,7 +216,11 @@ def _transform_object(self, data, schema, path, pattern_properties): successes.append(success) result[key] = subdata else: - # track that field has been removed + # track that field has been removed because it wasn't + # found in the schema. This likely indicates some problem + # with discovery but rather than failing the run because + # new data was added we'd rather continue the sync and + # allow customers to indicate that they want the new data. self.removed.add(".".join(map(str, path + [key]))) return all(successes), result @@ -236,13 +268,31 @@ def _transform(self, data, typ, schema, path): else: return False, None - elif schema.get("format") == "date-time": + elif typ == "string" and schema.get("format") == "date-time": data = self._transform_datetime(data) if data is None: return False, None return True, data + elif typ == "string" and schema.get("format") == "singer.decimal": + if data is None: + return False, None + if isinstance(data, (str, float, int)): + try: + return True, str(decimal.Decimal(str(data))) + except: + return False, None + elif isinstance(data, decimal.Decimal): + try: + if data.is_snan(): + return True, 'NaN' + else: + return True, str(data) + except: + return False, None + + return False, None elif typ == "object": # Objects do not necessarily specify properties return self._transform_object(data, diff --git a/singer/utils.py b/singer/utils.py index 4cd179d..6620005 100644 --- a/singer/utils.py +++ b/singer/utils.py @@ -105,7 +105,7 @@ def chunk(array, num): def load_json(path): - with open(path) as fil: + with open(path, encoding="utf-8") as fil: return json.load(fil) @@ -134,6 +134,7 @@ def parse_args(required_config_keys): -d,--discover Run in discover mode -p,--properties Properties file: DEPRECATED, please use --catalog instead --catalog Catalog file + --dev Runs the tap in dev mode Returns the parsed args object from argparse. For each argument that point to JSON files (config, state, properties), we will automatically @@ -163,16 +164,25 @@ def parse_args(required_config_keys): action='store_true', help='Do schema discovery') + parser.add_argument( + '--dev', + action='store_true', + help='Runs tap in dev mode') + args = parser.parse_args() if args.config: + setattr(args, 'config_path', args.config) args.config = load_json(args.config) if args.state: + setattr(args, 'state_path', args.state) args.state = load_json(args.state) else: args.state = {} if args.properties: + setattr(args, 'properties_path', args.properties) args.properties = load_json(args.properties) if args.catalog: + setattr(args, 'catalog_path', args.catalog) args.catalog = Catalog.load(args.catalog) check_config(args.config, required_config_keys) @@ -183,7 +193,7 @@ def parse_args(required_config_keys): def check_config(config, required_keys): missing_keys = [key for key in required_keys if key not in config] if missing_keys: - raise Exception("Config is missing required keys: {}".format(missing_keys)) + raise Exception(f"Config is missing required keys: {missing_keys}") def backoff(exceptions, giveup): @@ -224,7 +234,8 @@ def wrapped(*args, **kwargs): try: return fnc(*args, **kwargs) except Exception as exc: - logger.critical(exc) + for line in str(exc).splitlines(): + logger.critical(line) raise return wrapped return decorator diff --git a/tests/test_catalog.py b/tests/test_catalog.py index 4cb9390..8a72e1a 100644 --- a/tests/test_catalog.py +++ b/tests/test_catalog.py @@ -1,7 +1,50 @@ import unittest from singer.schema import Schema -from singer.catalog import Catalog, CatalogEntry +from singer.catalog import Catalog, CatalogEntry, write_catalog + +class TestWriteCatalog(unittest.TestCase): + def test_write_empty_catalog(self): + catalog = Catalog([]) + write_catalog(catalog) + + def test_write_catalog_with_streams(self): + catalog = Catalog([CatalogEntry(tap_stream_id='a',schema=Schema(),metadata=[])]) + write_catalog(catalog) + +class TestGetSelectedStreams(unittest.TestCase): + def test_one_selected_stream(self): + selected_entry = CatalogEntry(tap_stream_id='a', + schema=Schema(), + metadata=[{'metadata': + {'selected': True}, + 'breadcrumb': []}]) + catalog = Catalog( + [selected_entry, + CatalogEntry(tap_stream_id='b',schema=Schema(),metadata=[]), + CatalogEntry(tap_stream_id='c',schema=Schema(),metadata=[])]) + state = {} + selected_streams = catalog.get_selected_streams(state) + self.assertEqual([e for e in selected_streams],[selected_entry]) + + def test_resumes_currently_syncing_stream(self): + selected_entry_a = CatalogEntry(tap_stream_id='a', + schema=Schema(), + metadata=[{'metadata': + {'selected': True}, + 'breadcrumb': []}]) + selected_entry_c = CatalogEntry(tap_stream_id='c', + schema=Schema(), + metadata=[{'metadata': + {'selected': True}, + 'breadcrumb': []}]) + catalog = Catalog( + [selected_entry_a, + CatalogEntry(tap_stream_id='b',schema=Schema(),metadata=[]), + selected_entry_c]) + state = {'currently_syncing': 'c'} + selected_streams = catalog.get_selected_streams(state) + self.assertEqual([e for e in selected_streams][0],selected_entry_c) class TestToDictAndFromDict(unittest.TestCase): @@ -89,7 +132,7 @@ def test_from_dict(self): def test_to_dict(self): self.assertEqual(self.dict_form, self.obj_form.to_dict()) - + class TestGetStream(unittest.TestCase): def test(self): @@ -98,4 +141,4 @@ def test(self): CatalogEntry(tap_stream_id='b'), CatalogEntry(tap_stream_id='c')]) entry = catalog.get_stream('b') - self.assertEquals('b', entry.tap_stream_id) + self.assertEqual('b', entry.tap_stream_id) diff --git a/tests/test_exceptions.py b/tests/test_exceptions.py new file mode 100644 index 0000000..50cf7a1 --- /dev/null +++ b/tests/test_exceptions.py @@ -0,0 +1,68 @@ +import unittest + +from singer.exceptions import SingerConfigurationError +from singer.exceptions import SingerDiscoveryError +from singer.exceptions import SingerError +from singer.exceptions import SingerRetryableRequestError +from singer.exceptions import SingerSyncError + +class TestSingerErrors(unittest.TestCase): + def test_SingerError_prints_correctly(self): + error_text = "An error occured" + + with self.assertRaises(SingerError) as test_run: + raise SingerError(error_text) + + expected_text = "SingerError\n" + error_text + self.assertEqual(expected_text, + str(test_run.exception)) + + def test_SingerConfigurationError_prints_correctly(self): + error_text = "An error occured" + + with self.assertRaises(SingerConfigurationError) as test_run: + raise SingerConfigurationError(error_text) + + expected_text = "SingerConfigurationError\n" + error_text + self.assertEqual(expected_text, + str(test_run.exception)) + + def test_SingerDiscoveryError_prints_correctly(self): + error_text = "An error occured" + + with self.assertRaises(SingerDiscoveryError) as test_run: + raise SingerDiscoveryError(error_text) + + expected_text = "SingerDiscoveryError\n" + error_text + self.assertEqual(expected_text, + str(test_run.exception)) + + def test_SingerSyncError_prints_correctly(self): + error_text = "An error occured" + + with self.assertRaises(SingerSyncError) as test_run: + raise SingerSyncError(error_text) + + expected_text = "SingerSyncError\n" + error_text + self.assertEqual(expected_text, + str(test_run.exception)) + + def test_SingerRetryableRequestError_prints_correctly(self): + error_text = "An error occured" + + with self.assertRaises(SingerRetryableRequestError) as test_run: + raise SingerRetryableRequestError(error_text) + + expected_text = "SingerRetryableRequestError\n" + error_text + self.assertEqual(expected_text, + str(test_run.exception)) + + def test_SingerError_prints_multiple_lines_correctly(self): + error_text = "\n".join(["Line 1", "Line 2", "Line 3"]) + + with self.assertRaises(SingerError) as test_run: + raise SingerError(error_text) + + expected_text = "SingerError\n" + error_text + self.assertEqual(expected_text, + str(test_run.exception)) diff --git a/tests/test_metadata.py b/tests/test_metadata.py new file mode 100644 index 0000000..9cc2bb2 --- /dev/null +++ b/tests/test_metadata.py @@ -0,0 +1,357 @@ +from pprint import pprint +import unittest +from singer.metadata import get_standard_metadata, to_map + +def make_expected_metadata(base_obj, dict_of_extras, has_pk=False): + metadata_value = {**base_obj} + metadata_value.update(dict_of_extras) + + return [ + { + 'metadata': metadata_value, + 'breadcrumb': () + }, + { + 'metadata': { + 'inclusion': 'automatic' if has_pk else 'available', + }, + 'breadcrumb': ('properties', 'id') + }, + { + 'metadata': { + 'inclusion': 'available', + }, + 'breadcrumb': ('properties', 'name') + }, + { + 'metadata': { + 'inclusion': 'available', + }, + 'breadcrumb': ('properties', 'created') + } + ] + +class TestStandardMetadata(unittest.TestCase): + + def test_standard_metadata(self): + """ + There's four inputs we want to test: schema, key_properties, replication_method, valid_replication_keys. + + When `schema` is a non-null input, we expect `"inclusion": "available"` metadata for the `()` breadcrumb. + + When `key_properties` is a non-null input, we expect `table-key-properties` metadata for the `()` breadcrumb. + + When `replication_method` is a non-null input, we expect `forced-replication-method` metadata for the `()` breadcrumb. + + When `valid_replication_keys` is a non-null input, we expect `valid-replication-keys` metadata for the `()` breadcrumb. + """ + self.maxDiff = None + + # Some contants shared by a number of expected metadata objects + tap_stream_id = 'employees' + test_kp = ['id'] + test_rm = 'INCREMENTAL' + test_rk = ['id', 'created'] + metadata_kp = {'table-key-properties': ['id']} + metadata_rm = {'forced-replication-method': 'INCREMENTAL'} + metadata_rk = {'valid-replication-keys': ['id','created']} + schema_present_base_obj = {'inclusion': 'available'} + test_schema = { + 'type': ['null', 'object'], + 'additionalProperties': False, + 'properties': { + 'id': {'type': ['null', 'string']}, + 'name': {'type': ['null', 'string']}, + 'created': {'type': ['null', 'string'], + 'format': 'date-time'}, + } + } + + # test_variables is a list of tuples, where the first element is a + # dictionary of parameters for `get_standard_metadata()` and the + # second element is the expected metadata + test_variables = [ + ( # test_number=0 + { + 'schema': test_schema, + 'schema_name': tap_stream_id, + 'key_properties': None, + 'replication_method': None, + 'valid_replication_keys': None + }, + make_expected_metadata( + schema_present_base_obj, + {'schema-name': tap_stream_id,} + ) + ), + ( # test_number=1 + { + 'schema': test_schema, + 'schema_name': tap_stream_id, + 'key_properties': None, + 'replication_method': None, + 'valid_replication_keys': test_rk + }, + make_expected_metadata( + schema_present_base_obj, + {'valid-replication-keys': ['id','created'], + 'schema-name':tap_stream_id} + ) + ), + ( # test_number=2 + { + 'schema': test_schema, + 'schema_name': tap_stream_id, + 'key_properties': None, + 'replication_method': test_rm, + 'valid_replication_keys': None + }, + make_expected_metadata( + schema_present_base_obj, + {'forced-replication-method': 'INCREMENTAL', + 'schema-name':tap_stream_id} + ) + ), + ( # test_number=3 + { + 'schema': test_schema, + 'schema_name': tap_stream_id, + 'key_properties': None, + 'replication_method': test_rm, + 'valid_replication_keys': test_rk + }, + make_expected_metadata( + schema_present_base_obj, + {'valid-replication-keys': ['id','created'], + 'forced-replication-method': 'INCREMENTAL', + 'schema-name':tap_stream_id} + ) + ), + ( # test_number=4 + { + 'schema': test_schema, + 'schema_name': tap_stream_id, + 'key_properties': test_kp, + 'replication_method': None, + 'valid_replication_keys': None + }, + make_expected_metadata( + schema_present_base_obj, + {'table-key-properties': ['id'], + 'schema-name':tap_stream_id}, + has_pk=True + ) + ), + ( # test_number=5 + { + 'schema': test_schema, + 'schema_name': tap_stream_id, + 'key_properties': test_kp, + 'replication_method': None, + 'valid_replication_keys': test_rk + }, + make_expected_metadata( + schema_present_base_obj, + {'table-key-properties': ['id'], + 'valid-replication-keys': ['id','created'], + 'schema-name':tap_stream_id}, + has_pk=True + ) + ), + ( # test_number=6 + { + 'schema': test_schema, + 'schema_name': tap_stream_id, + 'key_properties': test_kp, + 'replication_method': test_rm, + 'valid_replication_keys': None + }, + make_expected_metadata( + schema_present_base_obj, + {'table-key-properties': ['id'], + 'forced-replication-method': 'INCREMENTAL', + 'schema-name':tap_stream_id}, + has_pk=True + ) + ), + ( # test_number=7 + { + 'schema': test_schema, + 'schema_name': tap_stream_id, + 'key_properties': test_kp, + 'replication_method': test_rm, + 'valid_replication_keys': test_rk + }, + make_expected_metadata( + schema_present_base_obj, + {'table-key-properties': ['id'], + 'forced-replication-method': 'INCREMENTAL', + 'valid-replication-keys': ['id','created'], + 'schema-name':tap_stream_id}, + has_pk=True + ) + ), + ( # test_number=8 + { + 'schema': None, + 'key_properties': None, + 'replication_method': None, + 'valid_replication_keys': None + }, + [] + ), + ( # test_number=9 + { + 'schema': None, + 'key_properties': None, + 'replication_method': None, + 'valid_replication_keys': test_rk + }, + [ + { + 'metadata': { + 'valid-replication-keys': ['id','created'] + }, + 'breadcrumb': [] + } + ] + ), + ( # test_number=10 + { + 'schema': None, + 'key_properties': None, + 'replication_method': test_rm, + 'valid_replication_keys': None + }, + [ + { + 'metadata': { + 'forced-replication-method': 'INCREMENTAL' + }, + 'breadcrumb': [] + } + ] + ), + ( # test_number=11 + { + 'schema': None, + 'key_properties': None, + 'replication_method': test_rm, + 'valid_replication_keys': test_rk + }, + [ + { + 'metadata': { + 'forced-replication-method': 'INCREMENTAL', + 'valid-replication-keys': ['id','created'] + }, + 'breadcrumb': [] + } + ] + ), + ( # test_number=12 + { + 'schema': None, + 'key_properties': test_kp, + 'replication_method': None, + 'valid_replication_keys': None + }, + [ + { + 'metadata': { + 'table-key-properties': ['id'], + }, + 'breadcrumb': [] + } + ] + ), + ( # test_number=13 + { + 'schema': None, + 'key_properties': test_kp, + 'replication_method': None, + 'valid_replication_keys': test_rk + }, + [ + { + 'metadata': { + 'table-key-properties': ['id'], + 'valid-replication-keys': ['id','created'] + }, + 'breadcrumb': [] + } + ] + ), + ( # test_number=14 + { + 'schema': None, + 'key_properties': test_kp, + 'replication_method': test_rm, + 'valid_replication_keys': None + }, + [ + { + 'metadata': { + 'table-key-properties': ['id'], + 'forced-replication-method': 'INCREMENTAL', + }, + 'breadcrumb': [] + } + ] + ), + ( # test_number=15 + { + 'schema': None, + 'key_properties': test_kp, + 'replication_method': test_rm, + 'valid_replication_keys': test_rk + }, + [ + { + 'metadata': { + 'table-key-properties': ['id'], + 'forced-replication-method': 'INCREMENTAL', + 'valid-replication-keys': ['id','created'] + }, + 'breadcrumb': [] + } + ] + ) + ] + + for i, var in enumerate(test_variables): + with self.subTest(test_number=i): + function_params = var[0] + expected_metadata = var[1] + + test_value = get_standard_metadata(**function_params) + + expected_value = to_map(expected_metadata) + actual_value = to_map(test_value) + self.assertDictEqual(expected_value, actual_value) + + # Test one function call where the parameters are not splat in + test_value = get_standard_metadata(test_schema, + tap_stream_id, + test_kp, + test_rk, + test_rm) + + expected_metadata = make_expected_metadata(schema_present_base_obj, + {'table-key-properties': ['id'], + 'forced-replication-method': 'INCREMENTAL', + 'valid-replication-keys': ['id','created'], + 'schema-name':tap_stream_id}, + has_pk=True) + self.assertDictEqual( + to_map(expected_metadata), + to_map(test_value) + ) + + def test_empty_key_properties_are_written(self): + mdata = get_standard_metadata(key_properties=[]) + self.assertEqual(mdata, [{'breadcrumb': (), 'metadata': {'table-key-properties': []}}]) + + def test_empty_valid_replication_keys_are_written(self): + mdata = get_standard_metadata(valid_replication_keys=[]) + self.assertEqual(mdata, [{'breadcrumb': (), 'metadata': {'valid-replication-keys': []}}]) diff --git a/tests/test_schema.py b/tests/test_schema.py index fa28bac..5682755 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -44,38 +44,38 @@ class TestSchema(unittest.TestCase): additionalProperties=True) def test_string_to_dict(self): - self.assertEquals(self.string_dict, self.string_obj.to_dict()) + self.assertEqual(self.string_dict, self.string_obj.to_dict()) def test_integer_to_dict(self): - self.assertEquals(self.integer_dict, self.integer_obj.to_dict()) + self.assertEqual(self.integer_dict, self.integer_obj.to_dict()) def test_array_to_dict(self): - self.assertEquals(self.array_dict, self.array_obj.to_dict()) + self.assertEqual(self.array_dict, self.array_obj.to_dict()) def test_object_to_dict(self): - self.assertEquals(self.object_dict, self.object_obj.to_dict()) + self.assertEqual(self.object_dict, self.object_obj.to_dict()) def test_string_from_dict(self): - self.assertEquals(self.string_obj, Schema.from_dict(self.string_dict)) + self.assertEqual(self.string_obj, Schema.from_dict(self.string_dict)) def test_integer_from_dict(self): - self.assertEquals(self.integer_obj, Schema.from_dict(self.integer_dict)) + self.assertEqual(self.integer_obj, Schema.from_dict(self.integer_dict)) def test_array_from_dict(self): - self.assertEquals(self.array_obj, Schema.from_dict(self.array_dict)) + self.assertEqual(self.array_obj, Schema.from_dict(self.array_dict)) def test_object_from_dict(self): - self.assertEquals(self.object_obj, Schema.from_dict(self.object_dict)) + self.assertEqual(self.object_obj, Schema.from_dict(self.object_dict)) def test_repr_atomic(self): - self.assertEquals(self.string_obj, eval(repr(self.string_obj))) + self.assertEqual(self.string_obj, eval(repr(self.string_obj))) def test_repr_recursive(self): - self.assertEquals(self.object_obj, eval(repr(self.object_obj))) + self.assertEqual(self.object_obj, eval(repr(self.object_obj))) def test_object_from_dict_with_defaults(self): schema = Schema.from_dict(self.object_dict, inclusion='automatic') - self.assertEquals('whatever', schema.inclusion, - msg='The schema value should override the default') - self.assertEquals('automatic', schema.properties['a_string'].inclusion) - self.assertEquals('automatic', schema.properties['an_array'].items.inclusion) + self.assertEqual('whatever', schema.inclusion, + msg='The schema value should override the default') + self.assertEqual('automatic', schema.properties['a_string'].inclusion) + self.assertEqual('automatic', schema.properties['an_array'].items.inclusion) diff --git a/tests/test_schema_generation.py b/tests/test_schema_generation.py new file mode 100644 index 0000000..610ac20 --- /dev/null +++ b/tests/test_schema_generation.py @@ -0,0 +1,72 @@ +import unittest +from singer.schema_generation import generate_schema + +class TestSchemaGeneration(unittest.TestCase): + def test_simple_schema(self): + records = [{'a': 1, 'b': 'two', 'c': True, 'dt': '2000-01-01T00:11:22Z'}] + expected_schema = { + 'type': ['null', 'object'], + 'properties': { + 'a': {'type': ['null', 'integer']}, + 'b': {'type': ['null', 'string']}, + 'c': {'type': ['null', 'boolean']}, + 'dt': {'type': ['null', 'string']} + } + } + self.assertEqual(expected_schema, generate_schema(records)) + + def test_mix_n_match_records_schema(self): + records = [ + {'a': 1, 'b': 'b'}, + {'a': 'two', 'c': 7, 'd': [1, 'two']}, + {'a': True, 'c': 7.7, 'd': {'one': 1, 'two': 'two'}} + ] + expected_schema = { + 'type': ['null', 'object'], + 'properties': {'a': {'anyOf': [{'type': ['null', 'integer']}, + {'type': ['null', 'boolean']}, + {'type': ['null', 'string']}]}, + 'b': {'type': ['null', 'string']}, + 'c': {'anyOf': [{'type': ['null', 'string'], 'format': 'singer.decimal'}, + {'type': ['null', 'integer']}]}, + 'd': {'anyOf': [{'type': ['null', 'array'], + 'items': {'anyOf': [{'type': ['null', 'integer']}, + {'type': ['null', 'string']}]}}, + {'type': ['null', 'object'], + 'properties': {'one': {'type': ['null', 'integer']}, + 'two': {'type': ['null', 'string']}}}]}} + } + actual_schema = generate_schema(records) + self.assertEqual(expected_schema, actual_schema) + + def test_nested_structue_schema(self): + records = [{'a': {'b': {'c': [{'d': 7}]}, 'e': [[1, 2, 3]]}}] + expected_schema = { + 'type': ['null', 'object'], + 'properties': { + 'a': { + 'type': ['null', 'object'], + 'properties': { + 'b': { + 'type': ['null', 'object'], + 'properties': { + 'c': { + 'type': ['null', 'array'], + 'items': { + 'type': ['null', 'object'], + 'properties': {'d': {'type': ['null', 'integer']}} + } + } + } + }, + 'e': { + 'type': ['null', 'array'], + 'items': { + 'type': ['null', 'array'], + 'items': {'type': ['null', 'integer']}} + } + } + } + } + } + self.assertEqual(expected_schema, generate_schema(records)) diff --git a/tests/test_singer.py b/tests/test_singer.py index 87c7cf3..7f69bb5 100644 --- a/tests/test_singer.py +++ b/tests/test_singer.py @@ -1,7 +1,10 @@ import singer import unittest +from unittest.mock import patch import datetime import dateutil +from decimal import Decimal + class TestSinger(unittest.TestCase): def test_parse_message_record_good(self): @@ -89,7 +92,6 @@ def test_parse_message_state_missing_value(self): singer.parse_message('{"type": "STATE"}') def test_round_trip(self): - record_message = singer.RecordMessage( record={'name': 'foo'}, stream='users') @@ -102,7 +104,7 @@ def test_round_trip(self): 'name': {'type': 'string'}}}) state_message = singer.StateMessage(value={'seq': 1}) - + self.assertEqual(record_message, singer.parse_message(singer.format_message(record_message))) self.assertEqual(schema_message, @@ -124,5 +126,85 @@ def test_write_schema(self): def test_write_state(self): singer.write_state({"foo": 1}) +class TestParsingNumbers(unittest.TestCase): + def create_record(self, value): + raw = '{"type": "RECORD", "stream": "test", "record": {"value": ' + value + '}}' + parsed = singer.parse_message(raw) + return parsed.record['value'] + + def test_parse_int_zero(self): + value = self.create_record('0') + self.assertEqual(type(value), int) + self.assertEqual(value, 0) + + def test_parse_regular_decimal(self): + value = self.create_record('3.14') + self.assertEqual(Decimal('3.14'), value) + + def test_parse_large_decimal(self): + value = self.create_record('9999999999999999.9999') + self.assertEqual(Decimal('9999999999999999.9999'), value) + + def test_parse_small_decimal(self): + value = self.create_record('-9999999999999999.9999') + self.assertEqual(Decimal('-9999999999999999.9999'), value) + + def test_parse_absurdly_large_decimal(self): + value_str = '9' * 1024 + '.' + '9' * 1024 + value = self.create_record(value_str) + self.assertEqual(Decimal(value_str), value) + + def test_parse_absurdly_large_int(self): + value_str = '9' * 1024 + value = self.create_record(value_str) + self.assertEqual(int(value_str), value) + self.assertEqual(int, type(value)) + + def test_parse_bulk_decs(self): + value_strs = [ + '-9999999999999999.9999999999999999999999', + '0', + '9999999999999999.9999999999999999999999', + '-7187498962233394.3739812942138415666763', + '9273972760690975.2044306442955715221042', + '29515565286974.1188802122612813004366', + '9176089101347578.2596296292040288441238', + '-8416853039392703.306423225471199148379', + '1285266411314091.3002668125515694162268', + '6051872750342125.3812886238958681227336', + '-1132031605459408.5571559429308939781468', + '-6387836755056303.0038029604189860431045', + '4526059300505414' + ] + for value_str in value_strs: + value = self.create_record(value_str) + self.assertEqual(Decimal(value_str), value) + + @patch('sys.stdout') + def test_ensure_ascii_false(self, mock_stdout): + """ + Setting ensure_ascii=False will preserve special characters like é + in their original form. + """ + rec = {"name": "José"} + expected_output = '{"type": "RECORD", "stream": "test_stream", "record": {"name": "José"}}\n' + rec_message = singer.RecordMessage(stream="test_stream", record=rec) + result = singer.write_message(rec_message, ensure_ascii=False) + mock_stdout.write.assert_called_once_with(expected_output) + mock_stdout.flush.assert_called_once() + + @patch('sys.stdout') + def test_ensure_ascii_true(self, mock_stdout): + """ + ensure_ascii defaults to True, special characters like é are + escaped into their ASCII representation (e.g., \u00e9) + """ + rec = {"name": "José"} + expected_output = '{"type": "RECORD", "stream": "test_stream", "record": {"name": "Jos\\u00e9"}}\n' + rec_message = singer.RecordMessage(stream="test_stream", record=rec) + result = singer.write_message(rec_message) + mock_stdout.write.assert_called_once_with(expected_output) + mock_stdout.flush.assert_called_once() + if __name__ == '__main__': unittest.main() diff --git a/tests/test_state.py b/tests/test_state.py new file mode 100644 index 0000000..4d30322 --- /dev/null +++ b/tests/test_state.py @@ -0,0 +1,271 @@ +import unittest +from singer import state as st + +class TestBookmark(unittest.TestCase): + def test_empty_state(self): + empty_state = {} + + # Case with no value to fall back on + self.assertIsNone(st.get_bookmark(empty_state, 'some_stream', 'my_key')) + + # Case with a given default + self.assertEqual(st.get_bookmark(empty_state, 'some_stream', 'my_key', 'default_value'), + 'default_value') + + def test_empty_bookmark(self): + empty_bookmark = {'bookmarks':{}} + + # Case with no value to fall back on + self.assertIsNone(st.get_bookmark(empty_bookmark, 'some_stream', 'my_key')) + + # Case with a given default + self.assertEqual(st.get_bookmark(empty_bookmark, 'some_stream', 'my_key', 'default_value'), + 'default_value') + + def test_non_empty_state(self): + stream_id_1 = 'customers' + bookmark_key_1 = 'datetime' + bookmark_val_1 = 123456789 + + non_empty_state = { + 'bookmarks' : { + stream_id_1 : { + bookmark_key_1 : bookmark_val_1 + } + } + } + + # + # Cases with no value to fall back on + # + + # Bad stream, bad key + self.assertIsNone(st.get_bookmark(non_empty_state, 'some_stream', 'my_key')) + + # Good stream, bad key + self.assertIsNone(st.get_bookmark(non_empty_state, stream_id_1, 'my_key')) + + # Good stream, good key + self.assertEqual(st.get_bookmark(non_empty_state, stream_id_1, bookmark_key_1), + bookmark_val_1) + + # + # Cases with a given default + # + + # Bad stream, bad key + self.assertEqual(st.get_bookmark(non_empty_state, 'some_stream', 'my_key', 'default_value'), + 'default_value') + + # Bad stream, good key + self.assertEqual(st.get_bookmark(non_empty_state, 'some_stream', bookmark_key_1, 'default_value'), + 'default_value') + + # Good stream, bad key + self.assertEqual(st.get_bookmark(non_empty_state, stream_id_1, 'my_key', 'default_value'), + 'default_value') + + # Good stream, good key + self.assertEqual(st.get_bookmark(non_empty_state, stream_id_1, bookmark_key_1, 'default_value'), + bookmark_val_1) + + def test_set_bookmark(self): + stream_id_1 = 'customers' + bookmark_key_1 = 'datetime' + bookmark_val_1 = 123456789 + + result = st.set_bookmark({'bookmarks': {stream_id_1: {bookmark_key_1: 'old-value'}}}, stream_id_1, bookmark_key_1, bookmark_val_1) + self.assertEqual(result, {'bookmarks': {stream_id_1: {bookmark_key_1: bookmark_val_1}}}) + + def test_clear_bookmark(self): + stream_id_1 = 'customers' + bookmark_key_1 = 'datetime' + bookmark_val_1 = 123456789 + + result = st.clear_bookmark({'bookmarks': {stream_id_1: {bookmark_key_1: bookmark_val_1}}}, stream_id_1, bookmark_key_1) + self.assertEqual(result, {'bookmarks': {stream_id_1: {}}}) + + +class TestOffset(unittest.TestCase): + def test_empty_state(self): + empty_state = {} + + # Case with no value to fall back on + self.assertIsNone(st.get_offset(empty_state, 'some_stream')) + + # Case with a given default + self.assertEqual(st.get_offset(empty_state, 'some_stream', 'default_value'), + 'default_value') + + def test_empty_bookmark(self): + empty_bookmark = {'bookmarks':{}} + + # Case with no value to fall back on + self.assertIsNone(st.get_offset(empty_bookmark, 'some_stream')) + + # Case with a given default + self.assertEqual(st.get_offset(empty_bookmark, 'some_stream', 'default_value'), + 'default_value') + + def test_non_empty_state(self): + stream_id_1 = 'customers' + bookmark_key_1 = 'datetime' + bookmark_val_1 = 123456789 + offset_val = 'fizzy water' + + non_empty_state = { + 'bookmarks' : { + stream_id_1 : { + bookmark_key_1 : bookmark_val_1, + 'offset' : offset_val + } + } + } + + # + # Cases with no value to fall back on + # + + # Bad stream + self.assertIsNone(st.get_offset(non_empty_state, 'some_stream')) + + # Good stream + self.assertEqual(st.get_offset(non_empty_state, stream_id_1), + offset_val) + + # + # Case with a given default + # + + # Bad stream + self.assertEqual(st.get_offset(non_empty_state, 'some_stream', 'default_value'), + 'default_value') + + # Good stream + self.assertEqual(st.get_offset(non_empty_state, stream_id_1, 'default_value'), + offset_val) + + def test_set_offset(self): + stream_id_1 = 'customers' + offset_key_1 = 'datetime' + offset_val_1 = 123456789 + + result = st.set_offset({'bookmarks': {stream_id_1: {'offset': {offset_key_1: 'old-value'}}}}, stream_id_1, offset_key_1, offset_val_1) + self.assertEqual(result, {'bookmarks': {stream_id_1: {'offset': {offset_key_1: offset_val_1}}}}) + + def test_clear_offset(self): + stream_id_1 = 'customers' + offset_key_1 = 'datetime' + offset_val_1 = 123456789 + + result = st.clear_offset({'bookmarks': {stream_id_1: {'offset': {offset_key_1: offset_val_1}}}}, stream_id_1) + self.assertEqual(result, {'bookmarks': {stream_id_1: {}}}) + + +class TestCurrentlySyncing(unittest.TestCase): + def test_empty_state(self): + empty_state = {} + + # Case with no value to fall back on + self.assertIsNone(st.get_currently_syncing(empty_state)) + + # Case with a given default + self.assertEqual(st.get_currently_syncing(empty_state, 'default_value'), + 'default_value') + + def test_non_empty_state(self): + stream_id_1 = 'customers' + bookmark_key_1 = 'datetime' + bookmark_val_1 = 123456789 + offset_val = 'fizzy water' + + non_empty_state = { + 'bookmarks' : { + stream_id_1 : { + bookmark_key_1 : bookmark_val_1, + 'offset' : offset_val + } + }, + 'currently_syncing' : stream_id_1 + } + + # Case with no value to fall back on + self.assertEqual(st.get_currently_syncing(non_empty_state), + stream_id_1) + + # Case with a given default + self.assertEqual(st.get_currently_syncing(non_empty_state, 'default_value'), + stream_id_1) + + def test_set_currently_syncing(self): + result = st.set_currently_syncing({'currently_syncing': 'foo'}, 'bar') + self.assertEqual(result, {'currently_syncing': 'bar'}) + + +class TestVersions(unittest.TestCase): + def test_empty_state(self): + empty_state = {} + + # Case with no value to fall back on + self.assertIsNone(st.get_version(empty_state, 'some_stream')) + + # Case with a given default + self.assertEqual(st.get_version(empty_state, 'some_stream', 'default_value'), + 'default_value') + + def test_empty_versions(self): + empty_versions = {'versions':{}} + + # Case with no value to fall back on + self.assertIsNone(st.get_version(empty_versions, 'some_stream')) + + # Case with a given default + self.assertEqual(st.get_version(empty_versions, 'some_stream', 'default_value'), + 'default_value') + + def test_non_empty_state(self): + stream_id_1 = 'customers' + version_val_1 = 123456789 + + non_empty_state = { + 'versions' : { + stream_id_1 : version_val_1 + } + } + + # + # Cases with no value to fall back on + # + + # Bad stream + self.assertIsNone(st.get_version(non_empty_state, 'some_stream')) + + # Good stream + self.assertEqual(st.get_version(non_empty_state, stream_id_1), + version_val_1) + + # + # Cases with a given default + # + + # Bad stream + self.assertEqual(st.get_version(non_empty_state, 'some_stream', 'default_value'), + 'default_value') + + # Good stream + self.assertEqual(st.get_version(non_empty_state, stream_id_1, 'default_value'), + version_val_1) + + def test_set_version(self): + stream_id_1 = 'customers' + version_val_1 = 123456789 + + result = st.set_version({'versions': {stream_id_1: 'old-value'}}, stream_id_1, version_val_1) + self.assertEqual(result, {'versions': {stream_id_1: version_val_1}}) + + def test_clear_version(self): + stream_id_1 = 'customers' + version_val_1 = 123456789 + + result = st.clear_version({'versions': {stream_id_1: version_val_1}}, stream_id_1) + self.assertEqual(result, {'versions': {}}) diff --git a/tests/test_transform.py b/tests/test_transform.py index 3ba57fa..b398e93 100644 --- a/tests/test_transform.py +++ b/tests/test_transform.py @@ -1,8 +1,12 @@ +import io +import sys import unittest +import decimal +import simplejson as json +import singer.messages as messages from singer import transform from singer.transform import * - class TestTransform(unittest.TestCase): def test_integer_transform(self): schema = {'type': 'integer'} @@ -24,7 +28,7 @@ def test_nested_transform(self): def test_multi_type_object_transform(self): schema = {"type": ["null", "object", "string"], - "properties": {"whatever": {"type": "date-time", + "properties": {"whatever": {"type": "string", "format": "date-time"}}} data = {"whatever": "2017-01-01"} expected = {"whatever": "2017-01-01T00:00:00.000000Z"} @@ -35,7 +39,7 @@ def test_multi_type_object_transform(self): def test_multi_type_array_transform(self): schema = {"type": ["null", "array", "integer"], - "items": {"type": "date-time", "format": "date-time"}} + "items": {"type": "string", "format": "date-time"}} data = ["2017-01-01"] expected = ["2017-01-01T00:00:00.000000Z"] self.assertEqual(expected, transform(data, schema)) @@ -252,6 +256,72 @@ def test_null_object_transform(self): empty_data = {'addrs': {}} self.assertDictEqual(empty_data, transform(empty_data, schema)) + def test_decimal_types_transform(self): + schema = {"type": "object", + "properties": {"percentage": {"type": ["string"], + "format": "singer.decimal"}}} + + inf = {'percentage': 'Infinity'} + negative_inf = {'percentage': '-Infinity'} + root2 = {'percentage': 1.4142135623730951} + nan = {'percentage': decimal.Decimal('NaN')} + snan = {'percentage': decimal.Decimal('sNaN')} + + self.assertEqual(inf, transform(inf, schema)) + self.assertEqual(negative_inf, transform(negative_inf, schema)) + self.assertEqual({'percentage': '1.4142135623730951'}, transform(root2, schema)) + self.assertEqual({'percentage': 'NaN'}, transform(nan, schema)) + self.assertEqual({'percentage': 'NaN'}, transform(snan, schema)) + + + str1 = {'percentage':'0.1'} + str2 = {'percentage': '0.0000000000001'} + str3 = {'percentage': '1E+13'} + str4 = {'percentage': '100'} + str5 = {'percentage': '-100'} + self.assertEqual(str1, transform(str1, schema)) + self.assertEqual({'percentage': '1E-13'}, transform(str2, schema)) + self.assertEqual({'percentage': '1E+13'}, transform(str3, schema)) + self.assertEqual({'percentage': '100'}, transform(str4, schema)) + self.assertEqual({'percentage': '-100'}, transform(str5, schema)) + + float1 = {'percentage': 12.0000000000000000000000000001234556} + float2 = {'percentage': 0.0123} + float3 = {'percentage': 100.0123} + float4 = {'percentage': -100.0123} + float5 = {'percentage': 0.000001} + float6 = {'percentage': 0.0000001} + self.assertEqual({'percentage':'12.0'}, transform(float1, schema)) + self.assertEqual({'percentage':'0.0123'}, transform(float2, schema)) + self.assertEqual({'percentage':'100.0123'}, transform(float3, schema)) + self.assertEqual({'percentage':'-100.0123'}, transform(float4, schema)) + self.assertEqual({'percentage':'0.000001'}, transform(float5, schema)) + self.assertEqual({'percentage':'1E-7'}, transform(float6, schema)) + + int1 = {'percentage': 123} + int2 = {'percentage': 0} + int3 = {'percentage': -1000} + self.assertEqual({'percentage':'123'}, transform(int1, schema)) + self.assertEqual({'percentage':'0'}, transform(int2, schema)) + self.assertEqual({'percentage':'-1000'}, transform(int3, schema)) + + dec1 = {'percentage': decimal.Decimal('1.1010101')} + dec2 = {'percentage': decimal.Decimal('.111111111111111111111111')} + dec3 = {'percentage': decimal.Decimal('-.111111111111111111111111')} + dec4 = {'percentage': decimal.Decimal('100')} + self.assertEqual({'percentage':'1.1010101'}, transform(dec1, schema)) + self.assertEqual({'percentage':'0.111111111111111111111111'}, transform(dec2, schema)) + self.assertEqual({'percentage':'-0.111111111111111111111111'}, transform(dec3, schema)) + self.assertEqual({'percentage':'100'}, transform(dec4, schema)) + + bad1 = {'percentage': 'fsdkjl'} + with self.assertRaises(SchemaMismatch): + transform(bad1, schema) + + badnull = {'percentage': None} + with self.assertRaises(SchemaMismatch): + self.assertEqual({'percentage':None}, transform(badnull, schema)) + class TestTransformsWithMetadata(unittest.TestCase): def test_drops_no_data_when_not_dict(self): @@ -295,6 +365,49 @@ def test_drops_fields_which_are_unsupported(self): dict_value = {"name": "chicken"} self.assertEqual({}, transform(dict_value, schema, NO_INTEGER_DATETIME_PARSING, metadata=metadata)) + def test_drops_nested_object_fields_which_are_unselected(self): + schema = {"type": "object", + "properties": {"addr": {"type": "object", + "properties": {"addr1": {"type": "string"}, + "city": {"type": "string"}, + "state": {"type": "string"}, + 'amount': {'type': 'integer'}}}}} + metadata = { + ('properties','addr'): {"selected": True}, + ('properties','addr', 'properties','amount'): {"selected": False} + } + data = {'addr': + {'addr1': 'address_1', 'city': 'city_1', 'state': 'state_1', 'amount': '123'} + } + expected = {'addr': + {'addr1': 'address_1', 'city': 'city_1', 'state': 'state_1'}, + } + self.assertDictEqual(expected, transform(data, schema, NO_INTEGER_DATETIME_PARSING, metadata=metadata)) + + def test_drops_nested_array_fields_which_are_unselected(self): + schema = {"type": "object", + "properties": {"addrs": {"type": "array", + "items": {"type": "object", + "properties": {"addr1": {"type": "string"}, + "city": {"type": "string"}, + "state": {"type": "string"}, + 'amount': {'type': 'integer'}}}}}} + metadata = { + ('properties','addrs'): {"selected": True}, + ('properties','addrs','items','properties','amount'): {"selected": False} + } + data = {'addrs': [ + {'addr1': 'address_1', 'city': 'city_1', 'state': 'state_1', 'amount': '123'}, + {'addr1': 'address_2', 'city': 'city_2', 'state': 'state_2', 'amount': '456'} + ] + } + expected = {'addrs': [ + {'addr1': 'address_1', 'city': 'city_1', 'state': 'state_1'}, + {'addr1': 'address_2', 'city': 'city_2', 'state': 'state_2'} + ] + } + self.assertDictEqual(expected, transform(data, schema, NO_INTEGER_DATETIME_PARSING, metadata=metadata)) + class TestResolveSchemaReferences(unittest.TestCase): def test_internal_refs_resolve(self): schema = {"type": "object", @@ -376,3 +489,58 @@ def test_pattern_properties_match_multiple(self): dict_value = {"name": "chicken", "unit_cost": 1.45, "SKU": '123456'} expected = dict(dict_value) self.assertEqual(expected, transform(dict_value, schema)) + +class DummyMessage: + """A dummy message object with an asdict() method.""" + def __init__(self, value): + self.value = value + + def asdict(self): + return {"value": self.value} + + +class TestAllowNan(unittest.TestCase): + """Unit tests for allow_nan support in singer.messages.""" + + def test_format_message_allow_nan_true(self): + """Should serialize NaN successfully when allow_nan=True.""" + msg = DummyMessage(float("nan")) + result = messages.format_message(msg, allow_nan=True) + + # The output JSON should contain NaN literal (not quoted) + self.assertIn("NaN", result) + + # Replace NaN with null to make it valid JSON for parsing check + json.loads(result.replace("NaN", "null")) + + def test_format_message_allow_nan_false(self): + """Should raise ValueError when allow_nan=False and value is NaN.""" + msg = DummyMessage(float("nan")) + with self.assertRaises(ValueError): + messages.format_message(msg, allow_nan=False) + + def test_write_message_allow_nan_true(self): + """Should write to stdout successfully when allow_nan=True.""" + msg = DummyMessage(float("nan")) + fake_stdout = io.StringIO() + original_stdout = sys.stdout + sys.stdout = fake_stdout + try: + messages.write_message(msg, allow_nan=True) + output = fake_stdout.getvalue() + self.assertIn("NaN", output) + self.assertTrue(output.endswith("\n")) + finally: + sys.stdout = original_stdout + + def test_write_message_allow_nan_false(self): + """Should raise ValueError when allow_nan=False and message has NaN.""" + msg = DummyMessage(float("nan")) + fake_stdout = io.StringIO() + original_stdout = sys.stdout + sys.stdout = fake_stdout + try: + with self.assertRaises(ValueError): + messages.write_message(msg, allow_nan=False) + finally: + sys.stdout = original_stdout