Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

k8s logtail #3758

Merged
merged 13 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions sky/templates/kubernetes-ray.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,54 @@ available_node_types:
# Do not change this command - it keeps the pod alive until it is
# explicitly killed.
command: ["/bin/bash", "-c", "--"]
args: ['trap : TERM INT; sleep infinity & wait;']
args:
- |
# Tails file and checks every 5 sec for
# open file handlers with write access
# closes if none exist
monitor_file() {
tail -f $file &
TAIL_PID=$!
while kill -0 $TAIL_PID 2> /dev/null; do
# only two PIDs should be accessing the file
# the log appender and log tailer
if [ $(lsof -w $file | wc -l) -lt 3 ]; then
kill $TAIL_PID
break
fi
# Sleep for 5 seconds before checking again. Do not make this
# too short as it will consume CPU, and too long will cause
# the file to be closed too late keeping the pod alive.
sleep 5
done
}

log_tail() {
FILE_PATTERN="~/sky_logs/*/tasks/*.log"
while ! ls $(eval echo $FILE_PATTERN) 1> /dev/null 2>&1; do
sleep 1
done

# Keep track of already monitored files
already_monitored=""

# Infinite loop to continuously check for new files
while true; do
for file in $(eval echo $FILE_PATTERN); do
if echo $already_monitored | grep -q $file; then
# File is already being monitored
continue
fi

# Monitor the new file
monitor_file $file &
already_monitored="${already_monitored} ${file}"
done
sleep 0.1
done
}
trap : TERM INT; log_tail || sleep infinity & wait

ports:
- containerPort: 22 # Used for SSH
- containerPort: {{ray_port}} # Redis port
Expand Down Expand Up @@ -365,7 +412,7 @@ setup_commands:
# Line 'sudo grep ..': set the number of threads per process to unlimited to avoid ray job submit stucking issue when the number of running ray jobs increase.
# Line 'mkdir -p ..': disable host key check
# Line 'python3 -c ..': patch the buggy ray files and enable `-o allow_other` option for `goofys`
- sudo DEBIAN_FRONTEND=noninteractive apt install gcc patch pciutils rsync fuse curl -y;
- sudo DEBIAN_FRONTEND=noninteractive apt install lsof gcc patch pciutils rsync fuse curl -y;
mkdir -p ~/.ssh; touch ~/.ssh/config;
{%- for initial_setup_command in initial_setup_commands %}
{{ initial_setup_command }}
Expand Down
86 changes: 86 additions & 0 deletions tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -2044,6 +2044,91 @@ def test_task_labels_kubernetes():
run_one_test(test)


# ---------- Container logs from task on Kubernetes ----------
@pytest.mark.kubernetes
def test_container_logs_multinode_kubernetes():
name = _get_cluster_name()
task_yaml = 'tests/test_yamls/test_k8s_logs.yaml'
head_logs = ('kubectl get pods '
f' | grep {name} | grep head | '
" awk '{print $1}' | xargs -I {} kubectl logs {}")
worker_logs = ('kubectl get pods '
f' | grep {name} | grep worker |'
" awk '{print $1}' | xargs -I {} kubectl logs {}")
with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f:
test = Test(
'container_logs_multinode_kubernetes',
[
f'sky launch -y -c {name} {task_yaml} --num-nodes 2',
f'{head_logs} | wc -l | grep 9',
f'{worker_logs} | wc -l | grep 9',
],
f'sky down -y {name}',
)
run_one_test(test)


@pytest.mark.kubernetes
def test_container_logs_two_jobs_kubernetes():
name = _get_cluster_name()
task_yaml = 'tests/test_yamls/test_k8s_logs.yaml'
pod_logs = ('kubectl get pods '
f' | grep {name} | grep head |'
" awk '{print $1}' | xargs -I {} kubectl logs {}")
with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f:
test = Test(
'test_container_logs_two_jobs_kubernetes',
[
f'sky launch -y -c {name} {task_yaml}',
f'{pod_logs} | wc -l | grep 9',
f'sky launch -y -c {name} {task_yaml}',
f'{pod_logs} | wc -l | grep 18',
f'{pod_logs} | grep 1 | wc -l | grep 2',
f'{pod_logs} | grep 2 | wc -l | grep 2',
f'{pod_logs} | grep 3 | wc -l | grep 2',
f'{pod_logs} | grep 4 | wc -l | grep 2',
f'{pod_logs} | grep 5 | wc -l | grep 2',
f'{pod_logs} | grep 6 | wc -l | grep 2',
f'{pod_logs} | grep 7 | wc -l | grep 2',
f'{pod_logs} | grep 8 | wc -l | grep 2',
f'{pod_logs} | grep 9 | wc -l | grep 2',
],
f'sky down -y {name}',
)
run_one_test(test)


@pytest.mark.kubernetes
def test_container_logs_two_simultaneous_jobs_kubernetes():
name = _get_cluster_name()
task_yaml = 'tests/test_yamls/test_k8s_logs.yaml '
pod_logs = ('kubectl get pods '
f' | grep {name} | grep head |'
" awk '{print $1}' | xargs -I {} kubectl logs {}")
with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f:
test = Test(
'test_container_logs_two_simultaneous_jobs_kubernetes',
[
f'sky launch -y -c {name}',
f'sky exec -c {name} -d {task_yaml}',
f'sky exec -c {name} -d {task_yaml}',
'sleep 30',
f'{pod_logs} | wc -l | grep 18',
f'{pod_logs} | grep 1 | wc -l | grep 2',
f'{pod_logs} | grep 2 | wc -l | grep 2',
f'{pod_logs} | grep 3 | wc -l | grep 2',
f'{pod_logs} | grep 4 | wc -l | grep 2',
f'{pod_logs} | grep 5 | wc -l | grep 2',
f'{pod_logs} | grep 6 | wc -l | grep 2',
f'{pod_logs} | grep 7 | wc -l | grep 2',
f'{pod_logs} | grep 8 | wc -l | grep 2',
f'{pod_logs} | grep 9 | wc -l | grep 2',
],
f'sky down -y {name}',
)
run_one_test(test)


# ---------- Task: n=2 nodes with setups. ----------
@pytest.mark.no_lambda_cloud # Lambda Cloud does not have V100 gpus
@pytest.mark.no_ibm # IBM cloud currently doesn't provide public image with CUDA
Expand Down Expand Up @@ -3140,6 +3225,7 @@ def test_kubernetes_custom_image(image_id):
run_one_test(test)


@pytest.mark.azure
def test_azure_start_stop_two_nodes():
name = _get_cluster_name()
test = Test(
Expand Down
8 changes: 8 additions & 0 deletions tests/test_yamls/test_k8s_logs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
name: test-k8s-logs

run: |
for i in $(seq 1 9)
do
echo "$i"
sleep 0.1
done
Loading