Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Managed Yarn AppMaster & ClusterManager. Credentials and ContainerId implementations. #46

Merged
merged 3 commits into from
Jan 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ uuid = "0bb624de-12df-571d-ad84-47aef8b93290"
keywords = ["hadoop", "hdfs", "yarn", "client"]
license = "MIT"
desc = "Hadoop HDFS and Yarn client"
version = "0.3.1"
version = "0.4.0"

[deps]
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"
CRC32c = "8bf52ea8-c179-5cab-976a-9e18b702a9bc"
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Expand All @@ -25,9 +26,10 @@ ProtoBuf = "0.7"
URIParser = "0.4"

[extras]
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["Test", "Random", "Distributed"]
test = ["Test", "Random", "Distributed", "Logging"]
4 changes: 3 additions & 1 deletion src/Elly.jl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import URIParser: URI

export show, convert, URI

export UserGroupInformation, add_token, find_tokens, username
export UserGroupInformation, add_token!, find_tokens, username

export HDFSClient, HDFSFile, HDFSFileInfo,
hdfs_server_defaults, hdfs_default_block_size, hdfs_default_replication, hdfs_blocks, hdfs_set_replication,
Expand All @@ -53,6 +53,8 @@ using Elly.hadoop.yarn

const ELLY_CLIENTNAME = "elly"

include("containerid.jl")
include("credentials.jl")
include("ugi.jl")
include("rpc.jl")
include("sasl.jl")
Expand Down
2 changes: 1 addition & 1 deletion src/api_hdfs_base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ mutable struct HDFSClient
end

function show(io::IO, client::HDFSClient)
show(client.nn_conn)
show(io, client.nn_conn)
println(io, " pwd: $(client.wd)")
nothing
end
Expand Down
69 changes: 48 additions & 21 deletions src/api_yarn_appmaster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,24 @@
# - Some related debate about this here: https://issues.apache.org/jira/browse/HADOOP-6685
# Since that is unusable in anything other than Java, we are forced to operate only in unmanaged mode, where we get it from the application report.

const YARN_CONTAINER_MEM_DEFAULT = 128
const YARN_CONTAINER_CPU_DEFAULT = 1
const YARN_CONTAINER_LOCATION_DEFAULT = "*"
const YARN_CONTAINER_PRIORITY_DEFAULT = 1
const YARN_NM_CONN_KEEPALIVE_SECS = 5*60


"""
YarnAppMaster is a skeleton application master. It provides the generic scafolding methods which can be used to create specific
application masters for different purposes.

When initializing a YarnAppMaster instance as a managed app master, the scheduler address is picked up from the environment variable `JULIA_YARN_RESOURCEMANAGER_SCHEDULER_ADDRESS`.
Tokens set by Yarn in the file pointed to by `HADOOP_TOKEN_FILE_LOCATION` are also read in automatically.

When run as a managed app master, if a function is provided to be executed, then the application master is registered, the function is executed and then the application master is deregistered.
This provides a convenient way to run simple Julia applications in a Yarn cluster. E.g.:

```
using Elly

YarnAppMaster() do
...
# execute Julia code
end
```
"""
mutable struct YarnAppMaster
amrm_conn::YarnAMRMProtocol
Expand All @@ -32,16 +40,13 @@ mutable struct YarnAppMaster
available_cores::Int32
nodes::YarnNodes
containers::YarnContainers

queue::AbstractString
managed::Bool

response_id::Int32 # initial value must be 0, update with response_id sent from server on every response

registration::Union{Nothing,RegisterApplicationMasterResponseProto}
am_rm_task::Union{Nothing,Task}
lck::Lock

function YarnAppMaster(rmhost::AbstractString, rmport::Integer, ugi::UserGroupInformation,
function YarnAppMaster(rmhost::AbstractString, rmport::Integer, ugi::UserGroupInformation=UserGroupInformation(),
amhost::AbstractString="", amport::Integer=0, amurl::AbstractString="")
amrm_conn = YarnAMRMProtocol(rmhost, rmport, ugi)
lck = makelock()
Expand All @@ -50,8 +55,31 @@ mutable struct YarnAppMaster
new(amrm_conn, amhost, amport, amurl,
Int32(0), Int32(0), Int32(0), Int32(0),
YarnNodes(ugi), YarnContainers(),
"", true, 0,
nothing, lck)
"", 0,
nothing, nothing, lck)
end

function YarnAppMaster(ugi::UserGroupInformation=UserGroupInformation())
rmschedaddress = ENV["JULIA_YARN_RESOURCEMANAGER_SCHEDULER_ADDRESS"]
@debug("got rm address", rmschedaddress)
parts = split(rmschedaddress, ":")
host = parts[1]
schedport = parse(Int, parts[2])
am = YarnAppMaster(host, schedport, ugi)
for token in find_tokens(ugi; kind="YARN_AM_RM_TOKEN")
add_token!(ugi, token_alias(am.amrm_conn.channel), token)
end
am
end
end

function YarnAppMaster(fn::Function, ugi::UserGroupInformation=UserGroupInformation())
yam = YarnAppMaster(ugi)
register(yam)
try
fn()
finally
unregister(yam, true)
end
end

Expand Down Expand Up @@ -88,18 +116,16 @@ function submit(client::YarnClient, unmanagedappmaster::YarnAppMaster)
# keep the am_rm token
tok = am_rm_token(app)
channel = unmanagedappmaster.amrm_conn.channel
add_token(channel.ugi, token_alias(channel), tok)
@debug("adding token", alias=token_alias(channel))
add_token!(channel.ugi, token_alias(channel), tok)

# register the unmanaged appmaster
unmanagedappmaster.managed = false
register(unmanagedappmaster)
wait_for_attempt_state(app, Int32(1), YarnApplicationAttemptStateProto.APP_ATTEMPT_RUNNING) || throw(YarnException("Application attempt could not be launched"))

# initialize complete node list for appmaster
nodes(client; nodelist=unmanagedappmaster.nodes)

# start the am_rm processing task once the app attempt starts running if it is an unmanaged application master
@async(process_am_rm(unmanagedappmaster))
app
end

Expand All @@ -126,7 +152,8 @@ function register(yam::YarnAppMaster)
end

# start the am_rm processing task on registration if it is a managed application master
yam.managed && @async(process_am_rm(yam))
yam.am_rm_task = @async process_am_rm(yam)

nothing
end

Expand Down Expand Up @@ -232,10 +259,10 @@ function _update_rm(yam::YarnAppMaster)
# store/update tokens
channel = yam.amrm_conn.channel
ugi = channel.ugi
isfilled(resp, :am_rm_token) && add_token(ugi, token_alias(channel), resp.am_rm_token)
isfilled(resp, :am_rm_token) && add_token!(ugi, token_alias(channel), resp.am_rm_token)
if isfilled(resp, :nm_tokens)
for nmtok in resp.nm_tokens
add_token(ugi, token_alias(nmtok.nodeId), nmtok.token)
add_token!(ugi, token_alias(nmtok.nodeId), nmtok.token)
end
end

Expand Down
19 changes: 12 additions & 7 deletions src/api_yarn_base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ function update(nodes::YarnNodes, arp::AllocateResponseProto)
isfilled(arp, :num_cluster_nodes) && (nodes.count = arp.num_cluster_nodes)
if isfilled(arp, :updated_nodes)
for nrep in arp.updated_nodes
@debug("updating node status", nodeid=nrep.nodeId)
nodes.status[nrep.nodeId] = YarnNode(nrep)
end
end
Expand All @@ -161,6 +162,7 @@ function update(nodes::YarnNodes, gcnrp::GetClusterNodesResponseProto)
nlist = gcnrp.nodeReports
nodes.count = length(nlist)
for nrep in nlist
@debug("updating node status", nodeid=nrep.nodeId)
nodes.status[nrep.nodeId] = YarnNode(nrep)
end
nothing
Expand Down Expand Up @@ -191,9 +193,12 @@ function _new_or_existing_conn(nodes::YarnNodes, nodeid::NodeIdProto)
end

function get_connection(nodes::YarnNodes, nodeid::NodeIdProto)
(nodeid in keys(nodes.status)) || throw(YarnException("Unknown Yarn node: $(nodeid.host):$(nodeid.port)"))
node = nodes.status[nodeid]
node.isrunning || throw(YarnException("Yarn node $(nodeid.host):$(nodeid.port) is not running"))
if !haskey(nodes.status, nodeid)
@debug("Unknown Yarn node, will attempt connection anyway...", expected="$(nodeid.host):$(nodeid.port)", have=collect(keys(nodes.status)))
else
node = nodes.status[nodeid]
node.isrunning || throw(YarnException("Yarn node $(nodeid.host):$(nodeid.port) is not running"))
end

conn, lastusetime, lck = _new_or_existing_conn(nodes, nodeid)
# lock the connection to mark in use
Expand Down Expand Up @@ -376,24 +381,24 @@ haverequests(containers::YarnContainers) = containers.ndesired != length(contain

# TODO: support local resources
# TODO: support tokens
function launchcontext(;cmd::AbstractString="", env::Dict=Dict(), service_data::Dict=Dict())
function launchcontext(; cmd::Union{AbstractString,Vector}="", env::Dict=Dict(), service_data::Dict=Dict())
clc = ContainerLaunchContextProto()
if !isempty(cmd)
setproperty!(clc, :command, AbstractString[cmd])
setproperty!(clc, :command, isa(cmd, Vector) ? convert(Vector{AbstractString},cmd) : AbstractString[cmd])
end
if !isempty(env)
envproto = StringStringMapProto[]
for (n,v) in env
(isa(n, AbstractString) && isa(v, AbstractString)) || throw(ArgumentError("non string environment variable specified: $(typeof(n)) => $(typeof(v))"))
push!(envproto, protobuild(StringStringMapProto, Dict(:key => n, :value => v)))
push!(envproto, StringStringMapProto(; key=n, value=v))
end
setproperty!(clc, :environment, envproto)
end
if !isempty(service_data)
svcdataproto = StringBytesMapProto[]
for (n,v) in service_data
(isa(n, AbstractString) && isa(v, Vector{UInt8})) || throw(ArgumentError("incompatible service data type specified: $(typeof(n)) => $(typeof(v))"))
push!(svcdataproto, protobuild(StringBytesMapProto, Dict(:key => n, :value => v)))
push!(svcdataproto, StringBytesMapProto(; key=n, value=v))
end
setproperty!(clc, :service_data, servicedataproto)
end
Expand Down
34 changes: 32 additions & 2 deletions src/api_yarn_client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
# - applicationclient_protocol.proto
# - application_history_client.proto

const YARN_CONTAINER_MEM_DEFAULT = 128
const YARN_CONTAINER_CPU_DEFAULT = 1
const YARN_CONTAINER_LOCATION_DEFAULT = "*"
const YARN_CONTAINER_PRIORITY_DEFAULT = 1
const YARN_NM_CONN_KEEPALIVE_SECS = 5*60

"""
YarnClient holds a connection to the Yarn Resource Manager and provides
APIs for application clients to interact with Yarn.
Expand All @@ -14,6 +20,14 @@ mutable struct YarnClient
function YarnClient(host::AbstractString, port::Integer, ugi::UserGroupInformation=UserGroupInformation())
new(YarnClientProtocol(host, port, ugi))
end

function YarnClient(ugi::UserGroupInformation=UserGroupInformation())
rmaddress = ENV["JULIA_YARN_RESOURCEMANAGER_ADDRESS"]
parts = split(rmaddress, ":")
host = parts[1]
port = parse(Int, parts[2])
YarnClient(host, port, ugi)
end
end

show(io::IO, client::YarnClient) = show(io, client.rm_conn)
Expand Down Expand Up @@ -159,9 +173,25 @@ function _new_app(client::YarnClient)
resp.application_id, resp.maximumCapability.memory, resp.maximumCapability.virtual_cores
end

function submit(client::YarnClient, container_spec::ContainerLaunchContextProto, mem::Integer, cores::Integer;
function submit(client::YarnClient, cmd::Union{AbstractString,Vector}, mem::Integer=YARN_CONTAINER_MEM_DEFAULT, cores::Integer=YARN_CONTAINER_CPU_DEFAULT, env::Dict{String,String}=Dict{String,String}(); kwargs...)
container_spec = launchcontext(cmd=cmd, env=env)
submit(client, container_spec, mem, cores; kwargs...)
end

function submit(client::YarnClient, container_spec::ContainerLaunchContextProto, mem::Integer=YARN_CONTAINER_MEM_DEFAULT, cores::Integer=YARN_CONTAINER_CPU_DEFAULT;
priority::Int32=one(Int32), appname::AbstractString="EllyApp", queue::AbstractString="default", apptype::AbstractString="YARN",
reuse::Bool=false, unmanaged::Bool=false)
reuse::Bool=false, unmanaged::Bool=false, schedaddr::String="")
@debug("submitting application", unmanaged=unmanaged, cmd=container_spec.command)

if !unmanaged
# the application master would need these environment variables to initialize itself
# TODO: we need ability to read hadoop configuration to avoid this
rm_chan = client.rm_conn.channel
isdefined(container_spec, :environment) || (container_spec.environment = StringStringMapProto[])
push!(container_spec.environment, StringStringMapProto(; key="JULIA_YARN_RESOURCEMANAGER_ADDRESS", value="$(rm_chan.host):$(rm_chan.port)"))
push!(container_spec.environment, StringStringMapProto(; key="JULIA_YARN_RESOURCEMANAGER_SCHEDULER_ADDRESS", value=schedaddr))
end

appid, maxmem, maxcores = _new_app(client)

prio = protobuild(PriorityProto, Dict(:priority => priority))
Expand Down
Loading