diff --git a/CMakeLists.txt b/CMakeLists.txt index d99e4f9..e9329ba 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -109,7 +109,7 @@ list(APPEND MSCP_BUILD_INCLUDE_DIRS ${CMAKE_CURRENT_BINARY_DIR}/include) # libmscp.a set(LIBMSCP_SRC src/mscp.c src/ssh.c src/fileops.c src/path.c src/checkpoint.c - src/platform.c src/print.c src/pool.c src/strerrno.c + src/bwlimit.c src/platform.c src/print.c src/pool.c src/strerrno.c ${OPENBSD_COMPAT_SRC}) add_library(mscp-static STATIC ${LIBMSCP_SRC}) target_include_directories(mscp-static @@ -203,7 +203,7 @@ foreach(x RANGE ${DIST_LISTLEN}) COMMENT "Test mscp in ${DOCKER_IMAGE} container" WORKING_DIRECTORY ${CMAKE_BINARY_DIR} COMMAND - ${CE} run --init --rm --sysctl net.ipv6.conf.all.disable_ipv6=0 + ${CE} run --init --rm -it --sysctl net.ipv6.conf.all.disable_ipv6=0 ${DOCKER_IMAGE} /mscp/scripts/test-in-container.sh) list(APPEND DOCKER_BUILDS docker-build-${DOCKER_INDEX}) diff --git a/doc/mscp.1.in b/doc/mscp.1.in index a99bb88..92d8651 100644 --- a/doc/mscp.1.in +++ b/doc/mscp.1.in @@ -38,6 +38,9 @@ mscp \- copy files over multiple SSH connections .BI \-b \ BUF_SIZE\c ] [\c +.BI \-L \ LIMIT_BITRATE\c +] +[\c .BI \-l \ LOGIN_NAME\c ] [\c @@ -198,6 +201,11 @@ Specifies the buffer size for I/O and transfer over SFTP. The default value is 16384. Note that the SSH specification restricts buffer size delivered over SSH. Changing this value is not recommended at present. +.TP +.B \-L \fILIMIT_BITRATE\fR +Limits the bitrate, specified with k (K), m (M), and g (G), e.g., 100m +indicates 100 Mbps. + .TP .B \-4 Uses IPv4 addresses only. diff --git a/doc/mscp.rst b/doc/mscp.rst index 9a53134..60d32a4 100644 --- a/doc/mscp.rst +++ b/doc/mscp.rst @@ -2,7 +2,7 @@ MSCP ==== -:Date: v0.1.4-28-g0d248c5 +:Date: v0.1.5-4-g9b8ba69 NAME ==== @@ -12,14 +12,14 @@ mscp - copy files over multiple SSH connections SYNOPSIS ======== -**mscp** [**-46vqDpHdNh**] [ **-n**\ *NR_CONNECTIONS* ] [ -**-m**\ *COREMASK* ] [ **-u**\ *MAX_STARTUPS* ] [ **-I**\ *INTERVAL* ] [ -**-W**\ *CHECKPOINT* ] [ **-R**\ *CHECKPOINT* ] [ -**-s**\ *MIN_CHUNK_SIZE* ] [ **-S**\ *MAX_CHUNK_SIZE* ] [ -**-a**\ *NR_AHEAD* ] [ **-b**\ *BUF_SIZE* ] [ **-l**\ *LOGIN_NAME* ] [ -**-P**\ *PORT* ] [ **-F**\ *CONFIG* ] [ **-i**\ *IDENTITY* ] [ -**-c**\ *CIPHER* ] [ **-M**\ *HMAC* ] [ **-C**\ *COMPRESS* ] [ -**-g**\ *CONGESTION* ] *source ... target* +**mscp** [**-46vqDpHdNh**] [ **-n** *NR_CONNECTIONS* ] [ **-m** +*COREMASK* ] [ **-u** *MAX_STARTUPS* ] [ **-I** *INTERVAL* ] [ **-W** +*CHECKPOINT* ] [ **-R** *CHECKPOINT* ] [ **-s** *MIN_CHUNK_SIZE* ] [ +**-S** *MAX_CHUNK_SIZE* ] [ **-a** *NR_AHEAD* ] [ **-b** *BUF_SIZE* ] [ +**-L** *LIMIT_BITRATE* ] [ **-l** *LOGIN_NAME* ] [ **-P** *PORT* ] [ +**-F** *CONFIG* ] [ **-i** *IDENTITY* ] [ **-c** *CIPHER* ] [ **-M** +*HMAC* ] [ **-C** *COMPRESS* ] [ **-g** *CONGESTION* ] *source ... +target* DESCRIPTION =========== @@ -111,6 +111,10 @@ OPTIONS delivered over SSH. Changing this value is not recommended at present. +**-L LIMIT_BITRATE** + Limits the bitrate, specified with k (K), m (M), and g (G), e.g., + 100m indicates 100 Mbps. + **-4** Uses IPv4 addresses only. diff --git a/include/mscp.h b/include/mscp.h index e8af736..5931665 100644 --- a/include/mscp.h +++ b/include/mscp.h @@ -42,7 +42,8 @@ struct mscp_opts { size_t min_chunk_sz; /** minimum chunk size (default 64MB) */ size_t max_chunk_sz; /** maximum chunk size (default file size/nr_threads) */ size_t buf_sz; /** buffer size, default 16k. */ - char *coremask; /** hex to specifiy usable cpu cores */ + size_t bitrate; /** bits-per-seconds to limit bandwidth */ + char *coremask; /** hex to specifiy usable cpu cores */ int max_startups; /** sshd MaxStartups concurrent connections */ int interval; /** interval between SSH connection attempts */ bool preserve_ts; /** preserve file timestamps */ diff --git a/src/bwlimit.c b/src/bwlimit.c new file mode 100644 index 0000000..f51823d --- /dev/null +++ b/src/bwlimit.c @@ -0,0 +1,94 @@ +/* SPDX-License-Identifier: GPL-3.0-only */ +#include + +#include +#include + +#define timespeczerorize(ts) \ + do { \ + ts.tv_sec = 0; \ + ts.tv_nsec = 0; \ + } while (0) + +int bwlimit_init(struct bwlimit *bw, uint64_t bps, uint64_t win) +{ + if (!(bw->sem = sem_create(1))) + return -1; + + bw->bps = bps; + bw->win = win; /* msec window */ + bw->amt = (double)bps / 8 / 1000 * win; /* bytes in a window (msec) */ + bw->credit = bw->amt; + timespeczerorize(bw->wstart); + timespeczerorize(bw->wend); + + return 0; +} + +#define timespecisset(ts) ((ts).tv_sec || (ts).tv_nsec) + +#define timespecmsadd(a, msec, r) \ + do { \ + (r).tv_sec = (a).tv_sec; \ + (r).tv_nsec = (a).tv_nsec + (msec * 1000000); \ + if ((r).tv_nsec > 1000000000) { \ + (r).tv_sec += (r.tv_nsec) / 1000000000L; \ + (r).tv_nsec = (r.tv_nsec) % 1000000000L; \ + } \ + } while (0) + +#define timespecsub(a, b, r) \ + do { \ + (r).tv_sec = (a).tv_sec - (b).tv_sec; \ + (r).tv_nsec = (a).tv_nsec - (b).tv_nsec; \ + if ((r).tv_nsec < 0) { \ + (r).tv_sec -= 1; \ + (r).tv_nsec += 1000000000; \ + } \ + } while (0) + +#define timespeccmp(a, b, expr) \ + ((a.tv_sec * 1000000000 + a.tv_nsec) expr(b.tv_sec * 1000000000 + b.tv_nsec)) + +#include + +int bwlimit_wait(struct bwlimit *bw, size_t nr_bytes) +{ + struct timespec now, end, rq, rm; + + if (bw->bps == 0) + return 0; /* no bandwidth limit */ + + if (sem_wait(bw->sem) < 0) + return -1; + + clock_gettime(CLOCK_MONOTONIC, &now); + + if (!timespecisset(bw->wstart)) { + bw->wstart = now; + timespecmsadd(bw->wstart, bw->win, bw->wend); + } + + bw->credit -= nr_bytes; + + if (bw->credit < 0) { + /* no more credit on this window. sleep until the end + * of this window + additional time for the remaining + * bytes. */ + uint64_t addition = (double)(bw->credit * -1) / (bw->bps / 8); + timespecmsadd(bw->wend, addition * 1000, end); + if (timespeccmp(end, now, >)) { + timespecsub(end, now, rq); + while (nanosleep(&rq, &rm) == -1) { + if (errno != EINTR) + break; + rq = rm; + } + } + bw->credit = bw->amt; + timespeczerorize(bw->wstart); + } + + sem_post(bw->sem); + return 0; +} diff --git a/src/bwlimit.h b/src/bwlimit.h new file mode 100644 index 0000000..58cdd8d --- /dev/null +++ b/src/bwlimit.h @@ -0,0 +1,28 @@ +/* SPDX-License-Identifier: GPL-3.0-only */ +#ifndef _BWLIMIT_H_ +#define _BWLIMIT_H_ + +#include +#include +#include +#include +#include + +struct bwlimit { + sem_t *sem; /* semaphore */ + uint64_t bps; /* limit bit-rate (bps) */ + uint64_t win; /* window size (msec) */ + size_t amt; /* amount of bytes can be sent in a window */ + + ssize_t credit; /* remaining bytes can be sent in a window */ + struct timespec wstart, wend; /* window start time and end time */ +}; + +int bwlimit_init(struct bwlimit *bw, uint64_t bps, uint64_t win); +/* if bps is 0, it means that bwlimit is not active. If so, + * bwlimit_wait() returns immediately. */ + +int bwlimit_wait(struct bwlimit *bw, size_t nr_bytes); + + +#endif /* _BWLIMIT_H_ */ diff --git a/src/main.c b/src/main.c index 81b53e8..4919818 100644 --- a/src/main.c +++ b/src/main.c @@ -26,7 +26,8 @@ void usage(bool print_help) "\n" "Usage: mscp [-46vqDpHdNh] [-n nr_conns] [-m coremask]\n" " [-u max_startups] [-I interval] [-W checkpoint] [-R checkpoint]\n" - " [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]\n" + " [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead]\n" + " [-b buf_sz] [-L limit_bitrate]\n" " [-l login_name] [-P port] [-F ssh_config] [-i identity_file]\n" " [-c cipher_spec] [-M hmac_spec] [-C compress] [-g congestion]\n" " source ... target\n" @@ -48,6 +49,7 @@ void usage(bool print_help) " -S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)\n" " -a NR_AHEAD number of inflight SFTP commands (default: 32)\n" " -b BUF_SZ buffer size for i/o and transfer\n" + " -L LIMIT_BITRATE Limit the bitrate, n[KMG] (default: 0, no limit)\n" "\n" " -4 use IPv4\n" " -6 use IPv6\n" @@ -266,12 +268,14 @@ int main(int argc, char **argv) int direction = 0; char *remote = NULL, *checkpoint_save = NULL, *checkpoint_load = NULL; bool dryrun = false, resume = false; + char *u; + size_t mag = 0; memset(&s, 0, sizeof(s)); memset(&o, 0, sizeof(o)); o.severity = MSCP_SEVERITY_WARN; -#define mscpopts "n:m:u:I:W:R:s:S:a:b:46vqDrl:P:i:F:c:M:C:g:pHdNh" +#define mscpopts "n:m:u:I:W:R:s:S:a:b:L:46vqDrl:P:i:F:c:M:C:g:pHdNh" while ((ch = getopt(argc, argv, mscpopts)) != -1) { switch (ch) { case 'n': @@ -309,6 +313,25 @@ int main(int argc, char **argv) case 'b': o.buf_sz = atoi(optarg); break; + case 'L': + u = optarg + (strlen(optarg) - 1); + if (*u == 'k' || *u == 'K') { + mag = 1000; + *u = '\0'; + } else if (*u == 'm' || *u == 'M') { + mag = 1000000; + *u = '\0'; + } else if (*u == 'g' || *u == 'G') { + mag = 1000000000; + *u = '\0'; + } + o.bitrate = atol(optarg); + if (o.bitrate == 0) { + pr_err("invalid bitrate: %s", optarg); + return 1; + } + o.bitrate *= mag; + break; case '4': s.ai_family = AF_INET; break; diff --git a/src/mscp.c b/src/mscp.c index 34dd8db..857f002 100644 --- a/src/mscp.c +++ b/src/mscp.c @@ -17,6 +17,7 @@ #include #include #include +#include #include @@ -56,6 +57,8 @@ struct mscp { #define chunk_pool_is_ready(m) ((m)->chunk_pool_ready) #define chunk_pool_set_ready(m, b) ((m)->chunk_pool_ready = b) + struct bwlimit bw; /* bandwidth limit mechanism */ + struct mscp_thread scan; /* mscp_thread for mscp_scan_thread() */ }; @@ -281,6 +284,12 @@ struct mscp *mscp_init(struct mscp_opts *o, struct mscp_ssh_opts *s) pr_notice("usable cpu cores:%s", b); } + if (bwlimit_init(&m->bw, o->bitrate, 100) < 0) { /* 100ms window (hardcoded) */ + priv_set_errv("bwlimit_init: %s", strerrno()); + goto free_out; + } + pr_notice("bitrate limit: %lu bps", o->bitrate); + return m; free_out: @@ -522,8 +531,8 @@ int mscp_checkpoint_load(struct mscp *m, const char *pathname) int mscp_checkpoint_save(struct mscp *m, const char *pathname) { - return checkpoint_save(pathname, m->direction, m->ssh_opts->login_name, - m->remote, m->path_pool, m->chunk_pool); + return checkpoint_save(pathname, m->direction, m->ssh_opts->login_name, m->remote, + m->path_pool, m->chunk_pool); } static void *mscp_copy_thread(void *arg); @@ -712,7 +721,7 @@ void *mscp_copy_thread(void *arg) } if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, m->opts->nr_ahead, - m->opts->buf_sz, m->opts->preserve_ts, + m->opts->buf_sz, m->opts->preserve_ts, &m->bw, &t->copied_bytes)) < 0) break; } diff --git a/src/path.c b/src/path.c index 219ec53..f85e20f 100644 --- a/src/path.c +++ b/src/path.c @@ -348,7 +348,7 @@ static ssize_t read_to_buf(void *ptr, size_t len, void *userdata) } static int copy_chunk_l2r(struct chunk *c, int fd, sftp_file sf, int nr_ahead, int buf_sz, - size_t *counter) + struct bwlimit *bw, size_t *counter) { ssize_t read_bytes, remaind, thrown; int idx, ret; @@ -371,6 +371,7 @@ static int copy_chunk_l2r(struct chunk *c, int fd, sftp_file sf, int nr_ahead, i return -1; } thrown -= reqs[idx].len; + bwlimit_wait(bw, reqs[idx].len); } for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) { @@ -399,6 +400,7 @@ static int copy_chunk_l2r(struct chunk *c, int fd, sftp_file sf, int nr_ahead, i return -1; } thrown -= reqs[idx].len; + bwlimit_wait(bw, reqs[idx].len); } if (remaind < 0) { @@ -412,7 +414,7 @@ static int copy_chunk_l2r(struct chunk *c, int fd, sftp_file sf, int nr_ahead, i } static int copy_chunk_r2l(struct chunk *c, sftp_file sf, int fd, int nr_ahead, int buf_sz, - size_t *counter) + struct bwlimit *bw, size_t *counter) { ssize_t read_bytes, write_bytes, remaind, thrown; char buf[buf_sz]; @@ -436,6 +438,7 @@ static int copy_chunk_r2l(struct chunk *c, sftp_file sf, int fd, int nr_ahead, i return -1; } thrown -= reqs[idx].len; + bwlimit_wait(bw, reqs[idx].len); } for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) { @@ -449,6 +452,7 @@ static int copy_chunk_r2l(struct chunk *c, sftp_file sf, int fd, int nr_ahead, i reqs[idx].len = min(thrown, sizeof(buf)); reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len); thrown -= reqs[idx].len; + bwlimit_wait(bw, reqs[idx].len); } write_bytes = write(fd, buf, read_bytes); @@ -477,19 +481,22 @@ static int copy_chunk_r2l(struct chunk *c, sftp_file sf, int fd, int nr_ahead, i } static int _copy_chunk(struct chunk *c, mf *s, mf *d, int nr_ahead, int buf_sz, - size_t *counter) + struct bwlimit *bw, size_t *counter) { if (s->local && d->remote) /* local to remote copy */ - return copy_chunk_l2r(c, s->local, d->remote, nr_ahead, buf_sz, counter); + return copy_chunk_l2r(c, s->local, d->remote, nr_ahead, buf_sz, bw, + counter); else if (s->remote && d->local) /* remote to local copy */ - return copy_chunk_r2l(c, s->remote, d->local, nr_ahead, buf_sz, counter); + return copy_chunk_r2l(c, s->remote, d->local, nr_ahead, buf_sz, bw, + counter); assert(false); return -1; /* not reached */ } int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, - int nr_ahead, int buf_sz, bool preserve_ts, size_t *counter) + int nr_ahead, int buf_sz, bool preserve_ts, struct bwlimit *bw, + size_t *counter) { mode_t mode; int flags; @@ -529,7 +536,7 @@ int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, c->state = CHUNK_STATE_COPING; pr_debug("copy chunk start: %s 0x%lx-0x%lx", c->p->path, c->off, c->off + c->len); - ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, counter); + ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, bw, counter); pr_debug("copy chunk done: %s 0x%lx-0x%lx", c->p->path, c->off, c->off + c->len); diff --git a/src/path.h b/src/path.h index aa61c44..c3d3158 100644 --- a/src/path.h +++ b/src/path.h @@ -9,6 +9,7 @@ #include #include #include +#include struct path { char *path; /* file path */ @@ -66,6 +67,7 @@ void free_path(struct path *p); /* copy a chunk. either src_sftp or dst_sftp is not null, and another is null */ int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, - int nr_ahead, int buf_sz, bool preserve_ts, size_t *counter); + int nr_ahead, int buf_sz, bool preserve_ts, struct bwlimit *bw, + size_t *counter); #endif /* _PATH_H_ */ diff --git a/test/test_e2e.py b/test/test_e2e.py index 808558d..c9a64bb 100644 --- a/test/test_e2e.py +++ b/test/test_e2e.py @@ -6,6 +6,7 @@ import platform import pytest import getpass +import datetime import time import os import shutil @@ -356,6 +357,21 @@ def test_dont_make_conns_more_than_chunks(mscp, src_prefix, dst_prefix): assert((end - start) < 10) +@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix) +def test_bwlimit(mscp, src_prefix, dst_prefix): + """Copy 100MB file with 100Mbps bitrate, this requires 8 seconds.""" + src = File("src", size = 100 * 1024 * 1024).make() + dst = File("dst") + + start = datetime.datetime.now().timestamp() + run2ok([mscp, "-H", "-vvv", "-L", "100m", src_prefix + "src", dst_prefix + "dst"]) + end = datetime.datetime.now().timestamp() + assert check_same_md5sum(src, dst) + src.cleanup() + dst.cleanup() + assert end - start > 7 + + @pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix) @pytest.mark.parametrize("src, dst", param_single_copy) def test_set_port_ng(mscp, src_prefix, dst_prefix, src, dst): @@ -553,15 +569,15 @@ def test_checkpoint_dump_and_resume(mscp, src_prefix, dst_prefix): dst2.cleanup() os.remove("checkpoint") -@pytest.mark.parametrize("timeout", [1,2,3]) +@pytest.mark.parametrize("timeout", [1, 2, 3, 4, 5, 6]) @pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix) def test_checkpoint_interrupt_and_resume(mscp, timeout, src_prefix, dst_prefix): - src1 = File("src1", size = 1024 * 1024 * 1024).make() - src2 = File("src2", size = 1024 * 1024 * 1024).make() + """Copy two 100MB files with 200Mbps -> 4 sec + 4 sec """ + src1 = File("src1", size = 100 * 1024 * 1024).make() + src2 = File("src2", size = 100 * 1024 * 1024).make() dst1 = File("dst/src1") dst2 = File("dst/src2") - run2ng([mscp, "-H", "-vv", "-W", "checkpoint", - "-n", 1, "-s", 8192, "-S", 16384, + run2ng([mscp, "-H", "-vv", "-W", "checkpoint", "-L", "200m", src_prefix + "src1", src_prefix + "src2", dst_prefix + "dst"], timeout = timeout) assert os.path.exists("checkpoint") @@ -574,3 +590,4 @@ def test_checkpoint_interrupt_and_resume(mscp, timeout, src_prefix, dst_prefix): dst1.cleanup() dst2.cleanup() os.remove("checkpoint") +