mirror of
https://github.com/upa/mscp.git
synced 2026-02-04 03:24:58 +08:00
add -I interval option
-I INTERVAL option inserts sleep for interval (seconds) between SSH connection attempts (issue #7).
This commit is contained in:
@@ -43,7 +43,8 @@ struct mscp_opts {
|
||||
size_t max_chunk_sz; /** maximum chunk size (default file size/nr_threads) */
|
||||
size_t buf_sz; /** buffer size, default 16k. */
|
||||
char coremask[MSCP_MAX_COREMASK_STR]; /** hex to specifiy usable cpu cores */
|
||||
int max_startups; /* sshd MaxStartups concurrent connections */
|
||||
int max_startups; /** sshd MaxStartups concurrent connections */
|
||||
int interval; /** interval between SSH connection attempts */
|
||||
|
||||
int severity; /** messaging severity. set MSCP_SERVERITY_* */
|
||||
int msg_fd; /** fd to output message. default STDOUT (0),
|
||||
|
||||
13
src/main.c
13
src/main.c
@@ -18,7 +18,8 @@
|
||||
void usage(bool print_help) {
|
||||
printf("mscp " MSCP_BUILD_VERSION ": copy files over multiple ssh connections\n"
|
||||
"\n"
|
||||
"Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask] [-u max_startups]\n"
|
||||
"Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]\n"
|
||||
" [-u max_startups] [-I interval]\n"
|
||||
" [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]\n"
|
||||
" [-l login_name] [-p port] [-F ssh_config] [-i identity_file]\n"
|
||||
" [-c cipher_spec] [-M hmac_spec] [-C compress] [-g congestion]\n"
|
||||
@@ -31,11 +32,12 @@ void usage(bool print_help) {
|
||||
printf(" -n NR_CONNECTIONS number of connections "
|
||||
"(default: floor(log(cores)*2)+1)\n"
|
||||
" -m COREMASK hex value to specify cores where threads pinned\n"
|
||||
" -u MAX_STARTUPS number of concurrent outgoing connections "
|
||||
" -u MAX_STARTUPS number of concurrent SSH connection attempts "
|
||||
"(default: 8)\n"
|
||||
" -I INTERVAL interval between SSH connection attempts (default: 0)\n"
|
||||
"\n"
|
||||
" -s MIN_CHUNK_SIZE min chunk size (default: 64MB)\n"
|
||||
" -S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)\n"
|
||||
"\n"
|
||||
" -a NR_AHEAD number of inflight SFTP commands (default: 32)\n"
|
||||
" -b BUF_SZ buffer size for i/o and transfer\n"
|
||||
"\n"
|
||||
@@ -253,7 +255,7 @@ int main(int argc, char **argv)
|
||||
o.severity = MSCP_SEVERITY_WARN;
|
||||
|
||||
while ((ch = getopt(argc, argv,
|
||||
"n:m:u:s:S:a:b:vqDrl:p:i:F:c:M:C:g:HdNh")) != -1) {
|
||||
"n:m:u:I:s:S:a:b:vqDrl:p:i:F:c:M:C:g:HdNh")) != -1) {
|
||||
switch (ch) {
|
||||
case 'n':
|
||||
o.nr_threads = atoi(optarg);
|
||||
@@ -269,6 +271,9 @@ int main(int argc, char **argv)
|
||||
case 'u':
|
||||
o.max_startups = atoi(optarg);
|
||||
break;
|
||||
case 'I':
|
||||
o.interval = atoi(optarg);
|
||||
break;
|
||||
case 's':
|
||||
o.min_chunk_sz = atoi(optarg);
|
||||
break;
|
||||
|
||||
78
src/mscp.c
78
src/mscp.c
@@ -80,8 +80,6 @@ struct src {
|
||||
#define non_null_string(s) (s[0] != '\0')
|
||||
|
||||
|
||||
|
||||
|
||||
static int expand_coremask(const char *coremask, int **cores, int *nr_cores)
|
||||
{
|
||||
int n, *core_list, core_list_len = 0, nr_usable, nr_all;
|
||||
@@ -203,6 +201,11 @@ static int validate_and_set_defaut_params(struct mscp_opts *o)
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (o->interval > 0) {
|
||||
/* when the interval is set, establish SSH connections sequentially. */
|
||||
o->max_startups = 1;
|
||||
}
|
||||
|
||||
if (o->msg_fd == 0)
|
||||
o->msg_fd = STDOUT_FILENO;
|
||||
|
||||
@@ -594,7 +597,22 @@ int mscp_join(struct mscp *m)
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* copy thread related functions */
|
||||
/* copy thread-related functions */
|
||||
|
||||
static void wait_for_interval(int interval)
|
||||
{
|
||||
_Atomic static long next;
|
||||
struct timeval t;
|
||||
long now;
|
||||
|
||||
gettimeofday(&t, NULL);
|
||||
now = t.tv_sec * 1000000 + t.tv_usec;
|
||||
|
||||
if (next - now > 0)
|
||||
usleep(next - now);
|
||||
|
||||
next = now + interval * 1000000;
|
||||
}
|
||||
|
||||
static void mscp_copy_thread_cleanup(void *arg)
|
||||
{
|
||||
@@ -604,16 +622,17 @@ static void mscp_copy_thread_cleanup(void *arg)
|
||||
|
||||
void *mscp_copy_thread(void *arg)
|
||||
{
|
||||
sftp_session src_sftp, dst_sftp;
|
||||
struct mscp_thread *t = arg;
|
||||
sftp_session src_sftp, dst_sftp;
|
||||
struct mscp_thread *t = arg;
|
||||
struct mscp *m = t->m;
|
||||
struct chunk *c;
|
||||
struct chunk *c;
|
||||
bool nomore;
|
||||
|
||||
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
|
||||
|
||||
if (t->cpu > -1) {
|
||||
if (set_thread_affinity(pthread_self(), t->cpu) < 0) {
|
||||
t->ret = -1;
|
||||
return NULL;
|
||||
}
|
||||
if (set_thread_affinity(pthread_self(), t->cpu) < 0)
|
||||
goto err_out;
|
||||
}
|
||||
|
||||
if (sem_wait(m->sem) < 0) {
|
||||
@@ -622,9 +641,12 @@ void *mscp_copy_thread(void *arg)
|
||||
goto err_out;
|
||||
}
|
||||
|
||||
mpr_notice(m->msg_fp, "connecting to %s for copy thread:%d...\n",
|
||||
m->remote, t->id);
|
||||
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
|
||||
if (!(nomore = chunk_pool_is_empty(&m->cp))) {
|
||||
if (m->opts->interval > 0)
|
||||
wait_for_interval(m->opts->interval);
|
||||
mpr_notice(m->msg_fp, "thread:%d connecting to %s\n", t->id, m->remote);
|
||||
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
|
||||
}
|
||||
|
||||
if (sem_post(m->sem) < 0) {
|
||||
mscp_set_error("sem_post: %s", strerrno());
|
||||
@@ -632,8 +654,13 @@ void *mscp_copy_thread(void *arg)
|
||||
goto err_out;
|
||||
}
|
||||
|
||||
if (nomore) {
|
||||
mpr_notice(m->msg_fp, "thread:%d no more connections needed\n", t->id);
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (!t->sftp) {
|
||||
mpr_err(m->msg_fp, "copy thread:%d: %s\n", t->id, mscp_get_error());
|
||||
mpr_err(m->msg_fp, "thread:%d: %s\n", t->id, mscp_get_error());
|
||||
goto err_out;
|
||||
}
|
||||
|
||||
@@ -650,10 +677,6 @@ void *mscp_copy_thread(void *arg)
|
||||
return NULL; /* not reached */
|
||||
}
|
||||
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
|
||||
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
|
||||
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
|
||||
|
||||
while (1) {
|
||||
c = chunk_pool_pop(&m->cp);
|
||||
if (c == CHUNK_POP_WAIT) {
|
||||
@@ -673,8 +696,8 @@ void *mscp_copy_thread(void *arg)
|
||||
pthread_cleanup_pop(1);
|
||||
|
||||
if (t->ret < 0)
|
||||
mpr_err(m->msg_fp, "copy failed: chunk %s 0x%010lx-0x%010lx\n",
|
||||
c->p->path, c->off, c->off + c->len);
|
||||
mpr_err(m->msg_fp, "thread:%d copy failed: %s 0x%010lx-0x%010lx\n",
|
||||
t->id, c->p->path, c->off, c->off + c->len);
|
||||
|
||||
return NULL;
|
||||
|
||||
@@ -682,10 +705,14 @@ err_out:
|
||||
t->finished = true;
|
||||
t->ret = -1;
|
||||
return NULL;
|
||||
out:
|
||||
t->finished = true;
|
||||
t->ret = 0;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/* cleanup related functions */
|
||||
/* cleanup-related functions */
|
||||
|
||||
static void free_src(struct list_head *list)
|
||||
{
|
||||
@@ -751,19 +778,20 @@ void mscp_free(struct mscp *m)
|
||||
|
||||
void mscp_get_stats(struct mscp *m, struct mscp_stats *s)
|
||||
{
|
||||
int nr_finished = 0, nr_threads = 0;
|
||||
struct mscp_thread *t;
|
||||
bool finished = true;
|
||||
|
||||
s->total = m->total_bytes;
|
||||
s->done = 0;
|
||||
|
||||
RWLOCK_READ_ACQUIRE(&m->thread_rwlock);
|
||||
list_for_each_entry(t, &m->thread_list, list) {
|
||||
nr_threads++;
|
||||
s->done += t->done;
|
||||
if (!t->finished)
|
||||
finished = false;
|
||||
if (t->finished)
|
||||
nr_finished++;
|
||||
}
|
||||
RWLOCK_RELEASE();
|
||||
|
||||
s->finished = finished;
|
||||
s->finished = nr_threads > 0 ? (nr_finished == nr_threads) : false;
|
||||
}
|
||||
|
||||
@@ -49,6 +49,10 @@ size_t chunk_pool_size(struct chunk_pool *cp)
|
||||
return cp->count;
|
||||
}
|
||||
|
||||
bool chunk_pool_is_empty(struct chunk_pool *cp)
|
||||
{
|
||||
return list_empty(&cp->list);
|
||||
}
|
||||
|
||||
struct chunk *chunk_pool_pop(struct chunk_pool *cp)
|
||||
{
|
||||
@@ -68,7 +72,7 @@ struct chunk *chunk_pool_pop(struct chunk_pool *cp)
|
||||
}
|
||||
LOCK_RELEASE();
|
||||
|
||||
/* return CHUNK_POP_WAIT would be very rare case, because it
|
||||
/* return CHUNK_POP_WAIT would be a rare case, because it
|
||||
* means copying over SSH is faster than traversing
|
||||
* local/remote file paths.
|
||||
*/
|
||||
|
||||
@@ -62,6 +62,9 @@ bool chunk_pool_is_filled(struct chunk_pool *cp);
|
||||
/* return number of chunks in the pool */
|
||||
size_t chunk_pool_size(struct chunk_pool *cp);
|
||||
|
||||
/* return true if chunk pool is empty (all chunks are already poped) */
|
||||
bool chunk_pool_is_empty(struct chunk_pool *cp);
|
||||
|
||||
/* free chunks in the chunk_pool */
|
||||
void chunk_pool_release(struct chunk_pool *cp);
|
||||
|
||||
|
||||
@@ -97,6 +97,7 @@ static PyObject *wrap_mscp_init(PyObject *self, PyObject *args, PyObject *kw)
|
||||
"coremask", /* const char * */
|
||||
|
||||
"max_startups", /* int */
|
||||
"interval", /* int */
|
||||
"severity", /* int, MSCP_SERVERITY_* */
|
||||
"msg_fd", /* int */
|
||||
|
||||
@@ -118,7 +119,7 @@ static PyObject *wrap_mscp_init(PyObject *self, PyObject *args, PyObject *kw)
|
||||
"enable_nagle", /* bool */
|
||||
NULL,
|
||||
};
|
||||
const char *fmt = "si" "|" "ii" "kkk" "s" "iii" "ssss" "ssssss" "ipp";
|
||||
const char *fmt = "si" "|" "ii" "kkk" "s" "iiii" "ssss" "ssssss" "ipp";
|
||||
char *coremask = NULL;
|
||||
char *login_name = NULL, *port = NULL, *config = NULL, *identity = NULL;
|
||||
char *cipher = NULL, *hmac = NULL, *compress = NULL, *ccalgo = NULL;
|
||||
@@ -146,6 +147,7 @@ static PyObject *wrap_mscp_init(PyObject *self, PyObject *args, PyObject *kw)
|
||||
&i->mo.buf_sz,
|
||||
&coremask,
|
||||
&i->mo.max_startups,
|
||||
&i->mo.interval,
|
||||
&i->mo.severity,
|
||||
&i->mo.msg_fd,
|
||||
&login_name,
|
||||
|
||||
@@ -300,6 +300,20 @@ def test_dont_truncate_dst(mscp, src_prefix, dst_prefix):
|
||||
assert md5_before == md5_after
|
||||
f.cleanup()
|
||||
|
||||
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
|
||||
def test_set_conn_interval(mscp, src_prefix, dst_prefix):
|
||||
srcs = []
|
||||
dsts = []
|
||||
for x in range(500):
|
||||
srcs.append(File("src/file{}".format(x), size = 128).make())
|
||||
dsts.append(File("dst/file{}".format(x)))
|
||||
run2ok([mscp, "-H", "-vvv", "-I", 1, src_prefix + "src", dst_prefix + "dst"])
|
||||
|
||||
for src, dst in zip(srcs, dsts):
|
||||
assert check_same_md5sum(src, dst)
|
||||
src.cleanup()
|
||||
dst.cleanup()
|
||||
|
||||
compressions = ["yes", "no", "none"]
|
||||
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
|
||||
@pytest.mark.parametrize("compress", compressions)
|
||||
|
||||
Reference in New Issue
Block a user