Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process JSON Body of ListObjects using streaming json parser #254

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,22 @@ pods = client.get_pods as: :raw
node = client.get_node "127.0.0.1", as: :raw
```

### Yielding lists entity-by-entity
By passing a block to any `get_*` method that returns a list (e.g.: `get_pods`, `get_nodes`, `get_namespaces`), entities will be yielded one-by-one to the passed block rather than returned as an array. This permits handling entities in large lists asynchronously while parsing takes place.

For example:
```ruby
client.get_services do |service, resource_version|
puts service
puts resource_version
end

#<Kubeclient::Service metadata={:name=>"kubernetes", :namespace=>"default", :selfLink=>"/api/v1/services/kubernetes?namespace=default", :uid=>"016e9dcd-ce39-11e4-ac24-3c970e4a436a", :resourceVersion=>"6", :creationTimestamp=>"2015-03-19T15:08:16+02:00", :labels=>{:component=>"apiserver", :provider=>"kubernetes"}}, spec={:port=>443, :protocol=>"TCP", :selector=>nil, :clusterIP=>"10.0.0.2", :containerPort=>0, :sessionAffinity=>"None"}, status={}>
59
#<Kubeclient::Service metadata={:name=>"kubernetes-ro", :namespace=>"default", :selfLink=>"/api/v1/services/kubernetes-ro?namespace=default", :uid=>"015b78bf-ce39-11e4-ac24-3c970e4a436a", :resourceVersion=>"5", :creationTimestamp=>"2015-03-19T15:08:15+02:00", :labels=>{:component=>"apiserver", :provider=>"kubernetes"}}, spec={:port=>80, :protocol=>"TCP", :selector=>nil, :clusterIP=>"10.0.0.1", :containerPort=>0, :sessionAffinity=>"None"}, status={}>
59
```

#### Delete an entity (by name)

For example: `delete_pod "pod name"` , `delete_replication_controller "rc name"`, `delete_node "node name"`, `delete_secret "secret name"`
Expand Down
1 change: 1 addition & 0 deletions kubeclient.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ Gem::Specification.new do |spec|
spec.add_dependency 'rest-client', '~> 2.0'
spec.add_dependency 'recursive-open-struct', '~> 1.0.4'
spec.add_dependency 'http', '~> 2.2.2'
spec.add_dependency 'json-streamer', '~> 2.0.0'
end
123 changes: 104 additions & 19 deletions lib/kubeclient/common.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
require 'json'
require 'rest-client'
require 'json/streamer'

module Kubeclient
# Common methods
# this is mixed in by other gems
Expand Down Expand Up @@ -113,9 +115,7 @@ def handle_exception
rescue JSON::ParserError
{}
end
err_message = json_error_msg['message'] || e.message
error_klass = e.http_code == 404 ? ResourceNotFoundError : HttpError
raise error_klass.new(e.http_code, err_message, e.response)
raise_http_err(json_error_msg, e)
end

def discover
Expand Down Expand Up @@ -176,8 +176,8 @@ def define_entity_methods
@entities.values.each do |entity|
klass = ClientMixin.resource_class(@class_owner, entity.entity_type)
# get all entities of a type e.g. get_nodes, get_pods, etc.
define_singleton_method("get_#{entity.method_names[1]}") do |options = {}|
get_entities(entity.entity_type, klass, entity.resource_name, options)
define_singleton_method("get_#{entity.method_names[1]}") do |options = {}, &block|
get_entities(entity.entity_type, klass, entity.resource_name, options, &block)
end

# watch all entities of a type e.g. watch_nodes, watch_pods, etc.
Expand Down Expand Up @@ -218,19 +218,7 @@ def self.underscore_entity(entity_name)

def create_rest_client(path = nil)
path ||= @api_endpoint.path
options = {
ssl_ca_file: @ssl_options[:ca_file],
ssl_cert_store: @ssl_options[:cert_store],
verify_ssl: @ssl_options[:verify_ssl],
ssl_client_cert: @ssl_options[:client_cert],
ssl_client_key: @ssl_options[:client_key],
proxy: @http_proxy_uri,
user: @auth_options[:username],
password: @auth_options[:password],
open_timeout: @timeouts[:open],
read_timeout: @timeouts[:read]
}
RestClient::Resource.new(@api_endpoint.merge(path).to_s, options)
RestClient::Resource.new(@api_endpoint.merge(path).to_s, rest_client_options)
end

def rest_client
Expand Down Expand Up @@ -267,7 +255,8 @@ def watch_entities(resource_name, options = {})
#
# Default response type will return a collection RecursiveOpenStruct
# (:ros) objects, unless `:as` is passed with `:raw`.
def get_entities(entity_type, klass, resource_name, options = {})
def get_entities(entity_type, klass, resource_name, options = {}, &block)
return get_entities_async(klass, resource_name, options, &block) if block_given?
params = {}
SEARCH_ARGUMENTS.each { |k, v| params[k] = options[v] if options[v] }

Expand Down Expand Up @@ -443,6 +432,96 @@ def api

private

def do_rest_request(args)
uri = @api_endpoint.merge("#{@api_endpoint.path}/#{@api_version}/#{args[:path]}")
options = {
url: uri.to_s,
method: args[:method],
headers: (args[:headers] || {}).merge('params' => args[:params])
}.merge(rest_client_options)
options[:block_response] = args[:block] if args[:block]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: To avoid confusion let's call it block_response everywhere.

Because Request.execute supports both regular ruby block and a :block_response param, with different semantics (regular block gets RestClient::Response after redirect/error processing/decoding).
https://github.com/rest-client/rest-client/blob/v2.0.2/lib/restclient/request.rb#L718-L726

resp = RestClient::Request.execute(options)
return unless resp.instance_of?(Net::HTTPClientError)
code = resp.code
code = code.to_i if code.instance_of?(String)
raise HttpError.new(code, resp.message, resp)
end

def rest_client_options
{
ssl_ca_file: @ssl_options[:ca_file],
ssl_cert_store: @ssl_options[:cert_store],
verify_ssl: @ssl_options[:verify_ssl],
ssl_client_cert: @ssl_options[:client_cert],
ssl_client_key: @ssl_options[:client_key],
proxy: @http_proxy_uri,
user: @auth_options[:username],
password: @auth_options[:password],
open_timeout: @timeouts[:open],
read_timeout: @timeouts[:read]
}
end

def get_entities_async(klass, resource_name, options = {})
if options[:as] == :raw
raise ArgumentError('Cannot pass block to get_entities when requesting options[:as] == raw')
end

params = {}
SEARCH_ARGUMENTS.each { |k, v| params[k] = options[v] if options[v] }

ns_prefix = build_namespace_prefix(options[:namespace])

conditions = Json::Streamer::Conditions.new
conditions.yield_object = lambda do |aggregator:, object:|
aggregator.level.eql?(2) && aggregator.key_for_level(1).eql?('items')
end

entity_streamer = Json::Streamer::JsonStreamer.new
# result['items'] might be nil due to https://github.com/kubernetes/kubernetes/issues/13096
entity_streamer.get_with_conditions(conditions) do |object|
entity = new_entity(object, klass)
yield(entity)
end

resource_version = nil
resource_version_streamer = Json::Streamer::JsonStreamer.new
resource_version_streamer.get(key: 'resourceVersion') do |value|
resource_version = value.to_i unless resource_version
end

kind = nil
kind_streamer = Json::Streamer::JsonStreamer.new
kind_streamer.get(key: 'kind') do |value|
kind = value unless kind
end

block = proc do |response|
if response.instance_of?(Net::HTTPNotFound)
raise ResourceNotFoundError.new(404,
'Not Found',
response)
end
response.read_body do |chunk|
entity_streamer.parser << chunk
resource_version_streamer.parser << chunk unless resource_version
kind_streamer.parser << chunk unless kind
end
end

handle_exception do
# disable compression by setting Accept-Encoding to empty string
do_rest_request(path: ns_prefix + resource_name,
block: block, method: :get,
headers: @headers.merge('Accept-Encoding' => ''), params: params)
end

{
resourceVersion: resource_version,
kind: kind
}
end

def load_entities
@entities = {}
fetch_entities['resources'].each do |resource|
Expand Down Expand Up @@ -508,5 +587,11 @@ def http_options(uri)

options.merge(@socket_options)
end

def raise_http_err(json_error_msg, e)
err_message = json_error_msg['message'] || e.message
error_klass = e.http_code == 404 ? ResourceNotFoundError : HttpError
raise error_klass.new((e.http_code || 404), err_message, e.response)
end
end
end
67 changes: 62 additions & 5 deletions test/test_kubeclient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,57 @@ def test_nonjson_exception_raw
assert_equal(404, exception.error_code)
end

def test_entity_list
def test_entity_list_with_block
stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'), status: 200)
stub_request(:get, %r{/services})
.to_return(body: open_test_file('entity_list.json'), status: 200)

client = Kubeclient::Client.new('http://localhost:8080/api/', 'v1')
services = []
metadata_object = client.get_services do |service|
services << service
end

assert_equal(metadata_object, resourceVersion: 59, kind: 'ServiceList')
refute_empty(services)
assert_equal(2, services.size)
assert_instance_of(Kubeclient::Service, services[0])
assert_instance_of(Kubeclient::Service, services[1])

assert_requested(:get, 'http://localhost:8080/api/v1/services', times: 1)
end

def test_entity_list_with_block_404
stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'), status: 200)
stub_request(:get, %r{/services})
.to_return(status: 404)

client = Kubeclient::Client.new('http://localhost:8080/api/', 'v1')

exception = assert_raises(Kubeclient::ResourceNotFoundError) do
client.get_services {}
end
assert(exception.message.include?('Not Found'))
assert_equal(404, exception.error_code)
end

def test_entity_list_with_block_http_err
stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'), status: 200)
stub_request(:get, %r{/services})
.to_return(body: '{"err": "I\'m a teapot!"}', status: 418)
Copy link
Collaborator

@cben cben Sep 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆 🍵 👍


client = Kubeclient::Client.new('http://localhost:8080/api/', 'v1')

exception = assert_raises(Kubeclient::HttpError) do
client.get_services {}
end
assert_equal(418, exception.error_code)
end

def test_entity_list_without_block
stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'), status: 200)
stub_request(:get, %r{/services})
Expand Down Expand Up @@ -573,10 +623,13 @@ def test_api_basic_auth_back_comp_success
auth_options: { user: 'username', password: 'password' }
)

pods = client.get_pods
pods = []
client.get_pods do |pod|
pods << pod
end

assert_equal('Pod', pods.kind)
assert_equal(1, pods.size)
assert_equal('Kubeclient::Pod', pods[0].class.to_s)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this changes?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a kubernetes PodList object has a field Kind which is essentially its class name. Since we're no longer returning a PodList (if block_given?), Kind is no longer available to us. Since i removed the line assert_equal('Pod', pods.kind), I wanted to preserve the logic.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh so this is for the old code
👍

assert_requested(:get, 'http://localhost:8080/api/v1/pods', times: 1)
end

Expand Down Expand Up @@ -704,10 +757,14 @@ def test_api_bearer_token_file_success
auth_options: { bearer_token_file: file }
)

pods = client.get_pods
pods = []
metadata_object = client.get_pods do |pod|
pods << pod
end

assert_equal('Pod', pods.kind)
assert_equal(metadata_object, resourceVersion: 1315, kind: 'PodList')
assert_equal(1, pods.size)
assert_equal('Kubeclient::Pod', pods[0].class.to_s)
end

def test_proxy_url
Expand Down