Skip to content

Commit

Permalink
parse streaming JSON from HTTP Response Bodies when passed a block
Browse files Browse the repository at this point in the history
  • Loading branch information
ilackarms committed Sep 19, 2017
1 parent a676edf commit b7f8758
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 24 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,22 @@ pods = client.get_pods as: :raw
node = client.get_node "127.0.0.1", as: :raw
```

### Yielding lists entity-by-entity
By passing a block to any `get_*` method that returns a list (e.g.: `get_pods`, `get_nodes`, `get_namespaces`), entities will be yielded one-by-one to the passed block rather than returned as an array. This permits handling entities in large lists asynchronously while parsing takes place.

For example:
```ruby
client.get_services do |service, resource_version|
puts service
puts resource_version
end

#<Kubeclient::Service metadata={:name=>"kubernetes", :namespace=>"default", :selfLink=>"/api/v1/services/kubernetes?namespace=default", :uid=>"016e9dcd-ce39-11e4-ac24-3c970e4a436a", :resourceVersion=>"6", :creationTimestamp=>"2015-03-19T15:08:16+02:00", :labels=>{:component=>"apiserver", :provider=>"kubernetes"}}, spec={:port=>443, :protocol=>"TCP", :selector=>nil, :clusterIP=>"10.0.0.2", :containerPort=>0, :sessionAffinity=>"None"}, status={}>
59
#<Kubeclient::Service metadata={:name=>"kubernetes-ro", :namespace=>"default", :selfLink=>"/api/v1/services/kubernetes-ro?namespace=default", :uid=>"015b78bf-ce39-11e4-ac24-3c970e4a436a", :resourceVersion=>"5", :creationTimestamp=>"2015-03-19T15:08:15+02:00", :labels=>{:component=>"apiserver", :provider=>"kubernetes"}}, spec={:port=>80, :protocol=>"TCP", :selector=>nil, :clusterIP=>"10.0.0.1", :containerPort=>0, :sessionAffinity=>"None"}, status={}>
59
```

#### Delete an entity (by name)

For example: `delete_pod "pod name"` , `delete_replication_controller "rc name"`, `delete_node "node name"`, `delete_secret "secret name"`
Expand Down
1 change: 1 addition & 0 deletions kubeclient.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ Gem::Specification.new do |spec|
spec.add_dependency 'rest-client'
spec.add_dependency 'recursive-open-struct', '~> 1.0.4'
spec.add_dependency 'http', '~> 2.2.2'
spec.add_dependency 'json-streamer'
end
105 changes: 86 additions & 19 deletions lib/kubeclient/common.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
require 'json'
require 'rest-client'
require 'json/streamer'

module Kubeclient
# Common methods
# this is mixed in by other gems
Expand Down Expand Up @@ -113,9 +115,7 @@ def handle_exception
rescue JSON::ParserError
{}
end
err_message = json_error_msg['message'] || e.message
error_klass = e.http_code == 404 ? ResourceNotFoundError : HttpError
raise error_klass.new(e.http_code, err_message, e.response)
raise_http_err(json_error_msg, e)
end

def discover
Expand Down Expand Up @@ -176,8 +176,8 @@ def define_entity_methods
@entities.values.each do |entity|
klass = ClientMixin.resource_class(@class_owner, entity.entity_type)
# get all entities of a type e.g. get_nodes, get_pods, etc.
define_singleton_method("get_#{entity.method_names[1]}") do |options = {}|
get_entities(entity.entity_type, klass, entity.resource_name, options)
define_singleton_method("get_#{entity.method_names[1]}") do |options = {}, &block|
get_entities(entity.entity_type, klass, entity.resource_name, options, &block)
end

# watch all entities of a type e.g. watch_nodes, watch_pods, etc.
Expand Down Expand Up @@ -218,19 +218,7 @@ def self.underscore_entity(entity_name)

def create_rest_client(path = nil)
path ||= @api_endpoint.path
options = {
ssl_ca_file: @ssl_options[:ca_file],
ssl_cert_store: @ssl_options[:cert_store],
verify_ssl: @ssl_options[:verify_ssl],
ssl_client_cert: @ssl_options[:client_cert],
ssl_client_key: @ssl_options[:client_key],
proxy: @http_proxy_uri,
user: @auth_options[:username],
password: @auth_options[:password],
open_timeout: @timeouts[:open],
ClientMixin.restclient_read_timeout_option => @timeouts[:read]
}
RestClient::Resource.new(@api_endpoint.merge(path).to_s, options)
RestClient::Resource.new(@api_endpoint.merge(path).to_s, rest_client_options)
end

def rest_client
Expand Down Expand Up @@ -267,7 +255,8 @@ def watch_entities(resource_name, options = {})
#
# Default response type will return a collection RecursiveOpenStruct
# (:ros) objects, unless `:as` is passed with `:raw`.
def get_entities(entity_type, klass, resource_name, options = {})
def get_entities(entity_type, klass, resource_name, options = {}, &block)
return get_entities_async(klass, resource_name, options, &block) if block_given?
params = {}
SEARCH_ARGUMENTS.each { |k, v| params[k] = options[v] if options[v] }

Expand Down Expand Up @@ -455,6 +444,78 @@ def self.restclient_read_timeout_option

private

def do_rest_request(args)
uri = @api_endpoint.merge("#{@api_endpoint.path}/#{@api_version}/#{args[:path]}")
options = {
url: uri.to_s,
method: args[:method],
headers: (args[:headers] || {}).merge('params' => args[:params])
}.merge(rest_client_options)
options[:block_response] = args[:block] if args[:block]
resp = RestClient::Request.execute(options)
return unless resp.instance_of?(Net::HTTPClientError)
code = resp.code
code = code.to_i if code.instance_of?(String)
raise HttpError.new(code, resp.message, resp)
end

def rest_client_options
{
ssl_ca_file: @ssl_options[:ca_file],
ssl_cert_store: @ssl_options[:cert_store],
verify_ssl: @ssl_options[:verify_ssl],
ssl_client_cert: @ssl_options[:client_cert],
ssl_client_key: @ssl_options[:client_key],
proxy: @http_proxy_uri,
user: @auth_options[:username],
password: @auth_options[:password],
open_timeout: @timeouts[:open],
ClientMixin.restclient_read_timeout_option => @timeouts[:read]
}
end

def get_entities_async(klass, resource_name, options = {})
if options[:as] == :raw
raise ArgumentError('Cannot pass block to get_entities when requesting options[:as] == raw')
end

params = {}
SEARCH_ARGUMENTS.each { |k, v| params[k] = options[v] if options[v] }

ns_prefix = build_namespace_prefix(options[:namespace])

resource_version = nil
entity_streamer = Json::Streamer::JsonStreamer.new
resource_version_streamer = Json::Streamer::JsonStreamer.new

resource_version_streamer.get(key: 'resourceVersion') do |value|
resource_version = value.to_i unless resource_version
end

# result['items'] might be nil due to https://github.com/kubernetes/kubernetes/issues/13096
entity_streamer.get(nesting_level: 2, yield_values: false) do |object|
entity = new_entity(object, klass)
yield(entity, resource_version)
end

block = proc do |response|
if response.instance_of?(Net::HTTPNotFound)
raise ResourceNotFoundError.new(404,
'Not Found',
response)
end
response.read_body do |chunk|
resource_version_streamer.parser << chunk
entity_streamer.parser << chunk
end
end

handle_exception do
do_rest_request(path: ns_prefix + resource_name,
block: block, method: :get, headers: @headers, params: params)
end
end

def load_entities
@entities = {}
fetch_entities['resources'].each do |resource|
Expand Down Expand Up @@ -520,5 +581,11 @@ def http_options(uri)

options.merge(@socket_options)
end

def raise_http_err(json_error_msg, e)
err_message = json_error_msg['message'] || e.message
error_klass = e.http_code == 404 ? ResourceNotFoundError : HttpError
raise error_klass.new((e.http_code || 404), err_message, e.response)
end
end
end
68 changes: 63 additions & 5 deletions test/test_kubeclient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,59 @@ def test_nonjson_exception_raw
assert_equal(404, exception.error_code)
end

def test_entity_list
def test_entity_list_with_block
stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'), status: 200)
stub_request(:get, %r{/services})
.to_return(body: open_test_file('entity_list.json'), status: 200)

client = Kubeclient::Client.new('http://localhost:8080/api/', 'v1')
services = []
resource_version = nil
client.get_services do |service, rv|
services << service
resource_version ||= rv
end

refute_empty(services)
assert_equal(2, services.size)
assert_equal(59, resource_version)
assert_instance_of(Kubeclient::Service, services[0])
assert_instance_of(Kubeclient::Service, services[1])

assert_requested(:get, 'http://localhost:8080/api/v1/services', times: 1)
end

def test_entity_list_with_block_404
stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'), status: 200)
stub_request(:get, %r{/services})
.to_return(status: 404)

client = Kubeclient::Client.new('http://localhost:8080/api/', 'v1')

exception = assert_raises(Kubeclient::ResourceNotFoundError) do
client.get_services {}
end
assert(exception.message.include?('Not Found'))
assert_equal(404, exception.error_code)
end

def test_entity_list_with_block_http_err
stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'), status: 200)
stub_request(:get, %r{/services})
.to_return(body: '{"err": "I\'m a teapot!"}', status: 418)

client = Kubeclient::Client.new('http://localhost:8080/api/', 'v1')

exception = assert_raises(Kubeclient::HttpError) do
client.get_services {}
end
assert_equal(418, exception.error_code)
end

def test_entity_list_without_block
stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'), status: 200)
stub_request(:get, %r{/services})
Expand Down Expand Up @@ -573,10 +625,13 @@ def test_api_basic_auth_back_comp_success
auth_options: { user: 'username', password: 'password' }
)

pods = client.get_pods
pods = []
client.get_pods do |pod|
pods << pod
end

assert_equal('Pod', pods.kind)
assert_equal(1, pods.size)
assert_equal('Kubeclient::Pod', pods[0].class.to_s)
assert_requested(:get, 'http://localhost:8080/api/v1/pods', times: 1)
end

Expand Down Expand Up @@ -704,10 +759,13 @@ def test_api_bearer_token_file_success
auth_options: { bearer_token_file: file }
)

pods = client.get_pods
pods = []
client.get_pods do |pod|
pods << pod
end

assert_equal('Pod', pods.kind)
assert_equal(1, pods.size)
assert_equal('Kubeclient::Pod', pods[0].class.to_s)
end

def test_proxy_url
Expand Down

0 comments on commit b7f8758

Please sign in to comment.