Skip to content

Commit

Permalink
parse json from streaming http in get_entities
Browse files Browse the repository at this point in the history
  • Loading branch information
ilackarms committed Aug 22, 2017
1 parent 1e7e4f4 commit 4d94845
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 19 deletions.
1 change: 1 addition & 0 deletions kubeclient.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ Gem::Specification.new do |spec|
spec.add_dependency 'rest-client'
spec.add_dependency 'recursive-open-struct', '~> 1.0.4'
spec.add_dependency 'http', '~> 2.2.2'
spec.add_dependency 'json-streamer'
end
79 changes: 65 additions & 14 deletions lib/kubeclient/common.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
require 'json'
require 'rest-client'
require 'json/streamer'

module Kubeclient
# Common methods
# this is mixed in by other gems
Expand Down Expand Up @@ -113,9 +115,7 @@ def handle_exception
rescue JSON::ParserError
{}
end
err_message = json_error_msg['message'] || e.message
error_klass = e.http_code == 404 ? ResourceNotFoundError : HttpError
raise error_klass.new(e.http_code, err_message, e.response)
http_err(json_error_msg, e)
end

def discover
Expand Down Expand Up @@ -176,8 +176,8 @@ def define_entity_methods
@entities.values.each do |entity|
klass = ClientMixin.resource_class(@class_owner, entity.entity_type)
# get all entities of a type e.g. get_nodes, get_pods, etc.
define_singleton_method("get_#{entity.method_names[1]}") do |options = {}|
get_entities(entity.entity_type, klass, entity.resource_name, options)
define_singleton_method("get_#{entity.method_names[1]}") do |options = {}, &block|
get_entities(entity.entity_type, klass, entity.resource_name, options, &block)
end

# watch all entities of a type e.g. watch_nodes, watch_pods, etc.
Expand Down Expand Up @@ -233,6 +233,27 @@ def create_rest_client(path = nil)
RestClient::Resource.new(@api_endpoint.merge(path).to_s, options)
end

def do_rest_request(args)
uri = @api_endpoint.merge("#{@api_endpoint.path}/#{@api_version}/#{args[:path]}")
options = {
url: uri.to_s,
method: args[:method],
headers: (args[:headers] || {}).merge('params' => args[:params]),
ssl_ca_file: @ssl_options[:ca_file],
ssl_cert_store: @ssl_options[:cert_store],
verify_ssl: @ssl_options[:verify_ssl],
ssl_client_cert: @ssl_options[:client_cert],
ssl_client_key: @ssl_options[:client_key],
proxy: @http_proxy_uri,
user: @auth_options[:username],
password: @auth_options[:password],
open_timeout: @timeouts[:open],
ClientMixin.restclient_read_timeout_option => @timeouts[:read]
}
options[:block_response] = args[:block] if args[:block]
RestClient::Request.execute(options)
end

def rest_client
@rest_client ||= begin
create_rest_client("#{@api_endpoint.path}/#{@api_version}")
Expand Down Expand Up @@ -268,21 +289,45 @@ def get_entities(entity_type, klass, resource_name, options = {})
SEARCH_ARGUMENTS.each { |k, v| params[k] = options[v] if options[v] }

ns_prefix = build_namespace_prefix(options[:namespace])
response = handle_exception do
rest_client[ns_prefix + resource_name]
.get({ 'params' => params }.merge(@headers))

resource_version = nil
entity_streamer = Json::Streamer::JsonStreamer.new
resource_version_streamer = Json::Streamer::JsonStreamer.new

resource_version_streamer.get(key: 'resourceVersion') do |value|
resource_version = value unless resource_version
end

result = JSON.parse(response)
entities = []
# result['items'] might be nil due to https://github.com/kubernetes/kubernetes/issues/13096
entity_streamer.get(nesting_level: 2, yield_values: false) do |object|
entity = new_entity(object, klass)
if block_given?
yield(entity, resource_version)
else
entities << entity
end
end

resource_version =
result.fetch('resourceVersion') do
result.fetch('metadata', {}).fetch('resourceVersion', nil)
block = proc do |response|
if response.instance_of?(Net::HTTPNotFound)
raise ResourceNotFoundError.new(404,
'Not Found',
response)
end
response.read_body do |chunk|
entity_streamer.parser << chunk
end
end

# result['items'] might be nil due to https://github.com/kubernetes/kubernetes/issues/13096
collection = result['items'].to_a.map { |item| new_entity(item, klass) }
handle_exception do
do_rest_request(path: ns_prefix + resource_name,
block: block, method: :get, headers: @headers, params: params)
end

return if block_given?

collection = entities.to_a
Kubeclient::Common::EntityList.new(entity_type, resource_version, collection)
end

Expand Down Expand Up @@ -508,5 +553,11 @@ def http_options(uri)

options.merge(@socket_options)
end

def http_err(json_error_msg, e)
err_message = json_error_msg['message'] || e.message
error_klass = e.http_code == 404 ? ResourceNotFoundError : HttpError
raise error_klass.new((e.http_code || 404), err_message, e.response)
end
end
end
36 changes: 31 additions & 5 deletions test/test_kubeclient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,27 @@ def test_nonjson_exception
assert_equal(404, exception.error_code)
end

def test_entity_list
def test_entity_list_with_block
stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'), status: 200)
stub_request(:get, %r{/services})
.to_return(body: open_test_file('entity_list.json'), status: 200)

client = Kubeclient::Client.new('http://localhost:8080/api/', 'v1')
services = []
client.get_services do |service|
services << service
end

refute_empty(services)
assert_equal(2, services.size)
assert_instance_of(Kubeclient::Service, services[0])
assert_instance_of(Kubeclient::Service, services[1])

assert_requested(:get, 'http://localhost:8080/api/v1/services', times: 1)
end

def test_entity_list_without_block
stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'), status: 200)
stub_request(:get, %r{/services})
Expand Down Expand Up @@ -440,10 +460,13 @@ def test_api_basic_auth_back_comp_success
auth_options: { user: 'username', password: 'password' }
)

pods = client.get_pods
pods = []
client.get_pods do |pod|
pods << pod
end

assert_equal('Pod', pods.kind)
assert_equal(1, pods.size)
assert_equal('Kubeclient::Pod', pods[0].class.to_s)
assert_requested(:get, 'http://localhost:8080/api/v1/pods', times: 1)
end

Expand Down Expand Up @@ -550,10 +573,13 @@ def test_api_bearer_token_file_success
auth_options: { bearer_token_file: file }
)

pods = client.get_pods
pods = []
client.get_pods do |pod|
pods << pod
end

assert_equal('Pod', pods.kind)
assert_equal(1, pods.size)
assert_equal('Kubeclient::Pod', pods[0].class.to_s)
end

def test_proxy_url
Expand Down

0 comments on commit 4d94845

Please sign in to comment.