add pool strcture and move src list to pool

This commit is contained in:
Ryo Nakamura
2024-02-10 13:57:37 +09:00
parent c5afb99d67
commit d2e061fd97
5 changed files with 190 additions and 38 deletions

View File

@@ -689,6 +689,8 @@ ForEachMacros:
- 'xbc_node_for_each_key_value'
- 'xbc_node_for_each_subkey'
- 'zorro_for_each_dev'
- 'pool_iter_for_each'
- 'pool_for_each'
IncludeBlocks: Preserve
IncludeCategories:

View File

@@ -99,7 +99,7 @@ list(APPEND MSCP_BUILD_INCLUDE_DIRS ${CMAKE_CURRENT_BINARY_DIR}/include)
# libmscp.a
set(LIBMSCP_SRC
src/mscp.c src/ssh.c src/fileops.c src/path.c src/platform.c
src/print.c src/strerrno.c
src/print.c src/pool.c src/strerrno.c
${OPENBSD_COMPAT_SRC})
add_library(mscp-static STATIC ${LIBMSCP_SRC})
target_include_directories(mscp-static

View File

@@ -7,6 +7,7 @@
#include <sys/time.h>
#include <list.h>
#include <pool.h>
#include <minmax.h>
#include <ssh.h>
#include <path.h>
@@ -34,6 +35,7 @@ struct mscp {
sftp_session first; /* first sftp session */
char dst_path[PATH_MAX];
pool *src_pool;
struct list_head src_list;
struct list_head path_list;
struct chunk_pool cp;
@@ -60,11 +62,6 @@ struct mscp_thread {
int ret;
};
struct src {
struct list_head list; /* mscp->src_list */
char *path;
};
#define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */
#define DEFAULT_NR_AHEAD 32
#define DEFAULT_BUF_SZ 16384
@@ -235,9 +232,14 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
priv_set_errv("malloc: %s", strerrno());
return NULL;
}
memset(m, 0, sizeof(*m));
INIT_LIST_HEAD(&m->src_list);
m->src_pool = pool_new();
if (!m->src_pool) {
priv_set_errv("pool_new: %s", strerrno());
goto free_out;
}
INIT_LIST_HEAD(&m->path_list);
chunk_pool_init(&m->cp);
@@ -275,6 +277,8 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
return m;
free_out:
if (m->src_pool)
pool_free(m->src_pool);
free(m);
return NULL;
}
@@ -290,23 +294,15 @@ int mscp_connect(struct mscp *m)
int mscp_add_src_path(struct mscp *m, const char *src_path)
{
struct src *s;
s = malloc(sizeof(*s));
char *s = strdup(src_path);
if (!s) {
priv_set_errv("malloc: %s", strerrno());
priv_set_errv("strdup: %s", strerrno());
return -1;
}
memset(s, 0, sizeof(*s));
s->path = strdup(src_path);
if (!s->path) {
priv_set_errv("malloc: %s", strerrno());
free(s);
if (pool_push(m->src_pool, s) < 0) {
priv_set_errv("pool_push: %s", strerrno());
return -1;
}
list_add_tail(&s->list, &m->src_list);
return 0;
}
@@ -370,8 +366,8 @@ void *mscp_scan_thread(void *arg)
struct path_resolve_args a;
struct list_head tmp;
struct path *p;
struct src *s;
struct stat ss, ds;
char *src_path;
glob_t pglob;
int n;
@@ -395,7 +391,7 @@ void *mscp_scan_thread(void *arg)
memset(&a, 0, sizeof(a));
a.total_bytes = &m->total_bytes;
if (list_count(&m->src_list) > 1)
if (pool_size(m->src_pool) > 1)
a.dst_path_should_dir = true;
if (mscp_stat(m->dst_path, &ds, dst_sftp) == 0) {
@@ -411,22 +407,22 @@ void *mscp_scan_thread(void *arg)
pr_info("start to walk source path(s)");
/* walk a src_path recusively, and resolve path->dst_path for each src */
list_for_each_entry(s, &m->src_list, list) {
/* walk each src_path recusively, and resolve path->dst_path for each src */
pool_iter_for_each(m->src_pool, src_path) {
memset(&pglob, 0, sizeof(pglob));
if (mscp_glob(s->path, GLOB_NOCHECK, &pglob, src_sftp) < 0) {
if (mscp_glob(src_path, GLOB_NOCHECK, &pglob, src_sftp) < 0) {
pr_err("mscp_glob: %s", strerrno());
goto err_out;
}
for (n = 0; n < pglob.gl_pathc; n++) {
if (mscp_stat(pglob.gl_pathv[n], &ss, src_sftp) < 0) {
pr_err("stat: %s %s", s->path, strerrno());
pr_err("stat: %s %s", src_path, strerrno());
goto err_out;
}
if (!a.dst_path_should_dir && pglob.gl_pathc > 1)
a.dst_path_should_dir = true; /* we have over 1 src */
a.dst_path_should_dir = true; /* we have over 1 srces */
/* set path specific args */
a.src_path = pglob.gl_pathv[n];
@@ -705,14 +701,6 @@ out:
/* cleanup-related functions */
static void list_free_src(struct list_head *list)
{
struct src *s;
s = list_entry(list, typeof(*s), list);
free(s->path);
free(s);
}
static void list_free_path(struct list_head *list)
{
struct path *p;
@@ -734,9 +722,6 @@ void mscp_cleanup(struct mscp *m)
m->first = NULL;
}
list_free_f(&m->src_list, list_free_src);
INIT_LIST_HEAD(&m->src_list);
list_free_f(&m->path_list, list_free_path);
INIT_LIST_HEAD(&m->path_list);

91
src/pool.c Normal file
View File

@@ -0,0 +1,91 @@
#include <string.h>
#include <stdlib.h>
#include "pool.h"
#define DEFAULT_START_SIZE 16
pool *pool_new(void)
{
pool *p;
p = malloc(sizeof(*p));
if (!p)
return NULL;
memset(p, 0, sizeof(*p));
p->array = calloc(DEFAULT_START_SIZE, sizeof(void *));
if (!p->array) {
free(p);
return NULL;
}
p->len = DEFAULT_START_SIZE;
p->num = 0;
lock_init(&p->lock);
return p;
}
void pool_free(pool *p)
{
if (p->array)
free(p->array);
free(p);
}
int pool_push(pool *p, void *v)
{
if (p->num == p->len) {
/* expand array */
size_t newlen = p->len * 2 * sizeof(void *);
void **new = realloc(p->array, newlen);
if (new == NULL)
return -1;
p->len = newlen;
p->array = new;
}
p->array[p->num] = v;
p->num++;
return 0;
}
int pool_push_lock(pool *p, void *v)
{
int ret = -1;
pool_lock(p);
ret = pool_push(p, v);
pool_unlock(p);
return ret;
}
void *pool_pop(pool *p)
{
return p->num == 0 ? NULL : p->array[--p->num];
}
void *pool_pop_lock(pool *p)
{
void *v;
pool_lock(p);
v = pool_pop(p);
pool_unlock(p);
return v;
}
void *pool_iter_next(pool *p)
{
if (p->num <= p->idx)
return NULL;
void *v = p->array[p->idx];
p->idx++;
return v;
}
void *pool_iter_next_lock(pool *p)
{
void *v = NULL;
pool_lock(p);
v = pool_iter_next(p);
pool_unlock(p);
return v;
}

74
src/pool.h Normal file
View File

@@ -0,0 +1,74 @@
#ifndef _POOL_H_
#define _POOL_H_
#include <stdbool.h>
#include <stddef.h>
#include <atomic.h>
/* A pool like a stack with an iterator walking from the bottom to the
* top. The memory foot print for a pool never shrinks. Thus this is
* not suitable for long-term uses. */
struct pool_struct {
void **array;
size_t len; /* length of array */
size_t num; /* number of items in the array */
size_t idx; /* index used dy iter */
int state;
lock lock;
};
typedef struct pool_struct pool;
pool *pool_new(void);
void pool_free(pool *p);
#define pool_lock(p) LOCK_ACQUIRE(&(p->lock))
#define pool_unlock(p) LOCK_RELEASE()
/*
* pool_push() pushes *v to pool *p. pool_push_lock() does this while
* locking *p.
*/
int pool_push(pool *p, void *v);
int pool_push_lock(pool *p, void *v);
/*
* pool_pop() pops the last *v pushed to *p. pool_pop_lock() does this
* while locking *p.
*/
void *pool_pop(pool *p);
void *pool_pop_lock(pool *p);
#define pool_size(p) ((p)->num)
#define pool_get(p, idx) ((p->num <= idx) ? NULL : p->array[idx])
/*
* pool->idx indicates next *v in an iteration. This has two
* use-cases.
*
* (1) A simple list: just a single thread has a pool, and the thread
* can call pool_iter_for_each() for the pool (not thread safe).
*
* (2) A thread-safe queue: one thread initializes the iterator for a
* pool by pool_iter_init(). Then, multiple threads get a next *v
* concurrently by pool_iter_next_lock(), which means dequeuing. At
* this time, other thread can add new *v by pool_push_lock(), which
* means enqueuing. During this, other threads must not intercept the
* pool by pool_iter_* functions.
*/
#define pool_iter_init(p) (p->idx = 0)
void *pool_iter_next(pool *p);
void *pool_iter_next_lock(pool *p);
#define pool_iter_for_each(p, v) \
pool_iter_init(p); \
for (v = pool_iter_next(p); v != NULL; v = pool_iter_next(p))
#define pool_for_each(p, v, idx) \
idx = 0; \
for (v = pool_get(p, idx); v != NULL; v = pool_get(p, ++idx))
#endif /* _POOL_H_ */