mscp_prepare() scans source paths in a thread.

This commit runs mscp_prepare() in a pthread. mscp copy threads
run aysnchronously with mscp_prepare(). So, when mscp_prepare()
has not finished yet (due to too many source files), we can start
to copy files.
This commit is contained in:
Ryo Nakamura
2023-03-13 22:35:51 +09:00
parent ceb9ebd5a8
commit 5f9f20f150
5 changed files with 130 additions and 46 deletions

View File

@@ -155,7 +155,7 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path);
/** /**
* @brief Prepare for file transfer. This function checks all source * @brief Prepare for file transfer. This function checks all source
* files (recursively), resolve paths on the destination side, and * files (recursively), resolve paths on the destination side, and
* calculate file chunks. * calculate file chunks. This function is non-blocking.
* *
* @param m mscp instance. * @param m mscp instance.
* *
@@ -164,6 +164,17 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path);
*/ */
int mscp_prepare(struct mscp *m); int mscp_prepare(struct mscp *m);
/**
* @brief Join prepare thread invoked by mscp_prepare(). mscp_join()
* involves this, so that mscp_prepare_join() should be called when
* mscp_prepare() is called by mscp_start() is not.
*
* @param m mscp instance.
* @return 0 on success, < 0 if an error occured.
* mscp_get_error() can be used to retrieve error message.
*/
int mscp_prepare_join(struct mscp *m);
/** /**
* @brief Start to copy files. mscp_start() returns immediately. You * @brief Start to copy files. mscp_start() returns immediately. You
* can get statistics via mscp_get_stats() or messages via pipe set by * can get statistics via mscp_get_stats() or messages via pipe set by

View File

@@ -358,7 +358,7 @@ int main(int argc, char **argv)
} }
if (dryrun) { if (dryrun) {
ret = 0; ret = mscp_prepare_join(m);
goto out; goto out;
} }

View File

@@ -31,6 +31,8 @@ struct mscp {
struct list_head path_list; struct list_head path_list;
struct chunk_pool cp; struct chunk_pool cp;
pthread_t tid_prepare; /* tid for prepare thread */
int ret_prepare; /* return code from prepare thread */
size_t total_bytes; /* total bytes to be transferred */ size_t total_bytes; /* total bytes to be transferred */
struct mscp_thread *threads; struct mscp_thread *threads;
@@ -292,9 +294,30 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path)
return 0; return 0;
} }
static void mscp_stop_copy_thread(struct mscp *m)
int mscp_prepare(struct mscp *m)
{ {
int n;
for (n = 0; n < m->opts->nr_threads; n++) {
if (m->threads[n].tid && !m->threads[n].finished)
pthread_cancel(m->threads[n].tid);
}
}
static void mscp_stop_prepare_thread(struct mscp *m)
{
if (m->tid_prepare)
pthread_cancel(m->tid_prepare);
}
void mscp_stop(struct mscp *m)
{
mscp_stop_prepare_thread(m);
mscp_stop_copy_thread(m);
}
void *mscp_prepare_thread(void *arg)
{
struct mscp *m = arg;
sftp_session src_sftp = NULL, dst_sftp = NULL; sftp_session src_sftp = NULL, dst_sftp = NULL;
struct path_resolve_args a; struct path_resolve_args a;
struct list_head tmp; struct list_head tmp;
@@ -302,6 +325,8 @@ int mscp_prepare(struct mscp *m)
struct src *s; struct src *s;
mstat ss, ds; mstat ss, ds;
m->ret_prepare = 0;
switch (m->direction) { switch (m->direction) {
case MSCP_DIRECTION_L2R: case MSCP_DIRECTION_L2R:
src_sftp = NULL; src_sftp = NULL;
@@ -313,7 +338,7 @@ int mscp_prepare(struct mscp *m)
break; break;
default: default:
mscp_set_error("invalid copy direction: %d", m->direction); mscp_set_error("invalid copy direction: %d", m->direction);
return -1; goto err_out;
} }
memset(&a, 0, sizeof(a)); memset(&a, 0, sizeof(a));
@@ -329,11 +354,14 @@ int mscp_prepare(struct mscp *m)
mscp_stat_free(ds); mscp_stat_free(ds);
} }
mpr_info(m->msg_fd, "start to walk source path(s)\n");
/* walk a src_path recusively, and resolve path->dst_path for each src */ /* walk a src_path recusively, and resolve path->dst_path for each src */
list_for_each_entry(s, &m->src_list, list) { list_for_each_entry(s, &m->src_list, list) {
if (mscp_stat(s->path, &ss, src_sftp) < 0) { if (mscp_stat(s->path, &ss, src_sftp) < 0) {
mscp_set_error("stat: %s", mscp_strerror(src_sftp)); mscp_set_error("stat: %s", mscp_strerror(src_sftp));
return -1; mscp_stat_free(ss);
goto err_out;
} }
/* fill path_resolve_args */ /* fill path_resolve_args */
@@ -350,27 +378,54 @@ int mscp_prepare(struct mscp *m)
INIT_LIST_HEAD(&tmp); INIT_LIST_HEAD(&tmp);
if (walk_src_path(src_sftp, s->path, &tmp, &a) < 0) if (walk_src_path(src_sftp, s->path, &tmp, &a) < 0)
return -1; goto err_out;
list_splice_tail(&tmp, m->path_list.prev); list_splice_tail(&tmp, m->path_list.prev);
} }
chunk_pool_done(&m->cp); chunk_pool_set_filled(&m->cp);
mpr_info(m->msg_fd, "walk source path(s) done\n");
m->ret_prepare = 0;
return NULL;
err_out:
m->ret_prepare = -1;
mscp_stop_copy_thread(m);
return NULL;
}
int mscp_prepare(struct mscp *m)
{
int ret = pthread_create(&m->tid_prepare, NULL, mscp_prepare_thread, m);
if (ret < 0) {
mscp_set_error("pthread_create_error: %d", ret);
m->tid_prepare = 0;
mscp_stop(m);
return -1;
}
/* wait until preparation is end or over nr_threads chunks are
* filled */
while (!chunk_pool_is_filled(&m->cp) &&
chunk_pool_size(&m->cp) < m->opts->nr_threads)
usleep(100);
return 0; return 0;
} }
void mscp_stop(struct mscp *m) int mscp_prepare_join(struct mscp *m)
{ {
int n; if (m->tid_prepare) {
pr("stopping...\n"); pthread_join(m->tid_prepare, NULL);
for (n = 0; n < m->opts->nr_threads; n++) { return m->ret_prepare;
if (m->threads[n].tid && !m->threads[n].finished) }
pthread_cancel(m->threads[n].tid); return 0;
}
} }
static void *mscp_copy_thread(void *arg); static void *mscp_copy_thread(void *arg);
int mscp_start(struct mscp *m) int mscp_start(struct mscp *m)
@@ -394,17 +449,11 @@ int mscp_start(struct mscp *m)
else else
t->cpu = m->cores[n % m->nr_cores]; t->cpu = m->cores[n % m->nr_cores];
if (n == 0) { mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n",
t->sftp = m->first; /* reuse first sftp session */ m->remote);
m->first = NULL; t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
} if (!t->sftp)
else { return -1;
mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n",
m->remote);
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
if (!t->sftp)
return -1;
}
} }
/* spawn copy threads */ /* spawn copy threads */
@@ -425,7 +474,10 @@ int mscp_join(struct mscp *m)
{ {
int n, ret = 0; int n, ret = 0;
/* waiting for threads join... */ /* waiting for prepare thread joins... */
ret = mscp_prepare_join(m);
/* waiting for copy threads join... */
for (n = 0; n < m->opts->nr_threads; n++) { for (n = 0; n < m->opts->nr_threads; n++) {
if (m->threads[n].tid) { if (m->threads[n].tid) {
pthread_join(m->threads[n].tid, NULL); pthread_join(m->threads[n].tid, NULL);

View File

@@ -30,46 +30,50 @@ static int get_page_mask(void)
/* chunk pool operations */ /* chunk pool operations */
#define CHUNK_POOL_STATE_ADDING 0 #define CHUNK_POOL_STATE_FILLING 0
#define CHUNK_POOL_STATE_DONE 1 #define CHUNK_POOL_STATE_FILLED 1
void chunk_pool_init(struct chunk_pool *cp) void chunk_pool_init(struct chunk_pool *cp)
{ {
memset(cp, 0, sizeof(*cp)); memset(cp, 0, sizeof(*cp));
INIT_LIST_HEAD(&cp->list); INIT_LIST_HEAD(&cp->list);
lock_init(&cp->lock); lock_init(&cp->lock);
cp->state = CHUNK_POOL_STATE_ADDING; cp->state = CHUNK_POOL_STATE_FILLING;
} }
static void chunk_pool_add(struct chunk_pool *cp, struct chunk *c) static void chunk_pool_add(struct chunk_pool *cp, struct chunk *c)
{ {
LOCK_ACQUIRE_THREAD(&cp->lock); LOCK_ACQUIRE_THREAD(&cp->lock);
list_add_tail(&c->list, &cp->list); list_add_tail(&c->list, &cp->list);
cp->count += 1;
LOCK_RELEASE_THREAD(); LOCK_RELEASE_THREAD();
} }
void chunk_pool_done(struct chunk_pool *cp) void chunk_pool_set_filled(struct chunk_pool *cp)
{ {
cp->state = CHUNK_POOL_STATE_DONE; cp->state = CHUNK_POOL_STATE_FILLED;
} }
int chunk_pool_size(struct chunk_pool *cp) bool chunk_pool_is_filled(struct chunk_pool *cp)
{ {
int n; return (cp->state == CHUNK_POOL_STATE_FILLED);
LOCK_ACQUIRE_THREAD(&cp->lock);
n = list_count(&cp->list);
LOCK_RELEASE_THREAD();
return n;
} }
size_t chunk_pool_size(struct chunk_pool *cp)
{
return cp->count;
}
struct chunk *chunk_pool_pop(struct chunk_pool *cp) struct chunk *chunk_pool_pop(struct chunk_pool *cp)
{ {
struct list_head *first = cp->list.next; struct list_head *first;
struct chunk *c = NULL; struct chunk *c = NULL;
LOCK_ACQUIRE_THREAD(&cp->lock); LOCK_ACQUIRE_THREAD(&cp->lock);
first = cp->list.next;
if (list_empty(&cp->list)) { if (list_empty(&cp->list)) {
if (cp->state == CHUNK_POOL_STATE_ADDING) if (!chunk_pool_is_filled(cp))
c = CHUNK_POP_WAIT; c = CHUNK_POP_WAIT;
else else
c = NULL; /* no more chunks */ c = NULL; /* no more chunks */
@@ -283,15 +287,19 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
return -1; return -1;
for (e = mscp_readdir(d); !mdirent_is_null(e); e = mscp_readdir(d)) { for (e = mscp_readdir(d); !mdirent_is_null(e); e = mscp_readdir(d)) {
if (check_path_should_skip(mdirent_name(e))) if (check_path_should_skip(mdirent_name(e))) {
mscp_dirent_free(e);
continue; continue;
}
if (strlen(path) + 1 + strlen(mdirent_name(e)) > PATH_MAX) { if (strlen(path) + 1 + strlen(mdirent_name(e)) > PATH_MAX) {
mscp_set_error("too long path: %s/%s", path, mdirent_name(e)); mscp_set_error("too long path: %s/%s", path, mdirent_name(e));
mscp_dirent_free(e);
return -1; return -1;
} }
snprintf(next_path, sizeof(next_path), "%s/%s", path, mdirent_name(e)); snprintf(next_path, sizeof(next_path), "%s/%s", path, mdirent_name(e));
ret = walk_path_recursive(sftp, next_path, path_list, a); ret = walk_path_recursive(sftp, next_path, path_list, a);
mscp_dirent_free(e);
if (ret < 0) if (ret < 0)
return ret; return ret;
} }
@@ -339,10 +347,13 @@ static int touch_dst_path(struct path *p, sftp_session sftp)
mstat s; mstat s;
if (mscp_stat(path, &s, sftp) == 0) { if (mscp_stat(path, &s, sftp) == 0) {
if (mstat_is_dir(s)) if (mstat_is_dir(s)) {
mscp_stat_free(s);
goto next; /* directory exists. go deeper */ goto next; /* directory exists. go deeper */
else } else {
mscp_stat_free(s);
return -1; /* path exists, but not directory. */ return -1; /* path exists, but not directory. */
}
} }
if (mscp_stat_check_err_noent(sftp) == 0) { if (mscp_stat_check_err_noent(sftp) == 0) {

View File

@@ -39,6 +39,7 @@ struct chunk {
struct chunk_pool { struct chunk_pool {
struct list_head list; /* list of struct chunk */ struct list_head list; /* list of struct chunk */
size_t count;
lock lock; lock lock;
int state; int state;
}; };
@@ -54,11 +55,12 @@ void chunk_pool_init(struct chunk_pool *cp);
struct chunk *chunk_pool_pop(struct chunk_pool *cp); struct chunk *chunk_pool_pop(struct chunk_pool *cp);
#define CHUNK_POP_WAIT ((void *) -1) #define CHUNK_POP_WAIT ((void *) -1)
/* set adding chunks to this pool has finished */ /* set and check fillingchunks to this pool has finished */
void chunk_pool_done(struct chunk_pool *cp); void chunk_pool_set_filled(struct chunk_pool *cp);
bool chunk_pool_is_filled(struct chunk_pool *cp);
/* return number of chunks in the pool */ /* return number of chunks in the pool */
int chunk_pool_size(struct chunk_pool *cp); size_t chunk_pool_size(struct chunk_pool *cp);
/* free chunks in the chunk_pool */ /* free chunks in the chunk_pool */
void chunk_pool_release(struct chunk_pool *cp); void chunk_pool_release(struct chunk_pool *cp);
@@ -168,6 +170,14 @@ static mdirent *mscp_readdir(mdir *d)
return &e; return &e;
} }
static void mscp_dirent_free(mdirent *e)
{
if (e->r) {
sftp_attributes_free(e->r);
e->r = NULL;
}
}
/* wrap retriving error */ /* wrap retriving error */
static const char *mscp_strerror(sftp_session sftp) static const char *mscp_strerror(sftp_session sftp)
{ {