diff --git a/lang/ruby/Manifest b/lang/ruby/Manifest index 87bfd98aa51..9fc48c2a3b7 100644 --- a/lang/ruby/Manifest +++ b/lang/ruby/Manifest @@ -9,6 +9,7 @@ lib/avro.rb lib/avro/data_file.rb lib/avro/io.rb lib/avro/ipc.rb +lib/avro/logical_types.rb lib/avro/protocol.rb lib/avro/schema.rb lib/avro/schema_compatibility.rb @@ -24,6 +25,7 @@ test/test_datafile.rb test/test_fingerprints.rb test/test_help.rb test/test_io.rb +test/test_logical_types.rb test/test_protocol.rb test/test_schema.rb test/test_schema_compatibility.rb diff --git a/lang/ruby/lib/avro/io.rb b/lang/ruby/lib/avro/io.rb index b04a19a7884..26bda973a17 100644 --- a/lang/ruby/lib/avro/io.rb +++ b/lang/ruby/lib/avro/io.rb @@ -254,7 +254,7 @@ def read_data(writers_schema, readers_schema, decoder) # function dispatch for reading data based on type of writer's # schema - case writers_schema.type_sym + datum = case writers_schema.type_sym when :null; decoder.read_null when :boolean; decoder.read_boolean when :string; decoder.read_string @@ -272,6 +272,8 @@ def read_data(writers_schema, readers_schema, decoder) else raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}" end + + readers_schema.type_adapter.decode(datum) end def read_fixed(writers_schema, readers_schema, decoder) @@ -499,8 +501,10 @@ def write(datum, encoder) write_data(writers_schema, datum, encoder) end - def write_data(writers_schema, datum, encoder) - unless Schema.validate(writers_schema, datum) + def write_data(writers_schema, logical_datum, encoder) + datum = writers_schema.type_adapter.encode(logical_datum) + + unless Schema.validate(writers_schema, datum, encoded = true) raise AvroTypeError.new(writers_schema, datum) end diff --git a/lang/ruby/lib/avro/logical_types.rb b/lang/ruby/lib/avro/logical_types.rb new file mode 100644 index 00000000000..e1b219d7241 --- /dev/null +++ b/lang/ruby/lib/avro/logical_types.rb @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'date' + +module Avro + module LogicalTypes + module IntDate + EPOCH_START = Date.new(1970, 1, 1) + + def self.encode(date) + return date.to_i if date.is_a?(Numeric) + + (date - EPOCH_START).to_i + end + + def self.decode(int) + EPOCH_START + int + end + end + + module TimestampMillis + def self.encode(value) + return value.to_i if value.is_a?(Numeric) + + time = value.to_time + time.to_i * 1000 + time.usec / 1000 + end + + def self.decode(int) + s, ms = int / 1000, int % 1000 + Time.at(s, ms * 1000).utc + end + end + + module TimestampMicros + def self.encode(value) + return value.to_i if value.is_a?(Numeric) + + time = value.to_time + time.to_i * 1000_000 + time.usec + end + + def self.decode(int) + s, us = int / 1000_000, int % 1000_000 + Time.at(s, us).utc + end + end + + module Identity + def self.encode(datum) + datum + end + + def self.decode(datum) + datum + end + end + + TYPES = { + "int" => { + "date" => IntDate + }, + "long" => { + "timestamp-millis" => TimestampMillis, + "timestamp-micros" => TimestampMicros + }, + }.freeze + + def self.type_adapter(type, logical_type) + return unless logical_type + + TYPES.fetch(type, {}.freeze).fetch(logical_type, Identity) + end + end +end diff --git a/lang/ruby/lib/avro/schema.rb b/lang/ruby/lib/avro/schema.rb index 024d5623067..3acd07b7f5a 100644 --- a/lang/ruby/lib/avro/schema.rb +++ b/lang/ruby/lib/avro/schema.rb @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +require 'avro/logical_types' + module Avro class Schema # Sets of strings, for backwards compatibility. See below for sets of symbols, @@ -40,6 +42,7 @@ def self.parse(json_string) def self.real_parse(json_obj, names=nil, default_namespace=nil) if json_obj.is_a? Hash type = json_obj['type'] + logical_type = json_obj['logicalType'] raise SchemaParseError, %Q(No "type" property: #{json_obj}) if type.nil? # Check that the type is valid before calling #to_sym, since symbols are never garbage @@ -50,7 +53,7 @@ def self.real_parse(json_obj, names=nil, default_namespace=nil) type_sym = type.to_sym if PRIMITIVE_TYPES_SYM.include?(type_sym) - return PrimitiveSchema.new(type_sym) + return PrimitiveSchema.new(type_sym, logical_type) elsif NAMED_TYPES_SYM.include? type_sym name = json_obj['name'] @@ -58,7 +61,7 @@ def self.real_parse(json_obj, names=nil, default_namespace=nil) case type_sym when :fixed size = json_obj['size'] - return FixedSchema.new(name, namespace, size, names) + return FixedSchema.new(name, namespace, size, names, logical_type) when :enum symbols = json_obj['symbols'] doc = json_obj['doc'] @@ -93,23 +96,29 @@ def self.real_parse(json_obj, names=nil, default_namespace=nil) end # Determine if a ruby datum is an instance of a schema - def self.validate(expected_schema, datum) - SchemaValidator.validate!(expected_schema, datum) + def self.validate(expected_schema, logical_datum, encoded = false) + SchemaValidator.validate!(expected_schema, logical_datum, encoded) true rescue SchemaValidator::ValidationError false end - def initialize(type) + def initialize(type, logical_type=nil) @type_sym = type.is_a?(Symbol) ? type : type.to_sym + @logical_type = logical_type end attr_reader :type_sym + attr_reader :logical_type # Returns the type as a string (rather than a symbol), for backwards compatibility. # Deprecated in favor of {#type_sym}. def type; @type_sym.to_s; end + def type_adapter + @type_adapter ||= LogicalTypes.type_adapter(type, logical_type) || LogicalTypes::Identity + end + # Returns the MD5 fingerprint of the schema as an Integer. def md5_fingerprint parsing_form = SchemaNormalization.to_parsing_form(self) @@ -157,7 +166,9 @@ def subparse(json_obj, names=nil, namespace=nil) end def to_avro(names=nil) - {'type' => type} + props = {'type' => type} + props['logicalType'] = logical_type if logical_type + props end def to_s @@ -166,8 +177,9 @@ def to_s class NamedSchema < Schema attr_reader :name, :namespace - def initialize(type, name, namespace=nil, names=nil, doc=nil) - super(type) + + def initialize(type, name, namespace=nil, names=nil, doc=nil, logical_type=nil) + super(type, logical_type) @name, @namespace = Name.extract_namespace(name, namespace) @doc = doc names = Name.add_name(names, self) @@ -318,11 +330,11 @@ def to_avro(names=Set.new) # Valid primitive types are in PRIMITIVE_TYPES. class PrimitiveSchema < Schema - def initialize(type) + def initialize(type, logical_type=nil) if PRIMITIVE_TYPES_SYM.include?(type) - super(type) + super(type, logical_type) elsif PRIMITIVE_TYPES.include?(type) - super(type.to_sym) + super(type.to_sym, logical_type) else raise AvroError.new("#{type} is not a valid primitive type.") end @@ -336,12 +348,12 @@ def to_avro(names=nil) class FixedSchema < NamedSchema attr_reader :size - def initialize(name, space, size, names=nil) + def initialize(name, space, size, names=nil, logical_type=nil) # Ensure valid cto args unless size.is_a?(Integer) raise AvroError, 'Fixed Schema requires a valid integer for size property.' end - super(:fixed, name, space, names) + super(:fixed, name, space, names, logical_type) @size = size end diff --git a/lang/ruby/lib/avro/schema_validator.rb b/lang/ruby/lib/avro/schema_validator.rb index 89b0a9c1e27..67464fbfe6c 100644 --- a/lang/ruby/lib/avro/schema_validator.rb +++ b/lang/ruby/lib/avro/schema_validator.rb @@ -62,16 +62,22 @@ def to_s TypeMismatchError = Class.new(ValidationError) class << self - def validate!(expected_schema, datum) + def validate!(expected_schema, logical_datum, encoded = false) result = Result.new - validate_recursive(expected_schema, datum, ROOT_IDENTIFIER, result) + validate_recursive(expected_schema, logical_datum, ROOT_IDENTIFIER, result, encoded) fail ValidationError, result if result.failure? result end private - def validate_recursive(expected_schema, datum, path, result) + def validate_recursive(expected_schema, logical_datum, path, result, encoded = false) + datum = if encoded + logical_datum + else + expected_schema.type_adapter.encode(logical_datum) rescue nil + end + case expected_schema.type_sym when :null fail TypeMismatchError unless datum.nil? diff --git a/lang/ruby/test/random_data.rb b/lang/ruby/test/random_data.rb index 9d276f7d2e5..54fa8781d6e 100644 --- a/lang/ruby/test/random_data.rb +++ b/lang/ruby/test/random_data.rb @@ -27,15 +27,17 @@ def next end def nextdata(schm, d=0) + return logical_nextdata(schm, d=0) unless schm.type_adapter.eql?(Avro::LogicalTypes::Identity) + case schm.type_sym when :boolean rand > 0.5 when :string randstr() when :int - rand(Avro::Schema::INT_MAX_VALUE - Avro::Schema::INT_MIN_VALUE) + Avro::Schema::INT_MIN_VALUE + rand_int when :long - rand(Avro::Schema::LONG_MAX_VALUE - Avro::Schema::LONG_MIN_VALUE) + Avro::Schema::LONG_MIN_VALUE + rand_long when :float (-1024 + 2048 * rand).round.to_f when :double @@ -79,6 +81,15 @@ def nextdata(schm, d=0) end end + def logical_nextdata(schm, _d=0) + case schm.logical_type + when 'date' + Avro::LogicalTypes::IntDate.decode(rand_int) + when 'timestamp-millis', 'timestamp-micros' + Avro::LogicalTypes::TimestampMicros.decode(rand_long) + end + end + CHARPOOL = 'abcdefghjkmnpqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ23456789' BYTEPOOL = '12345abcd' @@ -87,4 +98,12 @@ def randstr(chars=CHARPOOL, length=20) rand(length+1).times { str << chars[rand(chars.size)] } str end + + def rand_int + rand(Avro::Schema::INT_MAX_VALUE - Avro::Schema::INT_MIN_VALUE) + Avro::Schema::INT_MIN_VALUE + end + + def rand_long + rand(Avro::Schema::LONG_MAX_VALUE - Avro::Schema::LONG_MIN_VALUE) + Avro::Schema::LONG_MIN_VALUE + end end diff --git a/lang/ruby/test/test_io.rb b/lang/ruby/test/test_io.rb index fc0088b4184..70bb4d60cef 100644 --- a/lang/ruby/test/test_io.rb +++ b/lang/ruby/test/test_io.rb @@ -84,6 +84,17 @@ def test_record check_default(record_schema, '{"f": 11}', {"f" => 11}) end + def test_record_with_logical_type + record_schema = < 'record', 'name' => 'has_logical', + 'fields' => [ + {'name' => 'dt', 'type' => {'type' => 'int', 'logicalType' => 'date'}} + ] + } + end + def test_unknown_named_type error = assert_raise Avro::UnknownSchemaError do Avro::Schema.parse <<-SCHEMA