mirror of
https://github.com/upa/mscp.git
synced 2026-02-04 03:24:58 +08:00
cleanup mscp_scan_thread related codes
This commit is contained in:
22
src/main.c
22
src/main.c
@@ -233,12 +233,12 @@ free_target_out:
|
||||
|
||||
struct mscp *m = NULL;
|
||||
pthread_t tid_stat = 0;
|
||||
bool interrupted = false;
|
||||
|
||||
void sigint_handler(int sig)
|
||||
{
|
||||
interrupted = true;
|
||||
mscp_stop(m);
|
||||
if (tid_stat > 0)
|
||||
pthread_cancel(tid_stat);
|
||||
}
|
||||
|
||||
void *print_stat_thread(void *arg);
|
||||
@@ -252,6 +252,8 @@ void print_cli(const char *fmt, ...)
|
||||
va_end(va);
|
||||
}
|
||||
|
||||
void print_stat(bool final);
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
struct mscp_ssh_opts s;
|
||||
@@ -437,10 +439,15 @@ int main(int argc, char **argv)
|
||||
pthread_cancel(tid_stat);
|
||||
pthread_join(tid_stat, NULL);
|
||||
|
||||
print_stat(true);
|
||||
print_cli("\n"); /* final output */
|
||||
out:
|
||||
mscp_cleanup(m);
|
||||
mscp_free(m);
|
||||
|
||||
if (interrupted)
|
||||
ret = 1;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -612,12 +619,6 @@ void print_stat(bool final)
|
||||
}
|
||||
}
|
||||
|
||||
void print_stat_thread_cleanup(void *arg)
|
||||
{
|
||||
print_stat(true);
|
||||
print_cli("\n"); /* final output */
|
||||
}
|
||||
|
||||
void *print_stat_thread(void *arg)
|
||||
{
|
||||
struct mscp_stats s;
|
||||
@@ -627,15 +628,10 @@ void *print_stat_thread(void *arg)
|
||||
gettimeofday(&x.start, NULL);
|
||||
x.before = x.start;
|
||||
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
|
||||
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
|
||||
pthread_cleanup_push(print_stat_thread_cleanup, NULL);
|
||||
|
||||
while (true) {
|
||||
print_stat(false);
|
||||
sleep(1);
|
||||
}
|
||||
|
||||
pthread_cleanup_pop(1);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
106
src/mscp.c
106
src/mscp.c
@@ -20,6 +20,22 @@
|
||||
|
||||
#include <openbsd-compat/openbsd-compat.h>
|
||||
|
||||
struct mscp_thread {
|
||||
struct mscp *m;
|
||||
pthread_t tid;
|
||||
sftp_session sftp;
|
||||
|
||||
int ret;
|
||||
|
||||
/* attributes used by copy threads */
|
||||
int id;
|
||||
int cpu;
|
||||
size_t copied_bytes;
|
||||
|
||||
/* attributes used by scan thread */
|
||||
size_t total_bytes;
|
||||
};
|
||||
|
||||
struct mscp {
|
||||
char *remote; /* remote host (and uername) */
|
||||
int direction; /* copy direction */
|
||||
@@ -29,32 +45,17 @@ struct mscp {
|
||||
int *cores; /* usable cpu cores by COREMASK */
|
||||
int nr_cores; /* length of array of cores */
|
||||
|
||||
sem_t *sem; /* semaphore for concurrent
|
||||
* connecting ssh sessions */
|
||||
sem_t *sem; /* semaphore for concurrent connecting ssh sessions */
|
||||
|
||||
sftp_session first; /* first sftp session */
|
||||
|
||||
char dst_path[PATH_MAX];
|
||||
pool *src_pool, *path_pool;
|
||||
pool *thread_pool; /* mscp_threads for copy thread */
|
||||
|
||||
struct chunk_pool cp;
|
||||
|
||||
pthread_t tid_scan; /* tid for scan thread */
|
||||
int ret_scan; /* return code from scan thread */
|
||||
|
||||
size_t total_bytes; /* total bytes to be transferred */
|
||||
|
||||
pool *thread_pool;
|
||||
};
|
||||
|
||||
struct mscp_thread {
|
||||
struct mscp *m;
|
||||
int id;
|
||||
sftp_session sftp;
|
||||
pthread_t tid;
|
||||
int cpu;
|
||||
size_t done;
|
||||
bool finished;
|
||||
int ret;
|
||||
struct mscp_thread scan; /* mscp_thread for mscp_scan_thread() */
|
||||
};
|
||||
|
||||
#define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */
|
||||
@@ -348,7 +349,7 @@ static void mscp_stop_copy_thread(struct mscp *m)
|
||||
unsigned int idx;
|
||||
pool_lock(m->thread_pool);
|
||||
pool_for_each(m->thread_pool, t, idx) {
|
||||
if (!t->finished)
|
||||
if (t->tid)
|
||||
pthread_cancel(t->tid);
|
||||
}
|
||||
pool_unlock(m->thread_pool);
|
||||
@@ -356,8 +357,8 @@ static void mscp_stop_copy_thread(struct mscp *m)
|
||||
|
||||
static void mscp_stop_scan_thread(struct mscp *m)
|
||||
{
|
||||
if (m->tid_scan)
|
||||
pthread_cancel(m->tid_scan);
|
||||
if (m->scan.tid)
|
||||
pthread_cancel(m->scan.tid);
|
||||
}
|
||||
|
||||
void mscp_stop(struct mscp *m)
|
||||
@@ -368,7 +369,8 @@ void mscp_stop(struct mscp *m)
|
||||
|
||||
void *mscp_scan_thread(void *arg)
|
||||
{
|
||||
struct mscp *m = arg;
|
||||
struct mscp_thread *t = arg;
|
||||
struct mscp *m = t->m;
|
||||
sftp_session src_sftp = NULL, dst_sftp = NULL;
|
||||
struct path_resolve_args a;
|
||||
struct path *p;
|
||||
@@ -377,15 +379,13 @@ void *mscp_scan_thread(void *arg)
|
||||
glob_t pglob;
|
||||
int n;
|
||||
|
||||
m->ret_scan = 0;
|
||||
|
||||
switch (m->direction) {
|
||||
case MSCP_DIRECTION_L2R:
|
||||
src_sftp = NULL;
|
||||
dst_sftp = m->first;
|
||||
dst_sftp = t->sftp;
|
||||
break;
|
||||
case MSCP_DIRECTION_R2L:
|
||||
src_sftp = m->first;
|
||||
src_sftp = t->sftp;
|
||||
dst_sftp = NULL;
|
||||
break;
|
||||
default:
|
||||
@@ -395,7 +395,7 @@ void *mscp_scan_thread(void *arg)
|
||||
|
||||
/* initialize path_resolve_args */
|
||||
memset(&a, 0, sizeof(a));
|
||||
a.total_bytes = &m->total_bytes;
|
||||
a.total_bytes = &t->total_bytes;
|
||||
|
||||
if (pool_size(m->src_pool) > 1)
|
||||
a.dst_path_should_dir = true;
|
||||
@@ -444,22 +444,27 @@ void *mscp_scan_thread(void *arg)
|
||||
|
||||
pr_info("walk source path(s) done");
|
||||
chunk_pool_set_filled(&m->cp);
|
||||
m->ret_scan = 0;
|
||||
t->ret = 0;
|
||||
return NULL;
|
||||
|
||||
err_out:
|
||||
chunk_pool_set_filled(&m->cp);
|
||||
m->ret_scan = -1;
|
||||
t->ret = -1;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int mscp_scan(struct mscp *m)
|
||||
{
|
||||
int ret = pthread_create(&m->tid_scan, NULL, mscp_scan_thread, m);
|
||||
struct mscp_thread *t = &m->scan;
|
||||
int ret;
|
||||
|
||||
memset(t, 0, sizeof(*t));
|
||||
t->m = m;
|
||||
t->sftp = m->first;
|
||||
|
||||
ret = pthread_create(&t->tid, NULL, mscp_scan_thread, t);
|
||||
if (ret < 0) {
|
||||
priv_set_err("pthread_create: %d", ret);
|
||||
m->tid_scan = 0;
|
||||
mscp_stop(m);
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -477,10 +482,11 @@ int mscp_scan(struct mscp *m)
|
||||
|
||||
int mscp_scan_join(struct mscp *m)
|
||||
{
|
||||
if (m->tid_scan) {
|
||||
pthread_join(m->tid_scan, NULL);
|
||||
m->tid_scan = 0;
|
||||
return m->ret_scan;
|
||||
struct mscp_thread *t = &m->scan;
|
||||
if (t->tid) {
|
||||
pthread_join(t->tid, NULL);
|
||||
t->tid = 0;
|
||||
return t->ret;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
@@ -544,7 +550,7 @@ int mscp_join(struct mscp *m)
|
||||
struct mscp_thread *t;
|
||||
struct path *p;
|
||||
unsigned int idx;
|
||||
size_t done = 0, nr_copied = 0, nr_tobe_copied = 0;
|
||||
size_t total_copied_bytes = 0, nr_copied = 0, nr_tobe_copied = 0;
|
||||
int n, ret = 0;
|
||||
|
||||
/* waiting for scan thread joins... */
|
||||
@@ -556,7 +562,7 @@ int mscp_join(struct mscp *m)
|
||||
}
|
||||
|
||||
pool_for_each(m->thread_pool, t, idx) {
|
||||
done += t->done;
|
||||
total_copied_bytes += t->copied_bytes;
|
||||
if (t->ret != 0)
|
||||
ret = t->ret;
|
||||
if (t->sftp) {
|
||||
@@ -578,8 +584,8 @@ int mscp_join(struct mscp *m)
|
||||
m->first = NULL;
|
||||
}
|
||||
|
||||
pr_notice("%lu/%lu bytes copied for %lu/%lu files", done, m->total_bytes,
|
||||
nr_copied, nr_tobe_copied);
|
||||
pr_notice("%lu/%lu bytes copied for %lu/%lu files", total_copied_bytes,
|
||||
m->scan.total_bytes, nr_copied, nr_tobe_copied);
|
||||
|
||||
return ret;
|
||||
}
|
||||
@@ -601,12 +607,6 @@ static void wait_for_interval(int interval)
|
||||
next = now + interval * 1000000;
|
||||
}
|
||||
|
||||
static void mscp_copy_thread_cleanup(void *arg)
|
||||
{
|
||||
struct mscp_thread *t = arg;
|
||||
t->finished = true;
|
||||
}
|
||||
|
||||
void *mscp_copy_thread(void *arg)
|
||||
{
|
||||
sftp_session src_sftp, dst_sftp;
|
||||
@@ -618,8 +618,6 @@ void *mscp_copy_thread(void *arg)
|
||||
/* when error occurs, each thread prints error messages
|
||||
* immediately with pr_* functions. */
|
||||
|
||||
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
|
||||
|
||||
if (t->cpu > -1) {
|
||||
if (set_thread_affinity(pthread_self(), t->cpu) < 0) {
|
||||
pr_err("set_thread_affinity: %s", priv_get_err());
|
||||
@@ -681,12 +679,10 @@ void *mscp_copy_thread(void *arg)
|
||||
|
||||
if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, m->opts->nr_ahead,
|
||||
m->opts->buf_sz, m->opts->preserve_ts,
|
||||
&t->done)) < 0)
|
||||
&t->copied_bytes)) < 0)
|
||||
break;
|
||||
}
|
||||
|
||||
pthread_cleanup_pop(1);
|
||||
|
||||
if (t->ret < 0) {
|
||||
pr_err("thread[%d]: copy failed: %s -> %s, 0x%010lx-0x%010lx, %s", t->id,
|
||||
c->p->path, c->p->dst_path, c->off, c->off + c->len,
|
||||
@@ -696,11 +692,9 @@ void *mscp_copy_thread(void *arg)
|
||||
return NULL;
|
||||
|
||||
err_out:
|
||||
t->finished = true;
|
||||
t->ret = -1;
|
||||
return NULL;
|
||||
out:
|
||||
t->finished = true;
|
||||
t->ret = 0;
|
||||
return NULL;
|
||||
}
|
||||
@@ -742,10 +736,10 @@ void mscp_get_stats(struct mscp *m, struct mscp_stats *s)
|
||||
struct mscp_thread *t;
|
||||
unsigned int idx;
|
||||
|
||||
s->total = m->total_bytes;
|
||||
s->total = m->scan.total_bytes;
|
||||
s->done = 0;
|
||||
|
||||
pool_for_each(m->thread_pool, t, idx) {
|
||||
s->done += t->done;
|
||||
s->done += t->copied_bytes;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user