little cleanup

This commit is contained in:
Ryo Nakamura
2024-02-11 21:45:34 +09:00
parent a828ca3f5a
commit b756654f6e
3 changed files with 30 additions and 41 deletions

View File

@@ -21,24 +21,27 @@
struct mscp_thread { struct mscp_thread {
struct mscp *m; struct mscp *m;
pthread_t tid;
sftp_session sftp; sftp_session sftp;
int ret;
/* attributes used by copy threads */ /* attributes used by copy threads */
size_t copied_bytes;
int id; int id;
int cpu; int cpu;
size_t copied_bytes;
/* attributes used by scan thread */ /* attributes used by scan thread */
size_t total_bytes; size_t total_bytes;
bool finished; bool finished;
/* thread-specific values */
pthread_t tid;
int ret;
}; };
struct mscp { struct mscp {
char *remote; /* remote host (and uername) */ char *remote; /* remote host (and uername) */
int direction; /* copy direction */ int direction; /* copy direction */
char dst_path[PATH_MAX];
struct mscp_opts *opts; struct mscp_opts *opts;
struct mscp_ssh_opts *ssh_opts; struct mscp_ssh_opts *ssh_opts;
@@ -49,16 +52,12 @@ struct mscp {
sftp_session first; /* first sftp session */ sftp_session first; /* first sftp session */
char dst_path[PATH_MAX];
pool *src_pool, *path_pool, *chunk_pool, *thread_pool; pool *src_pool, *path_pool, *chunk_pool, *thread_pool;
struct mscp_thread scan; /* mscp_thread for mscp_scan_thread() */ struct mscp_thread scan; /* mscp_thread for mscp_scan_thread() */
#define mscp_scan_is_finished(m) ((m)->scan.finished) #define mscp_scan_is_finished(m) ((m)->scan.finished)
}; };
#define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */ #define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */
#define DEFAULT_NR_AHEAD 32 #define DEFAULT_NR_AHEAD 32
#define DEFAULT_BUF_SZ 16384 #define DEFAULT_BUF_SZ 16384
@@ -466,8 +465,7 @@ int mscp_scan(struct mscp *m)
t->sftp = m->first; t->sftp = m->first;
t->finished = false; t->finished = false;
ret = pthread_create(&t->tid, NULL, mscp_scan_thread, t); if ((ret = pthread_create(&t->tid, NULL, mscp_scan_thread, t)) < 0) {
if (ret < 0) {
priv_set_err("pthread_create: %d", ret); priv_set_err("pthread_create: %d", ret);
return -1; return -1;
} }
@@ -502,8 +500,7 @@ static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id)
struct mscp_thread *t; struct mscp_thread *t;
int ret; int ret;
t = malloc(sizeof(*t)); if (!(t = malloc(sizeof(*t)))) {
if (!t) {
priv_set_errv("malloc: %s,", strerrno()); priv_set_errv("malloc: %s,", strerrno());
return NULL; return NULL;
} }
@@ -516,8 +513,7 @@ static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id)
else else
t->cpu = m->cores[id % m->nr_cores]; t->cpu = m->cores[id % m->nr_cores];
ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t); if ((ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t)) < 0) {
if (ret < 0) {
priv_set_errv("pthread_create: %d", ret); priv_set_errv("pthread_create: %d", ret);
free(t); free(t);
return NULL; return NULL;

View File

@@ -96,40 +96,40 @@ static struct chunk *alloc_chunk(struct path *p)
return c; return c;
} }
static int resolve_chunk(struct path *p, struct path_resolve_args *a) static int resolve_chunk(struct path *p, size_t size, struct path_resolve_args *a)
{ {
struct chunk *c; struct chunk *c;
size_t chunk_sz; size_t chunk_sz;
size_t size; size_t remaind;
if (p->size <= a->min_chunk_sz) if (size <= a->min_chunk_sz)
chunk_sz = p->size; chunk_sz = size;
else if (a->max_chunk_sz) else if (a->max_chunk_sz)
chunk_sz = a->max_chunk_sz; chunk_sz = a->max_chunk_sz;
else { else {
chunk_sz = (p->size - (p->size % a->nr_conn)) / a->nr_conn; chunk_sz = (size - (size % a->nr_conn)) / a->nr_conn;
chunk_sz &= ~a->chunk_align; /* align with page_sz */ chunk_sz &= ~a->chunk_align; /* align with page_sz */
if (chunk_sz <= a->min_chunk_sz) if (chunk_sz <= a->min_chunk_sz)
chunk_sz = a->min_chunk_sz; chunk_sz = a->min_chunk_sz;
} }
/* for (size = f->size; size > 0;) does not create a file /* for (size = size; size > 0;) does not create a file (chunk)
* (chunk) when file size is 0. This do {} while (size > 0) * when file size is 0. This do {} while (remaind > 0) creates
* creates just open/close a 0-byte file. * just open/close a 0-byte file.
*/ */
size = p->size; remaind = size;
do { do {
c = alloc_chunk(p); c = alloc_chunk(p);
if (!c) if (!c)
return -1; return -1;
c->off = p->size - size; c->off = size - remaind;
c->len = size < chunk_sz ? size : chunk_sz; c->len = remaind < chunk_sz ? remaind : chunk_sz;
size -= c->len; remaind -= c->len;
if (pool_push_lock(a->chunk_pool, c) < 0) { if (pool_push_lock(a->chunk_pool, c) < 0) {
pr_err("pool_push_lock: %s", strerrno()); pr_err("pool_push_lock: %s", strerrno());
return -1; return -1;
} }
} while (size > 0); } while (remaind > 0);
return 0; return 0;
} }
@@ -159,8 +159,6 @@ static int append_path(sftp_session sftp, const char *path, struct stat st,
pr_err("strndup: %s", strerrno()); pr_err("strndup: %s", strerrno());
goto free_out; goto free_out;
} }
p->size = st.st_size;
p->mode = st.st_mode;
p->state = FILE_STATE_INIT; p->state = FILE_STATE_INIT;
lock_init(&p->lock); lock_init(&p->lock);
@@ -168,7 +166,7 @@ static int append_path(sftp_session sftp, const char *path, struct stat st,
if (!p->dst_path) if (!p->dst_path)
goto free_out; goto free_out;
if (resolve_chunk(p, a) < 0) if (resolve_chunk(p, st.st_size, a) < 0)
return -1; /* XXX: do not free path becuase chunk(s) return -1; /* XXX: do not free path becuase chunk(s)
* was added to chunk pool already */ * was added to chunk pool already */
@@ -177,7 +175,7 @@ static int append_path(sftp_session sftp, const char *path, struct stat st,
goto free_out; goto free_out;
} }
*a->total_bytes += p->size; *a->total_bytes += st.st_size;
return 0; return 0;
@@ -490,8 +488,7 @@ int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
/* open src */ /* open src */
flags = O_RDONLY; flags = O_RDONLY;
mode = S_IRUSR; mode = S_IRUSR;
s = mscp_open(c->p->path, flags, mode, src_sftp); if (!(s = mscp_open(c->p->path, flags, mode, src_sftp))) {
if (!s) {
priv_set_errv("mscp_open: %s: %s", c->p->path, strerrno()); priv_set_errv("mscp_open: %s: %s", c->p->path, strerrno());
return -1; return -1;
} }
@@ -503,8 +500,7 @@ int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
/* open dst */ /* open dst */
flags = O_WRONLY; flags = O_WRONLY;
mode = S_IRUSR | S_IWUSR; mode = S_IRUSR | S_IWUSR;
d = mscp_open(c->p->dst_path, flags, mode, dst_sftp); if (!(d = mscp_open(c->p->dst_path, flags, mode, dst_sftp))) {
if (!d) {
mscp_close(s); mscp_close(s);
priv_set_errv("mscp_open: %s: %s", c->p->dst_path, strerrno()); priv_set_errv("mscp_open: %s: %s", c->p->dst_path, strerrno());
return -1; return -1;

View File

@@ -12,18 +12,15 @@
struct path { struct path {
char *path; /* file path */ char *path; /* file path */
size_t size; /* size of file on this path */
mode_t mode; /* permission */
char *dst_path; /* copy dst path */ char *dst_path; /* copy dst path */
int state; refcnt refcnt; /* number of associated chunks */
lock lock; lock lock;
refcnt refcnt; int state;
};
#define FILE_STATE_INIT 0 #define FILE_STATE_INIT 0
#define FILE_STATE_OPENED 1 #define FILE_STATE_OPENED 1
#define FILE_STATE_DONE 2 #define FILE_STATE_DONE 2
};
struct chunk { struct chunk {
struct path *p; struct path *p;