add checkpoint.c and .h

This commit is contained in:
Ryo Nakamura
2024-02-16 14:54:24 +09:00
parent 4e895bb72e
commit f71c7a145a
11 changed files with 502 additions and 71 deletions

View File

@@ -10,6 +10,7 @@
#include <minmax.h>
#include <ssh.h>
#include <path.h>
#include <checkpoint.h>
#include <fileops.h>
#include <atomic.h>
#include <platform.h>
@@ -28,10 +29,6 @@ struct mscp_thread {
int id;
int cpu;
/* attributes used by scan thread */
size_t total_bytes;
bool finished;
/* thread-specific values */
pthread_t tid;
int ret;
@@ -54,8 +51,12 @@ struct mscp {
pool *src_pool, *path_pool, *chunk_pool, *thread_pool;
size_t total_bytes; /* total_bytes to be copied */
bool chunk_pool_ready;
#define chunk_pool_is_ready(m) ((m)->chunk_pool_ready)
#define chunk_pool_set_ready(m, b) ((m)->chunk_pool_ready = b)
struct mscp_thread scan; /* mscp_thread for mscp_scan_thread() */
#define mscp_scan_is_finished(m) ((m)->scan.finished)
};
#define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */
@@ -228,6 +229,10 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
return NULL;
}
memset(m, 0, sizeof(*m));
m->direction = direction;
m->opts = o;
m->ssh_opts = s;
chunk_pool_set_ready(m, false);
if (!(m->src_pool = pool_new())) {
priv_set_errv("pool_new: %s", strerrno());
@@ -258,7 +263,6 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
priv_set_errv("strdup: %s", strerrno());
goto free_out;
}
m->direction = direction;
if (o->coremask) {
if (expand_coremask(o->coremask, &m->cores, &m->nr_cores) < 0)
@@ -273,9 +277,6 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
pr_notice("usable cpu cores:%s", b);
}
m->opts = o;
m->ssh_opts = s;
return m;
free_out:
@@ -397,7 +398,7 @@ void *mscp_scan_thread(void *arg)
/* initialize path_resolve_args */
memset(&a, 0, sizeof(a));
a.total_bytes = &t->total_bytes;
a.total_bytes = &m->total_bytes;
if (pool_size(m->src_pool) > 1)
a.dst_path_should_dir = true;
@@ -446,12 +447,12 @@ void *mscp_scan_thread(void *arg)
pr_info("walk source path(s) done");
t->ret = 0;
t->finished = true;
chunk_pool_set_ready(m, true);
return NULL;
err_out:
t->ret = -1;
t->finished = true;
chunk_pool_set_ready(m, true);
return NULL;
}
@@ -463,7 +464,6 @@ int mscp_scan(struct mscp *m)
memset(t, 0, sizeof(*t));
t->m = m;
t->sftp = m->first;
t->finished = false;
if ((ret = pthread_create(&t->tid, NULL, mscp_scan_thread, t)) < 0) {
priv_set_err("pthread_create: %d", ret);
@@ -475,8 +475,7 @@ int mscp_scan(struct mscp *m)
* finished. If the number of chunks are smaller than
* nr_threads, we adjust nr_threads to the number of chunks.
*/
while (!mscp_scan_is_finished(m) &&
pool_size(m->chunk_pool) < m->opts->nr_threads)
while (!chunk_pool_is_ready(m) && pool_size(m->chunk_pool) < m->opts->nr_threads)
usleep(100);
return 0;
@@ -493,6 +492,39 @@ int mscp_scan_join(struct mscp *m)
return 0;
}
int mscp_load_checkpoint(struct mscp *m, const char *pathname)
{
size_t total_bytes = 0;
unsigned int idx;
struct chunk *c;
char remote[1024];
if (checkpoint_load(pathname, remote, sizeof(remote), &m->direction, m->path_pool,
m->chunk_pool) < 0)
return -1;
if (!(m->remote = strdup(remote))) {
priv_set_errv("malloc: %s", strerrno());
return -1;
}
pool_for_each(m->chunk_pool, c, idx) {
total_bytes += c->len;
}
m->total_bytes = total_bytes;
__sync_synchronize();
chunk_pool_set_ready(m, true);
return 0;
}
int mscp_save_checkpoint(struct mscp *m, const char *pathname)
{
return checkpoint_save(pathname, m->direction, m->remote, m->path_pool,
m->chunk_pool);
}
static void *mscp_copy_thread(void *arg);
static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id)
@@ -501,7 +533,7 @@ static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id)
int ret;
if (!(t = malloc(sizeof(*t)))) {
priv_set_errv("malloc: %s,", strerrno());
priv_set_errv("malloc: %s", strerrno());
return NULL;
}
@@ -585,7 +617,7 @@ int mscp_join(struct mscp *m)
}
pr_notice("%lu/%lu bytes copied for %lu/%lu files", total_copied_bytes,
m->scan.total_bytes, nr_copied, nr_tobe_copied);
m->total_bytes, nr_copied, nr_tobe_copied);
return ret;
}
@@ -670,13 +702,12 @@ void *mscp_copy_thread(void *arg)
while (1) {
c = pool_iter_next_lock(m->chunk_pool);
if (c == NULL) {
if (!mscp_scan_is_finished(m)) {
/* scan is not finished, wait. */
if (!chunk_pool_is_ready(m)) {
/* a new chunk will be added. wait for it. */
usleep(100);
continue;
}
/* scan is finished, and no more chunks */
break;
break; /* no more chunks */
}
if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, m->opts->nr_ahead,
@@ -735,7 +766,7 @@ void mscp_get_stats(struct mscp *m, struct mscp_stats *s)
struct mscp_thread *t;
unsigned int idx;
s->total = m->scan.total_bytes;
s->total = m->total_bytes;
s->done = 0;
pool_for_each(m->thread_pool, t, idx) {