implement ssh_connect_flag

Each copy thread establishes SSH/SFTP connection to remote host.
A delay is inserted between SSH connecting to the remote.
This commit is contained in:
Ryo Nakamura
2023-03-14 00:43:53 +09:00
parent 9b0eb668f9
commit 72c27f16d6
2 changed files with 80 additions and 24 deletions

View File

@@ -2,7 +2,7 @@
#include <unistd.h>
#include <math.h>
#include <pthread.h>
#include <sys/time.h>
#include <list.h>
#include <util.h>
@@ -13,6 +13,56 @@
#include <message.h>
#include <mscp.h>
struct ssh_connect_flag {
lock lock;
struct timeval enter;
long delay; /* msec */
};
static long timeval_sub(struct timeval a, struct timeval b)
{
unsigned long sec, usec;
if (a.tv_usec < b.tv_usec) {
a.tv_usec += 1000000;
a.tv_sec--;
}
sec = a.tv_sec - b.tv_sec;
usec = a.tv_usec - b.tv_usec;
return sec * 1000000 + usec;
}
static void ssh_connect_flag_init(struct ssh_connect_flag *f)
{
memset(f, 0, sizeof(f));
lock_init(&f->lock);
f->delay = 10000; /* To be configurable */
}
static void ssh_connect_ready(struct ssh_connect_flag *f)
{
struct timeval now;
long delta;
LOCK_ACQUIRE_THREAD(&f->lock);
if (f->enter.tv_sec == 0 && f->enter.tv_usec == 0) {
/* I'm the first one. */
goto ready;
}
gettimeofday(&now, NULL);
delta = timeval_sub(now, f->enter);
if (delta <= f->delay) {
/* wait until enter + delay time */
usleep(f->delay - delta);
}
ready:
gettimeofday(&f->enter, NULL);
LOCK_RELEASE_THREAD();
}
struct mscp {
char *remote; /* remote host (and uername) */
int direction; /* copy direction */
@@ -21,8 +71,10 @@ struct mscp {
int msg_fd; /* writer fd for message pipe */
int *cores; /* usable cpu cores by COREMASK */
int nr_cores; /* length of array of cores */
int *cores; /* usable cpu cores by COREMASK */
int nr_cores; /* length of array of cores */
struct ssh_connect_flag ssh_flag;
sftp_session first; /* first sftp session */
@@ -66,6 +118,9 @@ struct src {
#define non_null_string(s) (s[0] != '\0')
static int expand_coremask(const char *coremask, int **cores, int *nr_cores)
{
int n, *core_list, core_list_len = 0, nr_usable, nr_all;
@@ -200,21 +255,22 @@ struct mscp *mscp_init(const char *remote_host, int direction,
return NULL;
}
mprint_set_severity(o->severity);
if (validate_and_set_defaut_params(o) < 0)
goto free_out;
m = malloc(sizeof(*m));
if (!m) {
mscp_set_error("failed to allocate memory: %s", strerrno());
return NULL;
}
mprint_set_severity(o->severity);
if (validate_and_set_defaut_params(o) < 0)
goto free_out;
memset(m, 0, sizeof(*m));
INIT_LIST_HEAD(&m->src_list);
INIT_LIST_HEAD(&m->path_list);
chunk_pool_init(&m->cp);
ssh_connect_flag_init(&m->ssh_flag);
m->remote = strdup(remote_host);
if (!m->remote) {
@@ -464,16 +520,6 @@ int mscp_start(struct mscp *m)
else
t->cpu = m->cores[n % m->nr_cores];
mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n",
m->remote);
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
if (!t->sftp)
return -1;
}
/* spawn copy threads */
for (n = 0; n < m->opts->nr_threads; n++) {
struct mscp_thread *t = &m->threads[n];
ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
if (ret < 0) {
mscp_set_error("pthread_create error: %d", ret);
@@ -537,6 +583,21 @@ void *mscp_copy_thread(void *arg)
struct mscp *m = t->m;
struct chunk *c;
if (t->cpu > -1) {
if (set_thread_affinity(pthread_self(), t->cpu) < 0) {
t->ret = -1;
return NULL;
}
}
ssh_connect_ready(&m->ssh_flag);
mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n", m->remote);
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
if (!t->sftp) {
t->ret = -1;
return NULL;
}
switch (m->direction) {
case MSCP_DIRECTION_L2R:
src_sftp = NULL;
@@ -550,11 +611,6 @@ void *mscp_copy_thread(void *arg)
return NULL; /* not reached */
}
if (t->cpu > -1) {
if (set_thread_affinity(pthread_self(), t->cpu) < 0)
return NULL;
}
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
pthread_cleanup_push(mscp_copy_thread_cleanup, t);

View File

@@ -132,7 +132,7 @@ static int resolve_dst_path(const char *src_file_path, char *dst_file_path,
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
a->dst_path, src_file_path + strlen(a->src_path) + 1);
mpr_info(a->msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path);
mpr_debug(a->msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path);
return 0;
}