mirror of
https://github.com/upa/mscp.git
synced 2026-02-15 01:34:44 +08:00
revise walk_src_path.
In new walk_src_path, resolve dst path and resolve chunks are invoked when adding a path.
This commit is contained in:
95
src/mscp.c
95
src/mscp.c
@@ -29,8 +29,8 @@ struct mscp {
|
||||
char dst_path[PATH_MAX];
|
||||
struct list_head src_list;
|
||||
struct list_head path_list;
|
||||
struct list_head chunk_list;
|
||||
lock chunk_lock;
|
||||
struct chunk_pool cp;
|
||||
|
||||
|
||||
size_t total_bytes; /* total bytes to be transferred */
|
||||
struct mscp_thread *threads;
|
||||
@@ -212,8 +212,7 @@ struct mscp *mscp_init(const char *remote_host, int direction,
|
||||
memset(m, 0, sizeof(*m));
|
||||
INIT_LIST_HEAD(&m->src_list);
|
||||
INIT_LIST_HEAD(&m->path_list);
|
||||
INIT_LIST_HEAD(&m->chunk_list);
|
||||
lock_init(&m->chunk_lock);
|
||||
chunk_pool_init(&m->cp);
|
||||
|
||||
m->remote = strdup(remote_host);
|
||||
if (!m->remote) {
|
||||
@@ -297,14 +296,12 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path)
|
||||
int mscp_prepare(struct mscp *m)
|
||||
{
|
||||
sftp_session src_sftp = NULL, dst_sftp = NULL;
|
||||
bool src_path_is_dir, dst_path_is_dir, dst_path_should_dir;
|
||||
struct path_resolve_args a;
|
||||
struct list_head tmp;
|
||||
struct path *p;
|
||||
struct src *s;
|
||||
mstat ss, ds;
|
||||
|
||||
src_path_is_dir = dst_path_is_dir = dst_path_should_dir = false;
|
||||
|
||||
switch (m->direction) {
|
||||
case MSCP_DIRECTION_L2R:
|
||||
src_sftp = NULL;
|
||||
@@ -319,12 +316,16 @@ int mscp_prepare(struct mscp *m)
|
||||
return -1;
|
||||
}
|
||||
|
||||
memset(&a, 0, sizeof(a));
|
||||
a.msg_fd = m->msg_fd;
|
||||
a.total_bytes = &m->total_bytes;
|
||||
a.nr_conn = m->opts->nr_threads;
|
||||
if (list_count(&m->src_list) > 1)
|
||||
dst_path_should_dir = true;
|
||||
a.dst_path_should_dir = true;
|
||||
|
||||
if (mscp_stat(m->dst_path, &ds, dst_sftp) == 0) {
|
||||
if (mstat_is_dir(ds))
|
||||
dst_path_is_dir = true;
|
||||
a.dst_path_is_dir = true;
|
||||
mscp_stat_free(ds);
|
||||
}
|
||||
|
||||
@@ -334,33 +335,27 @@ int mscp_prepare(struct mscp *m)
|
||||
mscp_set_error("stat: %s", mscp_strerror(src_sftp));
|
||||
return -1;
|
||||
}
|
||||
src_path_is_dir = mstat_is_dir(ss);
|
||||
|
||||
/* fill path_resolve_args */
|
||||
a.src_path = s->path;
|
||||
a.dst_path = m->dst_path;
|
||||
a.src_path_is_dir = mstat_is_dir(ss);
|
||||
|
||||
a.cp = &m->cp;
|
||||
a.min_chunk_sz = m->opts->min_chunk_sz;
|
||||
a.max_chunk_sz = m->opts->max_chunk_sz;
|
||||
|
||||
mscp_stat_free(ss);
|
||||
|
||||
INIT_LIST_HEAD(&tmp);
|
||||
if (walk_src_path(src_sftp, s->path, &tmp) < 0)
|
||||
return -1;
|
||||
|
||||
if (list_count(&tmp) > 1)
|
||||
dst_path_should_dir = true;
|
||||
|
||||
if (resolve_dst_path(m->msg_fd, s->path, m->dst_path, &tmp,
|
||||
src_path_is_dir, dst_path_is_dir,
|
||||
dst_path_should_dir) < 0)
|
||||
INIT_LIST_HEAD(&tmp);
|
||||
if (walk_src_path(src_sftp, s->path, &tmp, &a) < 0)
|
||||
return -1;
|
||||
|
||||
list_splice_tail(&tmp, m->path_list.prev);
|
||||
}
|
||||
|
||||
if (resolve_chunk(&m->path_list, &m->chunk_list, m->opts->nr_threads,
|
||||
m->opts->min_chunk_sz, m->opts->max_chunk_sz) < 0)
|
||||
return -1;
|
||||
|
||||
/* save total bytes to be transferred */
|
||||
m->total_bytes = 0;
|
||||
list_for_each_entry(p, &m->path_list, list) {
|
||||
m->total_bytes += p->size;
|
||||
}
|
||||
chunk_pool_done(&m->cp);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@@ -382,7 +377,7 @@ int mscp_start(struct mscp *m)
|
||||
{
|
||||
int n, ret;
|
||||
|
||||
if ((n = list_count(&m->chunk_list)) < m->opts->nr_threads) {
|
||||
if ((n = chunk_pool_size(&m->cp)) < m->opts->nr_threads) {
|
||||
mpr_notice(m->msg_fd, "we have only %d chunk(s). "
|
||||
"set number of connections to %d\n", n, n);
|
||||
m->opts->nr_threads = n;
|
||||
@@ -462,20 +457,6 @@ int mscp_join(struct mscp *m)
|
||||
|
||||
/* copy thread related functions */
|
||||
|
||||
struct chunk *acquire_chunk(struct list_head *chunk_list)
|
||||
{
|
||||
/* under the lock for chunk_list */
|
||||
struct list_head *first = chunk_list->next;
|
||||
struct chunk *c = NULL;
|
||||
|
||||
if (list_empty(chunk_list))
|
||||
return NULL; /* list is empty */
|
||||
|
||||
c = list_entry(first, struct chunk, list);
|
||||
list_del(first);
|
||||
return c;
|
||||
}
|
||||
|
||||
static void mscp_copy_thread_cleanup(void *arg)
|
||||
{
|
||||
struct mscp_thread *t = arg;
|
||||
@@ -512,9 +493,11 @@ void *mscp_copy_thread(void *arg)
|
||||
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
|
||||
|
||||
while (1) {
|
||||
LOCK_ACQUIRE_THREAD(&m->chunk_lock);
|
||||
c = acquire_chunk(&m->chunk_list);
|
||||
LOCK_RELEASE_THREAD();
|
||||
c = chunk_pool_pop(&m->cp);
|
||||
if (c == CHUNK_POP_WAIT) {
|
||||
usleep(100); /* XXX: hard code */
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!c)
|
||||
break; /* no more chunks */
|
||||
@@ -537,16 +520,6 @@ void *mscp_copy_thread(void *arg)
|
||||
|
||||
/* cleanup related functions */
|
||||
|
||||
static void release_list(struct list_head *head, void (*f)(struct list_head*))
|
||||
{
|
||||
struct list_head *p, *n;
|
||||
|
||||
list_for_each_safe(p, n, head) {
|
||||
list_del(p);
|
||||
f(p);
|
||||
}
|
||||
}
|
||||
|
||||
static void free_src(struct list_head *list)
|
||||
{
|
||||
struct src *s;
|
||||
@@ -576,15 +549,15 @@ void mscp_cleanup(struct mscp *m)
|
||||
m->first = NULL;
|
||||
}
|
||||
|
||||
release_list(&m->src_list, free_src);
|
||||
list_free_f(&m->src_list, free_src);
|
||||
INIT_LIST_HEAD(&m->src_list);
|
||||
|
||||
release_list(&m->chunk_list, free_chunk);
|
||||
INIT_LIST_HEAD(&m->chunk_list);
|
||||
|
||||
release_list(&m->path_list, free_path);
|
||||
list_free_f(&m->path_list, free_path);
|
||||
INIT_LIST_HEAD(&m->path_list);
|
||||
|
||||
chunk_pool_release(&m->cp);
|
||||
chunk_pool_init(&m->cp);
|
||||
|
||||
if (m->threads) {
|
||||
free(m->threads);
|
||||
m->threads = NULL;
|
||||
|
||||
Reference in New Issue
Block a user