-
-
Notifications
You must be signed in to change notification settings - Fork 31
/
channel.pyx
218 lines (183 loc) · 8.48 KB
/
channel.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
#
# This file is part of the pylibssh library
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see file LICENSE.rst in this
# repository.
#
import signal
import time
from io import BytesIO
from cpython.bytes cimport PyBytes_AS_STRING
from libc.string cimport memset
from pylibsshext.errors cimport LibsshChannelException
from pylibsshext.errors import LibsshChannelReadFailure
from pylibsshext.session cimport get_libssh_session
from subprocess import CompletedProcess
cdef int _process_outputs(libssh.ssh_session session,
libssh.ssh_channel channel,
void *data,
libssh.uint32_t len,
int is_stderr,
void *userdata) with gil:
if len == 0:
return 0
data_b = <bytes>(<char *>data)[:len]
result = <object>userdata
if is_stderr:
result.stderr += data_b
else:
result.stdout += data_b
return len
cdef class Channel:
def __cinit__(self, session):
self._session = session
self._libssh_session = get_libssh_session(session)
self._libssh_channel = libssh.ssh_channel_new(self._libssh_session)
if self._libssh_channel is NULL:
raise MemoryError
rc = libssh.ssh_channel_open_session(self._libssh_channel)
if rc != libssh.SSH_OK:
libssh.ssh_channel_free(self._libssh_channel)
self._libssh_channel = NULL
raise LibsshChannelException("Failed to open_session: [%d]" % rc)
def __dealloc__(self):
if self._libssh_channel is not NULL:
libssh.ssh_channel_close(self._libssh_channel)
libssh.ssh_channel_free(self._libssh_channel)
self._libssh_channel = NULL
def request_shell(self):
self.request_pty()
rc = libssh.ssh_channel_request_shell(self._libssh_channel)
if rc != libssh.SSH_OK:
raise LibsshChannelException("Failed to request_shell: [%d]" % rc)
def request_exec(self, command):
"""Run a shell command without an interactive shell."""
rc = libssh.ssh_channel_request_exec(self._libssh_channel, command.encode("utf-8"))
if rc != libssh.SSH_OK:
raise LibsshChannelException("Failed to request_exec: [%d]" % rc)
def request_pty(self):
rc = libssh.ssh_channel_request_pty(self._libssh_channel)
if rc != libssh.SSH_OK:
raise LibsshChannelException("Failed to request pty: [%d]" % rc)
def request_pty_size(self, terminal, col, row):
rc = libssh.ssh_channel_request_pty_size(self._libssh_channel, terminal, col, row)
if rc != libssh.SSH_OK:
raise LibsshChannelException("Failed to request pty with [%d] for terminal [%s], "
"columns [%d] and rows [%d]" % (rc, terminal, col, row))
rc = libssh.ssh_channel_request_shell(self._libssh_channel)
if rc != libssh.SSH_OK:
raise LibsshChannelException("Failed to request_shell: [%d]" % rc)
def poll(self, timeout=-1, stderr=0):
if timeout < 0:
rc = libssh.ssh_channel_poll(self._libssh_channel, stderr)
else:
rc = libssh.ssh_channel_poll_timeout(self._libssh_channel, timeout, stderr)
if rc == libssh.SSH_ERROR:
raise LibsshChannelException("Failed to poll channel: [{0}]".format(rc))
return rc
def read_nonblocking(self, size=1024, stderr=0):
cdef char buffer[1024]
size_m = size
if size_m > sizeof(buffer):
size_m = sizeof(buffer)
nbytes = libssh.ssh_channel_read_nonblocking(self._libssh_channel, buffer, size_m, stderr)
if nbytes == libssh.SSH_ERROR:
# This is what Session._get_session_error_str() does, but we don't have the Python object
error = libssh.ssh_get_error(<void*>self._libssh_session).decode()
raise LibsshChannelReadFailure(error)
elif nbytes == libssh.SSH_EOF:
return None
return <bytes>buffer[:nbytes]
def recv(self, size=1024, stderr=0):
return self.read_nonblocking(size=size, stderr=stderr)
def write(self, data):
written = libssh.ssh_channel_write(self._libssh_channel, PyBytes_AS_STRING(data), len(data))
if written == libssh.SSH_ERROR:
raise LibsshChannelException("Failed to write to ssh channel")
return written
def sendall(self, data):
return self.write(data)
def read_bulk_response(self, stderr=0, timeout=0.001, retry=5):
if retry <= 0:
raise ValueError(
'Got arg `retry={arg!r}` but it must be greater than 0'.
format(arg=retry),
)
response = b""
with BytesIO() as recv_buff:
for _ in range(retry, 0, -1):
data = self.read_nonblocking(size=1024, stderr=stderr)
if not data:
if timeout:
time.sleep(timeout)
continue
recv_buff.write(data)
response = recv_buff.getvalue()
return response
def exec_command(self, command):
# request_exec requires a fresh channel each run, so do not use the existing channel
cdef libssh.ssh_channel channel = libssh.ssh_channel_new(self._libssh_session)
if channel is NULL:
raise MemoryError
rc = libssh.ssh_channel_open_session(channel)
if rc != libssh.SSH_OK:
libssh.ssh_channel_free(channel)
raise LibsshChannelException("Failed to open_session: [{0}]".format(rc))
rc = libssh.ssh_channel_request_exec(channel, command.encode("utf-8"))
if rc != libssh.SSH_OK:
libssh.ssh_channel_close(channel)
libssh.ssh_channel_free(channel)
raise LibsshChannelException("Failed to execute command [{0}]: [{1}]".format(command, rc))
result = CompletedProcess(args=command, returncode=-1, stdout=b'', stderr=b'')
cdef callbacks.ssh_channel_callbacks_struct cb
memset(&cb, 0, sizeof(cb))
cb.channel_data_function = <callbacks.ssh_channel_data_callback>&_process_outputs
cb.userdata = <void *>result
callbacks.ssh_callbacks_init(&cb)
callbacks.ssh_set_channel_callbacks(channel, &cb)
libssh.ssh_channel_send_eof(channel)
result.returncode = libssh.ssh_channel_get_exit_status(channel)
if channel is not NULL:
libssh.ssh_channel_close(channel)
libssh.ssh_channel_free(channel)
return result
def send_eof(self):
"""Send EOF to the channel, this will close stdin."""
rc = libssh.ssh_channel_send_eof(self._libssh_channel)
if rc != libssh.SSH_OK:
raise LibsshChannelException("Failed to ssh_channel_send_eof: [%d]" % rc)
def send_signal(self, sig):
"""
Send signal to the remote process.
:param sig: a signal constant from ``signal``, e.g. ``signal.SIGUSR1``.
:type sig: signal.Signals
"""
if not isinstance(sig, signal.Signals):
raise TypeError(f"Expecting signal.Signals not {type(sig)}")
sshsig = sig.name.replace("SIG", "") # FIXME: replace w/ `str.removeprefix()` once Python 3.8 support is dropped
rc = libssh.ssh_channel_request_send_signal(self._libssh_channel, sshsig.encode("utf-8"))
if rc != libssh.SSH_OK:
raise LibsshChannelException("Failed to ssh_channel_request_send_signal: [%d]" % rc)
def get_channel_exit_status(self):
return libssh.ssh_channel_get_exit_status(self._libssh_channel)
@property
def is_eof(self):
"""True if remote has sent an EOF."""
rc = libssh.ssh_channel_is_eof(self._libssh_channel)
return rc != 0
def close(self):
if self._libssh_channel is not NULL:
libssh.ssh_channel_close(self._libssh_channel)
libssh.ssh_channel_free(self._libssh_channel)
self._libssh_channel = NULL