diff --git a/Project.toml b/Project.toml index f67e672..5da6ae1 100644 --- a/Project.toml +++ b/Project.toml @@ -3,9 +3,10 @@ uuid = "0bb624de-12df-571d-ad84-47aef8b93290" keywords = ["hadoop", "hdfs", "yarn", "client"] license = "MIT" desc = "Hadoop HDFS and Yarn client" -version = "0.3.1" +version = "0.4.0" [deps] +Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" CRC32c = "8bf52ea8-c179-5cab-976a-9e18b702a9bc" Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" @@ -25,9 +26,10 @@ ProtoBuf = "0.7" URIParser = "0.4" [extras] +Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Test", "Random", "Distributed"] +test = ["Test", "Random", "Distributed", "Logging"] diff --git a/src/Elly.jl b/src/Elly.jl index 809163a..39f54e1 100644 --- a/src/Elly.jl +++ b/src/Elly.jl @@ -26,7 +26,7 @@ import URIParser: URI export show, convert, URI -export UserGroupInformation, add_token, find_tokens, username +export UserGroupInformation, add_token!, find_tokens, username export HDFSClient, HDFSFile, HDFSFileInfo, hdfs_server_defaults, hdfs_default_block_size, hdfs_default_replication, hdfs_blocks, hdfs_set_replication, @@ -53,6 +53,8 @@ using Elly.hadoop.yarn const ELLY_CLIENTNAME = "elly" +include("containerid.jl") +include("credentials.jl") include("ugi.jl") include("rpc.jl") include("sasl.jl") diff --git a/src/api_hdfs_base.jl b/src/api_hdfs_base.jl index 2398b1b..2a9e842 100644 --- a/src/api_hdfs_base.jl +++ b/src/api_hdfs_base.jl @@ -36,7 +36,7 @@ mutable struct HDFSClient end function show(io::IO, client::HDFSClient) - show(client.nn_conn) + show(io, client.nn_conn) println(io, " pwd: $(client.wd)") nothing end diff --git a/src/api_yarn_appmaster.jl b/src/api_yarn_appmaster.jl index cc9d7a6..5ccd247 100644 --- a/src/api_yarn_appmaster.jl +++ b/src/api_yarn_appmaster.jl @@ -8,16 +8,24 @@ # - Some related debate about this here: https://issues.apache.org/jira/browse/HADOOP-6685 # Since that is unusable in anything other than Java, we are forced to operate only in unmanaged mode, where we get it from the application report. -const YARN_CONTAINER_MEM_DEFAULT = 128 -const YARN_CONTAINER_CPU_DEFAULT = 1 -const YARN_CONTAINER_LOCATION_DEFAULT = "*" -const YARN_CONTAINER_PRIORITY_DEFAULT = 1 -const YARN_NM_CONN_KEEPALIVE_SECS = 5*60 - - """ YarnAppMaster is a skeleton application master. It provides the generic scafolding methods which can be used to create specific application masters for different purposes. + +When initializing a YarnAppMaster instance as a managed app master, the scheduler address is picked up from the environment variable `JULIA_YARN_RESOURCEMANAGER_SCHEDULER_ADDRESS`. +Tokens set by Yarn in the file pointed to by `HADOOP_TOKEN_FILE_LOCATION` are also read in automatically. + +When run as a managed app master, if a function is provided to be executed, then the application master is registered, the function is executed and then the application master is deregistered. +This provides a convenient way to run simple Julia applications in a Yarn cluster. E.g.: + +``` +using Elly + +YarnAppMaster() do + ... + # execute Julia code +end +``` """ mutable struct YarnAppMaster amrm_conn::YarnAMRMProtocol @@ -32,16 +40,13 @@ mutable struct YarnAppMaster available_cores::Int32 nodes::YarnNodes containers::YarnContainers - queue::AbstractString - managed::Bool - response_id::Int32 # initial value must be 0, update with response_id sent from server on every response - registration::Union{Nothing,RegisterApplicationMasterResponseProto} + am_rm_task::Union{Nothing,Task} lck::Lock - function YarnAppMaster(rmhost::AbstractString, rmport::Integer, ugi::UserGroupInformation, + function YarnAppMaster(rmhost::AbstractString, rmport::Integer, ugi::UserGroupInformation=UserGroupInformation(), amhost::AbstractString="", amport::Integer=0, amurl::AbstractString="") amrm_conn = YarnAMRMProtocol(rmhost, rmport, ugi) lck = makelock() @@ -50,8 +55,31 @@ mutable struct YarnAppMaster new(amrm_conn, amhost, amport, amurl, Int32(0), Int32(0), Int32(0), Int32(0), YarnNodes(ugi), YarnContainers(), - "", true, 0, - nothing, lck) + "", 0, + nothing, nothing, lck) + end + + function YarnAppMaster(ugi::UserGroupInformation=UserGroupInformation()) + rmschedaddress = ENV["JULIA_YARN_RESOURCEMANAGER_SCHEDULER_ADDRESS"] + @debug("got rm address", rmschedaddress) + parts = split(rmschedaddress, ":") + host = parts[1] + schedport = parse(Int, parts[2]) + am = YarnAppMaster(host, schedport, ugi) + for token in find_tokens(ugi; kind="YARN_AM_RM_TOKEN") + add_token!(ugi, token_alias(am.amrm_conn.channel), token) + end + am + end +end + +function YarnAppMaster(fn::Function, ugi::UserGroupInformation=UserGroupInformation()) + yam = YarnAppMaster(ugi) + register(yam) + try + fn() + finally + unregister(yam, true) end end @@ -88,18 +116,16 @@ function submit(client::YarnClient, unmanagedappmaster::YarnAppMaster) # keep the am_rm token tok = am_rm_token(app) channel = unmanagedappmaster.amrm_conn.channel - add_token(channel.ugi, token_alias(channel), tok) + @debug("adding token", alias=token_alias(channel)) + add_token!(channel.ugi, token_alias(channel), tok) # register the unmanaged appmaster - unmanagedappmaster.managed = false register(unmanagedappmaster) wait_for_attempt_state(app, Int32(1), YarnApplicationAttemptStateProto.APP_ATTEMPT_RUNNING) || throw(YarnException("Application attempt could not be launched")) # initialize complete node list for appmaster nodes(client; nodelist=unmanagedappmaster.nodes) - # start the am_rm processing task once the app attempt starts running if it is an unmanaged application master - @async(process_am_rm(unmanagedappmaster)) app end @@ -126,7 +152,8 @@ function register(yam::YarnAppMaster) end # start the am_rm processing task on registration if it is a managed application master - yam.managed && @async(process_am_rm(yam)) + yam.am_rm_task = @async process_am_rm(yam) + nothing end @@ -232,10 +259,10 @@ function _update_rm(yam::YarnAppMaster) # store/update tokens channel = yam.amrm_conn.channel ugi = channel.ugi - isfilled(resp, :am_rm_token) && add_token(ugi, token_alias(channel), resp.am_rm_token) + isfilled(resp, :am_rm_token) && add_token!(ugi, token_alias(channel), resp.am_rm_token) if isfilled(resp, :nm_tokens) for nmtok in resp.nm_tokens - add_token(ugi, token_alias(nmtok.nodeId), nmtok.token) + add_token!(ugi, token_alias(nmtok.nodeId), nmtok.token) end end diff --git a/src/api_yarn_base.jl b/src/api_yarn_base.jl index 7a6d06d..fd2f9e6 100644 --- a/src/api_yarn_base.jl +++ b/src/api_yarn_base.jl @@ -149,6 +149,7 @@ function update(nodes::YarnNodes, arp::AllocateResponseProto) isfilled(arp, :num_cluster_nodes) && (nodes.count = arp.num_cluster_nodes) if isfilled(arp, :updated_nodes) for nrep in arp.updated_nodes + @debug("updating node status", nodeid=nrep.nodeId) nodes.status[nrep.nodeId] = YarnNode(nrep) end end @@ -161,6 +162,7 @@ function update(nodes::YarnNodes, gcnrp::GetClusterNodesResponseProto) nlist = gcnrp.nodeReports nodes.count = length(nlist) for nrep in nlist + @debug("updating node status", nodeid=nrep.nodeId) nodes.status[nrep.nodeId] = YarnNode(nrep) end nothing @@ -191,9 +193,12 @@ function _new_or_existing_conn(nodes::YarnNodes, nodeid::NodeIdProto) end function get_connection(nodes::YarnNodes, nodeid::NodeIdProto) - (nodeid in keys(nodes.status)) || throw(YarnException("Unknown Yarn node: $(nodeid.host):$(nodeid.port)")) - node = nodes.status[nodeid] - node.isrunning || throw(YarnException("Yarn node $(nodeid.host):$(nodeid.port) is not running")) + if !haskey(nodes.status, nodeid) + @debug("Unknown Yarn node, will attempt connection anyway...", expected="$(nodeid.host):$(nodeid.port)", have=collect(keys(nodes.status))) + else + node = nodes.status[nodeid] + node.isrunning || throw(YarnException("Yarn node $(nodeid.host):$(nodeid.port) is not running")) + end conn, lastusetime, lck = _new_or_existing_conn(nodes, nodeid) # lock the connection to mark in use @@ -376,16 +381,16 @@ haverequests(containers::YarnContainers) = containers.ndesired != length(contain # TODO: support local resources # TODO: support tokens -function launchcontext(;cmd::AbstractString="", env::Dict=Dict(), service_data::Dict=Dict()) +function launchcontext(; cmd::Union{AbstractString,Vector}="", env::Dict=Dict(), service_data::Dict=Dict()) clc = ContainerLaunchContextProto() if !isempty(cmd) - setproperty!(clc, :command, AbstractString[cmd]) + setproperty!(clc, :command, isa(cmd, Vector) ? convert(Vector{AbstractString},cmd) : AbstractString[cmd]) end if !isempty(env) envproto = StringStringMapProto[] for (n,v) in env (isa(n, AbstractString) && isa(v, AbstractString)) || throw(ArgumentError("non string environment variable specified: $(typeof(n)) => $(typeof(v))")) - push!(envproto, protobuild(StringStringMapProto, Dict(:key => n, :value => v))) + push!(envproto, StringStringMapProto(; key=n, value=v)) end setproperty!(clc, :environment, envproto) end @@ -393,7 +398,7 @@ function launchcontext(;cmd::AbstractString="", env::Dict=Dict(), service_data:: svcdataproto = StringBytesMapProto[] for (n,v) in service_data (isa(n, AbstractString) && isa(v, Vector{UInt8})) || throw(ArgumentError("incompatible service data type specified: $(typeof(n)) => $(typeof(v))")) - push!(svcdataproto, protobuild(StringBytesMapProto, Dict(:key => n, :value => v))) + push!(svcdataproto, StringBytesMapProto(; key=n, value=v)) end setproperty!(clc, :service_data, servicedataproto) end diff --git a/src/api_yarn_client.jl b/src/api_yarn_client.jl index 28241d7..df6942c 100644 --- a/src/api_yarn_client.jl +++ b/src/api_yarn_client.jl @@ -4,6 +4,12 @@ # - applicationclient_protocol.proto # - application_history_client.proto +const YARN_CONTAINER_MEM_DEFAULT = 128 +const YARN_CONTAINER_CPU_DEFAULT = 1 +const YARN_CONTAINER_LOCATION_DEFAULT = "*" +const YARN_CONTAINER_PRIORITY_DEFAULT = 1 +const YARN_NM_CONN_KEEPALIVE_SECS = 5*60 + """ YarnClient holds a connection to the Yarn Resource Manager and provides APIs for application clients to interact with Yarn. @@ -14,6 +20,14 @@ mutable struct YarnClient function YarnClient(host::AbstractString, port::Integer, ugi::UserGroupInformation=UserGroupInformation()) new(YarnClientProtocol(host, port, ugi)) end + + function YarnClient(ugi::UserGroupInformation=UserGroupInformation()) + rmaddress = ENV["JULIA_YARN_RESOURCEMANAGER_ADDRESS"] + parts = split(rmaddress, ":") + host = parts[1] + port = parse(Int, parts[2]) + YarnClient(host, port, ugi) + end end show(io::IO, client::YarnClient) = show(io, client.rm_conn) @@ -159,9 +173,25 @@ function _new_app(client::YarnClient) resp.application_id, resp.maximumCapability.memory, resp.maximumCapability.virtual_cores end -function submit(client::YarnClient, container_spec::ContainerLaunchContextProto, mem::Integer, cores::Integer; +function submit(client::YarnClient, cmd::Union{AbstractString,Vector}, mem::Integer=YARN_CONTAINER_MEM_DEFAULT, cores::Integer=YARN_CONTAINER_CPU_DEFAULT, env::Dict{String,String}=Dict{String,String}(); kwargs...) + container_spec = launchcontext(cmd=cmd, env=env) + submit(client, container_spec, mem, cores; kwargs...) +end + +function submit(client::YarnClient, container_spec::ContainerLaunchContextProto, mem::Integer=YARN_CONTAINER_MEM_DEFAULT, cores::Integer=YARN_CONTAINER_CPU_DEFAULT; priority::Int32=one(Int32), appname::AbstractString="EllyApp", queue::AbstractString="default", apptype::AbstractString="YARN", - reuse::Bool=false, unmanaged::Bool=false) + reuse::Bool=false, unmanaged::Bool=false, schedaddr::String="") + @debug("submitting application", unmanaged=unmanaged, cmd=container_spec.command) + + if !unmanaged + # the application master would need these environment variables to initialize itself + # TODO: we need ability to read hadoop configuration to avoid this + rm_chan = client.rm_conn.channel + isdefined(container_spec, :environment) || (container_spec.environment = StringStringMapProto[]) + push!(container_spec.environment, StringStringMapProto(; key="JULIA_YARN_RESOURCEMANAGER_ADDRESS", value="$(rm_chan.host):$(rm_chan.port)")) + push!(container_spec.environment, StringStringMapProto(; key="JULIA_YARN_RESOURCEMANAGER_SCHEDULER_ADDRESS", value=schedaddr)) + end + appid, maxmem, maxcores = _new_app(client) prio = protobuild(PriorityProto, Dict(:priority => priority)) diff --git a/src/cluster_manager.jl b/src/cluster_manager.jl index f9d3a0a..370a0bb 100644 --- a/src/cluster_manager.jl +++ b/src/cluster_manager.jl @@ -10,39 +10,57 @@ keep_connected: if false, YarnManager will disconnect from the cluster once all """ struct YarnManager <: ClusterManager ugi::UserGroupInformation - clnt::YarnClient + clnt::Union{YarnClient,Nothing} am::YarnAppMaster - app::YarnApp + app::Union{YarnApp,ContainerIdProto} launch_timeout::Integer keep_connected::Bool function YarnManager(; kwargs...) params = Dict(kwargs) paramkeys = keys(params) + unmanaged = (:unmanaged in paramkeys) ? params[:unmanaged] : true + launch_timeout = (:launch_timeout in paramkeys) ? params[:launch_timeout] : 60 + keep_connected = (:keep_connected in paramkeys) ? params[:keep_connected] : true @debug("YarnManager constructor", params) - - ugi = (:ugi in paramkeys) ? params[:ugi] : UserGroupInformation() - rmport = (:rmport in paramkeys) ? params[:rmport] : 8032 - yarnhost = (:yarnhost in paramkeys) ? params[:yarnhost] : "localhost" - schedport = (:schedport in paramkeys) ? params[:schedport] : 8030 - launch_timeout = (:launch_timeout in paramkeys) ? params[:launch_timeout] : 60 - keep_connected = (:keep_connected in paramkeys) ? params[:keep_connected] : true - - clnt = YarnClient(yarnhost, rmport, ugi) - am = YarnAppMaster(yarnhost, schedport, ugi) - app = submit(clnt, am) + + if unmanaged + ugi = (:ugi in paramkeys) ? params[:ugi] : UserGroupInformation() + rmport = (:rmport in paramkeys) ? params[:rmport] : 8032 + yarnhost = (:yarnhost in paramkeys) ? params[:yarnhost] : "localhost" + schedport = (:schedport in paramkeys) ? params[:schedport] : 8030 + + clnt = YarnClient(yarnhost, rmport, ugi) + am = YarnAppMaster(yarnhost, schedport, ugi) + app = submit(clnt, am) + else + ugi = UserGroupInformation() + am = YarnAppMaster(ugi) + clnt = nothing + register(am) + app = parse_container_id(ENV["CONTAINER_ID"]) + end new(ugi, clnt, am, app, launch_timeout, keep_connected) end end +function YarnManager(fn::Function; kwargs...) + yam = YarnManager(; kwargs...) + try + fn(yam) + finally + disconnect(yam) + end +end + function disconnect(yarncm::YarnManager) ret = unregister(yarncm.am, true) end function show(io::IO, yarncm::YarnManager) print(io, "YarnManager for ") - show(io, yarncm.clnt) + show(io, yarncm.am) end function setup_worker(host, port) @@ -55,7 +73,7 @@ function setup_worker(host, port) Sockets.wait_connected(c) # >= Julia 1.3 end # identify container id so that rmprocs can clean things up nicely if required - serialize(c, ENV["JULIA_YARN_CID"]) + serialize(c, ENV["CONTAINER_ID"]) redirect_stdout(c) redirect_stderr(c) @@ -73,19 +91,22 @@ function _envdict(envhash::Base.EnvDict) end function _currprocname() - p = joinpath(Sys.BINDIR, Sys.get_process_title()) - exists(p) && (return p) + # Julia throws ENOBUFS sometimes while fetching process title + pt = try + Sys.get_process_title() + catch + "julia" + end + p = joinpath(Sys.BINDIR, pt) + isfile(p) && (return p) - ("_" in keys(ENV)) && contains(ENV["_"], "julia") && (return ENV["_"]) + haskey(ENV, "_") && contains(ENV["_"], "julia") && (return ENV["_"]) "julia" end function container_start(manager::YarnManager, cmd::String, env::Dict{String,String}, ipaddr::IPv4, port::UInt16, cid::ContainerIdProto) try - iob = IOBuffer() - serialize(iob, cid) - env["JULIA_YARN_CID"] = base64encode(take!(iob)) env["JULIA_YARN_KUKI"] = cluster_cookie() initargs = "using Elly; Elly.setup_worker($(ipaddr.host), $(port))" @@ -125,7 +146,7 @@ function launch(manager::YarnManager, params::Dict, instances_arr::Array, c::Con # keep the container id in userdata to be used with rmprocs if required cidstr = deserialize(sock) @debug("got container id string", cidstr) - cid = deserialize(IOBuffer(base64decode(cidstr))) + cid = parse_container_id(cidstr) @debug("got container id", cid) config.userdata = Dict(:container_id => cid) push!(instances_arr, config) diff --git a/src/containerid.jl b/src/containerid.jl new file mode 100644 index 0000000..543aef6 --- /dev/null +++ b/src/containerid.jl @@ -0,0 +1,37 @@ +const CONTAINER_ID_BITMASK = 0xffffffffff +const CONTAINER_ID_SPLITTER = '_' +const CONTAINER_PREFIX = "container" +const EPOCH_PREFIX = "e" + +parse_container_id() = parse_container_id(ENV["CONTAINER_ID"]) + +function parse_container_id(cidstr::String) + parts = split(cidstr, CONTAINER_ID_SPLITTER) + partidx = 1 + (parts[partidx] == CONTAINER_PREFIX) || error("Invalid ContainerId prefix: $cidstr") + partidx += 1 + epoch_or_ts = parts[partidx] + epoch = Int64(0) + appid = Int32(0) + clusterts = Int64(0) + if startswith(epoch_or_ts, EPOCH_PREFIX) + epoch = parse(Int64, epoch_or_ts[2:end]) + partidx += 1 + clusterts = parse(Int64, parts[partidx]) + partidx += 1 + appid = parse(Int32, parts[partidx]) + else + clusterts = parse(Int64, epoch_or_ts) + partidx += 1 + appid = parse(Int32, parts[partidx]) + end + partidx += 1 + attemptid = parse(Int32, parts[partidx]) + partidx += 1 + cid = parse(Int64, parts[partidx]) + cid = (epoch << 40) | cid + + appid_proto = ApplicationIdProto(; id=appid, cluster_timestamp=clusterts) + attemptid_proto = ApplicationAttemptIdProto(; application_id=appid_proto, attemptId=attemptid) + ContainerIdProto(; app_id=appid_proto, app_attempt_id=attemptid_proto, id=cid) +end diff --git a/src/credentials.jl b/src/credentials.jl new file mode 100644 index 0000000..f7d9f26 --- /dev/null +++ b/src/credentials.jl @@ -0,0 +1,62 @@ +const TOKEN_STORAGE_MAGIC = b"HDTS" +const TOKEN_STORAGE_VERSION = UInt8(0) + +# token protocol: https://github.com/hanborq/hadoop/blob/master/src/core/org/apache/hadoop/security/Credentials.java +struct Credentials + tokens::Dict{AbstractString,TokenProto} + secrets::Dict{AbstractString,Vector{UInt8}} + + function Credentials() + new(Dict{AbstractString,TokenProto}(), Dict{AbstractString,Vector{UInt8}}()) + end +end + +function read_credentials!(token_file::String; credentials::Credentials=Credentials()) + open(token_file, "r") do io + read_credentials!(io; credentials=credentials) + end + credentials +end + +function read_credentials!(io::IO; credentials::Credentials=Credentials()) + hdts = Vector{UInt8}(undef, 4) + readbytes!(io, hdts) + (hdts == TOKEN_STORAGE_MAGIC) || error("Bad header found in token storage") + version = read(io, UInt8) + (version == TOKEN_STORAGE_VERSION) || error("Unknown version $version in token storage (expected $TOKEN_STORAGE_VERSION)") + + # read tokens + size = ProtoBuf.read_varint(io, Int32) + for idx in 1:size + alias = ProtoBuf.read_string(io) + + identifier = ProtoBuf.read_bytes(io) + password = ProtoBuf.read_bytes(io) + kind = ProtoBuf.read_string(io) + service = ProtoBuf.read_string(io) + token = TokenProto(; identifier=identifier, password=password, kind=kind, service=service) + + if isempty(alias) + alias = string(token.service, token.kind) + end + + credentials.tokens[alias] = token + end + + # read secrets + size = ProtoBuf.read_varint(io, Int32) + for idx in 1:size + alias = ProtoBuf.read_string(io) + secret = ProtoBuf.read_bytes(io) + credentials.secrets[alias] = secret + end + credentials +end + +add_token!(credentials::Credentials, token::TokenProto) = add_token!(credentials, string(token.service, token.kind), token) +add_token!(credentials::Credentials, alias::AbstractString, token::TokenProto) = (credentials.tokens[alias] = token; nothing) + +_match_alias(alias, pattern) = isempty(pattern) || (pattern == alias) +_match_kind(token, pattern) = isempty(pattern) || (pattern == token.kind) +_match_token(dict_entry, alias_pattern, kind_pattern) = _match_alias(first(dict_entry), alias_pattern) && _match_kind(last(dict_entry), kind_pattern) +find_tokens(credentials::Credentials; alias::AbstractString="", kind::AbstractString="") = collect(values(filter(x->_match_token(x, alias, kind), credentials.tokens))) diff --git a/src/ugi.jl b/src/ugi.jl index 9d5d492..19cce59 100644 --- a/src/ugi.jl +++ b/src/ugi.jl @@ -4,10 +4,13 @@ # ref: https://www.opencore.com/blog/2016/5/user-name-handling-in-hadoop/ mutable struct UserGroupInformation userinfo::UserInformationProto - tokens::Dict{AbstractString,TokenProto} + credentials::Credentials + function UserGroupInformation(username::AbstractString=default_username(); proxy::Bool=false, proxyuser::AbstractString=username) userinfo = proxy ? protobuild(UserInformationProto, Dict(:realUser => username, :effectiveUser => proxyuser)) : protobuild(UserInformationProto, Dict(:realUser => username)) - new(userinfo, Dict{AbstractString,TokenProto}()) + ugi = new(userinfo, Credentials()) + haskey(ENV, "HADOOP_TOKEN_FILE_LOCATION") && read_credentials!(ENV["HADOOP_TOKEN_FILE_LOCATION"]; credentials=ugi.credentials) + ugi end end @@ -17,30 +20,19 @@ function default_username() error("Can not determine user information. Either HADOOP_USER_NAME or USER must be set.") end -add_token(ugi::UserGroupInformation, token::TokenProto) = add_token(ugi, token.service, token) -add_token(ugi::UserGroupInformation, alias::AbstractString, token::TokenProto) = (ugi.tokens[alias] = token; nothing) +add_token!(ugi::UserGroupInformation, token::TokenProto) = add_token!(ugi.credentials, token) +add_token!(ugi::UserGroupInformation, alias::AbstractString, token::TokenProto) = add_token!(ugi.credentials, alias, token) username(userinfo::UserInformationProto) = isfilled(userinfo, :realUser) ? userinfo.realUser : userinfo.effectiveUser username(ugi::UserGroupInformation) = username(ugi.userinfo) -function find_tokens(ugi::UserGroupInformation; alias::AbstractString="", kind::AbstractString="") - isempty(alias) && isempty(kind) && (return collect(values(ugi.tokens))) - - result = TokenProto[] - !isempty(alias) && (alias in keys(ugi.tokens)) && push!(result, ugi.tokens[alias]) - if !isempty(kind) - for tok in values(ugi.tokens) - (tok.kind == kind) && push!(result, tok) - end - end - result -end +find_tokens(ugi::UserGroupInformation; alias::AbstractString="", kind::AbstractString="") = find_tokens(ugi.credentials; alias=alias, kind=kind) function show(io::IO, ugi::UserGroupInformation) uinfo = ugi.userinfo print(io, "User:") isfilled(uinfo, :realUser) && print(io, ' ', uinfo.realUser) isfilled(uinfo, :effectiveUser) && print(io, " (", uinfo.effectiveUser, ')') - isempty(ugi.tokens) || print(io, " with ", length(ugi.tokens), " tokens") + isempty(ugi.credentials.tokens) || print(io, " with ", length(ugi.credentials.tokens), " tokens") nothing end diff --git a/test/hdfstests.jl b/test/hdfstests.jl index 84bd70e..df670f9 100644 --- a/test/hdfstests.jl +++ b/test/hdfstests.jl @@ -3,18 +3,34 @@ using Test using Random function test_ugi() + @info("test UserGroupInformation...") ugi = UserGroupInformation() @test !isempty(ugi.userinfo.realUser) + iob = IOBuffer() + show(iob, ugi) + @test !isempty(take!(iob)) ugi = UserGroupInformation(; proxy=true) @test !isempty(ugi.userinfo.realUser) @test !isempty(ugi.userinfo.effectiveUser) + iob = IOBuffer() + show(iob, ugi) + @test !isempty(take!(iob)) proxyuser = ugi.userinfo.realUser * "proxy" ugi = UserGroupInformation(; proxy=true, proxyuser=proxyuser) @test !isempty(ugi.userinfo.realUser) @test !isempty(ugi.userinfo.effectiveUser) @test ugi.userinfo.effectiveUser == proxyuser + iob = IOBuffer() + show(iob, ugi) + @test !isempty(take!(iob)) + + user = ENV["USER"] + delete!(ENV, "USER") + @test_throws Exception Elly.default_username() + ENV["USER"] = user + nothing end @@ -26,43 +42,42 @@ function test_hdfs(host="localhost", port=9000) exists(hdfsclnt, "/tmp") || mkdir(hdfsclnt, "/tmp") - println("listing files in root folder...") - dirtree = readdir(hdfsclnt, "/") - println(dirtree) - @test "tmp" in dirtree + result = readdir(hdfsclnt, "/") + @info("listing files in root folder", result) + @test "tmp" in result - println("getting server defaults...") - defs = hdfs_server_defaults(hdfsclnt) - for k in keys(defs) - println("\t$k => $(defs[k])") - end + result = hdfs_server_defaults(hdfsclnt) + @info("server defaults", result) @test hdfs_default_block_size(hdfsclnt) > 0 @test hdfs_default_replication(hdfsclnt) > 0 - println("getting file system status...") - fs_status = hdfs_status(hdfsclnt) - for k in keys(fs_status) - println("\t$k => $(fs_status[k])") - end - @test hdfs_capacity(hdfsclnt) > 0 + result = hdfs_status(hdfsclnt) + @info("file system status", result) + + result = hdfs_capacity(hdfsclnt) + @info("hdfs capacity", result) + @test result > 0 - println("/tmp should be a directory") - @test isdir(hdfsclnt, "/tmp") + result = isdir(hdfsclnt, "/tmp") + @info("/tmp should be a directory", result) + @test result - println("du should be >= 0") - @test du(hdfsclnt, "/") >= 0 + result = du(hdfsclnt, "/") + @info("du should be >= 0", result) + @test result >= 0 - println("stat /tmp") st = stat(hdfsclnt, "/tmp") - println(st) + @info("stat /tmp", st) @test st.name == "/tmp" - println("create a temporary dir...") + @info("create a temporary dir...") + t1 = time() cd(hdfsclnt, "/tmp") foo_dir = HDFSFile(hdfsclnt, "foo") mkdir(foo_dir, Elly.DEFAULT_FOLDER_MODE) + @info("...done in $(time() - t1) secs") - println("create a temporary file...") + @info("create a temporary file...") cd(hdfsclnt, "foo") bar_file = HDFSFile(hdfsclnt, "bar") teststr = "hello world\n" @@ -73,28 +88,30 @@ function test_hdfs(host="localhost", port=9000) write(f, teststr) end end - println("...done in $(time() - t1) secs") - println("verify file size to be $(length(teststr)*nloops)...") - @test filesize(bar_file) == length(teststr) * nloops + @info("...done in $(time() - t1) secs") + expected_len = length(teststr)*nloops + actual_len = filesize(bar_file) + @info("verify file size", expected_len, actual_len=Int(actual_len)) + @test expected_len == actual_len - println("touch, move and delete file...") + @info("touch, move and delete file...") touch(bar_file) @test isfile(bar_file) st = stat(bar_file) - println(st) + @info("stat file", st) @test st.name == "bar" mv(bar_file, "/tmp/foo/bar2") bar2_file = HDFSFile("hdfs://$(host):$(port)/tmp/foo/bar2"; ugi=ugi) @test isfile(bar2_file) rm(bar2_file) - println("touch and delete a new file...") + @info("touch and delete a new file...") touch(bar_file) @test exists(bar_file) @test isfile(bar_file) rm(bar_file) - println("create a large file...") + @info("create a large file...") size_bytes = limitedtestenv ? (128 * 10 * 1000) : (128 * 1000 * 1000) nloops = limitedtestenv ? 2 : 5 A = rand(UInt8, size_bytes) @@ -102,44 +119,45 @@ function test_hdfs(host="localhost", port=9000) for idx in 1:nloops t1 = time() write(f, A) - println("...block written in $(time() - t1) secs") + @info("...block written in $(time() - t1) secs") end end - println("verify file size to be $(sizeof(A) * nloops)...") - @test filesize(bar_file) == sizeof(A) * nloops + expected_size = sizeof(A) * nloops + actual_size = filesize(bar_file) + @info("verify file size", expected_size, actual_size) + @test expected_size == actual_size - println("read and verify...") + @info("read and verify...") B = Array{UInt8}(undef, size_bytes) open(bar_file, "r") do f for idx in 1:nloops t1 = time() read!(f, B) - println("...block read in $(time() - t1) secs") + @info("...block read in $(time() - t1) secs") @test A == B end end - println("read and verify with crc...") + @info("read and verify with crc...") B = Array{UInt8}(undef, size_bytes) open(bar_file, "r"; crc=true) do f for idx in 1:nloops t1 = time() read!(f, B) - println("...block read in $(time() - t1) secs") + @info("...block read in $(time() - t1) secs") @test A == B end end - println("blocks for $bar_file") blocks = hdfs_blocks(bar_file) - println(blocks) + @info("file blocks", file=bar_file, blocks) - println("setting replication factor for $bar_file") + @info("setting replication factor for $bar_file") @test hdfs_set_replication(bar_file, 2) cd(hdfsclnt, "/tmp/foo") NFILES = limitedtestenv ? 10 : 1000 - println("create many ($NFILES) files...") + @info("create many ($NFILES) files...") for fidx in 1:NFILES bar_file = HDFSFile(hdfsclnt, "bar$fidx") open(bar_file, "w") do f @@ -147,19 +165,19 @@ function test_hdfs(host="localhost", port=9000) write(f, teststr) end end - ((fidx % 10) == 0) && println("...created file #$fidx") + ((fidx % 10) == 0) && @info("...created file #$fidx") end - println("reading directory...") + @info("reading directory...") allfiles = readdir(hdfsclnt, "/tmp/foo") - println("delete many ($NFILES) files...") + @info("delete many ($NFILES) files...") for idx in 1:NFILES bar_file = HDFSFile(hdfsclnt, "bar$idx") rm(bar_file) - ((idx % 10) == 0) && println("...deleted file #$idx") + ((idx % 10) == 0) && @info("...deleted file #$idx") end @test length(allfiles) >= NFILES - println("test renew lease") + @info("test renew lease") hdfs_renewlease(hdfsclnt) iob = IOBuffer() show(iob, hdfsclnt) diff --git a/test/runtests.jl b/test/runtests.jl index 558c342..df85772 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -4,4 +4,9 @@ include("yarntests.jl") test_ugi() test_hdfs() -test_yarn() +test_credential_tokens() +test_container_id() +test_yarn_client() +test_unmanaged_yarn_clustermanager() +test_managed_yarn_clustermanager() +@info("runtests.jl done") diff --git a/test/token/container_tokens b/test/token/container_tokens new file mode 100644 index 0000000..bb20496 Binary files /dev/null and b/test/token/container_tokens differ diff --git a/test/yarnmanagedcm.jl b/test/yarnmanagedcm.jl new file mode 100644 index 0000000..8dbef53 --- /dev/null +++ b/test/yarnmanagedcm.jl @@ -0,0 +1,6 @@ +using Logging +include("yarntests.jl") +open("/tmp/ellytest.log", "a") do io + global_logger(SimpleLogger(io, Logging.Info)) + run_managed_yarn_clustermanager() +end diff --git a/test/yarntests.jl b/test/yarntests.jl index fc7ff13..95438e7 100644 --- a/test/yarntests.jl +++ b/test/yarntests.jl @@ -2,22 +2,48 @@ using Elly using Test using Distributed -function test_yarn(host="localhost", rmport=8032, schedport=8030) - limitedtestenv = (get(ENV, "CI", "false") == "true") +function test_credential_tokens() + @info("testing credential token interpretation") + token_file = joinpath(@__DIR__, "token", "container_tokens") + creds = Elly.read_credentials!(token_file) + tokens = find_tokens(creds, alias="YARN_AM_RM_TOKEN") + @test !isempty(tokens) + tokens = find_tokens(creds, kind="YARN_AM_RM_TOKEN") + @test !isempty(tokens) + nothing +end + +function test_container_id() + @info("testing container ids") + cid = Elly.parse_container_id("container_1577681661884_0005_01_000001") + @test cid.id == 1 + @test cid.app_id.cluster_timestamp == 1577681661884 + @test cid.app_id.id == 5 + @test cid.app_attempt_id.attemptId == 1 + + cid = Elly.parse_container_id("container_e17_1410901177871_0001_01_000005") + @test cid.id == 18691697672197 + @test cid.app_id.cluster_timestamp == 1410901177871 + @test cid.app_id.id == 1 + @test cid.app_attempt_id.attemptId == 1 + + nothing +end +function test_yarn_client(host="localhost", rmport=8032) + @info("testing yarn client") yarnclnt = YarnClient(host, rmport) nnodes = nodecount(yarnclnt) @test nnodes > 0 - println("number of yarn nodes: $nnodes") - nlist = nodes(yarnclnt) - show(stdout, nlist) - - yarncm = YarnManager(yarnhost=host, rmport=rmport, schedport=schedport, launch_timeout=60); + @info("yarn nodes", nnodes, nlist) + nothing +end +function make_julia_env() env = Dict{String,String}() - for envname in ("USER", "LD_LIBRARY_PATH", "USERNAME", "HOME", "PATH", "LOGNAME", "JULIA_LOAD_PATH", "LIBDIR") - if envname in keys(ENV) + for envname in ("USER", "LD_LIBRARY_PATH", "USERNAME", "HOME", "PATH", "LOGNAME", "JULIA_LOAD_PATH", "LIBDIR", "CI") + if haskey(ENV, envname) env[envname] = ENV[envname] end end @@ -28,10 +54,12 @@ function test_yarn(host="localhost", rmport=8032, schedport=8030) if !("JULIA_DEPOT_PATH" in keys(env)) env["JULIA_DEPOT_PATH"] = join(Base.DEPOT_PATH, ':') end - println("starting workers with environment:") - for (n,v) in env - println(" - $n => $v") - end + env +end + +function test_yarn_clustermanager(yarncm::YarnManager, limitedtestenv::Bool) + env = make_julia_env() + @info("starting workers with environment", env) addprocs(yarncm; np=1, env=env); @test nprocs() == 2 @@ -44,6 +72,39 @@ function test_yarn(host="localhost", rmport=8032, schedport=8030) @everywhere println("hi") rmprocs(workers()) @test nprocs() == 1 + nothing +end + +function test_unmanaged_yarn_clustermanager(host="localhost", rmport=8032, schedport=8030) + @info("testing unmanaged yarn clustermanager") + limitedtestenv = (get(ENV, "CI", "false") == "true") + YarnManager(yarnhost=host, rmport=rmport, schedport=schedport, launch_timeout=60) do yarncm + test_yarn_clustermanager(yarncm, limitedtestenv) + end + nothing +end + +function test_managed_yarn_clustermanager(host="localhost", rmport=8032, schedport=8030) + @info("testing managed yarn clustermanager") + ugi = UserGroupInformation() + clnt = YarnClient(host, rmport, ugi) - Elly.disconnect(yarncm) + env = make_julia_env() + @info("starting managed julia with environment", env) + + testscript = joinpath(@__DIR__, "yarnmanagedcm.jl") + app = submit(clnt, [Elly._currprocname(), testscript], Elly.YARN_CONTAINER_MEM_DEFAULT, Elly.YARN_CONTAINER_CPU_DEFAULT, env; schedaddr="$(host):$(schedport)") + Elly.wait_for_state(app, Elly.YarnApplicationStateProto.FINISHED) + @info("app complete", status=status(app)) + @test isfile("/tmp/ellytest.log") + @info("app output", testlogs=read("/tmp/ellytest.log", String)) + nothing +end + +function run_managed_yarn_clustermanager() + limitedtestenv = (get(ENV, "CI", "false") == "true") + YarnManager(launch_timeout=60, unmanaged=false) do yarncm + test_yarn_clustermanager(yarncm, limitedtestenv) + end + nothing end