change path_list to path_pool

This commit is contained in:
Ryo Nakamura
2024-02-10 21:29:07 +09:00
parent d2e061fd97
commit bfc955a9a7
6 changed files with 81 additions and 42 deletions

View File

@@ -35,9 +35,7 @@ struct mscp {
sftp_session first; /* first sftp session */ sftp_session first; /* first sftp session */
char dst_path[PATH_MAX]; char dst_path[PATH_MAX];
pool *src_pool; pool *src_pool, *path_pool;
struct list_head src_list;
struct list_head path_list;
struct chunk_pool cp; struct chunk_pool cp;
pthread_t tid_scan; /* tid for scan thread */ pthread_t tid_scan; /* tid for scan thread */
@@ -240,7 +238,12 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
goto free_out; goto free_out;
} }
INIT_LIST_HEAD(&m->path_list); m->path_pool = pool_new();
if (!m->path_pool) {
priv_set_errv("pool_new: %s", strerrno());
goto free_out;
}
chunk_pool_init(&m->cp); chunk_pool_init(&m->cp);
INIT_LIST_HEAD(&m->thread_list); INIT_LIST_HEAD(&m->thread_list);
@@ -279,6 +282,8 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
free_out: free_out:
if (m->src_pool) if (m->src_pool)
pool_free(m->src_pool); pool_free(m->src_pool);
if (m->path_pool)
pool_free(m->path_pool);
free(m); free(m);
return NULL; return NULL;
} }
@@ -400,6 +405,7 @@ void *mscp_scan_thread(void *arg)
} }
a.cp = &m->cp; a.cp = &m->cp;
a.path_pool = m->path_pool;
a.nr_conn = m->opts->nr_threads; a.nr_conn = m->opts->nr_threads;
a.min_chunk_sz = m->opts->min_chunk_sz; a.min_chunk_sz = m->opts->min_chunk_sz;
a.max_chunk_sz = m->opts->max_chunk_sz; a.max_chunk_sz = m->opts->max_chunk_sz;
@@ -429,11 +435,8 @@ void *mscp_scan_thread(void *arg)
a.dst_path = m->dst_path; a.dst_path = m->dst_path;
a.src_path_is_dir = S_ISDIR(ss.st_mode); a.src_path_is_dir = S_ISDIR(ss.st_mode);
INIT_LIST_HEAD(&tmp); if (walk_src_path(src_sftp, pglob.gl_pathv[n], &a) < 0)
if (walk_src_path(src_sftp, pglob.gl_pathv[n], &tmp, &a) < 0)
goto err_out; goto err_out;
list_splice_tail(&tmp, m->path_list.prev);
} }
mscp_globfree(&pglob); mscp_globfree(&pglob);
} }
@@ -565,12 +568,14 @@ int mscp_join(struct mscp *m)
} }
/* count up number of transferred files */ /* count up number of transferred files */
list_for_each_entry(p, &m->path_list, list) { pool_lock(m->path_pool);
pool_iter_for_each(m->path_pool, p) {
nr_tobe_copied++; nr_tobe_copied++;
if (p->state == FILE_STATE_DONE) { if (p->state == FILE_STATE_DONE) {
nr_copied++; nr_copied++;
} }
} }
pool_unlock();
pr_notice("%lu/%lu bytes copied for %lu/%lu files", done, m->total_bytes, pr_notice("%lu/%lu bytes copied for %lu/%lu files", done, m->total_bytes,
nr_copied, nr_tobe_copied); nr_copied, nr_tobe_copied);
@@ -701,13 +706,6 @@ out:
/* cleanup-related functions */ /* cleanup-related functions */
static void list_free_path(struct list_head *list)
{
struct path *p;
p = list_entry(list, typeof(*p), list);
free_path(p);
}
static void list_free_thread(struct list_head *list) static void list_free_thread(struct list_head *list)
{ {
struct mscp_thread *t; struct mscp_thread *t;
@@ -722,9 +720,8 @@ void mscp_cleanup(struct mscp *m)
m->first = NULL; m->first = NULL;
} }
list_free_f(&m->path_list, list_free_path); pool_zeroize(m->src_pool, free);
INIT_LIST_HEAD(&m->path_list); pool_zeroize(m->path_pool, (pool_map_f)free_path);
chunk_pool_release(&m->cp); chunk_pool_release(&m->cp);
chunk_pool_init(&m->cp); chunk_pool_init(&m->cp);
@@ -735,7 +732,9 @@ void mscp_cleanup(struct mscp *m)
void mscp_free(struct mscp *m) void mscp_free(struct mscp *m)
{ {
mscp_cleanup(m); pool_destroy(m->src_pool, free);
pool_destroy(m->path_pool, (pool_map_f)free_path);
if (m->remote) if (m->remote)
free(m->remote); free(m->remote);
if (m->cores) if (m->cores)

View File

@@ -9,7 +9,6 @@
#include <ssh.h> #include <ssh.h>
#include <minmax.h> #include <minmax.h>
#include <fileops.h> #include <fileops.h>
#include <list.h>
#include <atomic.h> #include <atomic.h>
#include <path.h> #include <path.h>
#include <strerrno.h> #include <strerrno.h>
@@ -219,17 +218,16 @@ void free_path(struct path *p)
} }
static int append_path(sftp_session sftp, const char *path, struct stat st, static int append_path(sftp_session sftp, const char *path, struct stat st,
struct list_head *path_list, struct path_resolve_args *a) struct path_resolve_args *a)
{ {
struct path *p; struct path *p;
if (!(p = malloc(sizeof(*p)))) { if (!(p = malloc(sizeof(*p)))) {
priv_set_errv("failed to allocate memory: %s", strerrno()); pr_err("malloc: %s", strerrno());
return -1; return -1;
} }
memset(p, 0, sizeof(*p)); memset(p, 0, sizeof(*p));
INIT_LIST_HEAD(&p->list);
p->path = strndup(path, PATH_MAX); p->path = strndup(path, PATH_MAX);
if (!p->path) { if (!p->path) {
pr_err("strndup: %s", strerrno()); pr_err("strndup: %s", strerrno());
@@ -248,7 +246,11 @@ static int append_path(sftp_session sftp, const char *path, struct stat st,
return -1; /* XXX: do not free path becuase chunk(s) return -1; /* XXX: do not free path becuase chunk(s)
* was added to chunk pool already */ * was added to chunk pool already */
list_add_tail(&p->list, path_list); if (pool_push_lock(a->path_pool, p) < 0) {
pr_err("pool_push: %s", strerrno());
goto free_out;
}
*a->total_bytes += p->size; *a->total_bytes += p->size;
return 0; return 0;
@@ -269,7 +271,7 @@ static bool check_path_should_skip(const char *path)
} }
static int walk_path_recursive(sftp_session sftp, const char *path, static int walk_path_recursive(sftp_session sftp, const char *path,
struct list_head *path_list, struct path_resolve_args *a) struct path_resolve_args *a)
{ {
char next_path[PATH_MAX + 1]; char next_path[PATH_MAX + 1];
struct dirent *e; struct dirent *e;
@@ -284,7 +286,7 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
if (S_ISREG(st.st_mode)) { if (S_ISREG(st.st_mode)) {
/* this path is regular file. it is to be copied */ /* this path is regular file. it is to be copied */
return append_path(sftp, path, st, path_list, a); return append_path(sftp, path, st, a);
} }
if (!S_ISDIR(st.st_mode)) if (!S_ISDIR(st.st_mode))
@@ -306,7 +308,7 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
continue; continue;
} }
walk_path_recursive(sftp, next_path, path_list, a); walk_path_recursive(sftp, next_path, a);
/* do not stop even when walk_path_recursive returns /* do not stop even when walk_path_recursive returns
* -1 due to an unreadable file. go to a next * -1 due to an unreadable file. go to a next
* file. Thus, do not pass error messages via * file. Thus, do not pass error messages via
@@ -321,9 +323,9 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
} }
int walk_src_path(sftp_session src_sftp, const char *src_path, int walk_src_path(sftp_session src_sftp, const char *src_path,
struct list_head *path_list, struct path_resolve_args *a) struct path_resolve_args *a)
{ {
return walk_path_recursive(src_sftp, src_path, path_list, a); return walk_path_recursive(src_sftp, src_path, a);
} }
/* based on /* based on

View File

@@ -6,14 +6,12 @@
#include <fcntl.h> #include <fcntl.h>
#include <dirent.h> #include <dirent.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <list.h> #include <list.h>
#include <pool.h>
#include <atomic.h> #include <atomic.h>
#include <ssh.h> #include <ssh.h>
struct path { struct path {
struct list_head list; /* mscp->path_list */
char *path; /* file path */ char *path; /* file path */
size_t size; /* size of file on this path */ size_t size; /* size of file on this path */
mode_t mode; /* permission */ mode_t mode; /* permission */
@@ -78,6 +76,7 @@ struct path_resolve_args {
bool dst_path_should_dir; bool dst_path_should_dir;
/* args to resolve chunks for a path */ /* args to resolve chunks for a path */
pool *path_pool;
struct chunk_pool *cp; struct chunk_pool *cp;
int nr_conn; int nr_conn;
size_t min_chunk_sz; size_t min_chunk_sz;
@@ -85,9 +84,9 @@ struct path_resolve_args {
size_t chunk_align; size_t chunk_align;
}; };
/* recursivly walk through src_path and fill path_list for each file */ /* walk src_path recursivly and fill a->path_pool with found files */
int walk_src_path(sftp_session src_sftp, const char *src_path, int walk_src_path(sftp_session src_sftp, const char *src_path,
struct list_head *path_list, struct path_resolve_args *a); struct path_resolve_args *a);
/* free struct path */ /* free struct path */
void free_path(struct path *p); void free_path(struct path *p);
@@ -96,7 +95,4 @@ void free_path(struct path *p);
int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
int nr_ahead, int buf_sz, bool preserve_ts, size_t *counter); int nr_ahead, int buf_sz, bool preserve_ts, size_t *counter);
/* just print contents. just for debugging */
void path_dump(struct list_head *path_list);
#endif /* _PATH_H_ */ #endif /* _PATH_H_ */

View File

@@ -1,7 +1,7 @@
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
#include "pool.h" #include <pool.h>
#define DEFAULT_START_SIZE 16 #define DEFAULT_START_SIZE 16
@@ -27,17 +27,34 @@ pool *pool_new(void)
void pool_free(pool *p) void pool_free(pool *p)
{ {
if (p->array) if (p->array) {
free(p->array); free(p->array);
p->array = NULL;
}
free(p); free(p);
} }
void pool_zeroize(pool *p, pool_map_f f)
{
void *v;
pool_iter_for_each(p, v) {
f(v);
}
p->num = 0;
}
void pool_destroy(pool *p, pool_map_f f)
{
pool_zeroize(p, f);
pool_free(p);
}
int pool_push(pool *p, void *v) int pool_push(pool *p, void *v)
{ {
if (p->num == p->len) { if (p->num == p->len) {
/* expand array */ /* expand array */
size_t newlen = p->len * 2 * sizeof(void *); size_t newlen = p->len * 2;
void **new = realloc(p->array, newlen); void *new = realloc(p->array, newlen * sizeof(void *));
if (new == NULL) if (new == NULL)
return -1; return -1;
p->len = newlen; p->len = newlen;

View File

@@ -21,9 +21,21 @@ struct pool_struct {
typedef struct pool_struct pool; typedef struct pool_struct pool;
/* allocate a new pool */
pool *pool_new(void); pool *pool_new(void);
/* func type applied to each item in a pool*/
typedef void (*pool_map_f)(void *v);
/* apply f, which free an item, to all items and set num to 0 */
void pool_zeroize(pool *p, pool_map_f f);
/* free pool->array and pool */
void pool_free(pool *p); void pool_free(pool *p);
/* free pool->array and pool after applying f to all items in p->array */
void pool_destroy(pool *p, pool_map_f f);
#define pool_lock(p) LOCK_ACQUIRE(&(p->lock)) #define pool_lock(p) LOCK_ACQUIRE(&(p->lock))
#define pool_unlock(p) LOCK_RELEASE() #define pool_unlock(p) LOCK_RELEASE()

View File

@@ -7,6 +7,7 @@ import platform
import pytest import pytest
import getpass import getpass
import os import os
import shutil
from subprocess import check_call, CalledProcessError, PIPE from subprocess import check_call, CalledProcessError, PIPE
from util import File, check_same_md5sum from util import File, check_same_md5sum
@@ -475,3 +476,15 @@ def test_specify_invalid_password_via_env(mscp):
src.path, "localhost:" + dst.path], env = env) src.path, "localhost:" + dst.path], env = env)
src.cleanup() src.cleanup()
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
def test_10k_files(mscp, src_prefix, dst_prefix):
srcs = []
dsts = []
for n in range(10000):
srcs.append(File("src/src-{:06d}".format(n), size=1024).make())
dsts.append(File("dst/src-{:06d}".format(n)))
run2ok([mscp, "-H", "-v", src_prefix + "src/*", dst_prefix + "dst"])
for s, d in zip(srcs, dsts):
assert check_same_md5sum(s, d)
shutil.rmtree("src")
shutil.rmtree("dst")