mirror of
https://github.com/upa/mscp.git
synced 2026-02-13 00:24:42 +08:00
add -L limit bitrate option (#14)
This commit is contained in:
94
src/bwlimit.c
Normal file
94
src/bwlimit.c
Normal file
@@ -0,0 +1,94 @@
|
||||
/* SPDX-License-Identifier: GPL-3.0-only */
|
||||
#include <errno.h>
|
||||
|
||||
#include <bwlimit.h>
|
||||
#include <platform.h>
|
||||
|
||||
#define timespeczerorize(ts) \
|
||||
do { \
|
||||
ts.tv_sec = 0; \
|
||||
ts.tv_nsec = 0; \
|
||||
} while (0)
|
||||
|
||||
int bwlimit_init(struct bwlimit *bw, uint64_t bps, uint64_t win)
|
||||
{
|
||||
if (!(bw->sem = sem_create(1)))
|
||||
return -1;
|
||||
|
||||
bw->bps = bps;
|
||||
bw->win = win; /* msec window */
|
||||
bw->amt = (double)bps / 8 / 1000 * win; /* bytes in a window (msec) */
|
||||
bw->credit = bw->amt;
|
||||
timespeczerorize(bw->wstart);
|
||||
timespeczerorize(bw->wend);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
#define timespecisset(ts) ((ts).tv_sec || (ts).tv_nsec)
|
||||
|
||||
#define timespecmsadd(a, msec, r) \
|
||||
do { \
|
||||
(r).tv_sec = (a).tv_sec; \
|
||||
(r).tv_nsec = (a).tv_nsec + (msec * 1000000); \
|
||||
if ((r).tv_nsec > 1000000000) { \
|
||||
(r).tv_sec += (r.tv_nsec) / 1000000000L; \
|
||||
(r).tv_nsec = (r.tv_nsec) % 1000000000L; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define timespecsub(a, b, r) \
|
||||
do { \
|
||||
(r).tv_sec = (a).tv_sec - (b).tv_sec; \
|
||||
(r).tv_nsec = (a).tv_nsec - (b).tv_nsec; \
|
||||
if ((r).tv_nsec < 0) { \
|
||||
(r).tv_sec -= 1; \
|
||||
(r).tv_nsec += 1000000000; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define timespeccmp(a, b, expr) \
|
||||
((a.tv_sec * 1000000000 + a.tv_nsec) expr(b.tv_sec * 1000000000 + b.tv_nsec))
|
||||
|
||||
#include <stdio.h>
|
||||
|
||||
int bwlimit_wait(struct bwlimit *bw, size_t nr_bytes)
|
||||
{
|
||||
struct timespec now, end, rq, rm;
|
||||
|
||||
if (bw->bps == 0)
|
||||
return 0; /* no bandwidth limit */
|
||||
|
||||
if (sem_wait(bw->sem) < 0)
|
||||
return -1;
|
||||
|
||||
clock_gettime(CLOCK_MONOTONIC, &now);
|
||||
|
||||
if (!timespecisset(bw->wstart)) {
|
||||
bw->wstart = now;
|
||||
timespecmsadd(bw->wstart, bw->win, bw->wend);
|
||||
}
|
||||
|
||||
bw->credit -= nr_bytes;
|
||||
|
||||
if (bw->credit < 0) {
|
||||
/* no more credit on this window. sleep until the end
|
||||
* of this window + additional time for the remaining
|
||||
* bytes. */
|
||||
uint64_t addition = (double)(bw->credit * -1) / (bw->bps / 8);
|
||||
timespecmsadd(bw->wend, addition * 1000, end);
|
||||
if (timespeccmp(end, now, >)) {
|
||||
timespecsub(end, now, rq);
|
||||
while (nanosleep(&rq, &rm) == -1) {
|
||||
if (errno != EINTR)
|
||||
break;
|
||||
rq = rm;
|
||||
}
|
||||
}
|
||||
bw->credit = bw->amt;
|
||||
timespeczerorize(bw->wstart);
|
||||
}
|
||||
|
||||
sem_post(bw->sem);
|
||||
return 0;
|
||||
}
|
||||
28
src/bwlimit.h
Normal file
28
src/bwlimit.h
Normal file
@@ -0,0 +1,28 @@
|
||||
/* SPDX-License-Identifier: GPL-3.0-only */
|
||||
#ifndef _BWLIMIT_H_
|
||||
#define _BWLIMIT_H_
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <stdint.h>
|
||||
#include <sys/types.h>
|
||||
#include <time.h>
|
||||
#include <semaphore.h>
|
||||
|
||||
struct bwlimit {
|
||||
sem_t *sem; /* semaphore */
|
||||
uint64_t bps; /* limit bit-rate (bps) */
|
||||
uint64_t win; /* window size (msec) */
|
||||
size_t amt; /* amount of bytes can be sent in a window */
|
||||
|
||||
ssize_t credit; /* remaining bytes can be sent in a window */
|
||||
struct timespec wstart, wend; /* window start time and end time */
|
||||
};
|
||||
|
||||
int bwlimit_init(struct bwlimit *bw, uint64_t bps, uint64_t win);
|
||||
/* if bps is 0, it means that bwlimit is not active. If so,
|
||||
* bwlimit_wait() returns immediately. */
|
||||
|
||||
int bwlimit_wait(struct bwlimit *bw, size_t nr_bytes);
|
||||
|
||||
|
||||
#endif /* _BWLIMIT_H_ */
|
||||
27
src/main.c
27
src/main.c
@@ -26,7 +26,8 @@ void usage(bool print_help)
|
||||
"\n"
|
||||
"Usage: mscp [-46vqDpHdNh] [-n nr_conns] [-m coremask]\n"
|
||||
" [-u max_startups] [-I interval] [-W checkpoint] [-R checkpoint]\n"
|
||||
" [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]\n"
|
||||
" [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead]\n"
|
||||
" [-b buf_sz] [-L limit_bitrate]\n"
|
||||
" [-l login_name] [-P port] [-F ssh_config] [-i identity_file]\n"
|
||||
" [-c cipher_spec] [-M hmac_spec] [-C compress] [-g congestion]\n"
|
||||
" source ... target\n"
|
||||
@@ -48,6 +49,7 @@ void usage(bool print_help)
|
||||
" -S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)\n"
|
||||
" -a NR_AHEAD number of inflight SFTP commands (default: 32)\n"
|
||||
" -b BUF_SZ buffer size for i/o and transfer\n"
|
||||
" -L LIMIT_BITRATE Limit the bitrate, n[KMG] (default: 0, no limit)\n"
|
||||
"\n"
|
||||
" -4 use IPv4\n"
|
||||
" -6 use IPv6\n"
|
||||
@@ -266,12 +268,14 @@ int main(int argc, char **argv)
|
||||
int direction = 0;
|
||||
char *remote = NULL, *checkpoint_save = NULL, *checkpoint_load = NULL;
|
||||
bool dryrun = false, resume = false;
|
||||
char *u;
|
||||
size_t mag = 0;
|
||||
|
||||
memset(&s, 0, sizeof(s));
|
||||
memset(&o, 0, sizeof(o));
|
||||
o.severity = MSCP_SEVERITY_WARN;
|
||||
|
||||
#define mscpopts "n:m:u:I:W:R:s:S:a:b:46vqDrl:P:i:F:c:M:C:g:pHdNh"
|
||||
#define mscpopts "n:m:u:I:W:R:s:S:a:b:L:46vqDrl:P:i:F:c:M:C:g:pHdNh"
|
||||
while ((ch = getopt(argc, argv, mscpopts)) != -1) {
|
||||
switch (ch) {
|
||||
case 'n':
|
||||
@@ -309,6 +313,25 @@ int main(int argc, char **argv)
|
||||
case 'b':
|
||||
o.buf_sz = atoi(optarg);
|
||||
break;
|
||||
case 'L':
|
||||
u = optarg + (strlen(optarg) - 1);
|
||||
if (*u == 'k' || *u == 'K') {
|
||||
mag = 1000;
|
||||
*u = '\0';
|
||||
} else if (*u == 'm' || *u == 'M') {
|
||||
mag = 1000000;
|
||||
*u = '\0';
|
||||
} else if (*u == 'g' || *u == 'G') {
|
||||
mag = 1000000000;
|
||||
*u = '\0';
|
||||
}
|
||||
o.bitrate = atol(optarg);
|
||||
if (o.bitrate == 0) {
|
||||
pr_err("invalid bitrate: %s", optarg);
|
||||
return 1;
|
||||
}
|
||||
o.bitrate *= mag;
|
||||
break;
|
||||
case '4':
|
||||
s.ai_family = AF_INET;
|
||||
break;
|
||||
|
||||
15
src/mscp.c
15
src/mscp.c
@@ -17,6 +17,7 @@
|
||||
#include <print.h>
|
||||
#include <strerrno.h>
|
||||
#include <mscp.h>
|
||||
#include <bwlimit.h>
|
||||
|
||||
#include <openbsd-compat/openbsd-compat.h>
|
||||
|
||||
@@ -56,6 +57,8 @@ struct mscp {
|
||||
#define chunk_pool_is_ready(m) ((m)->chunk_pool_ready)
|
||||
#define chunk_pool_set_ready(m, b) ((m)->chunk_pool_ready = b)
|
||||
|
||||
struct bwlimit bw; /* bandwidth limit mechanism */
|
||||
|
||||
struct mscp_thread scan; /* mscp_thread for mscp_scan_thread() */
|
||||
};
|
||||
|
||||
@@ -281,6 +284,12 @@ struct mscp *mscp_init(struct mscp_opts *o, struct mscp_ssh_opts *s)
|
||||
pr_notice("usable cpu cores:%s", b);
|
||||
}
|
||||
|
||||
if (bwlimit_init(&m->bw, o->bitrate, 100) < 0) { /* 100ms window (hardcoded) */
|
||||
priv_set_errv("bwlimit_init: %s", strerrno());
|
||||
goto free_out;
|
||||
}
|
||||
pr_notice("bitrate limit: %lu bps", o->bitrate);
|
||||
|
||||
return m;
|
||||
|
||||
free_out:
|
||||
@@ -522,8 +531,8 @@ int mscp_checkpoint_load(struct mscp *m, const char *pathname)
|
||||
|
||||
int mscp_checkpoint_save(struct mscp *m, const char *pathname)
|
||||
{
|
||||
return checkpoint_save(pathname, m->direction, m->ssh_opts->login_name,
|
||||
m->remote, m->path_pool, m->chunk_pool);
|
||||
return checkpoint_save(pathname, m->direction, m->ssh_opts->login_name, m->remote,
|
||||
m->path_pool, m->chunk_pool);
|
||||
}
|
||||
|
||||
static void *mscp_copy_thread(void *arg);
|
||||
@@ -712,7 +721,7 @@ void *mscp_copy_thread(void *arg)
|
||||
}
|
||||
|
||||
if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, m->opts->nr_ahead,
|
||||
m->opts->buf_sz, m->opts->preserve_ts,
|
||||
m->opts->buf_sz, m->opts->preserve_ts, &m->bw,
|
||||
&t->copied_bytes)) < 0)
|
||||
break;
|
||||
}
|
||||
|
||||
21
src/path.c
21
src/path.c
@@ -348,7 +348,7 @@ static ssize_t read_to_buf(void *ptr, size_t len, void *userdata)
|
||||
}
|
||||
|
||||
static int copy_chunk_l2r(struct chunk *c, int fd, sftp_file sf, int nr_ahead, int buf_sz,
|
||||
size_t *counter)
|
||||
struct bwlimit *bw, size_t *counter)
|
||||
{
|
||||
ssize_t read_bytes, remaind, thrown;
|
||||
int idx, ret;
|
||||
@@ -371,6 +371,7 @@ static int copy_chunk_l2r(struct chunk *c, int fd, sftp_file sf, int nr_ahead, i
|
||||
return -1;
|
||||
}
|
||||
thrown -= reqs[idx].len;
|
||||
bwlimit_wait(bw, reqs[idx].len);
|
||||
}
|
||||
|
||||
for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
|
||||
@@ -399,6 +400,7 @@ static int copy_chunk_l2r(struct chunk *c, int fd, sftp_file sf, int nr_ahead, i
|
||||
return -1;
|
||||
}
|
||||
thrown -= reqs[idx].len;
|
||||
bwlimit_wait(bw, reqs[idx].len);
|
||||
}
|
||||
|
||||
if (remaind < 0) {
|
||||
@@ -412,7 +414,7 @@ static int copy_chunk_l2r(struct chunk *c, int fd, sftp_file sf, int nr_ahead, i
|
||||
}
|
||||
|
||||
static int copy_chunk_r2l(struct chunk *c, sftp_file sf, int fd, int nr_ahead, int buf_sz,
|
||||
size_t *counter)
|
||||
struct bwlimit *bw, size_t *counter)
|
||||
{
|
||||
ssize_t read_bytes, write_bytes, remaind, thrown;
|
||||
char buf[buf_sz];
|
||||
@@ -436,6 +438,7 @@ static int copy_chunk_r2l(struct chunk *c, sftp_file sf, int fd, int nr_ahead, i
|
||||
return -1;
|
||||
}
|
||||
thrown -= reqs[idx].len;
|
||||
bwlimit_wait(bw, reqs[idx].len);
|
||||
}
|
||||
|
||||
for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
|
||||
@@ -449,6 +452,7 @@ static int copy_chunk_r2l(struct chunk *c, sftp_file sf, int fd, int nr_ahead, i
|
||||
reqs[idx].len = min(thrown, sizeof(buf));
|
||||
reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len);
|
||||
thrown -= reqs[idx].len;
|
||||
bwlimit_wait(bw, reqs[idx].len);
|
||||
}
|
||||
|
||||
write_bytes = write(fd, buf, read_bytes);
|
||||
@@ -477,19 +481,22 @@ static int copy_chunk_r2l(struct chunk *c, sftp_file sf, int fd, int nr_ahead, i
|
||||
}
|
||||
|
||||
static int _copy_chunk(struct chunk *c, mf *s, mf *d, int nr_ahead, int buf_sz,
|
||||
size_t *counter)
|
||||
struct bwlimit *bw, size_t *counter)
|
||||
{
|
||||
if (s->local && d->remote) /* local to remote copy */
|
||||
return copy_chunk_l2r(c, s->local, d->remote, nr_ahead, buf_sz, counter);
|
||||
return copy_chunk_l2r(c, s->local, d->remote, nr_ahead, buf_sz, bw,
|
||||
counter);
|
||||
else if (s->remote && d->local) /* remote to local copy */
|
||||
return copy_chunk_r2l(c, s->remote, d->local, nr_ahead, buf_sz, counter);
|
||||
return copy_chunk_r2l(c, s->remote, d->local, nr_ahead, buf_sz, bw,
|
||||
counter);
|
||||
|
||||
assert(false);
|
||||
return -1; /* not reached */
|
||||
}
|
||||
|
||||
int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
|
||||
int nr_ahead, int buf_sz, bool preserve_ts, size_t *counter)
|
||||
int nr_ahead, int buf_sz, bool preserve_ts, struct bwlimit *bw,
|
||||
size_t *counter)
|
||||
{
|
||||
mode_t mode;
|
||||
int flags;
|
||||
@@ -529,7 +536,7 @@ int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
|
||||
c->state = CHUNK_STATE_COPING;
|
||||
pr_debug("copy chunk start: %s 0x%lx-0x%lx", c->p->path, c->off, c->off + c->len);
|
||||
|
||||
ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, counter);
|
||||
ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, bw, counter);
|
||||
|
||||
pr_debug("copy chunk done: %s 0x%lx-0x%lx", c->p->path, c->off, c->off + c->len);
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
#include <pool.h>
|
||||
#include <atomic.h>
|
||||
#include <ssh.h>
|
||||
#include <bwlimit.h>
|
||||
|
||||
struct path {
|
||||
char *path; /* file path */
|
||||
@@ -66,6 +67,7 @@ void free_path(struct path *p);
|
||||
|
||||
/* copy a chunk. either src_sftp or dst_sftp is not null, and another is null */
|
||||
int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
|
||||
int nr_ahead, int buf_sz, bool preserve_ts, size_t *counter);
|
||||
int nr_ahead, int buf_sz, bool preserve_ts, struct bwlimit *bw,
|
||||
size_t *counter);
|
||||
|
||||
#endif /* _PATH_H_ */
|
||||
|
||||
Reference in New Issue
Block a user