mirror of
https://github.com/upa/mscp.git
synced 2026-02-15 09:44:43 +08:00
Compare commits
23 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5ac0874621 | ||
|
|
e0e6fae296 | ||
|
|
6305f02770 | ||
|
|
ae4b848ba0 | ||
|
|
3902fb584a | ||
|
|
4ec877a290 | ||
|
|
f0c70a148b | ||
|
|
e038b3020d | ||
|
|
2fdfa7b830 | ||
|
|
f5d0f526f2 | ||
|
|
a086e6a154 | ||
|
|
3bce4ec277 | ||
|
|
a923d40ada | ||
|
|
24fef5f539 | ||
|
|
4e80b05da7 | ||
|
|
98eca409af | ||
|
|
cf99a439cb | ||
|
|
3077bb0856 | ||
|
|
72c27f16d6 | ||
|
|
9b0eb668f9 | ||
|
|
5f9f20f150 | ||
|
|
ceb9ebd5a8 | ||
|
|
3810d6314d |
27
README.md
27
README.md
@@ -38,19 +38,25 @@ brew install upa/tap/mscp
|
|||||||
|
|
||||||
- Ubuntu 22.04
|
- Ubuntu 22.04
|
||||||
```console
|
```console
|
||||||
wget https://github.com/upa/mscp/releases/download/v0.0.6/mscp_0.0.6-ubuntu-22.04-x86_64.deb
|
wget https://github.com/upa/mscp/releases/latest/download/mscp_ubuntu-22.04-x86_64.deb
|
||||||
apt-get install -f ./mscp_0.0.6-ubuntu-22.04-x86_64.deb
|
apt-get install -f ./mscp_ubuntu-22.04-x86_64.deb
|
||||||
```
|
```
|
||||||
|
|
||||||
- Ubuntu 20.04
|
- Ubuntu 20.04
|
||||||
```console
|
```console
|
||||||
wget https://github.com/upa/mscp/releases/download/v0.0.6/mscp_0.0.6-ubuntu-20.04-x86_64.deb
|
wget https://github.com/upa/mscp/releases/latest/download/mscp_ubuntu-20.04-x86_64.deb
|
||||||
apt-get install -f ./mscp_0.0.6-ubuntu-20.04-x86_64.deb
|
apt-get install -f ./mscp_ubuntu-20.04-x86_64.deb
|
||||||
```
|
```
|
||||||
|
|
||||||
- Rocky 8.6
|
- Rocky 8.6
|
||||||
```console
|
```console
|
||||||
yum install https://github.com/upa/mscp/releases/download/v0.0.6/mscp_0.0.6-rocky-8.6-x86_64.rpm
|
yum install https://github.com/upa/mscp/releases/latest/download/mscp_rocky-8.6-x86_64.rpm
|
||||||
|
```
|
||||||
|
|
||||||
|
- Linux with single binary `mscp` (x86_64 only)
|
||||||
|
```console
|
||||||
|
wget https://github.com/upa/mscp/releases/latest/download/mscp.linux.x86.static -O /usr/local/bin/mscp
|
||||||
|
chmod 755 /usr/local/bin/mscp
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
@@ -98,9 +104,9 @@ of libssh. So you can start from cmake with it.
|
|||||||
|
|
||||||
```console
|
```console
|
||||||
$ mscp
|
$ mscp
|
||||||
mscp v0.0.7: copy files over multiple ssh connections
|
mscp v0.0.8: copy files over multiple ssh connections
|
||||||
|
|
||||||
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]
|
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask] [-u max_startups]
|
||||||
[-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]
|
[-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]
|
||||||
[-l login_name] [-p port] [-i identity_file]
|
[-l login_name] [-p port] [-i identity_file]
|
||||||
[-c cipher_spec] [-M hmac_spec] [-C compress] source ... target
|
[-c cipher_spec] [-M hmac_spec] [-C compress] source ... target
|
||||||
@@ -124,7 +130,7 @@ $ mscp -n 5 -m 0x1f -c aes128-gcm@openssh.com /var/ram/test.img 10.0.0.1:/var/ra
|
|||||||
- `-v` option increments verbose output level.
|
- `-v` option increments verbose output level.
|
||||||
|
|
||||||
```console
|
```console
|
||||||
$ mscp test 10.0.0.:
|
$ mscp test 10.0.0.1:
|
||||||
[=======================================] 100% 49B /49B 198.8B/s 00:00 ETA
|
[=======================================] 100% 49B /49B 198.8B/s 00:00 ETA
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -153,15 +159,16 @@ copy done: test/testdir/asdf
|
|||||||
|
|
||||||
```console
|
```console
|
||||||
$ mscp -h
|
$ mscp -h
|
||||||
mscp v0.0.7: copy files over multiple ssh connections
|
mscp v0.0.8: copy files over multiple ssh connections
|
||||||
|
|
||||||
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]
|
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask] [-u max_startups]
|
||||||
[-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]
|
[-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]
|
||||||
[-l login_name] [-p port] [-i identity_file]
|
[-l login_name] [-p port] [-i identity_file]
|
||||||
[-c cipher_spec] [-M hmac_spec] [-C compress] source ... target
|
[-c cipher_spec] [-M hmac_spec] [-C compress] source ... target
|
||||||
|
|
||||||
-n NR_CONNECTIONS number of connections (default: floor(log(cores)*2)+1)
|
-n NR_CONNECTIONS number of connections (default: floor(log(cores)*2)+1)
|
||||||
-m COREMASK hex value to specify cores where threads pinned
|
-m COREMASK hex value to specify cores where threads pinned
|
||||||
|
-u MAX_STARTUPS number of concurrent outgoing connections (default: 8)
|
||||||
-s MIN_CHUNK_SIZE min chunk size (default: 64MB)
|
-s MIN_CHUNK_SIZE min chunk size (default: 64MB)
|
||||||
-S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)
|
-S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)
|
||||||
|
|
||||||
|
|||||||
@@ -18,7 +18,7 @@
|
|||||||
* 2. connect to remote host with mscp_connect()
|
* 2. connect to remote host with mscp_connect()
|
||||||
* 3. add path to source files with mscp_add_src_path()
|
* 3. add path to source files with mscp_add_src_path()
|
||||||
* 4. set path to destination with mscp_set_dst_path()
|
* 4. set path to destination with mscp_set_dst_path()
|
||||||
* 5. finish preparation with mscp_prepare()
|
* 5. start to scan source files with mscp_scan()
|
||||||
* 6. start copy with mscp_start()
|
* 6. start copy with mscp_start()
|
||||||
* 7. wait for copy finished with mscp_join()
|
* 7. wait for copy finished with mscp_join()
|
||||||
* 8. cleanup mscp instance with mscp_cleanup() and mscp_free()
|
* 8. cleanup mscp instance with mscp_cleanup() and mscp_free()
|
||||||
@@ -43,6 +43,7 @@ struct mscp_opts {
|
|||||||
size_t max_chunk_sz; /** maximum chunk size (default file size/nr_threads) */
|
size_t max_chunk_sz; /** maximum chunk size (default file size/nr_threads) */
|
||||||
size_t buf_sz; /** buffer size, default 16k. */
|
size_t buf_sz; /** buffer size, default 16k. */
|
||||||
char coremask[MSCP_MAX_COREMASK_STR]; /** hex to specifiy usable cpu cores */
|
char coremask[MSCP_MAX_COREMASK_STR]; /** hex to specifiy usable cpu cores */
|
||||||
|
int max_startups; /* sshd MaxStartups concurrent connections */
|
||||||
|
|
||||||
int severity; /** messaging severity. set MSCP_SERVERITY_* */
|
int severity; /** messaging severity. set MSCP_SERVERITY_* */
|
||||||
int msg_fd; /** fd to output message. default STDOUT (0),
|
int msg_fd; /** fd to output message. default STDOUT (0),
|
||||||
@@ -109,7 +110,7 @@ struct mscp *mscp_init(const char *remote_host, int direction,
|
|||||||
/**
|
/**
|
||||||
* @brief Connect the first SSH connection. mscp_connect connects to
|
* @brief Connect the first SSH connection. mscp_connect connects to
|
||||||
* remote host and initialize a SFTP session over the
|
* remote host and initialize a SFTP session over the
|
||||||
* connection. mscp_prepare() and mscp_start() require mscp_connect()
|
* connection. mscp_scan() and mscp_start() require mscp_connect()
|
||||||
* beforehand.
|
* beforehand.
|
||||||
*
|
*
|
||||||
* @param m mscp instance.
|
* @param m mscp instance.
|
||||||
@@ -149,20 +150,31 @@ int mscp_add_src_path(struct mscp *m, const char *src_path);
|
|||||||
*/
|
*/
|
||||||
int mscp_set_dst_path(struct mscp *m, const char *dst_path);
|
int mscp_set_dst_path(struct mscp *m, const char *dst_path);
|
||||||
|
|
||||||
/* check source files, resolve destination file paths for all source
|
/* scan source files, resolve destination file paths for all source
|
||||||
* files, and prepare chunks for all files. */
|
* files, and calculate chunks for all files. */
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Prepare for file transfer. This function checks all source
|
* @brief Scan source paths and prepare. This function checks all
|
||||||
* files (recursively), resolve paths on the destination side, and
|
* source files (recursively), resolve paths on the destination side,
|
||||||
* calculate file chunks.
|
* and calculate file chunks. This function is non-blocking.
|
||||||
*
|
*
|
||||||
* @param m mscp instance.
|
* @param m mscp instance.
|
||||||
*
|
*
|
||||||
* @return 0 on success, < 0 if an error occured.
|
* @return 0 on success, < 0 if an error occured.
|
||||||
* mscp_get_error() can be used to retrieve error message.
|
* mscp_get_error() can be used to retrieve error message.
|
||||||
*/
|
*/
|
||||||
int mscp_prepare(struct mscp *m);
|
int mscp_scan(struct mscp *m);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Join scna thread invoked by mscp_scan(). mscp_join()
|
||||||
|
* involves this, so that mscp_scan_join() should be called when
|
||||||
|
* mscp_scan() is called by mscp_start() is not.
|
||||||
|
*
|
||||||
|
* @param m mscp instance.
|
||||||
|
* @return 0 on success, < 0 if an error occured.
|
||||||
|
* mscp_get_error() can be used to retrieve error message.
|
||||||
|
*/
|
||||||
|
int mscp_scan_join(struct mscp *m);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Start to copy files. mscp_start() returns immediately. You
|
* @brief Start to copy files. mscp_start() returns immediately. You
|
||||||
@@ -245,15 +257,6 @@ enum {
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Set a file descriptor for receiving messages from mscp.
|
|
||||||
* This function has the same effect with setting mscp_opts->msg_fd.
|
|
||||||
*
|
|
||||||
* @param m mscp instance.
|
|
||||||
* @param fd fd to which libmscp writes messages.
|
|
||||||
*/
|
|
||||||
void mscp_set_msg_fd(struct mscp *m, int fd);
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Get the recent error message from libmscp. Note that this
|
* @brief Get the recent error message from libmscp. Note that this
|
||||||
|
|||||||
23
mscp/mscp.py
23
mscp/mscp.py
@@ -37,7 +37,7 @@ SEVERITY_DEBUG = pymscp.SEVERITY_DEBUG
|
|||||||
|
|
||||||
STATE_INIT = 0
|
STATE_INIT = 0
|
||||||
STATE_CONNECTED = 1
|
STATE_CONNECTED = 1
|
||||||
STATE_PREPARED = 2
|
STATE_SCANNED = 2
|
||||||
STATE_RUNNING = 3
|
STATE_RUNNING = 3
|
||||||
STATE_STOPPED = 4
|
STATE_STOPPED = 4
|
||||||
STATE_JOINED = 5
|
STATE_JOINED = 5
|
||||||
@@ -47,7 +47,7 @@ STATE_RELEASED = 7
|
|||||||
_state_str = {
|
_state_str = {
|
||||||
STATE_INIT: "init",
|
STATE_INIT: "init",
|
||||||
STATE_CONNECTED: "connected",
|
STATE_CONNECTED: "connected",
|
||||||
STATE_PREPARED: "prepared",
|
STATE_SCANNED: "scanned",
|
||||||
STATE_RUNNING: "running",
|
STATE_RUNNING: "running",
|
||||||
STATE_STOPPED: "stopped",
|
STATE_STOPPED: "stopped",
|
||||||
STATE_JOINED: "joined",
|
STATE_JOINED: "joined",
|
||||||
@@ -71,6 +71,9 @@ class mscp:
|
|||||||
self.state = STATE_INIT
|
self.state = STATE_INIT
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
|
if not hasattr(self, "state"):
|
||||||
|
# this instance failed on mscp_init
|
||||||
|
return "mscp:{}:init-failed"
|
||||||
return "mscp:{}:{}".format(self.remote, self.__state2str())
|
return "mscp:{}:{}".format(self.remote, self.__state2str())
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
@@ -100,26 +103,30 @@ class mscp:
|
|||||||
self.state = STATE_CONNECTED
|
self.state = STATE_CONNECTED
|
||||||
|
|
||||||
def add_src_path(self, src_path: str):
|
def add_src_path(self, src_path: str):
|
||||||
|
if type(src_path) != str:
|
||||||
|
raise ValueError("src_path must be str: {}".format(src_path))
|
||||||
self.src_paths.append(src_path)
|
self.src_paths.append(src_path)
|
||||||
pymscp.mscp_add_src_path(m = self.m, src_path = src_path)
|
pymscp.mscp_add_src_path(m = self.m, src_path = src_path)
|
||||||
|
|
||||||
def set_dst_path(self, dst_path: str):
|
def set_dst_path(self, dst_path: str):
|
||||||
|
if type(dst_path) != str:
|
||||||
|
raise ValueError("dst_path must be str: {}".format(dst_path))
|
||||||
self.dst_path = dst_path
|
self.dst_path = dst_path
|
||||||
pymscp.mscp_set_dst_path(m = self.m, dst_path = dst_path);
|
pymscp.mscp_set_dst_path(m = self.m, dst_path = dst_path);
|
||||||
|
|
||||||
def prepare(self):
|
def scan(self):
|
||||||
if self.state != STATE_CONNECTED:
|
if self.state != STATE_CONNECTED:
|
||||||
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
|
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
|
||||||
if not self.src_paths:
|
if not self.src_paths:
|
||||||
raise RuntimeError("src path list is empty")
|
raise RuntimeError("src path list is empty")
|
||||||
if not self.dst_path:
|
if self.dst_path == None:
|
||||||
raise RuntimeError("dst path is not set")
|
raise RuntimeError("dst path is not set")
|
||||||
|
|
||||||
pymscp.mscp_prepare(m = self.m)
|
pymscp.mscp_scan(m = self.m)
|
||||||
self.state = STATE_PREPARED
|
self.state = STATE_SCANNED
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
if self.state != STATE_PREPARED:
|
if self.state != STATE_SCANNED:
|
||||||
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
|
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
|
||||||
|
|
||||||
pymscp.mscp_start(m = self.m)
|
pymscp.mscp_start(m = self.m)
|
||||||
@@ -167,7 +174,7 @@ class mscp:
|
|||||||
|
|
||||||
self.set_dst_path(dst)
|
self.set_dst_path(dst)
|
||||||
|
|
||||||
self.prepare()
|
self.scan()
|
||||||
self.start()
|
self.start()
|
||||||
if nonblock:
|
if nonblock:
|
||||||
return
|
return
|
||||||
|
|||||||
15
src/list.h
15
src/list.h
@@ -554,5 +554,20 @@ static inline int list_count(struct list_head *head)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* list_free_f - free items in a list with a function
|
||||||
|
* @head the heaf for your list.
|
||||||
|
* @f function that releases an item in the list.
|
||||||
|
*/
|
||||||
|
static inline void list_free_f(struct list_head *head, void (*f)(struct list_head *))
|
||||||
|
{
|
||||||
|
struct list_head *p, *n;
|
||||||
|
|
||||||
|
list_for_each_safe(p, n, head) {
|
||||||
|
list_del(p);
|
||||||
|
f(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|||||||
32
src/main.c
32
src/main.c
@@ -24,7 +24,7 @@
|
|||||||
void usage(bool print_help) {
|
void usage(bool print_help) {
|
||||||
printf("mscp v" VERSION ": copy files over multiple ssh connections\n"
|
printf("mscp v" VERSION ": copy files over multiple ssh connections\n"
|
||||||
"\n"
|
"\n"
|
||||||
"Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]\n"
|
"Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask] [-u max_startups]\n"
|
||||||
" [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]\n"
|
" [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]\n"
|
||||||
" [-l login_name] [-p port] [-i identity_file]\n"
|
" [-l login_name] [-p port] [-i identity_file]\n"
|
||||||
" [-c cipher_spec] [-M hmac_spec] [-C compress] source ... target\n"
|
" [-c cipher_spec] [-M hmac_spec] [-C compress] source ... target\n"
|
||||||
@@ -36,6 +36,8 @@ void usage(bool print_help) {
|
|||||||
printf(" -n NR_CONNECTIONS number of connections "
|
printf(" -n NR_CONNECTIONS number of connections "
|
||||||
"(default: floor(log(cores)*2)+1)\n"
|
"(default: floor(log(cores)*2)+1)\n"
|
||||||
" -m COREMASK hex value to specify cores where threads pinned\n"
|
" -m COREMASK hex value to specify cores where threads pinned\n"
|
||||||
|
" -u MAX_STARTUPS number of concurrent outgoing connections "
|
||||||
|
"(default: 8)\n"
|
||||||
" -s MIN_CHUNK_SIZE min chunk size (default: 64MB)\n"
|
" -s MIN_CHUNK_SIZE min chunk size (default: 64MB)\n"
|
||||||
" -S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)\n"
|
" -S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)\n"
|
||||||
"\n"
|
"\n"
|
||||||
@@ -52,7 +54,8 @@ void usage(bool print_help) {
|
|||||||
" -i IDENTITY identity file for public key authentication\n"
|
" -i IDENTITY identity file for public key authentication\n"
|
||||||
" -c CIPHER cipher spec\n"
|
" -c CIPHER cipher spec\n"
|
||||||
" -M HMAC hmac spec\n"
|
" -M HMAC hmac spec\n"
|
||||||
" -C COMPRESS enable compression: yes, no, zlib, zlib@openssh.com\n"
|
" -C COMPRESS enable compression: "
|
||||||
|
"yes, no, zlib, zlib@openssh.com\n"
|
||||||
" -H disable hostkey check\n"
|
" -H disable hostkey check\n"
|
||||||
" -d increment ssh debug output level\n"
|
" -d increment ssh debug output level\n"
|
||||||
" -N enable Nagle's algorithm (default disabled)\n"
|
" -N enable Nagle's algorithm (default disabled)\n"
|
||||||
@@ -69,7 +72,7 @@ char *split_remote_and_path(const char *string, char **remote, char **path)
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
if (!(s = strdup(string))) {
|
if (!(s = strdup(string))) {
|
||||||
fprintf(stderr, "strdup: %s\n", strerrno());
|
fprintf(stderr, "strdup: %s\n", strerror(errno));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,7 +118,7 @@ struct target *validate_targets(char **arg, int len)
|
|||||||
int n;
|
int n;
|
||||||
|
|
||||||
if ((t = calloc(len, sizeof(struct target))) == NULL) {
|
if ((t = calloc(len, sizeof(struct target))) == NULL) {
|
||||||
fprintf(stderr, "calloc: %s\n", strerrno());
|
fprintf(stderr, "calloc: %s\n", strerror(errno));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
memset(t, 0, len * sizeof(struct target));
|
memset(t, 0, len * sizeof(struct target));
|
||||||
@@ -204,7 +207,7 @@ int main(int argc, char **argv)
|
|||||||
memset(&o, 0, sizeof(o));
|
memset(&o, 0, sizeof(o));
|
||||||
o.severity = MSCP_SEVERITY_WARN;
|
o.severity = MSCP_SEVERITY_WARN;
|
||||||
|
|
||||||
while ((ch = getopt(argc, argv, "n:m:s:S:a:b:vqDrl:p:i:c:M:C:HdNh")) != -1) {
|
while ((ch = getopt(argc, argv, "n:m:u:s:S:a:b:vqDrl:p:i:c:M:C:HdNh")) != -1) {
|
||||||
switch (ch) {
|
switch (ch) {
|
||||||
case 'n':
|
case 'n':
|
||||||
o.nr_threads = atoi(optarg);
|
o.nr_threads = atoi(optarg);
|
||||||
@@ -217,6 +220,9 @@ int main(int argc, char **argv)
|
|||||||
case 'm':
|
case 'm':
|
||||||
strncpy(o.coremask, optarg, sizeof(o.coremask));
|
strncpy(o.coremask, optarg, sizeof(o.coremask));
|
||||||
break;
|
break;
|
||||||
|
case 'u':
|
||||||
|
o.max_startups = atoi(optarg);
|
||||||
|
break;
|
||||||
case 's':
|
case 's':
|
||||||
o.min_chunk_sz = atoi(optarg);
|
o.min_chunk_sz = atoi(optarg);
|
||||||
break;
|
break;
|
||||||
@@ -323,7 +329,7 @@ int main(int argc, char **argv)
|
|||||||
|
|
||||||
if (!dryrun) {
|
if (!dryrun) {
|
||||||
if (pipe(pipe_fd) < 0) {
|
if (pipe(pipe_fd) < 0) {
|
||||||
fprintf(stderr, "pipe: %s\n", strerrno());
|
fprintf(stderr, "pipe: %s\n", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
msg_fd = pipe_fd[0];
|
msg_fd = pipe_fd[0];
|
||||||
@@ -352,23 +358,23 @@ int main(int argc, char **argv)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mscp_prepare(m) < 0) {
|
if (mscp_scan(m) < 0) {
|
||||||
fprintf(stderr, "mscp_prepare: %s\n", mscp_get_error());
|
fprintf(stderr, "mscp_scan: %s\n", mscp_get_error());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dryrun) {
|
if (dryrun) {
|
||||||
ret = 0;
|
ret = mscp_scan_join(m);
|
||||||
goto out;
|
goto out;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pthread_create(&tid_stat, NULL, print_stat_thread, NULL) < 0) {
|
if (pthread_create(&tid_stat, NULL, print_stat_thread, NULL) < 0) {
|
||||||
fprintf(stderr, "pthread_create: %s\n", strerrno());
|
fprintf(stderr, "pthread_create: %s\n", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (signal(SIGINT, sigint_handler) == SIG_ERR) {
|
if (signal(SIGINT, sigint_handler) == SIG_ERR) {
|
||||||
fprintf(stderr, "signal: %s\n", strerrno());
|
fprintf(stderr, "signal: %s\n", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -547,14 +553,14 @@ void *print_stat_thread(void *arg)
|
|||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (poll(&pfd, 1, 100) < 0) {
|
if (poll(&pfd, 1, 100) < 0) {
|
||||||
fprintf(stderr, "poll: %s\n", strerrno());
|
fprintf(stderr, "poll: %s\n", strerror(errno));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pfd.revents & POLLIN) {
|
if (pfd.revents & POLLIN) {
|
||||||
memset(buf, 0, sizeof(buf));
|
memset(buf, 0, sizeof(buf));
|
||||||
if (read(msg_fd, buf, sizeof(buf)) < 0) {
|
if (read(msg_fd, buf, sizeof(buf)) < 0) {
|
||||||
fprintf(stderr, "read: %s\n", strerrno());
|
fprintf(stderr, "read: %s\n", strerror(errno));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
print_cli("\r\033[K" "%s", buf);
|
print_cli("\r\033[K" "%s", buf);
|
||||||
|
|||||||
@@ -4,10 +4,13 @@
|
|||||||
#include <limits.h>
|
#include <limits.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
|
||||||
|
#include <util.h>
|
||||||
#include <message.h>
|
#include <message.h>
|
||||||
|
|
||||||
/* mscp error message buffer */
|
/* strerror_r wrapper */
|
||||||
|
__thread char thread_strerror[128];
|
||||||
|
|
||||||
|
/* mscp error message buffer */
|
||||||
#define MSCP_ERRMSG_SIZE (PATH_MAX * 2)
|
#define MSCP_ERRMSG_SIZE (PATH_MAX * 2)
|
||||||
|
|
||||||
static char errmsg[MSCP_ERRMSG_SIZE];
|
static char errmsg[MSCP_ERRMSG_SIZE];
|
||||||
@@ -30,29 +33,17 @@ const char *mscp_get_error()
|
|||||||
|
|
||||||
/* message print functions */
|
/* message print functions */
|
||||||
|
|
||||||
static int mprint_serverity = MSCP_SEVERITY_WARN;
|
static int mprint_severity = MSCP_SEVERITY_WARN;
|
||||||
static pthread_mutex_t mprint_lock = PTHREAD_MUTEX_INITIALIZER;
|
|
||||||
|
|
||||||
void mprint_set_severity(int serverity)
|
void mprint_set_severity(int serverity)
|
||||||
{
|
{
|
||||||
if (serverity < 0)
|
if (serverity < 0)
|
||||||
mprint_serverity = -1; /* no print */
|
mprint_severity = -1; /* no print */
|
||||||
mprint_serverity = serverity;
|
mprint_severity = serverity;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mprint(int fd, int serverity, const char *fmt, ...)
|
int mprint_get_severity()
|
||||||
{
|
{
|
||||||
va_list va;
|
return mprint_severity;
|
||||||
int ret;
|
|
||||||
|
|
||||||
if (fd < 0)
|
|
||||||
return;
|
|
||||||
|
|
||||||
if (serverity <= mprint_serverity) {
|
|
||||||
pthread_mutex_lock(&mprint_lock);
|
|
||||||
va_start(va, fmt);
|
|
||||||
vdprintf(fd, fmt, va);
|
|
||||||
va_end(va);
|
|
||||||
pthread_mutex_unlock(&mprint_lock);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,23 +7,47 @@
|
|||||||
|
|
||||||
/* message print. printed messages are passed to application via msg_fd */
|
/* message print. printed messages are passed to application via msg_fd */
|
||||||
void mprint_set_severity(int severity);
|
void mprint_set_severity(int severity);
|
||||||
void mprint(int fd, int severity, const char *fmt, ...);
|
int mprint_get_severity();
|
||||||
|
|
||||||
|
#define mprint(fp, severity, fmt, ...) \
|
||||||
|
do { \
|
||||||
|
if (fp && severity <= mprint_get_severity()) { \
|
||||||
|
fprintf(fp, fmt, ##__VA_ARGS__); \
|
||||||
|
fflush(fp); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define mpr_err(fp, fmt, ...) \
|
||||||
|
mprint(fp, MSCP_SEVERITY_ERR, fmt, ##__VA_ARGS__)
|
||||||
|
#define mpr_warn(fp, fmt, ...) \
|
||||||
|
mprint(fp, MSCP_SEVERITY_WARN, fmt, ##__VA_ARGS__)
|
||||||
|
#define mpr_notice(fp, fmt, ...) \
|
||||||
|
mprint(fp, MSCP_SEVERITY_NOTICE, fmt, ##__VA_ARGS__)
|
||||||
|
#define mpr_info(fp, fmt, ...) \
|
||||||
|
mprint(fp, MSCP_SEVERITY_INFO, fmt, ##__VA_ARGS__)
|
||||||
|
#define mpr_debug(fp, fmt, ...) \
|
||||||
|
mprint(fp, MSCP_SEVERITY_DEBUG, fmt, ##__VA_ARGS__)
|
||||||
|
|
||||||
|
|
||||||
|
/* errorno wrapper */
|
||||||
|
extern __thread char thread_strerror[128];
|
||||||
|
|
||||||
|
#ifdef _GNU_SOURCE
|
||||||
|
/* GNU strerror_r */
|
||||||
|
#define strerrno() \
|
||||||
|
strerror_r(errno, thread_strerror, sizeof(thread_strerror))
|
||||||
|
#else
|
||||||
|
/* this macro assumes that strerror_r never fails. any good way? */
|
||||||
|
#define strerrno() \
|
||||||
|
(strerror_r(errno, thread_strerror, sizeof(thread_strerror)) \
|
||||||
|
? thread_strerror : thread_strerror)
|
||||||
|
#endif
|
||||||
|
|
||||||
#define mpr_err(fd, fmt, ...) \
|
|
||||||
mprint(fd, MSCP_SEVERITY_ERR, fmt, ##__VA_ARGS__)
|
|
||||||
#define mpr_warn(fd, fmt, ...) \
|
|
||||||
mprint(fd, MSCP_SEVERITY_WARN, fmt, ##__VA_ARGS__)
|
|
||||||
#define mpr_notice(fd, fmt, ...) \
|
|
||||||
mprint(fd, MSCP_SEVERITY_NOTICE, fmt, ##__VA_ARGS__)
|
|
||||||
#define mpr_info(fd, fmt, ...) \
|
|
||||||
mprint(fd, MSCP_SEVERITY_INFO, fmt, ##__VA_ARGS__)
|
|
||||||
#define mpr_debug(fd, fmt, ...) \
|
|
||||||
mprint(fd, MSCP_SEVERITY_DEBUG, fmt, ##__VA_ARGS__)
|
|
||||||
|
|
||||||
|
|
||||||
/* error message buffer */
|
/* error message buffer */
|
||||||
#define mscp_set_error(fmt, ...) \
|
#define mscp_set_error(fmt, ...) \
|
||||||
_mscp_set_error("%s:%d:%s: " fmt, \
|
_mscp_set_error("%s:%d:%s: " fmt "\0", \
|
||||||
basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
|
basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
|
||||||
|
|
||||||
void _mscp_set_error(const char *fmt, ...);
|
void _mscp_set_error(const char *fmt, ...);
|
||||||
|
|||||||
319
src/mscp.c
319
src/mscp.c
@@ -2,7 +2,8 @@
|
|||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <math.h>
|
#include <math.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
#include <semaphore.h>
|
||||||
|
#include <sys/time.h>
|
||||||
|
|
||||||
#include <list.h>
|
#include <list.h>
|
||||||
#include <util.h>
|
#include <util.h>
|
||||||
@@ -13,24 +14,30 @@
|
|||||||
#include <message.h>
|
#include <message.h>
|
||||||
#include <mscp.h>
|
#include <mscp.h>
|
||||||
|
|
||||||
|
|
||||||
struct mscp {
|
struct mscp {
|
||||||
char *remote; /* remote host (and uername) */
|
char *remote; /* remote host (and uername) */
|
||||||
int direction; /* copy direction */
|
int direction; /* copy direction */
|
||||||
struct mscp_opts *opts;
|
struct mscp_opts *opts;
|
||||||
struct mscp_ssh_opts *ssh_opts;
|
struct mscp_ssh_opts *ssh_opts;
|
||||||
|
|
||||||
int msg_fd; /* writer fd for message pipe */
|
FILE *msg_fp; /* writer fd for message pipe */
|
||||||
|
|
||||||
int *cores; /* usable cpu cores by COREMASK */
|
int *cores; /* usable cpu cores by COREMASK */
|
||||||
int nr_cores; /* length of array of cores */
|
int nr_cores; /* length of array of cores */
|
||||||
|
|
||||||
|
sem_t *sem; /* semaphore for concurrent
|
||||||
|
* connecting ssh sessions */
|
||||||
|
|
||||||
sftp_session first; /* first sftp session */
|
sftp_session first; /* first sftp session */
|
||||||
|
|
||||||
char dst_path[PATH_MAX];
|
char dst_path[PATH_MAX];
|
||||||
struct list_head src_list;
|
struct list_head src_list;
|
||||||
struct list_head path_list;
|
struct list_head path_list;
|
||||||
struct list_head chunk_list;
|
struct chunk_pool cp;
|
||||||
lock chunk_lock;
|
|
||||||
|
pthread_t tid_scan; /* tid for scan thread */
|
||||||
|
int ret_scan; /* return code from scan thread */
|
||||||
|
|
||||||
size_t total_bytes; /* total bytes to be transferred */
|
size_t total_bytes; /* total bytes to be transferred */
|
||||||
struct mscp_thread *threads;
|
struct mscp_thread *threads;
|
||||||
@@ -39,6 +46,7 @@ struct mscp {
|
|||||||
|
|
||||||
struct mscp_thread {
|
struct mscp_thread {
|
||||||
struct mscp *m;
|
struct mscp *m;
|
||||||
|
int id;
|
||||||
sftp_session sftp;
|
sftp_session sftp;
|
||||||
pthread_t tid;
|
pthread_t tid;
|
||||||
int cpu;
|
int cpu;
|
||||||
@@ -62,8 +70,13 @@ struct src {
|
|||||||
* sftp_async_read returns 0.
|
* sftp_async_read returns 0.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define DEFAULT_MAX_STARTUPS 8
|
||||||
|
|
||||||
#define non_null_string(s) (s[0] != '\0')
|
#define non_null_string(s) (s[0] != '\0')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static int expand_coremask(const char *coremask, int **cores, int *nr_cores)
|
static int expand_coremask(const char *coremask, int **cores, int *nr_cores)
|
||||||
{
|
{
|
||||||
int n, *core_list, core_list_len = 0, nr_usable, nr_all;
|
int n, *core_list, core_list_len = 0, nr_usable, nr_all;
|
||||||
@@ -178,6 +191,16 @@ static int validate_and_set_defaut_params(struct mscp_opts *o)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (o->max_startups == 0)
|
||||||
|
o->max_startups = DEFAULT_MAX_STARTUPS;
|
||||||
|
else if (o->max_startups < 0) {
|
||||||
|
mscp_set_error("invalid max_startups: %d", o->max_startups);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (o->msg_fd == 0)
|
||||||
|
o->msg_fd = STDOUT_FILENO;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -198,22 +221,27 @@ struct mscp *mscp_init(const char *remote_host, int direction,
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mprint_set_severity(o->severity);
|
||||||
|
|
||||||
|
if (validate_and_set_defaut_params(o) < 0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
m = malloc(sizeof(*m));
|
m = malloc(sizeof(*m));
|
||||||
if (!m) {
|
if (!m) {
|
||||||
mscp_set_error("failed to allocate memory: %s", strerrno());
|
mscp_set_error("failed to allocate memory: %s", strerrno());
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
mprint_set_severity(o->severity);
|
|
||||||
|
|
||||||
if (validate_and_set_defaut_params(o) < 0)
|
|
||||||
goto free_out;
|
|
||||||
|
|
||||||
memset(m, 0, sizeof(*m));
|
memset(m, 0, sizeof(*m));
|
||||||
INIT_LIST_HEAD(&m->src_list);
|
INIT_LIST_HEAD(&m->src_list);
|
||||||
INIT_LIST_HEAD(&m->path_list);
|
INIT_LIST_HEAD(&m->path_list);
|
||||||
INIT_LIST_HEAD(&m->chunk_list);
|
chunk_pool_init(&m->cp);
|
||||||
lock_init(&m->chunk_lock);
|
|
||||||
|
if ((m->sem = sem_create(o->max_startups)) == NULL) {
|
||||||
|
mscp_set_error("sem_create: %s", strerrno());
|
||||||
|
goto free_out;
|
||||||
|
}
|
||||||
|
|
||||||
m->remote = strdup(remote_host);
|
m->remote = strdup(remote_host);
|
||||||
if (!m->remote) {
|
if (!m->remote) {
|
||||||
@@ -221,15 +249,22 @@ struct mscp *mscp_init(const char *remote_host, int direction,
|
|||||||
goto free_out;
|
goto free_out;
|
||||||
}
|
}
|
||||||
m->direction = direction;
|
m->direction = direction;
|
||||||
m->msg_fd = o->msg_fd;
|
if (o->msg_fd > -1) {
|
||||||
|
m->msg_fp = fdopen(o->msg_fd, "a");
|
||||||
|
if (!m->msg_fp) {
|
||||||
|
mscp_set_error("fdopen failed: %s", strerrno());
|
||||||
|
goto free_out;
|
||||||
|
}
|
||||||
|
} else
|
||||||
|
m->msg_fp = NULL;
|
||||||
|
|
||||||
if (strlen(o->coremask) > 0) {
|
if (strlen(o->coremask) > 0) {
|
||||||
if (expand_coremask(o->coremask, &m->cores, &m->nr_cores) < 0)
|
if (expand_coremask(o->coremask, &m->cores, &m->nr_cores) < 0)
|
||||||
goto free_out;
|
goto free_out;
|
||||||
mpr_notice(m->msg_fd, "usable cpu cores:");
|
mpr_notice(m->msg_fp, "usable cpu cores:");
|
||||||
for (n = 0; n < m->nr_cores; n++)
|
for (n = 0; n < m->nr_cores; n++)
|
||||||
mpr_notice(m->msg_fd, " %d", m->cores[n]);
|
mpr_notice(m->msg_fp, " %d", m->cores[n]);
|
||||||
mpr_notice(m->msg_fd, "\n");
|
mpr_notice(m->msg_fp, "\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
m->opts = o;
|
m->opts = o;
|
||||||
@@ -242,11 +277,6 @@ free_out:
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mscp_set_msg_fd(struct mscp *m, int fd)
|
|
||||||
{
|
|
||||||
m->msg_fd = fd;
|
|
||||||
}
|
|
||||||
|
|
||||||
int mscp_connect(struct mscp *m)
|
int mscp_connect(struct mscp *m)
|
||||||
{
|
{
|
||||||
m->first = ssh_init_sftp_session(m->remote, m->ssh_opts);
|
m->first = ssh_init_sftp_session(m->remote, m->ssh_opts);
|
||||||
@@ -293,17 +323,52 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int get_page_mask(void)
|
||||||
int mscp_prepare(struct mscp *m)
|
|
||||||
{
|
{
|
||||||
|
long page_sz = sysconf(_SC_PAGESIZE);
|
||||||
|
size_t page_mask = 0;
|
||||||
|
int n;
|
||||||
|
|
||||||
|
for (n = 0; page_sz > 0; page_sz >>= 1, n++) {
|
||||||
|
page_mask <<= 1;
|
||||||
|
page_mask |= 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return page_mask >> 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void mscp_stop_copy_thread(struct mscp *m)
|
||||||
|
{
|
||||||
|
int n;
|
||||||
|
for (n = 0; n < m->opts->nr_threads; n++) {
|
||||||
|
if (m->threads[n].tid && !m->threads[n].finished)
|
||||||
|
pthread_cancel(m->threads[n].tid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void mscp_stop_scan_thread(struct mscp *m)
|
||||||
|
{
|
||||||
|
if (m->tid_scan)
|
||||||
|
pthread_cancel(m->tid_scan);
|
||||||
|
}
|
||||||
|
|
||||||
|
void mscp_stop(struct mscp *m)
|
||||||
|
{
|
||||||
|
mscp_stop_scan_thread(m);
|
||||||
|
mscp_stop_copy_thread(m);
|
||||||
|
}
|
||||||
|
|
||||||
|
void *mscp_scan_thread(void *arg)
|
||||||
|
{
|
||||||
|
struct mscp *m = arg;
|
||||||
sftp_session src_sftp = NULL, dst_sftp = NULL;
|
sftp_session src_sftp = NULL, dst_sftp = NULL;
|
||||||
bool src_path_is_dir, dst_path_is_dir, dst_path_should_dir;
|
struct path_resolve_args a;
|
||||||
struct list_head tmp;
|
struct list_head tmp;
|
||||||
struct path *p;
|
struct path *p;
|
||||||
struct src *s;
|
struct src *s;
|
||||||
mstat ss, ds;
|
mstat ss, ds;
|
||||||
|
|
||||||
src_path_is_dir = dst_path_is_dir = dst_path_should_dir = false;
|
m->ret_scan = 0;
|
||||||
|
|
||||||
switch (m->direction) {
|
switch (m->direction) {
|
||||||
case MSCP_DIRECTION_L2R:
|
case MSCP_DIRECTION_L2R:
|
||||||
@@ -316,105 +381,123 @@ int mscp_prepare(struct mscp *m)
|
|||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
mscp_set_error("invalid copy direction: %d", m->direction);
|
mscp_set_error("invalid copy direction: %d", m->direction);
|
||||||
return -1;
|
goto err_out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* initialize path_resolve_args */
|
||||||
|
memset(&a, 0, sizeof(a));
|
||||||
|
a.msg_fp = m->msg_fp;
|
||||||
|
a.total_bytes = &m->total_bytes;
|
||||||
|
|
||||||
if (list_count(&m->src_list) > 1)
|
if (list_count(&m->src_list) > 1)
|
||||||
dst_path_should_dir = true;
|
a.dst_path_should_dir = true;
|
||||||
|
|
||||||
if (mscp_stat(m->dst_path, &ds, dst_sftp) == 0) {
|
if (mscp_stat(m->dst_path, &ds, dst_sftp) == 0) {
|
||||||
if (mstat_is_dir(ds))
|
if (mstat_is_dir(ds))
|
||||||
dst_path_is_dir = true;
|
a.dst_path_is_dir = true;
|
||||||
mscp_stat_free(ds);
|
mscp_stat_free(ds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
a.cp = &m->cp;
|
||||||
|
a.nr_conn = m->opts->nr_threads;
|
||||||
|
a.min_chunk_sz = m->opts->min_chunk_sz;
|
||||||
|
a.max_chunk_sz = m->opts->max_chunk_sz;
|
||||||
|
a.chunk_align = get_page_mask();
|
||||||
|
|
||||||
|
mpr_info(m->msg_fp, "start to walk source path(s)\n");
|
||||||
|
|
||||||
/* walk a src_path recusively, and resolve path->dst_path for each src */
|
/* walk a src_path recusively, and resolve path->dst_path for each src */
|
||||||
list_for_each_entry(s, &m->src_list, list) {
|
list_for_each_entry(s, &m->src_list, list) {
|
||||||
if (mscp_stat(s->path, &ss, src_sftp) < 0) {
|
if (mscp_stat(s->path, &ss, src_sftp) < 0) {
|
||||||
mscp_set_error("stat: %s", mscp_strerror(src_sftp));
|
mscp_set_error("stat: %s", mscp_strerror(src_sftp));
|
||||||
return -1;
|
mscp_stat_free(ss);
|
||||||
|
goto err_out;
|
||||||
}
|
}
|
||||||
src_path_is_dir = mstat_is_dir(ss);
|
|
||||||
|
/* set path specific args */
|
||||||
|
a.src_path = s->path;
|
||||||
|
a.dst_path = m->dst_path;
|
||||||
|
a.src_path_is_dir = mstat_is_dir(ss);
|
||||||
mscp_stat_free(ss);
|
mscp_stat_free(ss);
|
||||||
|
|
||||||
INIT_LIST_HEAD(&tmp);
|
INIT_LIST_HEAD(&tmp);
|
||||||
if (walk_src_path(src_sftp, s->path, &tmp) < 0)
|
if (walk_src_path(src_sftp, s->path, &tmp, &a) < 0)
|
||||||
return -1;
|
goto err_out;
|
||||||
|
|
||||||
if (list_count(&tmp) > 1)
|
|
||||||
dst_path_should_dir = true;
|
|
||||||
|
|
||||||
if (resolve_dst_path(m->msg_fd, s->path, m->dst_path, &tmp,
|
|
||||||
src_path_is_dir, dst_path_is_dir,
|
|
||||||
dst_path_should_dir) < 0)
|
|
||||||
return -1;
|
|
||||||
|
|
||||||
list_splice_tail(&tmp, m->path_list.prev);
|
list_splice_tail(&tmp, m->path_list.prev);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (resolve_chunk(&m->path_list, &m->chunk_list, m->opts->nr_threads,
|
chunk_pool_set_filled(&m->cp);
|
||||||
m->opts->min_chunk_sz, m->opts->max_chunk_sz) < 0)
|
|
||||||
return -1;
|
|
||||||
|
|
||||||
/* save total bytes to be transferred */
|
mpr_info(m->msg_fp, "walk source path(s) done\n");
|
||||||
m->total_bytes = 0;
|
|
||||||
list_for_each_entry(p, &m->path_list, list) {
|
m->ret_scan = 0;
|
||||||
m->total_bytes += p->size;
|
return NULL;
|
||||||
|
|
||||||
|
err_out:
|
||||||
|
m->ret_scan = -1;
|
||||||
|
mscp_stop_copy_thread(m);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int mscp_scan(struct mscp *m)
|
||||||
|
{
|
||||||
|
int ret = pthread_create(&m->tid_scan, NULL, mscp_scan_thread, m);
|
||||||
|
if (ret < 0) {
|
||||||
|
mscp_set_error("pthread_create_error: %d", ret);
|
||||||
|
m->tid_scan = 0;
|
||||||
|
mscp_stop(m);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* need scan finished or over nr_threads chunks to determine
|
||||||
|
* actual number of threads (and connections). If the number
|
||||||
|
* of chunks are smaller than nr_threads, we adjust nr_threads
|
||||||
|
* to the number of chunks.
|
||||||
|
*/
|
||||||
|
while (!chunk_pool_is_filled(&m->cp) &&
|
||||||
|
chunk_pool_size(&m->cp) < m->opts->nr_threads)
|
||||||
|
usleep(100);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mscp_stop(struct mscp *m)
|
int mscp_scan_join(struct mscp *m)
|
||||||
{
|
{
|
||||||
int n;
|
if (m->tid_scan) {
|
||||||
pr("stopping...\n");
|
pthread_join(m->tid_scan, NULL);
|
||||||
for (n = 0; n < m->opts->nr_threads; n++) {
|
m->tid_scan = 0;
|
||||||
if (m->threads[n].tid && !m->threads[n].finished)
|
return m->ret_scan;
|
||||||
pthread_cancel(m->threads[n].tid);
|
}
|
||||||
}
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static void *mscp_copy_thread(void *arg);
|
static void *mscp_copy_thread(void *arg);
|
||||||
|
|
||||||
int mscp_start(struct mscp *m)
|
int mscp_start(struct mscp *m)
|
||||||
{
|
{
|
||||||
int n, ret;
|
int n, ret;
|
||||||
|
|
||||||
if ((n = list_count(&m->chunk_list)) < m->opts->nr_threads) {
|
if ((n = chunk_pool_size(&m->cp)) < m->opts->nr_threads) {
|
||||||
mpr_notice(m->msg_fd, "we have only %d chunk(s). "
|
mpr_notice(m->msg_fp, "we have only %d chunk(s). "
|
||||||
"set number of connections to %d\n", n, n);
|
"set number of connections to %d\n", n, n);
|
||||||
m->opts->nr_threads = n;
|
m->opts->nr_threads = n;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* prepare thread instances */
|
/* scan thread instances */
|
||||||
m->threads = calloc(m->opts->nr_threads, sizeof(struct mscp_thread));
|
m->threads = calloc(m->opts->nr_threads, sizeof(struct mscp_thread));
|
||||||
memset(m->threads, 0, m->opts->nr_threads * sizeof(struct mscp_thread));
|
memset(m->threads, 0, m->opts->nr_threads * sizeof(struct mscp_thread));
|
||||||
for (n = 0; n < m->opts->nr_threads; n++) {
|
for (n = 0; n < m->opts->nr_threads; n++) {
|
||||||
struct mscp_thread *t = &m->threads[n];
|
struct mscp_thread *t = &m->threads[n];
|
||||||
t->m = m;
|
t->m = m;
|
||||||
|
t->id = n;
|
||||||
if (!m->cores)
|
if (!m->cores)
|
||||||
t->cpu = -1;
|
t->cpu = -1;
|
||||||
else
|
else
|
||||||
t->cpu = m->cores[n % m->nr_cores];
|
t->cpu = m->cores[n % m->nr_cores];
|
||||||
|
|
||||||
if (n == 0) {
|
|
||||||
t->sftp = m->first; /* reuse first sftp session */
|
|
||||||
m->first = NULL;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n",
|
|
||||||
m->remote);
|
|
||||||
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
|
|
||||||
if (!t->sftp)
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* spawn copy threads */
|
|
||||||
for (n = 0; n < m->opts->nr_threads; n++) {
|
|
||||||
struct mscp_thread *t = &m->threads[n];
|
|
||||||
ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
|
ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
mscp_set_error("pthread_create error: %d", ret);
|
mscp_set_error("pthread_create error: %d", ret);
|
||||||
@@ -430,7 +513,10 @@ int mscp_join(struct mscp *m)
|
|||||||
{
|
{
|
||||||
int n, ret = 0;
|
int n, ret = 0;
|
||||||
|
|
||||||
/* waiting for threads join... */
|
/* waiting for scan thread joins... */
|
||||||
|
ret = mscp_scan_join(m);
|
||||||
|
|
||||||
|
/* waiting for copy threads join... */
|
||||||
for (n = 0; n < m->opts->nr_threads; n++) {
|
for (n = 0; n < m->opts->nr_threads; n++) {
|
||||||
if (m->threads[n].tid) {
|
if (m->threads[n].tid) {
|
||||||
pthread_join(m->threads[n].tid, NULL);
|
pthread_join(m->threads[n].tid, NULL);
|
||||||
@@ -462,20 +548,6 @@ int mscp_join(struct mscp *m)
|
|||||||
|
|
||||||
/* copy thread related functions */
|
/* copy thread related functions */
|
||||||
|
|
||||||
struct chunk *acquire_chunk(struct list_head *chunk_list)
|
|
||||||
{
|
|
||||||
/* under the lock for chunk_list */
|
|
||||||
struct list_head *first = chunk_list->next;
|
|
||||||
struct chunk *c = NULL;
|
|
||||||
|
|
||||||
if (list_empty(chunk_list))
|
|
||||||
return NULL; /* list is empty */
|
|
||||||
|
|
||||||
c = list_entry(first, struct chunk, list);
|
|
||||||
list_del(first);
|
|
||||||
return c;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void mscp_copy_thread_cleanup(void *arg)
|
static void mscp_copy_thread_cleanup(void *arg)
|
||||||
{
|
{
|
||||||
struct mscp_thread *t = arg;
|
struct mscp_thread *t = arg;
|
||||||
@@ -489,6 +561,34 @@ void *mscp_copy_thread(void *arg)
|
|||||||
struct mscp *m = t->m;
|
struct mscp *m = t->m;
|
||||||
struct chunk *c;
|
struct chunk *c;
|
||||||
|
|
||||||
|
if (t->cpu > -1) {
|
||||||
|
if (set_thread_affinity(pthread_self(), t->cpu) < 0) {
|
||||||
|
t->ret = -1;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sem_wait(m->sem) < 0) {
|
||||||
|
mscp_set_error("sem_wait: %s\n", strerrno());
|
||||||
|
mpr_err(m->msg_fp, "%s", mscp_get_error());
|
||||||
|
goto err_out;
|
||||||
|
}
|
||||||
|
|
||||||
|
mpr_notice(m->msg_fp, "connecting to %s for a copy thread[%d]...\n",
|
||||||
|
m->remote, t->id);
|
||||||
|
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
|
||||||
|
|
||||||
|
if (sem_post(m->sem) < 0) {
|
||||||
|
mscp_set_error("sem_post: %s\n", strerrno());
|
||||||
|
mpr_err(m->msg_fp, "%s", mscp_get_error());
|
||||||
|
goto err_out;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!t->sftp) {
|
||||||
|
mpr_err(m->msg_fp, "copy thread[%d]: %s\n", t->id, mscp_get_error());
|
||||||
|
goto err_out;
|
||||||
|
}
|
||||||
|
|
||||||
switch (m->direction) {
|
switch (m->direction) {
|
||||||
case MSCP_DIRECTION_L2R:
|
case MSCP_DIRECTION_L2R:
|
||||||
src_sftp = NULL;
|
src_sftp = NULL;
|
||||||
@@ -502,24 +602,21 @@ void *mscp_copy_thread(void *arg)
|
|||||||
return NULL; /* not reached */
|
return NULL; /* not reached */
|
||||||
}
|
}
|
||||||
|
|
||||||
if (t->cpu > -1) {
|
|
||||||
if (set_thread_affinity(pthread_self(), t->cpu) < 0)
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
|
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
|
||||||
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
|
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
|
||||||
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
|
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
LOCK_ACQUIRE_THREAD(&m->chunk_lock);
|
c = chunk_pool_pop(&m->cp);
|
||||||
c = acquire_chunk(&m->chunk_list);
|
if (c == CHUNK_POP_WAIT) {
|
||||||
LOCK_RELEASE_THREAD();
|
usleep(100); /* XXX: hard code */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (!c)
|
if (!c)
|
||||||
break; /* no more chunks */
|
break; /* no more chunks */
|
||||||
|
|
||||||
if ((t->ret = copy_chunk(m->msg_fd,
|
if ((t->ret = copy_chunk(m->msg_fp,
|
||||||
c, src_sftp, dst_sftp, m->opts->nr_ahead,
|
c, src_sftp, dst_sftp, m->opts->nr_ahead,
|
||||||
m->opts->buf_sz, &t->done)) < 0)
|
m->opts->buf_sz, &t->done)) < 0)
|
||||||
break;
|
break;
|
||||||
@@ -532,21 +629,15 @@ void *mscp_copy_thread(void *arg)
|
|||||||
c->p->path, c->off, c->off + c->len);
|
c->p->path, c->off, c->off + c->len);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
|
err_out:
|
||||||
|
t->ret = -1;
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* cleanup related functions */
|
/* cleanup related functions */
|
||||||
|
|
||||||
static void release_list(struct list_head *head, void (*f)(struct list_head*))
|
|
||||||
{
|
|
||||||
struct list_head *p, *n;
|
|
||||||
|
|
||||||
list_for_each_safe(p, n, head) {
|
|
||||||
list_del(p);
|
|
||||||
f(p);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void free_src(struct list_head *list)
|
static void free_src(struct list_head *list)
|
||||||
{
|
{
|
||||||
struct src *s;
|
struct src *s;
|
||||||
@@ -576,15 +667,15 @@ void mscp_cleanup(struct mscp *m)
|
|||||||
m->first = NULL;
|
m->first = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
release_list(&m->src_list, free_src);
|
list_free_f(&m->src_list, free_src);
|
||||||
INIT_LIST_HEAD(&m->src_list);
|
INIT_LIST_HEAD(&m->src_list);
|
||||||
|
|
||||||
release_list(&m->chunk_list, free_chunk);
|
list_free_f(&m->path_list, free_path);
|
||||||
INIT_LIST_HEAD(&m->chunk_list);
|
|
||||||
|
|
||||||
release_list(&m->path_list, free_path);
|
|
||||||
INIT_LIST_HEAD(&m->path_list);
|
INIT_LIST_HEAD(&m->path_list);
|
||||||
|
|
||||||
|
chunk_pool_release(&m->cp);
|
||||||
|
chunk_pool_init(&m->cp);
|
||||||
|
|
||||||
if (m->threads) {
|
if (m->threads) {
|
||||||
free(m->threads);
|
free(m->threads);
|
||||||
m->threads = NULL;
|
m->threads = NULL;
|
||||||
@@ -598,6 +689,8 @@ void mscp_free(struct mscp *m)
|
|||||||
free(m->remote);
|
free(m->remote);
|
||||||
if (m->cores)
|
if (m->cores)
|
||||||
free(m->cores);
|
free(m->cores);
|
||||||
|
|
||||||
|
sem_release(m->sem);
|
||||||
free(m);
|
free(m);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
381
src/path.c
381
src/path.c
@@ -12,8 +12,186 @@
|
|||||||
#include <path.h>
|
#include <path.h>
|
||||||
#include <message.h>
|
#include <message.h>
|
||||||
|
|
||||||
|
|
||||||
|
/* chunk pool operations */
|
||||||
|
#define CHUNK_POOL_STATE_FILLING 0
|
||||||
|
#define CHUNK_POOL_STATE_FILLED 1
|
||||||
|
|
||||||
|
void chunk_pool_init(struct chunk_pool *cp)
|
||||||
|
{
|
||||||
|
memset(cp, 0, sizeof(*cp));
|
||||||
|
INIT_LIST_HEAD(&cp->list);
|
||||||
|
lock_init(&cp->lock);
|
||||||
|
cp->state = CHUNK_POOL_STATE_FILLING;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void chunk_pool_add(struct chunk_pool *cp, struct chunk *c)
|
||||||
|
{
|
||||||
|
LOCK_ACQUIRE_THREAD(&cp->lock);
|
||||||
|
list_add_tail(&c->list, &cp->list);
|
||||||
|
cp->count += 1;
|
||||||
|
LOCK_RELEASE_THREAD();
|
||||||
|
}
|
||||||
|
|
||||||
|
void chunk_pool_set_filled(struct chunk_pool *cp)
|
||||||
|
{
|
||||||
|
cp->state = CHUNK_POOL_STATE_FILLED;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool chunk_pool_is_filled(struct chunk_pool *cp)
|
||||||
|
{
|
||||||
|
return (cp->state == CHUNK_POOL_STATE_FILLED);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t chunk_pool_size(struct chunk_pool *cp)
|
||||||
|
{
|
||||||
|
return cp->count;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
struct chunk *chunk_pool_pop(struct chunk_pool *cp)
|
||||||
|
{
|
||||||
|
struct list_head *first;
|
||||||
|
struct chunk *c = NULL;
|
||||||
|
|
||||||
|
LOCK_ACQUIRE_THREAD(&cp->lock);
|
||||||
|
first = cp->list.next;
|
||||||
|
if (list_empty(&cp->list)) {
|
||||||
|
if (!chunk_pool_is_filled(cp))
|
||||||
|
c = CHUNK_POP_WAIT;
|
||||||
|
else
|
||||||
|
c = NULL; /* no more chunks */
|
||||||
|
} else {
|
||||||
|
c = list_entry(first, struct chunk, list);
|
||||||
|
list_del(first);
|
||||||
|
}
|
||||||
|
LOCK_RELEASE_THREAD();
|
||||||
|
|
||||||
|
/* return CHUNK_POP_WAIT would be very rare case, because it
|
||||||
|
* means copying over SSH is faster than traversing
|
||||||
|
* local/remote file paths.
|
||||||
|
*/
|
||||||
|
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void chunk_free(struct list_head *list)
|
||||||
|
{
|
||||||
|
struct chunk *c;
|
||||||
|
c = list_entry(list, typeof(*c), list);
|
||||||
|
free(c);
|
||||||
|
}
|
||||||
|
|
||||||
|
void chunk_pool_release(struct chunk_pool *cp)
|
||||||
|
{
|
||||||
|
list_free_f(&cp->list, chunk_free);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* paths of copy source resoltion */
|
||||||
|
static int resolve_dst_path(const char *src_file_path, char *dst_file_path,
|
||||||
|
struct path_resolve_args *a)
|
||||||
|
{
|
||||||
|
char copy[PATH_MAX];
|
||||||
|
char *prefix;
|
||||||
|
int offset;
|
||||||
|
|
||||||
|
strncpy(copy, a->src_path, PATH_MAX - 1);
|
||||||
|
prefix = dirname(copy);
|
||||||
|
if (!prefix) {
|
||||||
|
mscp_set_error("dirname: %s", strerrno());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (strlen(prefix) == 1 && prefix[0] == '.')
|
||||||
|
offset = 0;
|
||||||
|
else
|
||||||
|
offset = strlen(prefix) + 1;
|
||||||
|
|
||||||
|
if (!a->src_path_is_dir && !a->dst_path_is_dir) {
|
||||||
|
/* src path is file. dst path is (1) file, or (2) does not exist.
|
||||||
|
* In the second case, we need to put src under the dst.
|
||||||
|
*/
|
||||||
|
if (a->dst_path_should_dir)
|
||||||
|
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
|
||||||
|
a->dst_path, a->src_path + offset);
|
||||||
|
else
|
||||||
|
strncpy(dst_file_path, a->dst_path, PATH_MAX - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* src is file, and dst is dir */
|
||||||
|
if (!a->src_path_is_dir && a->dst_path_is_dir)
|
||||||
|
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
|
||||||
|
a->dst_path, a->src_path + offset);
|
||||||
|
|
||||||
|
/* both are directory */
|
||||||
|
if (a->src_path_is_dir && a->dst_path_is_dir)
|
||||||
|
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
|
||||||
|
a->dst_path, src_file_path + offset);
|
||||||
|
|
||||||
|
/* dst path does not exist. change dir name to dst_path */
|
||||||
|
if (a->src_path_is_dir && !a->dst_path_is_dir)
|
||||||
|
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
|
||||||
|
a->dst_path, src_file_path + strlen(a->src_path) + 1);
|
||||||
|
|
||||||
|
mpr_debug(a->msg_fp, "file: %s -> %s\n", src_file_path, dst_file_path);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* chunk preparation */
|
||||||
|
static struct chunk *alloc_chunk(struct path *p)
|
||||||
|
{
|
||||||
|
struct chunk *c;
|
||||||
|
|
||||||
|
if (!(c = malloc(sizeof(*c)))) {
|
||||||
|
mscp_set_error("malloc %s", strerrno());
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
memset(c, 0, sizeof(*c));
|
||||||
|
|
||||||
|
c->p = p;
|
||||||
|
c->off = 0;
|
||||||
|
c->len = 0;
|
||||||
|
refcnt_inc(&p->refcnt);
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int resolve_chunk(struct path *p, struct path_resolve_args *a)
|
||||||
|
{
|
||||||
|
struct chunk *c;
|
||||||
|
size_t chunk_sz;
|
||||||
|
size_t size;
|
||||||
|
|
||||||
|
if (p->size <= a->min_chunk_sz)
|
||||||
|
chunk_sz = p->size;
|
||||||
|
else if (a->max_chunk_sz)
|
||||||
|
chunk_sz = a->max_chunk_sz;
|
||||||
|
else {
|
||||||
|
chunk_sz = (p->size - (p->size % a->nr_conn)) / a->nr_conn;
|
||||||
|
chunk_sz &= ~a->chunk_align; /* align with page_sz */
|
||||||
|
if (chunk_sz <= a->min_chunk_sz)
|
||||||
|
chunk_sz = a->min_chunk_sz;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* for (size = f->size; size > 0;) does not create a file
|
||||||
|
* (chunk) when file size is 0. This do {} while (size > 0)
|
||||||
|
* creates just open/close a 0-byte file.
|
||||||
|
*/
|
||||||
|
size = p->size;
|
||||||
|
do {
|
||||||
|
c = alloc_chunk(p);
|
||||||
|
if (!c)
|
||||||
|
return -1;
|
||||||
|
c->off = p->size - size;
|
||||||
|
c->len = size < chunk_sz ? size : chunk_sz;
|
||||||
|
size -= c->len;
|
||||||
|
chunk_pool_add(a->cp, c);
|
||||||
|
} while (size > 0);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int append_path(sftp_session sftp, const char *path, mstat s,
|
static int append_path(sftp_session sftp, const char *path, mstat s,
|
||||||
struct list_head *path_list)
|
struct list_head *path_list, struct path_resolve_args *a)
|
||||||
{
|
{
|
||||||
struct path *p;
|
struct path *p;
|
||||||
|
|
||||||
@@ -29,9 +207,22 @@ static int append_path(sftp_session sftp, const char *path, mstat s,
|
|||||||
p->mode = mstat_mode(s);
|
p->mode = mstat_mode(s);
|
||||||
p->state = FILE_STATE_INIT;
|
p->state = FILE_STATE_INIT;
|
||||||
lock_init(&p->lock);
|
lock_init(&p->lock);
|
||||||
|
|
||||||
|
if (resolve_dst_path(p->path, p->dst_path, a) < 0)
|
||||||
|
goto free_out;
|
||||||
|
|
||||||
|
if (resolve_chunk(p, a) < 0)
|
||||||
|
return -1; /* XXX: do not free path becuase chunk(s)
|
||||||
|
* was added to chunk pool already */
|
||||||
|
|
||||||
list_add_tail(&p->list, path_list);
|
list_add_tail(&p->list, path_list);
|
||||||
|
*a->total_bytes += p->size;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
free_out:
|
||||||
|
free(p);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool check_path_should_skip(const char *path)
|
static bool check_path_should_skip(const char *path)
|
||||||
@@ -45,7 +236,7 @@ static bool check_path_should_skip(const char *path)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int walk_path_recursive(sftp_session sftp, const char *path,
|
static int walk_path_recursive(sftp_session sftp, const char *path,
|
||||||
struct list_head *path_list)
|
struct list_head *path_list, struct path_resolve_args *a)
|
||||||
{
|
{
|
||||||
char next_path[PATH_MAX];
|
char next_path[PATH_MAX];
|
||||||
mdirent *e;
|
mdirent *e;
|
||||||
@@ -58,7 +249,7 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
|
|||||||
|
|
||||||
if (mstat_is_regular(s)) {
|
if (mstat_is_regular(s)) {
|
||||||
/* this path is regular file. it is to be copied */
|
/* this path is regular file. it is to be copied */
|
||||||
ret = append_path(sftp, path, s, path_list);
|
ret = append_path(sftp, path, s, path_list, a);
|
||||||
mscp_stat_free(s);
|
mscp_stat_free(s);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@@ -77,15 +268,19 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
|
|||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
for (e = mscp_readdir(d); !mdirent_is_null(e); e = mscp_readdir(d)) {
|
for (e = mscp_readdir(d); !mdirent_is_null(e); e = mscp_readdir(d)) {
|
||||||
if (check_path_should_skip(mdirent_name(e)))
|
if (check_path_should_skip(mdirent_name(e))) {
|
||||||
|
mscp_dirent_free(e);
|
||||||
continue;
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (strlen(path) + 1 + strlen(mdirent_name(e)) > PATH_MAX) {
|
if (strlen(path) + 1 + strlen(mdirent_name(e)) > PATH_MAX) {
|
||||||
mscp_set_error("too long path: %s/%s", path, mdirent_name(e));
|
mscp_set_error("too long path: %s/%s", path, mdirent_name(e));
|
||||||
|
mscp_dirent_free(e);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
snprintf(next_path, sizeof(next_path), "%s/%s", path, mdirent_name(e));
|
snprintf(next_path, sizeof(next_path), "%s/%s", path, mdirent_name(e));
|
||||||
ret = walk_path_recursive(sftp, next_path, path_list);
|
ret = walk_path_recursive(sftp, next_path, path_list, a);
|
||||||
|
mscp_dirent_free(e);
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@@ -96,75 +291,9 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
|
|||||||
}
|
}
|
||||||
|
|
||||||
int walk_src_path(sftp_session src_sftp, const char *src_path,
|
int walk_src_path(sftp_session src_sftp, const char *src_path,
|
||||||
struct list_head *path_list)
|
struct list_head *path_list, struct path_resolve_args *a)
|
||||||
{
|
{
|
||||||
return walk_path_recursive(src_sftp, src_path, path_list);
|
return walk_path_recursive(src_sftp, src_path, path_list, a);
|
||||||
}
|
|
||||||
|
|
||||||
static int src2dst_path(int msg_fd, const char *src_path, const char *src_file_path,
|
|
||||||
const char *dst_path, char *dst_file_path, size_t len,
|
|
||||||
bool src_path_is_dir, bool dst_path_is_dir,
|
|
||||||
bool dst_path_should_dir)
|
|
||||||
{
|
|
||||||
char copy[PATH_MAX];
|
|
||||||
char *prefix;
|
|
||||||
int offset;
|
|
||||||
|
|
||||||
strncpy(copy, src_path, PATH_MAX - 1);
|
|
||||||
prefix = dirname(copy);
|
|
||||||
if (!prefix) {
|
|
||||||
mscp_set_error("dirname: %s", strerrno());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (strlen(prefix) == 1 && prefix[0] == '.')
|
|
||||||
offset = 0;
|
|
||||||
else
|
|
||||||
offset = strlen(prefix) + 1;
|
|
||||||
|
|
||||||
if (!src_path_is_dir && !dst_path_is_dir) {
|
|
||||||
/* src path is file. dst path is (1) file, or (2) does not exist.
|
|
||||||
* In the second case, we need to put src under the dst.
|
|
||||||
*/
|
|
||||||
if (dst_path_should_dir)
|
|
||||||
snprintf(dst_file_path, len, "%s/%s",
|
|
||||||
dst_path, src_path + offset);
|
|
||||||
else
|
|
||||||
strncpy(dst_file_path, dst_path, len);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* src is file, and dst is dir */
|
|
||||||
if (!src_path_is_dir && dst_path_is_dir)
|
|
||||||
snprintf(dst_file_path, len, "%s/%s", dst_path, src_path + offset);
|
|
||||||
|
|
||||||
/* both are directory */
|
|
||||||
if (src_path_is_dir && dst_path_is_dir)
|
|
||||||
snprintf(dst_file_path, len, "%s/%s", dst_path, src_file_path + offset);
|
|
||||||
|
|
||||||
/* dst path does not exist. change dir name to dst_path */
|
|
||||||
if (src_path_is_dir && !dst_path_is_dir)
|
|
||||||
snprintf(dst_file_path, len, "%s/%s",
|
|
||||||
dst_path, src_file_path + strlen(src_path) + 1);
|
|
||||||
|
|
||||||
mpr_info(msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path,
|
|
||||||
struct list_head *path_list, bool src_path_is_dir,
|
|
||||||
bool dst_path_is_dir, bool dst_path_should_dir)
|
|
||||||
{
|
|
||||||
struct path *p;
|
|
||||||
|
|
||||||
list_for_each_entry(p, path_list, list) {
|
|
||||||
if (src2dst_path(msg_fd, src_path, p->path,
|
|
||||||
dst_path, p->dst_path, PATH_MAX,
|
|
||||||
src_path_is_dir, dst_path_is_dir,
|
|
||||||
dst_path_should_dir) < 0)
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void path_dump(struct list_head *path_list)
|
void path_dump(struct list_head *path_list)
|
||||||
@@ -177,90 +306,6 @@ void path_dump(struct list_head *path_list)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* chunk preparation */
|
|
||||||
|
|
||||||
static struct chunk *alloc_chunk(struct path *p)
|
|
||||||
{
|
|
||||||
struct chunk *c;
|
|
||||||
|
|
||||||
if (!(c = malloc(sizeof(*c)))) {
|
|
||||||
mscp_set_error("malloc %s", strerrno());
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
memset(c, 0, sizeof(*c));
|
|
||||||
|
|
||||||
c->p = p;
|
|
||||||
c->off = 0;
|
|
||||||
c->len = 0;
|
|
||||||
refcnt_inc(&p->refcnt);
|
|
||||||
return c;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int get_page_mask(void)
|
|
||||||
{
|
|
||||||
long page_sz = sysconf(_SC_PAGESIZE);
|
|
||||||
size_t page_mask = 0;
|
|
||||||
int n;
|
|
||||||
|
|
||||||
for (n = 0; page_sz > 0; page_sz >>= 1, n++) {
|
|
||||||
page_mask <<= 1;
|
|
||||||
page_mask |= 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return page_mask >> 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list,
|
|
||||||
int nr_conn, int min_chunk_sz, int max_chunk_sz)
|
|
||||||
{
|
|
||||||
struct chunk *c;
|
|
||||||
struct path *p;
|
|
||||||
size_t page_mask;
|
|
||||||
size_t chunk_sz;
|
|
||||||
size_t size;
|
|
||||||
|
|
||||||
page_mask = get_page_mask();
|
|
||||||
|
|
||||||
list_for_each_entry(p, path_list, list) {
|
|
||||||
if (p->size <= min_chunk_sz)
|
|
||||||
chunk_sz = p->size;
|
|
||||||
else if (max_chunk_sz)
|
|
||||||
chunk_sz = max_chunk_sz;
|
|
||||||
else {
|
|
||||||
chunk_sz = (p->size - (p->size % nr_conn)) / nr_conn;
|
|
||||||
chunk_sz &= ~page_mask; /* align with page_sz */
|
|
||||||
if (chunk_sz <= min_chunk_sz)
|
|
||||||
chunk_sz = min_chunk_sz;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* for (size = f->size; size > 0;) does not create a
|
|
||||||
* file (chunk) when file size is 0. This do {} while
|
|
||||||
* (size > 0) creates just open/close a 0-byte file.
|
|
||||||
*/
|
|
||||||
size = p->size;
|
|
||||||
do {
|
|
||||||
c = alloc_chunk(p);
|
|
||||||
if (!c)
|
|
||||||
return -1;
|
|
||||||
c->off = p->size - size;
|
|
||||||
c->len = size < chunk_sz ? size : chunk_sz;
|
|
||||||
size -= c->len;
|
|
||||||
list_add_tail(&c->list, chunk_list);
|
|
||||||
} while (size > 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void chunk_dump(struct list_head *chunk_list)
|
|
||||||
{
|
|
||||||
struct chunk *c;
|
|
||||||
|
|
||||||
list_for_each_entry(c, chunk_list, list) {
|
|
||||||
printf("chunk: %s 0x%lx-%lx bytes\n",
|
|
||||||
c->p->path, c->off, c->off + c->len);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* based on
|
/* based on
|
||||||
@@ -283,10 +328,13 @@ static int touch_dst_path(struct path *p, sftp_session sftp)
|
|||||||
|
|
||||||
mstat s;
|
mstat s;
|
||||||
if (mscp_stat(path, &s, sftp) == 0) {
|
if (mscp_stat(path, &s, sftp) == 0) {
|
||||||
if (mstat_is_dir(s))
|
if (mstat_is_dir(s)) {
|
||||||
|
mscp_stat_free(s);
|
||||||
goto next; /* directory exists. go deeper */
|
goto next; /* directory exists. go deeper */
|
||||||
else
|
} else {
|
||||||
|
mscp_stat_free(s);
|
||||||
return -1; /* path exists, but not directory. */
|
return -1; /* path exists, but not directory. */
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mscp_stat_check_err_noent(sftp) == 0) {
|
if (mscp_stat_check_err_noent(sftp) == 0) {
|
||||||
@@ -311,7 +359,7 @@ static int touch_dst_path(struct path *p, sftp_session sftp)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int prepare_dst_path(int msg_fd, struct path *p, sftp_session dst_sftp)
|
static int prepare_dst_path(FILE *msg_fp, struct path *p, sftp_session dst_sftp)
|
||||||
{
|
{
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
|
|
||||||
@@ -322,7 +370,7 @@ static int prepare_dst_path(int msg_fd, struct path *p, sftp_session dst_sftp)
|
|||||||
goto out;
|
goto out;
|
||||||
}
|
}
|
||||||
p->state = FILE_STATE_OPENED;
|
p->state = FILE_STATE_OPENED;
|
||||||
mpr_info(msg_fd, "copy start: %s\n", p->path);
|
mpr_info(msg_fp, "copy start: %s\n", p->path);
|
||||||
}
|
}
|
||||||
|
|
||||||
out:
|
out:
|
||||||
@@ -482,7 +530,8 @@ static int _copy_chunk(struct chunk *c, mfh s, mfh d,
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
|
int copy_chunk(FILE *msg_fp, struct chunk *c,
|
||||||
|
sftp_session src_sftp, sftp_session dst_sftp,
|
||||||
int nr_ahead, int buf_sz, size_t *counter)
|
int nr_ahead, int buf_sz, size_t *counter)
|
||||||
{
|
{
|
||||||
mode_t mode;
|
mode_t mode;
|
||||||
@@ -492,7 +541,7 @@ int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session
|
|||||||
|
|
||||||
assert((src_sftp && !dst_sftp) || (!src_sftp && dst_sftp));
|
assert((src_sftp && !dst_sftp) || (!src_sftp && dst_sftp));
|
||||||
|
|
||||||
if (prepare_dst_path(msg_fd, c->p, dst_sftp) < 0)
|
if (prepare_dst_path(msg_fp, c->p, dst_sftp) < 0)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
/* open src */
|
/* open src */
|
||||||
@@ -511,11 +560,11 @@ int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session
|
|||||||
if (mscp_open_is_failed(d))
|
if (mscp_open_is_failed(d))
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
mpr_debug(msg_fd, "copy chunk start: %s 0x%lx-0x%lx\n",
|
mpr_debug(msg_fp, "copy chunk start: %s 0x%lx-0x%lx\n",
|
||||||
c->p->path, c->off, c->off + c->len);
|
c->p->path, c->off, c->off + c->len);
|
||||||
ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, counter);
|
ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, counter);
|
||||||
|
|
||||||
mpr_debug(msg_fd, "copy chunk done: %s 0x%lx-0x%lx\n",
|
mpr_debug(msg_fp, "copy chunk done: %s 0x%lx-0x%lx\n",
|
||||||
c->p->path, c->off, c->off + c->len);
|
c->p->path, c->off, c->off + c->len);
|
||||||
|
|
||||||
|
|
||||||
@@ -527,7 +576,7 @@ int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session
|
|||||||
if (refcnt_dec(&c->p->refcnt) == 0) {
|
if (refcnt_dec(&c->p->refcnt) == 0) {
|
||||||
c->p->state = FILE_STATE_DONE;
|
c->p->state = FILE_STATE_DONE;
|
||||||
mscp_chmod(c->p->dst_path, c->p->mode, dst_sftp);
|
mscp_chmod(c->p->dst_path, c->p->mode, dst_sftp);
|
||||||
mpr_info(msg_fd, "copy done: %s\n", c->p->path);
|
mpr_info(msg_fp, "copy done: %s\n", c->p->path);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
74
src/path.h
74
src/path.h
@@ -29,7 +29,7 @@ struct path {
|
|||||||
#define FILE_STATE_DONE 2
|
#define FILE_STATE_DONE 2
|
||||||
|
|
||||||
struct chunk {
|
struct chunk {
|
||||||
struct list_head list; /* mscp->chunk_list */
|
struct list_head list; /* chunk_pool->list */
|
||||||
|
|
||||||
struct path *p;
|
struct path *p;
|
||||||
size_t off; /* offset of this chunk on the file on path p */
|
size_t off; /* offset of this chunk on the file on path p */
|
||||||
@@ -37,30 +37,66 @@ struct chunk {
|
|||||||
size_t done; /* copied bytes for this chunk by a thread */
|
size_t done; /* copied bytes for this chunk by a thread */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct chunk_pool {
|
||||||
|
struct list_head list; /* list of struct chunk */
|
||||||
|
size_t count;
|
||||||
|
lock lock;
|
||||||
|
int state;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/* initialize chunk pool */
|
||||||
|
void chunk_pool_init(struct chunk_pool *cp);
|
||||||
|
|
||||||
|
/* acquire a chunk from pool. return value is NULL indicates no more
|
||||||
|
* chunk, GET_CHUNK_WAIT means caller should waits until a chunk is
|
||||||
|
* added, or pointer to chunk.
|
||||||
|
*/
|
||||||
|
struct chunk *chunk_pool_pop(struct chunk_pool *cp);
|
||||||
|
#define CHUNK_POP_WAIT ((void *) -1)
|
||||||
|
|
||||||
|
/* set and check fillingchunks to this pool has finished */
|
||||||
|
void chunk_pool_set_filled(struct chunk_pool *cp);
|
||||||
|
bool chunk_pool_is_filled(struct chunk_pool *cp);
|
||||||
|
|
||||||
|
/* return number of chunks in the pool */
|
||||||
|
size_t chunk_pool_size(struct chunk_pool *cp);
|
||||||
|
|
||||||
|
/* free chunks in the chunk_pool */
|
||||||
|
void chunk_pool_release(struct chunk_pool *cp);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
struct path_resolve_args {
|
||||||
|
FILE *msg_fp;
|
||||||
|
size_t *total_bytes;
|
||||||
|
|
||||||
|
/* args to resolve src path to dst path */
|
||||||
|
const char *src_path;
|
||||||
|
const char *dst_path;
|
||||||
|
bool src_path_is_dir;
|
||||||
|
bool dst_path_is_dir;
|
||||||
|
bool dst_path_should_dir;
|
||||||
|
|
||||||
|
/* args to resolve chunks for a path */
|
||||||
|
struct chunk_pool *cp;
|
||||||
|
int nr_conn;
|
||||||
|
size_t min_chunk_sz;
|
||||||
|
size_t max_chunk_sz;
|
||||||
|
size_t chunk_align;
|
||||||
|
};
|
||||||
|
|
||||||
/* recursivly walk through src_path and fill path_list for each file */
|
/* recursivly walk through src_path and fill path_list for each file */
|
||||||
int walk_src_path(sftp_session src_sftp, const char *src_path,
|
int walk_src_path(sftp_session src_sftp, const char *src_path,
|
||||||
struct list_head *path_list);
|
struct list_head *path_list, struct path_resolve_args *a);
|
||||||
|
|
||||||
/* fill path->dst_path for all files */
|
|
||||||
int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path,
|
|
||||||
struct list_head *path_list,
|
|
||||||
bool src_path_is_dir, bool dst_path_is_dir,
|
|
||||||
bool dst_path_should_dir);
|
|
||||||
|
|
||||||
/* resolve chunks from files in the path_list */
|
|
||||||
int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list,
|
|
||||||
int nr_conn, int min_chunk_sz, int max_chunk_sz);
|
|
||||||
|
|
||||||
/* copy a chunk. either src_sftp or dst_sftp is not null, and another is null */
|
/* copy a chunk. either src_sftp or dst_sftp is not null, and another is null */
|
||||||
int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
|
int copy_chunk(FILE *msg_fp, struct chunk *c,
|
||||||
|
sftp_session src_sftp, sftp_session dst_sftp,
|
||||||
int nr_ahead, int buf_sz, size_t *counter);
|
int nr_ahead, int buf_sz, size_t *counter);
|
||||||
|
|
||||||
/* just print contents. just for debugging */
|
/* just print contents. just for debugging */
|
||||||
void path_dump(struct list_head *path_list);
|
void path_dump(struct list_head *path_list);
|
||||||
void chunk_dump(struct list_head *chunk_list);
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -136,6 +172,14 @@ static mdirent *mscp_readdir(mdir *d)
|
|||||||
return &e;
|
return &e;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void mscp_dirent_free(mdirent *e)
|
||||||
|
{
|
||||||
|
if (e->r) {
|
||||||
|
sftp_attributes_free(e->r);
|
||||||
|
e->r = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* wrap retriving error */
|
/* wrap retriving error */
|
||||||
static const char *mscp_strerror(sftp_session sftp)
|
static const char *mscp_strerror(sftp_session sftp)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -1,9 +1,11 @@
|
|||||||
#ifdef __APPLE__
|
#ifdef __APPLE__
|
||||||
|
#include <stdlib.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <sys/sysctl.h>
|
#include <sys/sysctl.h>
|
||||||
#elif linux
|
#elif linux
|
||||||
#define _GNU_SOURCE
|
#define _GNU_SOURCE
|
||||||
#include <sched.h>
|
#include <sched.h>
|
||||||
|
#include <stdlib.h>
|
||||||
#else
|
#else
|
||||||
#error unsupported platform
|
#error unsupported platform
|
||||||
#endif
|
#endif
|
||||||
@@ -12,6 +14,7 @@
|
|||||||
#include <platform.h>
|
#include <platform.h>
|
||||||
#include <message.h>
|
#include <message.h>
|
||||||
|
|
||||||
|
|
||||||
#ifdef __APPLE__
|
#ifdef __APPLE__
|
||||||
int nr_cpus()
|
int nr_cpus()
|
||||||
{
|
{
|
||||||
@@ -32,6 +35,38 @@ int set_thread_affinity(pthread_t tid, int core)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void random_string(char *buf, size_t size)
|
||||||
|
{
|
||||||
|
char chars[] = "abcdefhijklmnopqrstuvwxyz1234567890";
|
||||||
|
int n, x;
|
||||||
|
|
||||||
|
for (n = 0; n < size - 1; n++) {
|
||||||
|
x = arc4random() % (sizeof(chars) - 1);
|
||||||
|
buf[n] = chars[x];
|
||||||
|
}
|
||||||
|
buf[size - 1] = '\0';
|
||||||
|
}
|
||||||
|
|
||||||
|
sem_t *sem_create(int value)
|
||||||
|
{
|
||||||
|
char sem_name[30] = "mscp-";
|
||||||
|
sem_t *sem;
|
||||||
|
int n;
|
||||||
|
|
||||||
|
n = strlen(sem_name);
|
||||||
|
random_string(sem_name + n, sizeof(sem_name) - n - 1);
|
||||||
|
if ((sem = sem_open(sem_name, O_CREAT, 600, value)) == SEM_FAILED)
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
return sem;
|
||||||
|
}
|
||||||
|
|
||||||
|
int sem_release(sem_t *sem)
|
||||||
|
{
|
||||||
|
return sem_close(sem);
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifdef linux
|
#ifdef linux
|
||||||
@@ -56,5 +91,27 @@ int set_thread_affinity(pthread_t tid, int core)
|
|||||||
core, strerrno());
|
core, strerrno());
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sem_t *sem_create(int value)
|
||||||
|
{
|
||||||
|
sem_t *sem;
|
||||||
|
|
||||||
|
if ((sem = malloc(sizeof(*sem))) == NULL)
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
if (sem_init(sem, 0, value) < 0) {
|
||||||
|
free(sem);
|
||||||
|
return sem;
|
||||||
|
}
|
||||||
|
|
||||||
|
return sem;
|
||||||
|
}
|
||||||
|
|
||||||
|
int sem_release(sem_t *sem)
|
||||||
|
{
|
||||||
|
free(sem);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|||||||
@@ -2,8 +2,20 @@
|
|||||||
#define _PLATFORM_H_
|
#define _PLATFORM_H_
|
||||||
|
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
#include <semaphore.h>
|
||||||
|
|
||||||
int nr_cpus();
|
int nr_cpus(void);
|
||||||
int set_thread_affinity(pthread_t tid, int core);
|
int set_thread_affinity(pthread_t tid, int core);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* macOS does not support sem_init(). macOS (seems to) releases the
|
||||||
|
* named semaphore when associated mscp process finished. In linux,
|
||||||
|
* program (seems to) need to release named semaphore in /dev/shm by
|
||||||
|
* sem_unlink() explicitly. So, using sem_init() (unnamed semaphore)
|
||||||
|
* in linux and using sem_open() (named semaphore) in macOS without
|
||||||
|
* sem_unlink() are reasonable (?).
|
||||||
|
*/
|
||||||
|
sem_t *sem_create(int value);
|
||||||
|
int sem_release(sem_t *sem);
|
||||||
|
|
||||||
#endif /* _PLATFORM_H_ */
|
#endif /* _PLATFORM_H_ */
|
||||||
|
|||||||
18
src/pymscp.c
18
src/pymscp.c
@@ -74,7 +74,7 @@ static int release_instance(struct instance *i)
|
|||||||
|
|
||||||
/* wrapper functions */
|
/* wrapper functions */
|
||||||
|
|
||||||
static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
|
static PyObject *wrap_mscp_init(PyObject *self, PyObject *args, PyObject *kw)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Initialize struct mscp with options. wrap_mscp_init
|
* Initialize struct mscp with options. wrap_mscp_init
|
||||||
@@ -89,10 +89,14 @@ static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
|
|||||||
/* mscp_opts */
|
/* mscp_opts */
|
||||||
"nr_threads", /* int */
|
"nr_threads", /* int */
|
||||||
"nr_ahead", /* int */
|
"nr_ahead", /* int */
|
||||||
|
|
||||||
"min_chunk_sz", /* unsigned long */
|
"min_chunk_sz", /* unsigned long */
|
||||||
"max_chunk_sz", /* unsigned long */
|
"max_chunk_sz", /* unsigned long */
|
||||||
"buf_sz", /* unsigned long */
|
"buf_sz", /* unsigned long */
|
||||||
|
|
||||||
"coremask", /* const char * */
|
"coremask", /* const char * */
|
||||||
|
|
||||||
|
"max_startups", /* int */
|
||||||
"severity", /* int, MSCP_SERVERITY_* */
|
"severity", /* int, MSCP_SERVERITY_* */
|
||||||
"msg_fd", /* int */
|
"msg_fd", /* int */
|
||||||
|
|
||||||
@@ -100,17 +104,19 @@ static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
|
|||||||
"login_name", /* const char * */
|
"login_name", /* const char * */
|
||||||
"port", /* const char * */
|
"port", /* const char * */
|
||||||
"identity", /* const char * */
|
"identity", /* const char * */
|
||||||
|
|
||||||
"cipher", /* const char * */
|
"cipher", /* const char * */
|
||||||
"hmac", /* const char * */
|
"hmac", /* const char * */
|
||||||
"compress", /* const char * */
|
"compress", /* const char * */
|
||||||
"password", /* const char * */
|
"password", /* const char * */
|
||||||
"passphrase", /* const char * */
|
"passphrase", /* const char * */
|
||||||
|
|
||||||
"debug_level", /* int */
|
"debug_level", /* int */
|
||||||
"no_hostkey_check", /* bool */
|
"no_hostkey_check", /* bool */
|
||||||
"enable_nagle", /* bool */
|
"enable_nagle", /* bool */
|
||||||
NULL,
|
NULL,
|
||||||
};
|
};
|
||||||
const char *fmt = "si" "|iikkksii" "ssssssssipp";
|
const char *fmt = "si" "|" "ii" "kkk" "s" "iii" "sss" "sssss" "ipp";
|
||||||
char *coremask = NULL;
|
char *coremask = NULL;
|
||||||
char *login_name = NULL, *port = NULL, *identity = NULL;
|
char *login_name = NULL, *port = NULL, *identity = NULL;
|
||||||
char *cipher = NULL, *hmac = NULL, *compress = NULL;
|
char *cipher = NULL, *hmac = NULL, *compress = NULL;
|
||||||
@@ -137,6 +143,7 @@ static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
|
|||||||
&i->mo.max_chunk_sz,
|
&i->mo.max_chunk_sz,
|
||||||
&i->mo.buf_sz,
|
&i->mo.buf_sz,
|
||||||
&coremask,
|
&coremask,
|
||||||
|
&i->mo.max_startups,
|
||||||
&i->mo.severity,
|
&i->mo.severity,
|
||||||
&i->mo.msg_fd,
|
&i->mo.msg_fd,
|
||||||
&login_name,
|
&login_name,
|
||||||
@@ -175,6 +182,7 @@ static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
|
|||||||
|
|
||||||
i->m = mscp_init(remote, direction, &i->mo, &i->so);
|
i->m = mscp_init(remote, direction, &i->mo, &i->so);
|
||||||
if (!i->m) {
|
if (!i->m) {
|
||||||
|
PyErr_Format(PyExc_RuntimeError, "%s", mscp_get_error());
|
||||||
free(i);
|
free(i);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -260,7 +268,7 @@ static PyObject *wrap_mscp_set_dst_path(PyObject *self, PyObject *args, PyObject
|
|||||||
return Py_BuildValue("");
|
return Py_BuildValue("");
|
||||||
}
|
}
|
||||||
|
|
||||||
static PyObject *wrap_mscp_prepare(PyObject *self, PyObject *args, PyObject *kw)
|
static PyObject *wrap_mscp_scan(PyObject *self, PyObject *args, PyObject *kw)
|
||||||
{
|
{
|
||||||
char *keywords[] = { "m", NULL };
|
char *keywords[] = { "m", NULL };
|
||||||
unsigned long long addr;
|
unsigned long long addr;
|
||||||
@@ -275,7 +283,7 @@ static PyObject *wrap_mscp_prepare(PyObject *self, PyObject *args, PyObject *kw)
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mscp_prepare(m) < 0) {
|
if (mscp_scan(m) < 0) {
|
||||||
PyErr_Format(PyExc_RuntimeError, mscp_get_error());
|
PyErr_Format(PyExc_RuntimeError, mscp_get_error());
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -429,7 +437,7 @@ static PyMethodDef pymscpMethods[] = {
|
|||||||
METH_VARARGS | METH_KEYWORDS, NULL
|
METH_VARARGS | METH_KEYWORDS, NULL
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"mscp_prepare", (PyCFunction)wrap_mscp_prepare,
|
"mscp_scan", (PyCFunction)wrap_mscp_scan,
|
||||||
METH_VARARGS | METH_KEYWORDS, NULL
|
METH_VARARGS | METH_KEYWORDS, NULL
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -31,8 +31,6 @@
|
|||||||
#define pr_debug(fmt, ...)
|
#define pr_debug(fmt, ...)
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define strerrno() strerror(errno)
|
|
||||||
|
|
||||||
|
|
||||||
#define min(a, b) (((a) > (b)) ? (b) : (a))
|
#define min(a, b) (((a) > (b)) ? (b) : (a))
|
||||||
#define max(a, b) (((a) > (b)) ? (a) : (b))
|
#define max(a, b) (((a) > (b)) ? (a) : (b))
|
||||||
|
|||||||
@@ -71,6 +71,7 @@ param_kwargs = [
|
|||||||
{ "min_chunk_sz": 1 * 1024 * 1024 },
|
{ "min_chunk_sz": 1 * 1024 * 1024 },
|
||||||
{ "max_chunk_sz": 64 * 1024 * 1024 },
|
{ "max_chunk_sz": 64 * 1024 * 1024 },
|
||||||
{ "coremask": "0x0f" },
|
{ "coremask": "0x0f" },
|
||||||
|
{ "max_startups": 5 },
|
||||||
{ "severity": mscp.SEVERITY_NONE },
|
{ "severity": mscp.SEVERITY_NONE },
|
||||||
{ "cipher": "aes128-gcm@openssh.com" },
|
{ "cipher": "aes128-gcm@openssh.com" },
|
||||||
{ "compress": "yes" },
|
{ "compress": "yes" },
|
||||||
@@ -102,3 +103,22 @@ def test_login_failed():
|
|||||||
m = mscp.mscp(remote, mscp.LOCAL2REMOTE, port = "65534")
|
m = mscp.mscp(remote, mscp.LOCAL2REMOTE, port = "65534")
|
||||||
with pytest.raises(RuntimeError) as e:
|
with pytest.raises(RuntimeError) as e:
|
||||||
m.connect()
|
m.connect()
|
||||||
|
|
||||||
|
|
||||||
|
param_invalid_kwargs = [
|
||||||
|
{ "nr_threads": -1 },
|
||||||
|
{ "nr_ahead": -1 },
|
||||||
|
{ "min_chunk_sz": 1 },
|
||||||
|
{ "max_chunk_sz": 1 },
|
||||||
|
{ "coremask": "xxxxx" },
|
||||||
|
{ "max_startups": -1 },
|
||||||
|
{ "cipher": "invalid" },
|
||||||
|
{ "hmac": "invalid"},
|
||||||
|
{ "compress": "invalid"},
|
||||||
|
]
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("kw", param_invalid_kwargs)
|
||||||
|
def test_invalid_options(kw):
|
||||||
|
with pytest.raises(RuntimeError) as e:
|
||||||
|
m = mscp.mscp("localhost", mscp.LOCAL2REMOTE, **kw)
|
||||||
|
m.connect()
|
||||||
|
|||||||
@@ -16,6 +16,9 @@ class File():
|
|||||||
self.content = content
|
self.content = content
|
||||||
self.perm = perm
|
self.perm = perm
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "<file:{} {}-bytes>".format(self.path, self.size)
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.path
|
return self.path
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user