Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

readonly conflicts with default (and rule_filter) #272

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions cerberus/tests/rulefilter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# -*- coding: utf-8 -*-

if __name__ == '__main__':
import os
import sys
import unittest # TODO pytest
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),
'..', '..')))

from cerberus import errors # noqa
from cerberus.tests import TestBase # noqa


ValidationError = errors.ValidationError


class TestRuleFilter(TestBase):

def test_process_no_validation_rules(self):
self.validator.rule_filter = lambda f: False
document = {
'a_string': 'a', # too short
'a_binary': None, # not nullable
'an_integer': 200, # too big
'a_boolean': 'foo', # wrong type
'a_regex_email': '@', # wrong pattern
'a_readonly_string': 'bar', # readonly
'a_restricted_string': 'baz', # forbidden value
'an_array': ['foo', 'bar'], # forbidden values
'a_list_of_dicts': [
{}, {'sku': '123'} # missing required field price
],
'a_dict_with_valueschema': {
'foo': '1', # not of type 'integer'
'bar': '2', # not of type 'integer'
},
'a_dict_with_keyschema': {
'1': 'foo', # key does not match keyschema
'2': 'bar', # key does not match keyschema
}
}
self.assertSuccess(document)

def test_disable_coerce(self):
self.validator.rule_filter = lambda f: f != 'coerce'
self.schema['an_integer']['coerce'] = lambda v: int(v)
# The `type` rule will still be processed and let the validation fail
self.assertFail({'an_integer': '7'})
self.assertError('an_integer', ('an_integer', 'type'),
errors.BAD_TYPE, 'integer')
self.assertEqual(self.validator.document['an_integer'], '7')

def test_disable_default(self):
self.validator.rule_filter = \
lambda f: f not in ('default', 'default_setter')
self.schema['a_string']['default'] = 'default'
self.schema['a_number']['default_setter'] = lambda d: 2
self.assertSuccess({})
self.assertNotIn('a_string', self.validator.document)
self.assertNotIn('a_number', self.validator.document)

def test_disable_rename(self):
self.validator.rule_filter = \
lambda f: f not in ('rename', 'rename_handler')
self.schema['a_string']['rename'] = 'new_string'
self.schema['a_dict']['allow_unknown'] = {'rename_handler': int}
document = {
'a_string': 'foo',
'a_dict': {'city': 'test', '2': 'foo'}
}
self.assertSuccess(document)
self.assertIn('a_string', self.validator.document)
self.assertIn('2', self.validator.document['a_dict'])

def test_still_disallow_unknown_keys(self):
self.validator.rule_filter = lambda f: False
self.assertFail({'unknown': 'unknown'})
self.assertError('unknown', (), errors.UNKNOWN_FIELD, None)

def test_still_purge_unknown_keys(self):
self.validator.rule_filter = lambda f: False
self.validator.purge_unknown = True
self.assertSuccess({'unknown': 'unknown'})
self.assertNotIn('unknown', self.validator.document)

def test_recurse_schema(self):
""" We expect to validate all subdocuments even if `rule_filter`
returns False for 'schema'. """
self.validator.rule_filter = lambda f: f == 'required'
self.assertSuccess({'a_list_of_dicts': []})
self.assertFail({'a_list_of_dicts': [{}]})
self.assertError('a_list_of_dicts', ('a_list_of_dicts', 'schema'),
errors.SEQUENCE_SCHEMA,
self.schema['a_list_of_dicts']['schema'])

def test_recurse_schema_without_failing(self):
self.validator.rule_filter = lambda f: f == 'readonly'
self.assertSuccess({'a_dict': 'wrong type'})

def test_recurse_items(self):
""" We expect to validate all subdocuments (here: items of a list)
even if `rule_filter` returns False for 'items'. """
self.validator.rule_filter = lambda f: f == 'type'
self.assertFail({'a_list_of_values': [1, 2]})
self.assertError('a_list_of_values', ('a_list_of_values', 'items'),
errors.BAD_ITEMS, [{'type': 'string'},
{'type': 'integer'}])

def test_recurse_items_without_failing(self):
""" Processing `items` would result in failure because the list
contains the wrong number of items. But because we don't process
`items` we don't expect the validation to fail. """
self.validator.rule_filter = lambda f: f == 'readonly'
self.assertSuccess({'a_list_of_values': 'wrong type'})

def test_recurse_keyschema(self):
self.validator.rule_filter = lambda f: f in ('type', 'regex')
self.assertFail({'a_dict_with_keyschema': {'AAA': 1}})
self.assertError('a_dict_with_keyschema',
('a_dict_with_keyschema', 'keyschema'),
errors.KEYSCHEMA,
self.schema['a_dict_with_keyschema']['keyschema'])

def test_recurse_keyschema_without_failing(self):
self.validator.rule_filter = lambda f: f == 'readonly'
self.assertSuccess({'a_dict_with_keyschema': 'wrong type'})

def test_recurse_valueschema(self):
self.validator.rule_filter = lambda f: f == 'type'
self.assertFail({'a_dict_with_valueschema': {'foo': 'bar'}})
self.assertError('a_dict_with_valueschema',
('a_dict_with_valueschema', 'valueschema'),
errors.VALUESCHEMA,
self.schema['a_dict_with_valueschema']['valueschema'])

def test_recurse_valueschema_without_failing(self):
self.validator.rule_filter = lambda f: f == 'readonly'
self.assertSuccess({'a_dict_with_valueschema': 'wrong type'})


if __name__ == '__main__':
# TODO get pytest.main() working before tackling
# https://github.com/nicolaiarocci/cerberus/issues/213
unittest.main()
29 changes: 29 additions & 0 deletions cerberus/tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,35 @@ def test_readonly_field_first_rule(self):
# instead.
assert 'read-only' in v.errors['a_readonly_number'][0]

def test_readonly_field_with_default_value(self):
schema = {
'created': {
'type': 'string',
'readonly': True,
'default': 'today'
}
}
self.assertSuccess({}, schema)
self.assertFail({'created': 'tomorrow'}, schema)
self.assertFail({'created': 'today'}, schema)

def test_nested_readonly_field_with_default_value(self):
schema = {
'some_field': {
'type': 'dict',
'schema': {
'created': {
'type': 'string',
'readonly': True,
'default': 'today'
}
}
}
}
self.assertSuccess({'some_field': {}}, schema)
self.assertFail({'some_field': {'created': 'tomorrow'}}, schema)
self.assertFail({'some_field': {'created': 'today'}}, schema)

def test_not_a_string(self):
self.assertBadType('a_string', 'string', 1)

Expand Down
106 changes: 88 additions & 18 deletions cerberus/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ def dummy(self, constraint, field, value):
return f


def true(*args, **kwargs):
""" Return true ignoring all arguments.

It is used as default value of :attr:`~cerberus.Validator.rule_filter`.
We don't use a lambda function because the debug output is nicer when
using a named function. """
return True


class DocumentError(Exception):
""" Raised when the target document is missing or has the wrong format """
pass
Expand Down Expand Up @@ -87,16 +96,24 @@ class and a dictionary, the latter is passed to the
:type error_handler: class or instance based on
:class:`~cerberus.errors.BaseErrorHandler` or
:class:`tuple`
:param rule_filter: See :attr:`~cerberus.Validator.rule_filter`.
Defaults to ``lambda f: True``.
:type rule_filter: :class:`function`
""" # noqa

mandatory_validations = ('nullable', )
""" Rules that are evaluated on any field, regardless whether defined in
the schema or not.
Type: :class:`tuple` """
priority_validations = ('nullable', 'readonly', 'type')
preceding_normalization_validations = ('readonly', )
""" Rules that will be processed before normalization. If any of these
fail, no further normalization or validation will be done. """
priority_validations = ('nullable', 'type')
""" Rules that will be processed in that order before any other and abort
validation of a document's field if return ``True``.
Type: :class:`tuple` """
recursing_rules = ('schema', 'items', 'keyschema', 'valueschema')
""" Rules that will create child validators to process subdocuments. """
_valid_schemas = set()
""" A :class:`set` of hashed validation schemas that are legit for a
particular ``Validator`` class. """
Expand Down Expand Up @@ -142,6 +159,7 @@ def __init__(self, *args, **kwargs):
self.__store_config(args, kwargs)
self.schema = kwargs.get('schema', None)
self.allow_unknown = kwargs.get('allow_unknown', False)
self.rule_filter = kwargs.get('rule_filter', true)

def __init_error_handler(self, kwargs):
error_handler = kwargs.pop('error_handler', errors.BasicErrorHandler)
Expand Down Expand Up @@ -409,6 +427,18 @@ def rules_set_registry(self):
def rules_set_registry(self, registry):
self._config['rules_set_registry'] = registry

@property
def rule_filter(self):
""" A function which returns ``True`` if the rule should be processed.
Rules in :attr:`~cerberus.Validator.recursing_rules` are always
processed, but will not fail on their own if the filter function
returns ``False`` for them. """
return self._config.get('rule_filter', true)

@rule_filter.setter
def rule_filter(self, rule_filter):
self._config['rule_filter'] = rule_filter

@property
def root_schema(self):
""" The :attr:`~cerberus.Validator.schema` attribute of the
Expand Down Expand Up @@ -494,8 +524,12 @@ def __normalize_mapping(self, mapping, schema):
self.__normalize_rename_fields(mapping, schema)
if self.purge_unknown:
self._normalize_purge_unknown(mapping, schema)
self.__normalize_default_fields(mapping, schema)
self._normalize_coerce(mapping, schema)
if self.rule_filter('default'):
self.__normalize_default_fields(mapping, schema)
if self.rule_filter('default_setter'):
self.__normalize_default_setter_fields(mapping, schema)
if self.rule_filter('coerce'):
self._normalize_coerce(mapping, schema)
self.__normalize_containers(mapping, schema)
return mapping

Expand Down Expand Up @@ -643,7 +677,7 @@ def __normalize_rename_fields(self, mapping, schema):

def _normalize_rename(self, mapping, schema, field):
""" {'type': 'hashable'} """
if 'rename' in schema[field]:
if self.rule_filter('rename') and 'rename' in schema[field]:
mapping[schema[field]['rename']] = mapping[field]
del mapping[field]

Expand All @@ -655,7 +689,8 @@ def _normalize_rename_handler(self, mapping, schema, field):
{'type': 'string'}]}},
{'type': 'string'}
]} """
if 'rename_handler' not in schema[field]:
if not self.rule_filter('rename_handler') or \
'rename_handler' not in schema[field]:
return
new_name = self.__normalize_coerce(
schema[field]['rename_handler'], field, field,
Expand All @@ -674,6 +709,9 @@ def __normalize_default_fields(self, mapping, schema):
for field in fields_with_default:
self._normalize_default(mapping, schema, field)

def __normalize_default_setter_fields(self, mapping, schema):
fields = [x for x in schema if x not in mapping or
mapping[x] is None and not schema[x].get('nullable', False)]
known_fields_states = set()
fields = [x for x in fields if 'default_setter' in schema[x]]
while fields:
Expand Down Expand Up @@ -735,9 +773,41 @@ class instantiation.
self._unrequired_by_excludes = set()

self.__init_processing(document, schema)

if normalize:
self.__normalize_mapping(self.document, self.schema)
self.__process_rules_preceding_normalization()
if not bool(self._errors):
self.__normalize_mapping(self.document, self.schema)
if not bool(self._errors):
self.__process_rules_following_normalization()
else:
self.__process_all_rules()

self.error_handler.end(self)

return not bool(self._errors)

__call__ = validate

def __process_rules_preceding_normalization(self):
rule_filter = lambda f: f in self.preceding_normalization_validations \
and self.rule_filter(f)
validator = self._get_child_validator(rule_filter=rule_filter,
allow_unknown=True)
if not validator(self.document, self.schema, normalize=False,
update=self.update):
self._error(validator._errors)

def __process_rules_following_normalization(self):
rule_filter = \
lambda f: f not in self.preceding_normalization_validations and \
self.rule_filter(f)
validator = self._get_child_validator(rule_filter=rule_filter)
if not validator(self.document, self.schema, normalize=False,
update=self.update):
self._error(validator._errors)

def __process_all_rules(self):
for field in self.document:
if self.ignore_none_values and self.document[field] is None:
continue
Expand All @@ -746,16 +816,9 @@ class instantiation.
self.__validate_definitions(definitions, field)
else:
self.__validate_unknown_fields(field)

if not self.update:
if not self.update and self.rule_filter('required'):
self.__validate_required_fields(self.document)

self.error_handler.end(self)

return not bool(self._errors)

__call__ = validate

def validated(self, *args, **kwargs):
""" Wrapper around :func:`validate` that returns the normalized and
validated document or :obj:`None` if validation failed. """
Expand Down Expand Up @@ -800,15 +863,22 @@ def validate_rule(rule):
prior_rules = tuple((x for x in self.priority_validations
if x in definitions or
x in self.mandatory_validations))
for rule in prior_rules:
for rule in filter(self.rule_filter, prior_rules):
if validate_rule(rule):
return

rules = set(definitions)
rules |= set(self.mandatory_validations)
rules -= set(prior_rules + ('allow_unknown', 'required'))
rules -= set(self.normalization_rules)
for rule in rules:
rules -= set(self.recursing_rules)
for rule in filter(self.rule_filter, rules):
try:
validate_rule(rule)
except _SchemaRuleTypeError:
break

for rule in (x for x in self.recursing_rules if x in definitions):
try:
validate_rule(rule)
except _SchemaRuleTypeError:
Expand Down Expand Up @@ -917,9 +987,9 @@ def _validate_forbidden(self, forbidden_values, field, value):

def _validate_items(self, items, field, values):
""" {'type': 'list', 'validator': 'items'} """
if len(items) != len(values):
if self.rule_filter('items') and len(items) != len(values):
self._error(field, errors.ITEMS_LENGTH, len(items), len(values))
else:
elif isinstance(values, list):
schema = dict((i, definition) for i, definition in enumerate(items)) # noqa
validator = self._get_child_validator(document_crumb=field,
schema_crumb=(field, 'items'), # noqa
Expand Down
Loading