mirror of
https://github.com/upa/mscp.git
synced 2026-02-11 15:34:53 +08:00
Compare commits
23 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5ac0874621 | ||
|
|
e0e6fae296 | ||
|
|
6305f02770 | ||
|
|
ae4b848ba0 | ||
|
|
3902fb584a | ||
|
|
4ec877a290 | ||
|
|
f0c70a148b | ||
|
|
e038b3020d | ||
|
|
2fdfa7b830 | ||
|
|
f5d0f526f2 | ||
|
|
a086e6a154 | ||
|
|
3bce4ec277 | ||
|
|
a923d40ada | ||
|
|
24fef5f539 | ||
|
|
4e80b05da7 | ||
|
|
98eca409af | ||
|
|
cf99a439cb | ||
|
|
3077bb0856 | ||
|
|
72c27f16d6 | ||
|
|
9b0eb668f9 | ||
|
|
5f9f20f150 | ||
|
|
ceb9ebd5a8 | ||
|
|
3810d6314d |
27
README.md
27
README.md
@@ -38,19 +38,25 @@ brew install upa/tap/mscp
|
||||
|
||||
- Ubuntu 22.04
|
||||
```console
|
||||
wget https://github.com/upa/mscp/releases/download/v0.0.6/mscp_0.0.6-ubuntu-22.04-x86_64.deb
|
||||
apt-get install -f ./mscp_0.0.6-ubuntu-22.04-x86_64.deb
|
||||
wget https://github.com/upa/mscp/releases/latest/download/mscp_ubuntu-22.04-x86_64.deb
|
||||
apt-get install -f ./mscp_ubuntu-22.04-x86_64.deb
|
||||
```
|
||||
|
||||
- Ubuntu 20.04
|
||||
```console
|
||||
wget https://github.com/upa/mscp/releases/download/v0.0.6/mscp_0.0.6-ubuntu-20.04-x86_64.deb
|
||||
apt-get install -f ./mscp_0.0.6-ubuntu-20.04-x86_64.deb
|
||||
wget https://github.com/upa/mscp/releases/latest/download/mscp_ubuntu-20.04-x86_64.deb
|
||||
apt-get install -f ./mscp_ubuntu-20.04-x86_64.deb
|
||||
```
|
||||
|
||||
- Rocky 8.6
|
||||
```console
|
||||
yum install https://github.com/upa/mscp/releases/download/v0.0.6/mscp_0.0.6-rocky-8.6-x86_64.rpm
|
||||
yum install https://github.com/upa/mscp/releases/latest/download/mscp_rocky-8.6-x86_64.rpm
|
||||
```
|
||||
|
||||
- Linux with single binary `mscp` (x86_64 only)
|
||||
```console
|
||||
wget https://github.com/upa/mscp/releases/latest/download/mscp.linux.x86.static -O /usr/local/bin/mscp
|
||||
chmod 755 /usr/local/bin/mscp
|
||||
```
|
||||
|
||||
|
||||
@@ -98,9 +104,9 @@ of libssh. So you can start from cmake with it.
|
||||
|
||||
```console
|
||||
$ mscp
|
||||
mscp v0.0.7: copy files over multiple ssh connections
|
||||
mscp v0.0.8: copy files over multiple ssh connections
|
||||
|
||||
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]
|
||||
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask] [-u max_startups]
|
||||
[-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]
|
||||
[-l login_name] [-p port] [-i identity_file]
|
||||
[-c cipher_spec] [-M hmac_spec] [-C compress] source ... target
|
||||
@@ -124,7 +130,7 @@ $ mscp -n 5 -m 0x1f -c aes128-gcm@openssh.com /var/ram/test.img 10.0.0.1:/var/ra
|
||||
- `-v` option increments verbose output level.
|
||||
|
||||
```console
|
||||
$ mscp test 10.0.0.:
|
||||
$ mscp test 10.0.0.1:
|
||||
[=======================================] 100% 49B /49B 198.8B/s 00:00 ETA
|
||||
```
|
||||
|
||||
@@ -153,15 +159,16 @@ copy done: test/testdir/asdf
|
||||
|
||||
```console
|
||||
$ mscp -h
|
||||
mscp v0.0.7: copy files over multiple ssh connections
|
||||
mscp v0.0.8: copy files over multiple ssh connections
|
||||
|
||||
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]
|
||||
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask] [-u max_startups]
|
||||
[-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]
|
||||
[-l login_name] [-p port] [-i identity_file]
|
||||
[-c cipher_spec] [-M hmac_spec] [-C compress] source ... target
|
||||
|
||||
-n NR_CONNECTIONS number of connections (default: floor(log(cores)*2)+1)
|
||||
-m COREMASK hex value to specify cores where threads pinned
|
||||
-u MAX_STARTUPS number of concurrent outgoing connections (default: 8)
|
||||
-s MIN_CHUNK_SIZE min chunk size (default: 64MB)
|
||||
-S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
* 2. connect to remote host with mscp_connect()
|
||||
* 3. add path to source files with mscp_add_src_path()
|
||||
* 4. set path to destination with mscp_set_dst_path()
|
||||
* 5. finish preparation with mscp_prepare()
|
||||
* 5. start to scan source files with mscp_scan()
|
||||
* 6. start copy with mscp_start()
|
||||
* 7. wait for copy finished with mscp_join()
|
||||
* 8. cleanup mscp instance with mscp_cleanup() and mscp_free()
|
||||
@@ -43,6 +43,7 @@ struct mscp_opts {
|
||||
size_t max_chunk_sz; /** maximum chunk size (default file size/nr_threads) */
|
||||
size_t buf_sz; /** buffer size, default 16k. */
|
||||
char coremask[MSCP_MAX_COREMASK_STR]; /** hex to specifiy usable cpu cores */
|
||||
int max_startups; /* sshd MaxStartups concurrent connections */
|
||||
|
||||
int severity; /** messaging severity. set MSCP_SERVERITY_* */
|
||||
int msg_fd; /** fd to output message. default STDOUT (0),
|
||||
@@ -109,7 +110,7 @@ struct mscp *mscp_init(const char *remote_host, int direction,
|
||||
/**
|
||||
* @brief Connect the first SSH connection. mscp_connect connects to
|
||||
* remote host and initialize a SFTP session over the
|
||||
* connection. mscp_prepare() and mscp_start() require mscp_connect()
|
||||
* connection. mscp_scan() and mscp_start() require mscp_connect()
|
||||
* beforehand.
|
||||
*
|
||||
* @param m mscp instance.
|
||||
@@ -149,20 +150,31 @@ int mscp_add_src_path(struct mscp *m, const char *src_path);
|
||||
*/
|
||||
int mscp_set_dst_path(struct mscp *m, const char *dst_path);
|
||||
|
||||
/* check source files, resolve destination file paths for all source
|
||||
* files, and prepare chunks for all files. */
|
||||
/* scan source files, resolve destination file paths for all source
|
||||
* files, and calculate chunks for all files. */
|
||||
|
||||
/**
|
||||
* @brief Prepare for file transfer. This function checks all source
|
||||
* files (recursively), resolve paths on the destination side, and
|
||||
* calculate file chunks.
|
||||
* @brief Scan source paths and prepare. This function checks all
|
||||
* source files (recursively), resolve paths on the destination side,
|
||||
* and calculate file chunks. This function is non-blocking.
|
||||
*
|
||||
* @param m mscp instance.
|
||||
*
|
||||
* @return 0 on success, < 0 if an error occured.
|
||||
* mscp_get_error() can be used to retrieve error message.
|
||||
*/
|
||||
int mscp_prepare(struct mscp *m);
|
||||
int mscp_scan(struct mscp *m);
|
||||
|
||||
/**
|
||||
* @brief Join scna thread invoked by mscp_scan(). mscp_join()
|
||||
* involves this, so that mscp_scan_join() should be called when
|
||||
* mscp_scan() is called by mscp_start() is not.
|
||||
*
|
||||
* @param m mscp instance.
|
||||
* @return 0 on success, < 0 if an error occured.
|
||||
* mscp_get_error() can be used to retrieve error message.
|
||||
*/
|
||||
int mscp_scan_join(struct mscp *m);
|
||||
|
||||
/**
|
||||
* @brief Start to copy files. mscp_start() returns immediately. You
|
||||
@@ -245,15 +257,6 @@ enum {
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* @brief Set a file descriptor for receiving messages from mscp.
|
||||
* This function has the same effect with setting mscp_opts->msg_fd.
|
||||
*
|
||||
* @param m mscp instance.
|
||||
* @param fd fd to which libmscp writes messages.
|
||||
*/
|
||||
void mscp_set_msg_fd(struct mscp *m, int fd);
|
||||
|
||||
|
||||
/**
|
||||
* @brief Get the recent error message from libmscp. Note that this
|
||||
|
||||
23
mscp/mscp.py
23
mscp/mscp.py
@@ -37,7 +37,7 @@ SEVERITY_DEBUG = pymscp.SEVERITY_DEBUG
|
||||
|
||||
STATE_INIT = 0
|
||||
STATE_CONNECTED = 1
|
||||
STATE_PREPARED = 2
|
||||
STATE_SCANNED = 2
|
||||
STATE_RUNNING = 3
|
||||
STATE_STOPPED = 4
|
||||
STATE_JOINED = 5
|
||||
@@ -47,7 +47,7 @@ STATE_RELEASED = 7
|
||||
_state_str = {
|
||||
STATE_INIT: "init",
|
||||
STATE_CONNECTED: "connected",
|
||||
STATE_PREPARED: "prepared",
|
||||
STATE_SCANNED: "scanned",
|
||||
STATE_RUNNING: "running",
|
||||
STATE_STOPPED: "stopped",
|
||||
STATE_JOINED: "joined",
|
||||
@@ -71,6 +71,9 @@ class mscp:
|
||||
self.state = STATE_INIT
|
||||
|
||||
def __str__(self):
|
||||
if not hasattr(self, "state"):
|
||||
# this instance failed on mscp_init
|
||||
return "mscp:{}:init-failed"
|
||||
return "mscp:{}:{}".format(self.remote, self.__state2str())
|
||||
|
||||
def __repr__(self):
|
||||
@@ -100,26 +103,30 @@ class mscp:
|
||||
self.state = STATE_CONNECTED
|
||||
|
||||
def add_src_path(self, src_path: str):
|
||||
if type(src_path) != str:
|
||||
raise ValueError("src_path must be str: {}".format(src_path))
|
||||
self.src_paths.append(src_path)
|
||||
pymscp.mscp_add_src_path(m = self.m, src_path = src_path)
|
||||
|
||||
def set_dst_path(self, dst_path: str):
|
||||
if type(dst_path) != str:
|
||||
raise ValueError("dst_path must be str: {}".format(dst_path))
|
||||
self.dst_path = dst_path
|
||||
pymscp.mscp_set_dst_path(m = self.m, dst_path = dst_path);
|
||||
|
||||
def prepare(self):
|
||||
def scan(self):
|
||||
if self.state != STATE_CONNECTED:
|
||||
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
|
||||
if not self.src_paths:
|
||||
raise RuntimeError("src path list is empty")
|
||||
if not self.dst_path:
|
||||
if self.dst_path == None:
|
||||
raise RuntimeError("dst path is not set")
|
||||
|
||||
pymscp.mscp_prepare(m = self.m)
|
||||
self.state = STATE_PREPARED
|
||||
pymscp.mscp_scan(m = self.m)
|
||||
self.state = STATE_SCANNED
|
||||
|
||||
def start(self):
|
||||
if self.state != STATE_PREPARED:
|
||||
if self.state != STATE_SCANNED:
|
||||
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
|
||||
|
||||
pymscp.mscp_start(m = self.m)
|
||||
@@ -167,7 +174,7 @@ class mscp:
|
||||
|
||||
self.set_dst_path(dst)
|
||||
|
||||
self.prepare()
|
||||
self.scan()
|
||||
self.start()
|
||||
if nonblock:
|
||||
return
|
||||
|
||||
15
src/list.h
15
src/list.h
@@ -554,5 +554,20 @@ static inline int list_count(struct list_head *head)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* list_free_f - free items in a list with a function
|
||||
* @head the heaf for your list.
|
||||
* @f function that releases an item in the list.
|
||||
*/
|
||||
static inline void list_free_f(struct list_head *head, void (*f)(struct list_head *))
|
||||
{
|
||||
struct list_head *p, *n;
|
||||
|
||||
list_for_each_safe(p, n, head) {
|
||||
list_del(p);
|
||||
f(p);
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
32
src/main.c
32
src/main.c
@@ -24,7 +24,7 @@
|
||||
void usage(bool print_help) {
|
||||
printf("mscp v" VERSION ": copy files over multiple ssh connections\n"
|
||||
"\n"
|
||||
"Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]\n"
|
||||
"Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask] [-u max_startups]\n"
|
||||
" [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]\n"
|
||||
" [-l login_name] [-p port] [-i identity_file]\n"
|
||||
" [-c cipher_spec] [-M hmac_spec] [-C compress] source ... target\n"
|
||||
@@ -36,6 +36,8 @@ void usage(bool print_help) {
|
||||
printf(" -n NR_CONNECTIONS number of connections "
|
||||
"(default: floor(log(cores)*2)+1)\n"
|
||||
" -m COREMASK hex value to specify cores where threads pinned\n"
|
||||
" -u MAX_STARTUPS number of concurrent outgoing connections "
|
||||
"(default: 8)\n"
|
||||
" -s MIN_CHUNK_SIZE min chunk size (default: 64MB)\n"
|
||||
" -S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)\n"
|
||||
"\n"
|
||||
@@ -52,7 +54,8 @@ void usage(bool print_help) {
|
||||
" -i IDENTITY identity file for public key authentication\n"
|
||||
" -c CIPHER cipher spec\n"
|
||||
" -M HMAC hmac spec\n"
|
||||
" -C COMPRESS enable compression: yes, no, zlib, zlib@openssh.com\n"
|
||||
" -C COMPRESS enable compression: "
|
||||
"yes, no, zlib, zlib@openssh.com\n"
|
||||
" -H disable hostkey check\n"
|
||||
" -d increment ssh debug output level\n"
|
||||
" -N enable Nagle's algorithm (default disabled)\n"
|
||||
@@ -69,7 +72,7 @@ char *split_remote_and_path(const char *string, char **remote, char **path)
|
||||
*/
|
||||
|
||||
if (!(s = strdup(string))) {
|
||||
fprintf(stderr, "strdup: %s\n", strerrno());
|
||||
fprintf(stderr, "strdup: %s\n", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@@ -115,7 +118,7 @@ struct target *validate_targets(char **arg, int len)
|
||||
int n;
|
||||
|
||||
if ((t = calloc(len, sizeof(struct target))) == NULL) {
|
||||
fprintf(stderr, "calloc: %s\n", strerrno());
|
||||
fprintf(stderr, "calloc: %s\n", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
memset(t, 0, len * sizeof(struct target));
|
||||
@@ -204,7 +207,7 @@ int main(int argc, char **argv)
|
||||
memset(&o, 0, sizeof(o));
|
||||
o.severity = MSCP_SEVERITY_WARN;
|
||||
|
||||
while ((ch = getopt(argc, argv, "n:m:s:S:a:b:vqDrl:p:i:c:M:C:HdNh")) != -1) {
|
||||
while ((ch = getopt(argc, argv, "n:m:u:s:S:a:b:vqDrl:p:i:c:M:C:HdNh")) != -1) {
|
||||
switch (ch) {
|
||||
case 'n':
|
||||
o.nr_threads = atoi(optarg);
|
||||
@@ -217,6 +220,9 @@ int main(int argc, char **argv)
|
||||
case 'm':
|
||||
strncpy(o.coremask, optarg, sizeof(o.coremask));
|
||||
break;
|
||||
case 'u':
|
||||
o.max_startups = atoi(optarg);
|
||||
break;
|
||||
case 's':
|
||||
o.min_chunk_sz = atoi(optarg);
|
||||
break;
|
||||
@@ -323,7 +329,7 @@ int main(int argc, char **argv)
|
||||
|
||||
if (!dryrun) {
|
||||
if (pipe(pipe_fd) < 0) {
|
||||
fprintf(stderr, "pipe: %s\n", strerrno());
|
||||
fprintf(stderr, "pipe: %s\n", strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
msg_fd = pipe_fd[0];
|
||||
@@ -352,23 +358,23 @@ int main(int argc, char **argv)
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mscp_prepare(m) < 0) {
|
||||
fprintf(stderr, "mscp_prepare: %s\n", mscp_get_error());
|
||||
if (mscp_scan(m) < 0) {
|
||||
fprintf(stderr, "mscp_scan: %s\n", mscp_get_error());
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dryrun) {
|
||||
ret = 0;
|
||||
ret = mscp_scan_join(m);
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (pthread_create(&tid_stat, NULL, print_stat_thread, NULL) < 0) {
|
||||
fprintf(stderr, "pthread_create: %s\n", strerrno());
|
||||
fprintf(stderr, "pthread_create: %s\n", strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (signal(SIGINT, sigint_handler) == SIG_ERR) {
|
||||
fprintf(stderr, "signal: %s\n", strerrno());
|
||||
fprintf(stderr, "signal: %s\n", strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -547,14 +553,14 @@ void *print_stat_thread(void *arg)
|
||||
|
||||
while (true) {
|
||||
if (poll(&pfd, 1, 100) < 0) {
|
||||
fprintf(stderr, "poll: %s\n", strerrno());
|
||||
fprintf(stderr, "poll: %s\n", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (pfd.revents & POLLIN) {
|
||||
memset(buf, 0, sizeof(buf));
|
||||
if (read(msg_fd, buf, sizeof(buf)) < 0) {
|
||||
fprintf(stderr, "read: %s\n", strerrno());
|
||||
fprintf(stderr, "read: %s\n", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
print_cli("\r\033[K" "%s", buf);
|
||||
|
||||
@@ -4,10 +4,13 @@
|
||||
#include <limits.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#include <util.h>
|
||||
#include <message.h>
|
||||
|
||||
/* mscp error message buffer */
|
||||
/* strerror_r wrapper */
|
||||
__thread char thread_strerror[128];
|
||||
|
||||
/* mscp error message buffer */
|
||||
#define MSCP_ERRMSG_SIZE (PATH_MAX * 2)
|
||||
|
||||
static char errmsg[MSCP_ERRMSG_SIZE];
|
||||
@@ -30,29 +33,17 @@ const char *mscp_get_error()
|
||||
|
||||
/* message print functions */
|
||||
|
||||
static int mprint_serverity = MSCP_SEVERITY_WARN;
|
||||
static pthread_mutex_t mprint_lock = PTHREAD_MUTEX_INITIALIZER;
|
||||
static int mprint_severity = MSCP_SEVERITY_WARN;
|
||||
|
||||
void mprint_set_severity(int serverity)
|
||||
{
|
||||
if (serverity < 0)
|
||||
mprint_serverity = -1; /* no print */
|
||||
mprint_serverity = serverity;
|
||||
mprint_severity = -1; /* no print */
|
||||
mprint_severity = serverity;
|
||||
}
|
||||
|
||||
void mprint(int fd, int serverity, const char *fmt, ...)
|
||||
int mprint_get_severity()
|
||||
{
|
||||
va_list va;
|
||||
int ret;
|
||||
|
||||
if (fd < 0)
|
||||
return;
|
||||
|
||||
if (serverity <= mprint_serverity) {
|
||||
pthread_mutex_lock(&mprint_lock);
|
||||
va_start(va, fmt);
|
||||
vdprintf(fd, fmt, va);
|
||||
va_end(va);
|
||||
pthread_mutex_unlock(&mprint_lock);
|
||||
}
|
||||
return mprint_severity;
|
||||
}
|
||||
|
||||
|
||||
@@ -7,23 +7,47 @@
|
||||
|
||||
/* message print. printed messages are passed to application via msg_fd */
|
||||
void mprint_set_severity(int severity);
|
||||
void mprint(int fd, int severity, const char *fmt, ...);
|
||||
int mprint_get_severity();
|
||||
|
||||
#define mprint(fp, severity, fmt, ...) \
|
||||
do { \
|
||||
if (fp && severity <= mprint_get_severity()) { \
|
||||
fprintf(fp, fmt, ##__VA_ARGS__); \
|
||||
fflush(fp); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define mpr_err(fp, fmt, ...) \
|
||||
mprint(fp, MSCP_SEVERITY_ERR, fmt, ##__VA_ARGS__)
|
||||
#define mpr_warn(fp, fmt, ...) \
|
||||
mprint(fp, MSCP_SEVERITY_WARN, fmt, ##__VA_ARGS__)
|
||||
#define mpr_notice(fp, fmt, ...) \
|
||||
mprint(fp, MSCP_SEVERITY_NOTICE, fmt, ##__VA_ARGS__)
|
||||
#define mpr_info(fp, fmt, ...) \
|
||||
mprint(fp, MSCP_SEVERITY_INFO, fmt, ##__VA_ARGS__)
|
||||
#define mpr_debug(fp, fmt, ...) \
|
||||
mprint(fp, MSCP_SEVERITY_DEBUG, fmt, ##__VA_ARGS__)
|
||||
|
||||
|
||||
/* errorno wrapper */
|
||||
extern __thread char thread_strerror[128];
|
||||
|
||||
#ifdef _GNU_SOURCE
|
||||
/* GNU strerror_r */
|
||||
#define strerrno() \
|
||||
strerror_r(errno, thread_strerror, sizeof(thread_strerror))
|
||||
#else
|
||||
/* this macro assumes that strerror_r never fails. any good way? */
|
||||
#define strerrno() \
|
||||
(strerror_r(errno, thread_strerror, sizeof(thread_strerror)) \
|
||||
? thread_strerror : thread_strerror)
|
||||
#endif
|
||||
|
||||
#define mpr_err(fd, fmt, ...) \
|
||||
mprint(fd, MSCP_SEVERITY_ERR, fmt, ##__VA_ARGS__)
|
||||
#define mpr_warn(fd, fmt, ...) \
|
||||
mprint(fd, MSCP_SEVERITY_WARN, fmt, ##__VA_ARGS__)
|
||||
#define mpr_notice(fd, fmt, ...) \
|
||||
mprint(fd, MSCP_SEVERITY_NOTICE, fmt, ##__VA_ARGS__)
|
||||
#define mpr_info(fd, fmt, ...) \
|
||||
mprint(fd, MSCP_SEVERITY_INFO, fmt, ##__VA_ARGS__)
|
||||
#define mpr_debug(fd, fmt, ...) \
|
||||
mprint(fd, MSCP_SEVERITY_DEBUG, fmt, ##__VA_ARGS__)
|
||||
|
||||
|
||||
/* error message buffer */
|
||||
#define mscp_set_error(fmt, ...) \
|
||||
_mscp_set_error("%s:%d:%s: " fmt, \
|
||||
_mscp_set_error("%s:%d:%s: " fmt "\0", \
|
||||
basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
|
||||
|
||||
void _mscp_set_error(const char *fmt, ...);
|
||||
|
||||
319
src/mscp.c
319
src/mscp.c
@@ -2,7 +2,8 @@
|
||||
#include <unistd.h>
|
||||
#include <math.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#include <semaphore.h>
|
||||
#include <sys/time.h>
|
||||
|
||||
#include <list.h>
|
||||
#include <util.h>
|
||||
@@ -13,24 +14,30 @@
|
||||
#include <message.h>
|
||||
#include <mscp.h>
|
||||
|
||||
|
||||
struct mscp {
|
||||
char *remote; /* remote host (and uername) */
|
||||
int direction; /* copy direction */
|
||||
struct mscp_opts *opts;
|
||||
struct mscp_ssh_opts *ssh_opts;
|
||||
|
||||
int msg_fd; /* writer fd for message pipe */
|
||||
FILE *msg_fp; /* writer fd for message pipe */
|
||||
|
||||
int *cores; /* usable cpu cores by COREMASK */
|
||||
int nr_cores; /* length of array of cores */
|
||||
int *cores; /* usable cpu cores by COREMASK */
|
||||
int nr_cores; /* length of array of cores */
|
||||
|
||||
sem_t *sem; /* semaphore for concurrent
|
||||
* connecting ssh sessions */
|
||||
|
||||
sftp_session first; /* first sftp session */
|
||||
|
||||
char dst_path[PATH_MAX];
|
||||
struct list_head src_list;
|
||||
struct list_head path_list;
|
||||
struct list_head chunk_list;
|
||||
lock chunk_lock;
|
||||
struct chunk_pool cp;
|
||||
|
||||
pthread_t tid_scan; /* tid for scan thread */
|
||||
int ret_scan; /* return code from scan thread */
|
||||
|
||||
size_t total_bytes; /* total bytes to be transferred */
|
||||
struct mscp_thread *threads;
|
||||
@@ -39,6 +46,7 @@ struct mscp {
|
||||
|
||||
struct mscp_thread {
|
||||
struct mscp *m;
|
||||
int id;
|
||||
sftp_session sftp;
|
||||
pthread_t tid;
|
||||
int cpu;
|
||||
@@ -62,8 +70,13 @@ struct src {
|
||||
* sftp_async_read returns 0.
|
||||
*/
|
||||
|
||||
#define DEFAULT_MAX_STARTUPS 8
|
||||
|
||||
#define non_null_string(s) (s[0] != '\0')
|
||||
|
||||
|
||||
|
||||
|
||||
static int expand_coremask(const char *coremask, int **cores, int *nr_cores)
|
||||
{
|
||||
int n, *core_list, core_list_len = 0, nr_usable, nr_all;
|
||||
@@ -178,6 +191,16 @@ static int validate_and_set_defaut_params(struct mscp_opts *o)
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (o->max_startups == 0)
|
||||
o->max_startups = DEFAULT_MAX_STARTUPS;
|
||||
else if (o->max_startups < 0) {
|
||||
mscp_set_error("invalid max_startups: %d", o->max_startups);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (o->msg_fd == 0)
|
||||
o->msg_fd = STDOUT_FILENO;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -198,22 +221,27 @@ struct mscp *mscp_init(const char *remote_host, int direction,
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mprint_set_severity(o->severity);
|
||||
|
||||
if (validate_and_set_defaut_params(o) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
m = malloc(sizeof(*m));
|
||||
if (!m) {
|
||||
mscp_set_error("failed to allocate memory: %s", strerrno());
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mprint_set_severity(o->severity);
|
||||
|
||||
if (validate_and_set_defaut_params(o) < 0)
|
||||
goto free_out;
|
||||
|
||||
memset(m, 0, sizeof(*m));
|
||||
INIT_LIST_HEAD(&m->src_list);
|
||||
INIT_LIST_HEAD(&m->path_list);
|
||||
INIT_LIST_HEAD(&m->chunk_list);
|
||||
lock_init(&m->chunk_lock);
|
||||
chunk_pool_init(&m->cp);
|
||||
|
||||
if ((m->sem = sem_create(o->max_startups)) == NULL) {
|
||||
mscp_set_error("sem_create: %s", strerrno());
|
||||
goto free_out;
|
||||
}
|
||||
|
||||
m->remote = strdup(remote_host);
|
||||
if (!m->remote) {
|
||||
@@ -221,15 +249,22 @@ struct mscp *mscp_init(const char *remote_host, int direction,
|
||||
goto free_out;
|
||||
}
|
||||
m->direction = direction;
|
||||
m->msg_fd = o->msg_fd;
|
||||
if (o->msg_fd > -1) {
|
||||
m->msg_fp = fdopen(o->msg_fd, "a");
|
||||
if (!m->msg_fp) {
|
||||
mscp_set_error("fdopen failed: %s", strerrno());
|
||||
goto free_out;
|
||||
}
|
||||
} else
|
||||
m->msg_fp = NULL;
|
||||
|
||||
if (strlen(o->coremask) > 0) {
|
||||
if (expand_coremask(o->coremask, &m->cores, &m->nr_cores) < 0)
|
||||
goto free_out;
|
||||
mpr_notice(m->msg_fd, "usable cpu cores:");
|
||||
mpr_notice(m->msg_fp, "usable cpu cores:");
|
||||
for (n = 0; n < m->nr_cores; n++)
|
||||
mpr_notice(m->msg_fd, " %d", m->cores[n]);
|
||||
mpr_notice(m->msg_fd, "\n");
|
||||
mpr_notice(m->msg_fp, " %d", m->cores[n]);
|
||||
mpr_notice(m->msg_fp, "\n");
|
||||
}
|
||||
|
||||
m->opts = o;
|
||||
@@ -242,11 +277,6 @@ free_out:
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void mscp_set_msg_fd(struct mscp *m, int fd)
|
||||
{
|
||||
m->msg_fd = fd;
|
||||
}
|
||||
|
||||
int mscp_connect(struct mscp *m)
|
||||
{
|
||||
m->first = ssh_init_sftp_session(m->remote, m->ssh_opts);
|
||||
@@ -293,17 +323,52 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path)
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int mscp_prepare(struct mscp *m)
|
||||
static int get_page_mask(void)
|
||||
{
|
||||
long page_sz = sysconf(_SC_PAGESIZE);
|
||||
size_t page_mask = 0;
|
||||
int n;
|
||||
|
||||
for (n = 0; page_sz > 0; page_sz >>= 1, n++) {
|
||||
page_mask <<= 1;
|
||||
page_mask |= 1;
|
||||
}
|
||||
|
||||
return page_mask >> 1;
|
||||
}
|
||||
|
||||
static void mscp_stop_copy_thread(struct mscp *m)
|
||||
{
|
||||
int n;
|
||||
for (n = 0; n < m->opts->nr_threads; n++) {
|
||||
if (m->threads[n].tid && !m->threads[n].finished)
|
||||
pthread_cancel(m->threads[n].tid);
|
||||
}
|
||||
}
|
||||
|
||||
static void mscp_stop_scan_thread(struct mscp *m)
|
||||
{
|
||||
if (m->tid_scan)
|
||||
pthread_cancel(m->tid_scan);
|
||||
}
|
||||
|
||||
void mscp_stop(struct mscp *m)
|
||||
{
|
||||
mscp_stop_scan_thread(m);
|
||||
mscp_stop_copy_thread(m);
|
||||
}
|
||||
|
||||
void *mscp_scan_thread(void *arg)
|
||||
{
|
||||
struct mscp *m = arg;
|
||||
sftp_session src_sftp = NULL, dst_sftp = NULL;
|
||||
bool src_path_is_dir, dst_path_is_dir, dst_path_should_dir;
|
||||
struct path_resolve_args a;
|
||||
struct list_head tmp;
|
||||
struct path *p;
|
||||
struct src *s;
|
||||
mstat ss, ds;
|
||||
|
||||
src_path_is_dir = dst_path_is_dir = dst_path_should_dir = false;
|
||||
m->ret_scan = 0;
|
||||
|
||||
switch (m->direction) {
|
||||
case MSCP_DIRECTION_L2R:
|
||||
@@ -316,105 +381,123 @@ int mscp_prepare(struct mscp *m)
|
||||
break;
|
||||
default:
|
||||
mscp_set_error("invalid copy direction: %d", m->direction);
|
||||
return -1;
|
||||
goto err_out;
|
||||
}
|
||||
|
||||
/* initialize path_resolve_args */
|
||||
memset(&a, 0, sizeof(a));
|
||||
a.msg_fp = m->msg_fp;
|
||||
a.total_bytes = &m->total_bytes;
|
||||
|
||||
if (list_count(&m->src_list) > 1)
|
||||
dst_path_should_dir = true;
|
||||
a.dst_path_should_dir = true;
|
||||
|
||||
if (mscp_stat(m->dst_path, &ds, dst_sftp) == 0) {
|
||||
if (mstat_is_dir(ds))
|
||||
dst_path_is_dir = true;
|
||||
a.dst_path_is_dir = true;
|
||||
mscp_stat_free(ds);
|
||||
}
|
||||
|
||||
a.cp = &m->cp;
|
||||
a.nr_conn = m->opts->nr_threads;
|
||||
a.min_chunk_sz = m->opts->min_chunk_sz;
|
||||
a.max_chunk_sz = m->opts->max_chunk_sz;
|
||||
a.chunk_align = get_page_mask();
|
||||
|
||||
mpr_info(m->msg_fp, "start to walk source path(s)\n");
|
||||
|
||||
/* walk a src_path recusively, and resolve path->dst_path for each src */
|
||||
list_for_each_entry(s, &m->src_list, list) {
|
||||
if (mscp_stat(s->path, &ss, src_sftp) < 0) {
|
||||
mscp_set_error("stat: %s", mscp_strerror(src_sftp));
|
||||
return -1;
|
||||
mscp_stat_free(ss);
|
||||
goto err_out;
|
||||
}
|
||||
src_path_is_dir = mstat_is_dir(ss);
|
||||
|
||||
/* set path specific args */
|
||||
a.src_path = s->path;
|
||||
a.dst_path = m->dst_path;
|
||||
a.src_path_is_dir = mstat_is_dir(ss);
|
||||
mscp_stat_free(ss);
|
||||
|
||||
INIT_LIST_HEAD(&tmp);
|
||||
if (walk_src_path(src_sftp, s->path, &tmp) < 0)
|
||||
return -1;
|
||||
|
||||
if (list_count(&tmp) > 1)
|
||||
dst_path_should_dir = true;
|
||||
|
||||
if (resolve_dst_path(m->msg_fd, s->path, m->dst_path, &tmp,
|
||||
src_path_is_dir, dst_path_is_dir,
|
||||
dst_path_should_dir) < 0)
|
||||
return -1;
|
||||
if (walk_src_path(src_sftp, s->path, &tmp, &a) < 0)
|
||||
goto err_out;
|
||||
|
||||
list_splice_tail(&tmp, m->path_list.prev);
|
||||
}
|
||||
|
||||
if (resolve_chunk(&m->path_list, &m->chunk_list, m->opts->nr_threads,
|
||||
m->opts->min_chunk_sz, m->opts->max_chunk_sz) < 0)
|
||||
return -1;
|
||||
chunk_pool_set_filled(&m->cp);
|
||||
|
||||
/* save total bytes to be transferred */
|
||||
m->total_bytes = 0;
|
||||
list_for_each_entry(p, &m->path_list, list) {
|
||||
m->total_bytes += p->size;
|
||||
mpr_info(m->msg_fp, "walk source path(s) done\n");
|
||||
|
||||
m->ret_scan = 0;
|
||||
return NULL;
|
||||
|
||||
err_out:
|
||||
m->ret_scan = -1;
|
||||
mscp_stop_copy_thread(m);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int mscp_scan(struct mscp *m)
|
||||
{
|
||||
int ret = pthread_create(&m->tid_scan, NULL, mscp_scan_thread, m);
|
||||
if (ret < 0) {
|
||||
mscp_set_error("pthread_create_error: %d", ret);
|
||||
m->tid_scan = 0;
|
||||
mscp_stop(m);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* need scan finished or over nr_threads chunks to determine
|
||||
* actual number of threads (and connections). If the number
|
||||
* of chunks are smaller than nr_threads, we adjust nr_threads
|
||||
* to the number of chunks.
|
||||
*/
|
||||
while (!chunk_pool_is_filled(&m->cp) &&
|
||||
chunk_pool_size(&m->cp) < m->opts->nr_threads)
|
||||
usleep(100);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mscp_stop(struct mscp *m)
|
||||
int mscp_scan_join(struct mscp *m)
|
||||
{
|
||||
int n;
|
||||
pr("stopping...\n");
|
||||
for (n = 0; n < m->opts->nr_threads; n++) {
|
||||
if (m->threads[n].tid && !m->threads[n].finished)
|
||||
pthread_cancel(m->threads[n].tid);
|
||||
}
|
||||
if (m->tid_scan) {
|
||||
pthread_join(m->tid_scan, NULL);
|
||||
m->tid_scan = 0;
|
||||
return m->ret_scan;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static void *mscp_copy_thread(void *arg);
|
||||
|
||||
int mscp_start(struct mscp *m)
|
||||
{
|
||||
int n, ret;
|
||||
|
||||
if ((n = list_count(&m->chunk_list)) < m->opts->nr_threads) {
|
||||
mpr_notice(m->msg_fd, "we have only %d chunk(s). "
|
||||
if ((n = chunk_pool_size(&m->cp)) < m->opts->nr_threads) {
|
||||
mpr_notice(m->msg_fp, "we have only %d chunk(s). "
|
||||
"set number of connections to %d\n", n, n);
|
||||
m->opts->nr_threads = n;
|
||||
}
|
||||
|
||||
/* prepare thread instances */
|
||||
/* scan thread instances */
|
||||
m->threads = calloc(m->opts->nr_threads, sizeof(struct mscp_thread));
|
||||
memset(m->threads, 0, m->opts->nr_threads * sizeof(struct mscp_thread));
|
||||
for (n = 0; n < m->opts->nr_threads; n++) {
|
||||
struct mscp_thread *t = &m->threads[n];
|
||||
t->m = m;
|
||||
t->id = n;
|
||||
if (!m->cores)
|
||||
t->cpu = -1;
|
||||
else
|
||||
t->cpu = m->cores[n % m->nr_cores];
|
||||
|
||||
if (n == 0) {
|
||||
t->sftp = m->first; /* reuse first sftp session */
|
||||
m->first = NULL;
|
||||
}
|
||||
else {
|
||||
mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n",
|
||||
m->remote);
|
||||
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
|
||||
if (!t->sftp)
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
/* spawn copy threads */
|
||||
for (n = 0; n < m->opts->nr_threads; n++) {
|
||||
struct mscp_thread *t = &m->threads[n];
|
||||
ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
|
||||
if (ret < 0) {
|
||||
mscp_set_error("pthread_create error: %d", ret);
|
||||
@@ -430,7 +513,10 @@ int mscp_join(struct mscp *m)
|
||||
{
|
||||
int n, ret = 0;
|
||||
|
||||
/* waiting for threads join... */
|
||||
/* waiting for scan thread joins... */
|
||||
ret = mscp_scan_join(m);
|
||||
|
||||
/* waiting for copy threads join... */
|
||||
for (n = 0; n < m->opts->nr_threads; n++) {
|
||||
if (m->threads[n].tid) {
|
||||
pthread_join(m->threads[n].tid, NULL);
|
||||
@@ -462,20 +548,6 @@ int mscp_join(struct mscp *m)
|
||||
|
||||
/* copy thread related functions */
|
||||
|
||||
struct chunk *acquire_chunk(struct list_head *chunk_list)
|
||||
{
|
||||
/* under the lock for chunk_list */
|
||||
struct list_head *first = chunk_list->next;
|
||||
struct chunk *c = NULL;
|
||||
|
||||
if (list_empty(chunk_list))
|
||||
return NULL; /* list is empty */
|
||||
|
||||
c = list_entry(first, struct chunk, list);
|
||||
list_del(first);
|
||||
return c;
|
||||
}
|
||||
|
||||
static void mscp_copy_thread_cleanup(void *arg)
|
||||
{
|
||||
struct mscp_thread *t = arg;
|
||||
@@ -489,6 +561,34 @@ void *mscp_copy_thread(void *arg)
|
||||
struct mscp *m = t->m;
|
||||
struct chunk *c;
|
||||
|
||||
if (t->cpu > -1) {
|
||||
if (set_thread_affinity(pthread_self(), t->cpu) < 0) {
|
||||
t->ret = -1;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (sem_wait(m->sem) < 0) {
|
||||
mscp_set_error("sem_wait: %s\n", strerrno());
|
||||
mpr_err(m->msg_fp, "%s", mscp_get_error());
|
||||
goto err_out;
|
||||
}
|
||||
|
||||
mpr_notice(m->msg_fp, "connecting to %s for a copy thread[%d]...\n",
|
||||
m->remote, t->id);
|
||||
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
|
||||
|
||||
if (sem_post(m->sem) < 0) {
|
||||
mscp_set_error("sem_post: %s\n", strerrno());
|
||||
mpr_err(m->msg_fp, "%s", mscp_get_error());
|
||||
goto err_out;
|
||||
}
|
||||
|
||||
if (!t->sftp) {
|
||||
mpr_err(m->msg_fp, "copy thread[%d]: %s\n", t->id, mscp_get_error());
|
||||
goto err_out;
|
||||
}
|
||||
|
||||
switch (m->direction) {
|
||||
case MSCP_DIRECTION_L2R:
|
||||
src_sftp = NULL;
|
||||
@@ -502,24 +602,21 @@ void *mscp_copy_thread(void *arg)
|
||||
return NULL; /* not reached */
|
||||
}
|
||||
|
||||
if (t->cpu > -1) {
|
||||
if (set_thread_affinity(pthread_self(), t->cpu) < 0)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
|
||||
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
|
||||
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
|
||||
|
||||
while (1) {
|
||||
LOCK_ACQUIRE_THREAD(&m->chunk_lock);
|
||||
c = acquire_chunk(&m->chunk_list);
|
||||
LOCK_RELEASE_THREAD();
|
||||
c = chunk_pool_pop(&m->cp);
|
||||
if (c == CHUNK_POP_WAIT) {
|
||||
usleep(100); /* XXX: hard code */
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!c)
|
||||
break; /* no more chunks */
|
||||
|
||||
if ((t->ret = copy_chunk(m->msg_fd,
|
||||
if ((t->ret = copy_chunk(m->msg_fp,
|
||||
c, src_sftp, dst_sftp, m->opts->nr_ahead,
|
||||
m->opts->buf_sz, &t->done)) < 0)
|
||||
break;
|
||||
@@ -532,21 +629,15 @@ void *mscp_copy_thread(void *arg)
|
||||
c->p->path, c->off, c->off + c->len);
|
||||
|
||||
return NULL;
|
||||
|
||||
err_out:
|
||||
t->ret = -1;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/* cleanup related functions */
|
||||
|
||||
static void release_list(struct list_head *head, void (*f)(struct list_head*))
|
||||
{
|
||||
struct list_head *p, *n;
|
||||
|
||||
list_for_each_safe(p, n, head) {
|
||||
list_del(p);
|
||||
f(p);
|
||||
}
|
||||
}
|
||||
|
||||
static void free_src(struct list_head *list)
|
||||
{
|
||||
struct src *s;
|
||||
@@ -576,15 +667,15 @@ void mscp_cleanup(struct mscp *m)
|
||||
m->first = NULL;
|
||||
}
|
||||
|
||||
release_list(&m->src_list, free_src);
|
||||
list_free_f(&m->src_list, free_src);
|
||||
INIT_LIST_HEAD(&m->src_list);
|
||||
|
||||
release_list(&m->chunk_list, free_chunk);
|
||||
INIT_LIST_HEAD(&m->chunk_list);
|
||||
|
||||
release_list(&m->path_list, free_path);
|
||||
list_free_f(&m->path_list, free_path);
|
||||
INIT_LIST_HEAD(&m->path_list);
|
||||
|
||||
chunk_pool_release(&m->cp);
|
||||
chunk_pool_init(&m->cp);
|
||||
|
||||
if (m->threads) {
|
||||
free(m->threads);
|
||||
m->threads = NULL;
|
||||
@@ -598,6 +689,8 @@ void mscp_free(struct mscp *m)
|
||||
free(m->remote);
|
||||
if (m->cores)
|
||||
free(m->cores);
|
||||
|
||||
sem_release(m->sem);
|
||||
free(m);
|
||||
}
|
||||
|
||||
|
||||
381
src/path.c
381
src/path.c
@@ -12,8 +12,186 @@
|
||||
#include <path.h>
|
||||
#include <message.h>
|
||||
|
||||
|
||||
/* chunk pool operations */
|
||||
#define CHUNK_POOL_STATE_FILLING 0
|
||||
#define CHUNK_POOL_STATE_FILLED 1
|
||||
|
||||
void chunk_pool_init(struct chunk_pool *cp)
|
||||
{
|
||||
memset(cp, 0, sizeof(*cp));
|
||||
INIT_LIST_HEAD(&cp->list);
|
||||
lock_init(&cp->lock);
|
||||
cp->state = CHUNK_POOL_STATE_FILLING;
|
||||
}
|
||||
|
||||
static void chunk_pool_add(struct chunk_pool *cp, struct chunk *c)
|
||||
{
|
||||
LOCK_ACQUIRE_THREAD(&cp->lock);
|
||||
list_add_tail(&c->list, &cp->list);
|
||||
cp->count += 1;
|
||||
LOCK_RELEASE_THREAD();
|
||||
}
|
||||
|
||||
void chunk_pool_set_filled(struct chunk_pool *cp)
|
||||
{
|
||||
cp->state = CHUNK_POOL_STATE_FILLED;
|
||||
}
|
||||
|
||||
bool chunk_pool_is_filled(struct chunk_pool *cp)
|
||||
{
|
||||
return (cp->state == CHUNK_POOL_STATE_FILLED);
|
||||
}
|
||||
|
||||
size_t chunk_pool_size(struct chunk_pool *cp)
|
||||
{
|
||||
return cp->count;
|
||||
}
|
||||
|
||||
|
||||
struct chunk *chunk_pool_pop(struct chunk_pool *cp)
|
||||
{
|
||||
struct list_head *first;
|
||||
struct chunk *c = NULL;
|
||||
|
||||
LOCK_ACQUIRE_THREAD(&cp->lock);
|
||||
first = cp->list.next;
|
||||
if (list_empty(&cp->list)) {
|
||||
if (!chunk_pool_is_filled(cp))
|
||||
c = CHUNK_POP_WAIT;
|
||||
else
|
||||
c = NULL; /* no more chunks */
|
||||
} else {
|
||||
c = list_entry(first, struct chunk, list);
|
||||
list_del(first);
|
||||
}
|
||||
LOCK_RELEASE_THREAD();
|
||||
|
||||
/* return CHUNK_POP_WAIT would be very rare case, because it
|
||||
* means copying over SSH is faster than traversing
|
||||
* local/remote file paths.
|
||||
*/
|
||||
|
||||
return c;
|
||||
}
|
||||
|
||||
static void chunk_free(struct list_head *list)
|
||||
{
|
||||
struct chunk *c;
|
||||
c = list_entry(list, typeof(*c), list);
|
||||
free(c);
|
||||
}
|
||||
|
||||
void chunk_pool_release(struct chunk_pool *cp)
|
||||
{
|
||||
list_free_f(&cp->list, chunk_free);
|
||||
}
|
||||
|
||||
/* paths of copy source resoltion */
|
||||
static int resolve_dst_path(const char *src_file_path, char *dst_file_path,
|
||||
struct path_resolve_args *a)
|
||||
{
|
||||
char copy[PATH_MAX];
|
||||
char *prefix;
|
||||
int offset;
|
||||
|
||||
strncpy(copy, a->src_path, PATH_MAX - 1);
|
||||
prefix = dirname(copy);
|
||||
if (!prefix) {
|
||||
mscp_set_error("dirname: %s", strerrno());
|
||||
return -1;
|
||||
}
|
||||
if (strlen(prefix) == 1 && prefix[0] == '.')
|
||||
offset = 0;
|
||||
else
|
||||
offset = strlen(prefix) + 1;
|
||||
|
||||
if (!a->src_path_is_dir && !a->dst_path_is_dir) {
|
||||
/* src path is file. dst path is (1) file, or (2) does not exist.
|
||||
* In the second case, we need to put src under the dst.
|
||||
*/
|
||||
if (a->dst_path_should_dir)
|
||||
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
|
||||
a->dst_path, a->src_path + offset);
|
||||
else
|
||||
strncpy(dst_file_path, a->dst_path, PATH_MAX - 1);
|
||||
}
|
||||
|
||||
/* src is file, and dst is dir */
|
||||
if (!a->src_path_is_dir && a->dst_path_is_dir)
|
||||
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
|
||||
a->dst_path, a->src_path + offset);
|
||||
|
||||
/* both are directory */
|
||||
if (a->src_path_is_dir && a->dst_path_is_dir)
|
||||
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
|
||||
a->dst_path, src_file_path + offset);
|
||||
|
||||
/* dst path does not exist. change dir name to dst_path */
|
||||
if (a->src_path_is_dir && !a->dst_path_is_dir)
|
||||
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
|
||||
a->dst_path, src_file_path + strlen(a->src_path) + 1);
|
||||
|
||||
mpr_debug(a->msg_fp, "file: %s -> %s\n", src_file_path, dst_file_path);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* chunk preparation */
|
||||
static struct chunk *alloc_chunk(struct path *p)
|
||||
{
|
||||
struct chunk *c;
|
||||
|
||||
if (!(c = malloc(sizeof(*c)))) {
|
||||
mscp_set_error("malloc %s", strerrno());
|
||||
return NULL;
|
||||
}
|
||||
memset(c, 0, sizeof(*c));
|
||||
|
||||
c->p = p;
|
||||
c->off = 0;
|
||||
c->len = 0;
|
||||
refcnt_inc(&p->refcnt);
|
||||
return c;
|
||||
}
|
||||
|
||||
static int resolve_chunk(struct path *p, struct path_resolve_args *a)
|
||||
{
|
||||
struct chunk *c;
|
||||
size_t chunk_sz;
|
||||
size_t size;
|
||||
|
||||
if (p->size <= a->min_chunk_sz)
|
||||
chunk_sz = p->size;
|
||||
else if (a->max_chunk_sz)
|
||||
chunk_sz = a->max_chunk_sz;
|
||||
else {
|
||||
chunk_sz = (p->size - (p->size % a->nr_conn)) / a->nr_conn;
|
||||
chunk_sz &= ~a->chunk_align; /* align with page_sz */
|
||||
if (chunk_sz <= a->min_chunk_sz)
|
||||
chunk_sz = a->min_chunk_sz;
|
||||
}
|
||||
|
||||
/* for (size = f->size; size > 0;) does not create a file
|
||||
* (chunk) when file size is 0. This do {} while (size > 0)
|
||||
* creates just open/close a 0-byte file.
|
||||
*/
|
||||
size = p->size;
|
||||
do {
|
||||
c = alloc_chunk(p);
|
||||
if (!c)
|
||||
return -1;
|
||||
c->off = p->size - size;
|
||||
c->len = size < chunk_sz ? size : chunk_sz;
|
||||
size -= c->len;
|
||||
chunk_pool_add(a->cp, c);
|
||||
} while (size > 0);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int append_path(sftp_session sftp, const char *path, mstat s,
|
||||
struct list_head *path_list)
|
||||
struct list_head *path_list, struct path_resolve_args *a)
|
||||
{
|
||||
struct path *p;
|
||||
|
||||
@@ -29,9 +207,22 @@ static int append_path(sftp_session sftp, const char *path, mstat s,
|
||||
p->mode = mstat_mode(s);
|
||||
p->state = FILE_STATE_INIT;
|
||||
lock_init(&p->lock);
|
||||
|
||||
if (resolve_dst_path(p->path, p->dst_path, a) < 0)
|
||||
goto free_out;
|
||||
|
||||
if (resolve_chunk(p, a) < 0)
|
||||
return -1; /* XXX: do not free path becuase chunk(s)
|
||||
* was added to chunk pool already */
|
||||
|
||||
list_add_tail(&p->list, path_list);
|
||||
*a->total_bytes += p->size;
|
||||
|
||||
return 0;
|
||||
|
||||
free_out:
|
||||
free(p);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static bool check_path_should_skip(const char *path)
|
||||
@@ -45,7 +236,7 @@ static bool check_path_should_skip(const char *path)
|
||||
}
|
||||
|
||||
static int walk_path_recursive(sftp_session sftp, const char *path,
|
||||
struct list_head *path_list)
|
||||
struct list_head *path_list, struct path_resolve_args *a)
|
||||
{
|
||||
char next_path[PATH_MAX];
|
||||
mdirent *e;
|
||||
@@ -58,7 +249,7 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
|
||||
|
||||
if (mstat_is_regular(s)) {
|
||||
/* this path is regular file. it is to be copied */
|
||||
ret = append_path(sftp, path, s, path_list);
|
||||
ret = append_path(sftp, path, s, path_list, a);
|
||||
mscp_stat_free(s);
|
||||
return ret;
|
||||
}
|
||||
@@ -77,15 +268,19 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
|
||||
return -1;
|
||||
|
||||
for (e = mscp_readdir(d); !mdirent_is_null(e); e = mscp_readdir(d)) {
|
||||
if (check_path_should_skip(mdirent_name(e)))
|
||||
if (check_path_should_skip(mdirent_name(e))) {
|
||||
mscp_dirent_free(e);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (strlen(path) + 1 + strlen(mdirent_name(e)) > PATH_MAX) {
|
||||
mscp_set_error("too long path: %s/%s", path, mdirent_name(e));
|
||||
mscp_dirent_free(e);
|
||||
return -1;
|
||||
}
|
||||
snprintf(next_path, sizeof(next_path), "%s/%s", path, mdirent_name(e));
|
||||
ret = walk_path_recursive(sftp, next_path, path_list);
|
||||
ret = walk_path_recursive(sftp, next_path, path_list, a);
|
||||
mscp_dirent_free(e);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
}
|
||||
@@ -96,75 +291,9 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
|
||||
}
|
||||
|
||||
int walk_src_path(sftp_session src_sftp, const char *src_path,
|
||||
struct list_head *path_list)
|
||||
struct list_head *path_list, struct path_resolve_args *a)
|
||||
{
|
||||
return walk_path_recursive(src_sftp, src_path, path_list);
|
||||
}
|
||||
|
||||
static int src2dst_path(int msg_fd, const char *src_path, const char *src_file_path,
|
||||
const char *dst_path, char *dst_file_path, size_t len,
|
||||
bool src_path_is_dir, bool dst_path_is_dir,
|
||||
bool dst_path_should_dir)
|
||||
{
|
||||
char copy[PATH_MAX];
|
||||
char *prefix;
|
||||
int offset;
|
||||
|
||||
strncpy(copy, src_path, PATH_MAX - 1);
|
||||
prefix = dirname(copy);
|
||||
if (!prefix) {
|
||||
mscp_set_error("dirname: %s", strerrno());
|
||||
return -1;
|
||||
}
|
||||
if (strlen(prefix) == 1 && prefix[0] == '.')
|
||||
offset = 0;
|
||||
else
|
||||
offset = strlen(prefix) + 1;
|
||||
|
||||
if (!src_path_is_dir && !dst_path_is_dir) {
|
||||
/* src path is file. dst path is (1) file, or (2) does not exist.
|
||||
* In the second case, we need to put src under the dst.
|
||||
*/
|
||||
if (dst_path_should_dir)
|
||||
snprintf(dst_file_path, len, "%s/%s",
|
||||
dst_path, src_path + offset);
|
||||
else
|
||||
strncpy(dst_file_path, dst_path, len);
|
||||
}
|
||||
|
||||
/* src is file, and dst is dir */
|
||||
if (!src_path_is_dir && dst_path_is_dir)
|
||||
snprintf(dst_file_path, len, "%s/%s", dst_path, src_path + offset);
|
||||
|
||||
/* both are directory */
|
||||
if (src_path_is_dir && dst_path_is_dir)
|
||||
snprintf(dst_file_path, len, "%s/%s", dst_path, src_file_path + offset);
|
||||
|
||||
/* dst path does not exist. change dir name to dst_path */
|
||||
if (src_path_is_dir && !dst_path_is_dir)
|
||||
snprintf(dst_file_path, len, "%s/%s",
|
||||
dst_path, src_file_path + strlen(src_path) + 1);
|
||||
|
||||
mpr_info(msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path,
|
||||
struct list_head *path_list, bool src_path_is_dir,
|
||||
bool dst_path_is_dir, bool dst_path_should_dir)
|
||||
{
|
||||
struct path *p;
|
||||
|
||||
list_for_each_entry(p, path_list, list) {
|
||||
if (src2dst_path(msg_fd, src_path, p->path,
|
||||
dst_path, p->dst_path, PATH_MAX,
|
||||
src_path_is_dir, dst_path_is_dir,
|
||||
dst_path_should_dir) < 0)
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
return walk_path_recursive(src_sftp, src_path, path_list, a);
|
||||
}
|
||||
|
||||
void path_dump(struct list_head *path_list)
|
||||
@@ -177,90 +306,6 @@ void path_dump(struct list_head *path_list)
|
||||
}
|
||||
}
|
||||
|
||||
/* chunk preparation */
|
||||
|
||||
static struct chunk *alloc_chunk(struct path *p)
|
||||
{
|
||||
struct chunk *c;
|
||||
|
||||
if (!(c = malloc(sizeof(*c)))) {
|
||||
mscp_set_error("malloc %s", strerrno());
|
||||
return NULL;
|
||||
}
|
||||
memset(c, 0, sizeof(*c));
|
||||
|
||||
c->p = p;
|
||||
c->off = 0;
|
||||
c->len = 0;
|
||||
refcnt_inc(&p->refcnt);
|
||||
return c;
|
||||
}
|
||||
|
||||
static int get_page_mask(void)
|
||||
{
|
||||
long page_sz = sysconf(_SC_PAGESIZE);
|
||||
size_t page_mask = 0;
|
||||
int n;
|
||||
|
||||
for (n = 0; page_sz > 0; page_sz >>= 1, n++) {
|
||||
page_mask <<= 1;
|
||||
page_mask |= 1;
|
||||
}
|
||||
|
||||
return page_mask >> 1;
|
||||
}
|
||||
|
||||
int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list,
|
||||
int nr_conn, int min_chunk_sz, int max_chunk_sz)
|
||||
{
|
||||
struct chunk *c;
|
||||
struct path *p;
|
||||
size_t page_mask;
|
||||
size_t chunk_sz;
|
||||
size_t size;
|
||||
|
||||
page_mask = get_page_mask();
|
||||
|
||||
list_for_each_entry(p, path_list, list) {
|
||||
if (p->size <= min_chunk_sz)
|
||||
chunk_sz = p->size;
|
||||
else if (max_chunk_sz)
|
||||
chunk_sz = max_chunk_sz;
|
||||
else {
|
||||
chunk_sz = (p->size - (p->size % nr_conn)) / nr_conn;
|
||||
chunk_sz &= ~page_mask; /* align with page_sz */
|
||||
if (chunk_sz <= min_chunk_sz)
|
||||
chunk_sz = min_chunk_sz;
|
||||
}
|
||||
|
||||
/* for (size = f->size; size > 0;) does not create a
|
||||
* file (chunk) when file size is 0. This do {} while
|
||||
* (size > 0) creates just open/close a 0-byte file.
|
||||
*/
|
||||
size = p->size;
|
||||
do {
|
||||
c = alloc_chunk(p);
|
||||
if (!c)
|
||||
return -1;
|
||||
c->off = p->size - size;
|
||||
c->len = size < chunk_sz ? size : chunk_sz;
|
||||
size -= c->len;
|
||||
list_add_tail(&c->list, chunk_list);
|
||||
} while (size > 0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void chunk_dump(struct list_head *chunk_list)
|
||||
{
|
||||
struct chunk *c;
|
||||
|
||||
list_for_each_entry(c, chunk_list, list) {
|
||||
printf("chunk: %s 0x%lx-%lx bytes\n",
|
||||
c->p->path, c->off, c->off + c->len);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* based on
|
||||
@@ -283,10 +328,13 @@ static int touch_dst_path(struct path *p, sftp_session sftp)
|
||||
|
||||
mstat s;
|
||||
if (mscp_stat(path, &s, sftp) == 0) {
|
||||
if (mstat_is_dir(s))
|
||||
if (mstat_is_dir(s)) {
|
||||
mscp_stat_free(s);
|
||||
goto next; /* directory exists. go deeper */
|
||||
else
|
||||
} else {
|
||||
mscp_stat_free(s);
|
||||
return -1; /* path exists, but not directory. */
|
||||
}
|
||||
}
|
||||
|
||||
if (mscp_stat_check_err_noent(sftp) == 0) {
|
||||
@@ -311,7 +359,7 @@ static int touch_dst_path(struct path *p, sftp_session sftp)
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int prepare_dst_path(int msg_fd, struct path *p, sftp_session dst_sftp)
|
||||
static int prepare_dst_path(FILE *msg_fp, struct path *p, sftp_session dst_sftp)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
@@ -322,7 +370,7 @@ static int prepare_dst_path(int msg_fd, struct path *p, sftp_session dst_sftp)
|
||||
goto out;
|
||||
}
|
||||
p->state = FILE_STATE_OPENED;
|
||||
mpr_info(msg_fd, "copy start: %s\n", p->path);
|
||||
mpr_info(msg_fp, "copy start: %s\n", p->path);
|
||||
}
|
||||
|
||||
out:
|
||||
@@ -482,7 +530,8 @@ static int _copy_chunk(struct chunk *c, mfh s, mfh d,
|
||||
return -1;
|
||||
}
|
||||
|
||||
int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
|
||||
int copy_chunk(FILE *msg_fp, struct chunk *c,
|
||||
sftp_session src_sftp, sftp_session dst_sftp,
|
||||
int nr_ahead, int buf_sz, size_t *counter)
|
||||
{
|
||||
mode_t mode;
|
||||
@@ -492,7 +541,7 @@ int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session
|
||||
|
||||
assert((src_sftp && !dst_sftp) || (!src_sftp && dst_sftp));
|
||||
|
||||
if (prepare_dst_path(msg_fd, c->p, dst_sftp) < 0)
|
||||
if (prepare_dst_path(msg_fp, c->p, dst_sftp) < 0)
|
||||
return -1;
|
||||
|
||||
/* open src */
|
||||
@@ -511,11 +560,11 @@ int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session
|
||||
if (mscp_open_is_failed(d))
|
||||
return -1;
|
||||
|
||||
mpr_debug(msg_fd, "copy chunk start: %s 0x%lx-0x%lx\n",
|
||||
mpr_debug(msg_fp, "copy chunk start: %s 0x%lx-0x%lx\n",
|
||||
c->p->path, c->off, c->off + c->len);
|
||||
ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, counter);
|
||||
|
||||
mpr_debug(msg_fd, "copy chunk done: %s 0x%lx-0x%lx\n",
|
||||
mpr_debug(msg_fp, "copy chunk done: %s 0x%lx-0x%lx\n",
|
||||
c->p->path, c->off, c->off + c->len);
|
||||
|
||||
|
||||
@@ -527,7 +576,7 @@ int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session
|
||||
if (refcnt_dec(&c->p->refcnt) == 0) {
|
||||
c->p->state = FILE_STATE_DONE;
|
||||
mscp_chmod(c->p->dst_path, c->p->mode, dst_sftp);
|
||||
mpr_info(msg_fd, "copy done: %s\n", c->p->path);
|
||||
mpr_info(msg_fp, "copy done: %s\n", c->p->path);
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
||||
74
src/path.h
74
src/path.h
@@ -29,7 +29,7 @@ struct path {
|
||||
#define FILE_STATE_DONE 2
|
||||
|
||||
struct chunk {
|
||||
struct list_head list; /* mscp->chunk_list */
|
||||
struct list_head list; /* chunk_pool->list */
|
||||
|
||||
struct path *p;
|
||||
size_t off; /* offset of this chunk on the file on path p */
|
||||
@@ -37,30 +37,66 @@ struct chunk {
|
||||
size_t done; /* copied bytes for this chunk by a thread */
|
||||
};
|
||||
|
||||
struct chunk_pool {
|
||||
struct list_head list; /* list of struct chunk */
|
||||
size_t count;
|
||||
lock lock;
|
||||
int state;
|
||||
};
|
||||
|
||||
|
||||
/* initialize chunk pool */
|
||||
void chunk_pool_init(struct chunk_pool *cp);
|
||||
|
||||
/* acquire a chunk from pool. return value is NULL indicates no more
|
||||
* chunk, GET_CHUNK_WAIT means caller should waits until a chunk is
|
||||
* added, or pointer to chunk.
|
||||
*/
|
||||
struct chunk *chunk_pool_pop(struct chunk_pool *cp);
|
||||
#define CHUNK_POP_WAIT ((void *) -1)
|
||||
|
||||
/* set and check fillingchunks to this pool has finished */
|
||||
void chunk_pool_set_filled(struct chunk_pool *cp);
|
||||
bool chunk_pool_is_filled(struct chunk_pool *cp);
|
||||
|
||||
/* return number of chunks in the pool */
|
||||
size_t chunk_pool_size(struct chunk_pool *cp);
|
||||
|
||||
/* free chunks in the chunk_pool */
|
||||
void chunk_pool_release(struct chunk_pool *cp);
|
||||
|
||||
|
||||
|
||||
struct path_resolve_args {
|
||||
FILE *msg_fp;
|
||||
size_t *total_bytes;
|
||||
|
||||
/* args to resolve src path to dst path */
|
||||
const char *src_path;
|
||||
const char *dst_path;
|
||||
bool src_path_is_dir;
|
||||
bool dst_path_is_dir;
|
||||
bool dst_path_should_dir;
|
||||
|
||||
/* args to resolve chunks for a path */
|
||||
struct chunk_pool *cp;
|
||||
int nr_conn;
|
||||
size_t min_chunk_sz;
|
||||
size_t max_chunk_sz;
|
||||
size_t chunk_align;
|
||||
};
|
||||
|
||||
/* recursivly walk through src_path and fill path_list for each file */
|
||||
int walk_src_path(sftp_session src_sftp, const char *src_path,
|
||||
struct list_head *path_list);
|
||||
|
||||
/* fill path->dst_path for all files */
|
||||
int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path,
|
||||
struct list_head *path_list,
|
||||
bool src_path_is_dir, bool dst_path_is_dir,
|
||||
bool dst_path_should_dir);
|
||||
|
||||
/* resolve chunks from files in the path_list */
|
||||
int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list,
|
||||
int nr_conn, int min_chunk_sz, int max_chunk_sz);
|
||||
struct list_head *path_list, struct path_resolve_args *a);
|
||||
|
||||
/* copy a chunk. either src_sftp or dst_sftp is not null, and another is null */
|
||||
int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
|
||||
int copy_chunk(FILE *msg_fp, struct chunk *c,
|
||||
sftp_session src_sftp, sftp_session dst_sftp,
|
||||
int nr_ahead, int buf_sz, size_t *counter);
|
||||
|
||||
/* just print contents. just for debugging */
|
||||
void path_dump(struct list_head *path_list);
|
||||
void chunk_dump(struct list_head *chunk_list);
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -136,6 +172,14 @@ static mdirent *mscp_readdir(mdir *d)
|
||||
return &e;
|
||||
}
|
||||
|
||||
static void mscp_dirent_free(mdirent *e)
|
||||
{
|
||||
if (e->r) {
|
||||
sftp_attributes_free(e->r);
|
||||
e->r = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/* wrap retriving error */
|
||||
static const char *mscp_strerror(sftp_session sftp)
|
||||
{
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
#ifdef __APPLE__
|
||||
#include <stdlib.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/sysctl.h>
|
||||
#elif linux
|
||||
#define _GNU_SOURCE
|
||||
#include <sched.h>
|
||||
#include <stdlib.h>
|
||||
#else
|
||||
#error unsupported platform
|
||||
#endif
|
||||
@@ -12,6 +14,7 @@
|
||||
#include <platform.h>
|
||||
#include <message.h>
|
||||
|
||||
|
||||
#ifdef __APPLE__
|
||||
int nr_cpus()
|
||||
{
|
||||
@@ -32,6 +35,38 @@ int set_thread_affinity(pthread_t tid, int core)
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static void random_string(char *buf, size_t size)
|
||||
{
|
||||
char chars[] = "abcdefhijklmnopqrstuvwxyz1234567890";
|
||||
int n, x;
|
||||
|
||||
for (n = 0; n < size - 1; n++) {
|
||||
x = arc4random() % (sizeof(chars) - 1);
|
||||
buf[n] = chars[x];
|
||||
}
|
||||
buf[size - 1] = '\0';
|
||||
}
|
||||
|
||||
sem_t *sem_create(int value)
|
||||
{
|
||||
char sem_name[30] = "mscp-";
|
||||
sem_t *sem;
|
||||
int n;
|
||||
|
||||
n = strlen(sem_name);
|
||||
random_string(sem_name + n, sizeof(sem_name) - n - 1);
|
||||
if ((sem = sem_open(sem_name, O_CREAT, 600, value)) == SEM_FAILED)
|
||||
return NULL;
|
||||
|
||||
return sem;
|
||||
}
|
||||
|
||||
int sem_release(sem_t *sem)
|
||||
{
|
||||
return sem_close(sem);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
#ifdef linux
|
||||
@@ -56,5 +91,27 @@ int set_thread_affinity(pthread_t tid, int core)
|
||||
core, strerrno());
|
||||
return ret;
|
||||
}
|
||||
|
||||
sem_t *sem_create(int value)
|
||||
{
|
||||
sem_t *sem;
|
||||
|
||||
if ((sem = malloc(sizeof(*sem))) == NULL)
|
||||
return NULL;
|
||||
|
||||
if (sem_init(sem, 0, value) < 0) {
|
||||
free(sem);
|
||||
return sem;
|
||||
}
|
||||
|
||||
return sem;
|
||||
}
|
||||
|
||||
int sem_release(sem_t *sem)
|
||||
{
|
||||
free(sem);
|
||||
return 0;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
@@ -2,8 +2,20 @@
|
||||
#define _PLATFORM_H_
|
||||
|
||||
#include <pthread.h>
|
||||
#include <semaphore.h>
|
||||
|
||||
int nr_cpus();
|
||||
int nr_cpus(void);
|
||||
int set_thread_affinity(pthread_t tid, int core);
|
||||
|
||||
/*
|
||||
* macOS does not support sem_init(). macOS (seems to) releases the
|
||||
* named semaphore when associated mscp process finished. In linux,
|
||||
* program (seems to) need to release named semaphore in /dev/shm by
|
||||
* sem_unlink() explicitly. So, using sem_init() (unnamed semaphore)
|
||||
* in linux and using sem_open() (named semaphore) in macOS without
|
||||
* sem_unlink() are reasonable (?).
|
||||
*/
|
||||
sem_t *sem_create(int value);
|
||||
int sem_release(sem_t *sem);
|
||||
|
||||
#endif /* _PLATFORM_H_ */
|
||||
|
||||
18
src/pymscp.c
18
src/pymscp.c
@@ -74,7 +74,7 @@ static int release_instance(struct instance *i)
|
||||
|
||||
/* wrapper functions */
|
||||
|
||||
static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
|
||||
static PyObject *wrap_mscp_init(PyObject *self, PyObject *args, PyObject *kw)
|
||||
{
|
||||
/*
|
||||
* Initialize struct mscp with options. wrap_mscp_init
|
||||
@@ -89,10 +89,14 @@ static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
|
||||
/* mscp_opts */
|
||||
"nr_threads", /* int */
|
||||
"nr_ahead", /* int */
|
||||
|
||||
"min_chunk_sz", /* unsigned long */
|
||||
"max_chunk_sz", /* unsigned long */
|
||||
"buf_sz", /* unsigned long */
|
||||
|
||||
"coremask", /* const char * */
|
||||
|
||||
"max_startups", /* int */
|
||||
"severity", /* int, MSCP_SERVERITY_* */
|
||||
"msg_fd", /* int */
|
||||
|
||||
@@ -100,17 +104,19 @@ static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
|
||||
"login_name", /* const char * */
|
||||
"port", /* const char * */
|
||||
"identity", /* const char * */
|
||||
|
||||
"cipher", /* const char * */
|
||||
"hmac", /* const char * */
|
||||
"compress", /* const char * */
|
||||
"password", /* const char * */
|
||||
"passphrase", /* const char * */
|
||||
|
||||
"debug_level", /* int */
|
||||
"no_hostkey_check", /* bool */
|
||||
"enable_nagle", /* bool */
|
||||
NULL,
|
||||
};
|
||||
const char *fmt = "si" "|iikkksii" "ssssssssipp";
|
||||
const char *fmt = "si" "|" "ii" "kkk" "s" "iii" "sss" "sssss" "ipp";
|
||||
char *coremask = NULL;
|
||||
char *login_name = NULL, *port = NULL, *identity = NULL;
|
||||
char *cipher = NULL, *hmac = NULL, *compress = NULL;
|
||||
@@ -137,6 +143,7 @@ static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
|
||||
&i->mo.max_chunk_sz,
|
||||
&i->mo.buf_sz,
|
||||
&coremask,
|
||||
&i->mo.max_startups,
|
||||
&i->mo.severity,
|
||||
&i->mo.msg_fd,
|
||||
&login_name,
|
||||
@@ -175,6 +182,7 @@ static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
|
||||
|
||||
i->m = mscp_init(remote, direction, &i->mo, &i->so);
|
||||
if (!i->m) {
|
||||
PyErr_Format(PyExc_RuntimeError, "%s", mscp_get_error());
|
||||
free(i);
|
||||
return NULL;
|
||||
}
|
||||
@@ -260,7 +268,7 @@ static PyObject *wrap_mscp_set_dst_path(PyObject *self, PyObject *args, PyObject
|
||||
return Py_BuildValue("");
|
||||
}
|
||||
|
||||
static PyObject *wrap_mscp_prepare(PyObject *self, PyObject *args, PyObject *kw)
|
||||
static PyObject *wrap_mscp_scan(PyObject *self, PyObject *args, PyObject *kw)
|
||||
{
|
||||
char *keywords[] = { "m", NULL };
|
||||
unsigned long long addr;
|
||||
@@ -275,7 +283,7 @@ static PyObject *wrap_mscp_prepare(PyObject *self, PyObject *args, PyObject *kw)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (mscp_prepare(m) < 0) {
|
||||
if (mscp_scan(m) < 0) {
|
||||
PyErr_Format(PyExc_RuntimeError, mscp_get_error());
|
||||
return NULL;
|
||||
}
|
||||
@@ -429,7 +437,7 @@ static PyMethodDef pymscpMethods[] = {
|
||||
METH_VARARGS | METH_KEYWORDS, NULL
|
||||
},
|
||||
{
|
||||
"mscp_prepare", (PyCFunction)wrap_mscp_prepare,
|
||||
"mscp_scan", (PyCFunction)wrap_mscp_scan,
|
||||
METH_VARARGS | METH_KEYWORDS, NULL
|
||||
},
|
||||
{
|
||||
|
||||
@@ -31,8 +31,6 @@
|
||||
#define pr_debug(fmt, ...)
|
||||
#endif
|
||||
|
||||
#define strerrno() strerror(errno)
|
||||
|
||||
|
||||
#define min(a, b) (((a) > (b)) ? (b) : (a))
|
||||
#define max(a, b) (((a) > (b)) ? (a) : (b))
|
||||
|
||||
@@ -71,6 +71,7 @@ param_kwargs = [
|
||||
{ "min_chunk_sz": 1 * 1024 * 1024 },
|
||||
{ "max_chunk_sz": 64 * 1024 * 1024 },
|
||||
{ "coremask": "0x0f" },
|
||||
{ "max_startups": 5 },
|
||||
{ "severity": mscp.SEVERITY_NONE },
|
||||
{ "cipher": "aes128-gcm@openssh.com" },
|
||||
{ "compress": "yes" },
|
||||
@@ -102,3 +103,22 @@ def test_login_failed():
|
||||
m = mscp.mscp(remote, mscp.LOCAL2REMOTE, port = "65534")
|
||||
with pytest.raises(RuntimeError) as e:
|
||||
m.connect()
|
||||
|
||||
|
||||
param_invalid_kwargs = [
|
||||
{ "nr_threads": -1 },
|
||||
{ "nr_ahead": -1 },
|
||||
{ "min_chunk_sz": 1 },
|
||||
{ "max_chunk_sz": 1 },
|
||||
{ "coremask": "xxxxx" },
|
||||
{ "max_startups": -1 },
|
||||
{ "cipher": "invalid" },
|
||||
{ "hmac": "invalid"},
|
||||
{ "compress": "invalid"},
|
||||
]
|
||||
|
||||
@pytest.mark.parametrize("kw", param_invalid_kwargs)
|
||||
def test_invalid_options(kw):
|
||||
with pytest.raises(RuntimeError) as e:
|
||||
m = mscp.mscp("localhost", mscp.LOCAL2REMOTE, **kw)
|
||||
m.connect()
|
||||
|
||||
@@ -16,6 +16,9 @@ class File():
|
||||
self.content = content
|
||||
self.perm = perm
|
||||
|
||||
def __repr__(self):
|
||||
return "<file:{} {}-bytes>".format(self.path, self.size)
|
||||
|
||||
def __str__(self):
|
||||
return self.path
|
||||
|
||||
|
||||
Reference in New Issue
Block a user