mirror of
https://github.com/upa/mscp.git
synced 2026-02-08 22:04:44 +08:00
Compare commits
33 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ab06c9531 | ||
|
|
a847ef1ea8 | ||
|
|
24e86f58d8 | ||
|
|
1d3b3a2261 | ||
|
|
575c920b6e | ||
|
|
1bd832a135 | ||
|
|
834407379d | ||
|
|
6be61e8adf | ||
|
|
8192151154 | ||
|
|
3f00bd2c7b | ||
|
|
5ac0874621 | ||
|
|
e0e6fae296 | ||
|
|
6305f02770 | ||
|
|
ae4b848ba0 | ||
|
|
3902fb584a | ||
|
|
4ec877a290 | ||
|
|
f0c70a148b | ||
|
|
e038b3020d | ||
|
|
2fdfa7b830 | ||
|
|
f5d0f526f2 | ||
|
|
a086e6a154 | ||
|
|
3bce4ec277 | ||
|
|
a923d40ada | ||
|
|
24fef5f539 | ||
|
|
4e80b05da7 | ||
|
|
98eca409af | ||
|
|
cf99a439cb | ||
|
|
3077bb0856 | ||
|
|
72c27f16d6 | ||
|
|
9b0eb668f9 | ||
|
|
5f9f20f150 | ||
|
|
ceb9ebd5a8 | ||
|
|
3810d6314d |
4
.github/workflows/release.yml
vendored
4
.github/workflows/release.yml
vendored
@@ -45,8 +45,8 @@ jobs:
|
||||
files: |
|
||||
${{github.workspace}}/build/mscp_ubuntu-20.04-x86_64.deb
|
||||
${{github.workspace}}/build/mscp_ubuntu-22.04-x86_64.deb
|
||||
${{github.workspace}}/build/mscp_centos-8-x86_64.rpm
|
||||
${{github.workspace}}/build/mscp_rocky-8.6-x86_64.rpm
|
||||
${{github.workspace}}/build/mscp_rocky-8.8-x86_64.rpm
|
||||
${{github.workspace}}/build/mscp_almalinux-8.8-x86_64.rpm
|
||||
${{github.workspace}}/build/mscp_alpine-3.17-x86_64.static
|
||||
${{github.workspace}}/build/mscp.linux.x86.static
|
||||
|
||||
|
||||
@@ -121,9 +121,8 @@ enable_testing()
|
||||
|
||||
|
||||
# CPACK Rules
|
||||
set(CPACK_SET_DESTDIR true)
|
||||
set(CPACK_PROJECT_NAME ${PROJECT_NAME})
|
||||
set(CPACK_PROJECT_VERSION ${PROJECT_VERSION})
|
||||
#set(CPACK_SET_DESTDIR true)
|
||||
set(CPACK_PACKAGE_NAME ${PROJECT_NAME})
|
||||
set(CPACK_PACKAGE_CONTACT "Ryo Nakamura <upa@haeena.net>")
|
||||
set(CPACK_PACKAGE_DESCRIPTION
|
||||
"mscp, copy files over multiple ssh connections")
|
||||
@@ -153,6 +152,7 @@ if(UNIX AND NOT APPLE) # on linux
|
||||
set(CPACK_RPM_PACKAGE_REQUIRES ${DIST_DEP})
|
||||
set(CPACK_RPM_PACKAGE_HOMEPAGE "https://github.com/upa/mscp")
|
||||
set(CPACK_RPM_PACKAGE_DESCRIPTION ${CPACK_PACKAGE_DESCRIPTION})
|
||||
set(CPACK_RPM_PACKAGE_LICENSE "GPLv3")
|
||||
endif() # on linux
|
||||
|
||||
include(CPack)
|
||||
@@ -162,9 +162,9 @@ include(CPack)
|
||||
# Custom targets to build and test mscp in docker containers.
|
||||
# foreach(IN ZIP_LISTS) (cmake >= 3.17) can shorten the following lists.
|
||||
# However, ubuntu 20.04 has cmake 3.16.3. So this is a roundabout trick.
|
||||
list(APPEND DIST_NAMES ubuntu ubuntu centos rocky alpine)
|
||||
list(APPEND DIST_VERS 20.04 22.04 8 8.6 3.17)
|
||||
list(APPEND DIST_PKGS deb deb rpm rpm static)
|
||||
list(APPEND DIST_NAMES ubuntu ubuntu rocky almalinux alpine)
|
||||
list(APPEND DIST_VERS 20.04 22.04 8.8 8.8 3.17)
|
||||
list(APPEND DIST_PKGS deb deb rpm rpm static)
|
||||
|
||||
list(LENGTH DIST_NAMES _DIST_LISTLEN)
|
||||
math(EXPR DIST_LISTLEN "${_DIST_LISTLEN} - 1")
|
||||
|
||||
36
README.md
36
README.md
@@ -38,19 +38,30 @@ brew install upa/tap/mscp
|
||||
|
||||
- Ubuntu 22.04
|
||||
```console
|
||||
wget https://github.com/upa/mscp/releases/download/v0.0.6/mscp_0.0.6-ubuntu-22.04-x86_64.deb
|
||||
apt-get install -f ./mscp_0.0.6-ubuntu-22.04-x86_64.deb
|
||||
wget https://github.com/upa/mscp/releases/latest/download/mscp_ubuntu-22.04-x86_64.deb
|
||||
apt-get install -f ./mscp_ubuntu-22.04-x86_64.deb
|
||||
```
|
||||
|
||||
- Ubuntu 20.04
|
||||
```console
|
||||
wget https://github.com/upa/mscp/releases/download/v0.0.6/mscp_0.0.6-ubuntu-20.04-x86_64.deb
|
||||
apt-get install -f ./mscp_0.0.6-ubuntu-20.04-x86_64.deb
|
||||
wget https://github.com/upa/mscp/releases/latest/download/mscp_ubuntu-20.04-x86_64.deb
|
||||
apt-get install -f ./mscp_ubuntu-20.04-x86_64.deb
|
||||
```
|
||||
|
||||
- Rocky 8.6
|
||||
- Rocky 8.8
|
||||
```console
|
||||
yum install https://github.com/upa/mscp/releases/download/v0.0.6/mscp_0.0.6-rocky-8.6-x86_64.rpm
|
||||
yum install https://github.com/upa/mscp/releases/latest/download/mscp_rocky-8.8-x86_64.rpm
|
||||
```
|
||||
|
||||
- Alma 8.8
|
||||
```console
|
||||
yum install https://github.com/upa/mscp/releases/latest/download/mscp_almalinux-8.8-x86_64.rpm
|
||||
```
|
||||
|
||||
- Linux with single binary `mscp` (x86_64 only)
|
||||
```console
|
||||
wget https://github.com/upa/mscp/releases/latest/download/mscp.linux.x86.static -O /usr/local/bin/mscp
|
||||
chmod 755 /usr/local/bin/mscp
|
||||
```
|
||||
|
||||
|
||||
@@ -61,7 +72,7 @@ patch introduces asynchronous SFTP Write, which is derived from
|
||||
https://github.com/limes-datentechnik-gmbh/libssh (see [Re: SFTP Write
|
||||
async](https://archive.libssh.org/libssh/2020-06/0000004.html)).
|
||||
|
||||
Currently macOS and Linux (Ubuntu, CentOS, Rocky) are supported.
|
||||
Currently macOS and Linux (Ubuntu, Rocky and Alma) are supported.
|
||||
|
||||
```console
|
||||
# clone this repository
|
||||
@@ -98,9 +109,9 @@ of libssh. So you can start from cmake with it.
|
||||
|
||||
```console
|
||||
$ mscp
|
||||
mscp v0.0.7: copy files over multiple ssh connections
|
||||
mscp v0.0.8: copy files over multiple ssh connections
|
||||
|
||||
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]
|
||||
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask] [-u max_startups]
|
||||
[-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]
|
||||
[-l login_name] [-p port] [-i identity_file]
|
||||
[-c cipher_spec] [-M hmac_spec] [-C compress] source ... target
|
||||
@@ -124,7 +135,7 @@ $ mscp -n 5 -m 0x1f -c aes128-gcm@openssh.com /var/ram/test.img 10.0.0.1:/var/ra
|
||||
- `-v` option increments verbose output level.
|
||||
|
||||
```console
|
||||
$ mscp test 10.0.0.:
|
||||
$ mscp test 10.0.0.1:
|
||||
[=======================================] 100% 49B /49B 198.8B/s 00:00 ETA
|
||||
```
|
||||
|
||||
@@ -153,15 +164,16 @@ copy done: test/testdir/asdf
|
||||
|
||||
```console
|
||||
$ mscp -h
|
||||
mscp v0.0.7: copy files over multiple ssh connections
|
||||
mscp v0.0.8: copy files over multiple ssh connections
|
||||
|
||||
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]
|
||||
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask] [-u max_startups]
|
||||
[-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]
|
||||
[-l login_name] [-p port] [-i identity_file]
|
||||
[-c cipher_spec] [-M hmac_spec] [-C compress] source ... target
|
||||
|
||||
-n NR_CONNECTIONS number of connections (default: floor(log(cores)*2)+1)
|
||||
-m COREMASK hex value to specify cores where threads pinned
|
||||
-u MAX_STARTUPS number of concurrent outgoing connections (default: 8)
|
||||
-s MIN_CHUNK_SIZE min chunk size (default: 64MB)
|
||||
-S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)
|
||||
|
||||
|
||||
@@ -8,9 +8,7 @@ docker build -t mscp-ubuntu:20.04 -f docker/ubuntu-20.04.Dockerfile .
|
||||
|
||||
docker build -t mscp-ubuntu:22.04 -f docker/ubuntu-22.04.Dockerfile .
|
||||
|
||||
docker build -t mscp-centos:8 -f docker/centos-8.Dockerfile .
|
||||
|
||||
docker build -t mscp-rocky:8.6 -f docker/rocky-8.6.Dockerfile .
|
||||
docker build -t mscp-rocky:8.8 -f docker/rocky-8.Dockerfile .
|
||||
```
|
||||
|
||||
Test `mscp` in the containers.
|
||||
@@ -20,25 +18,20 @@ docker run --init --rm mscp-ubuntu:20.04 /mscp/scripts/test-in-container.sh
|
||||
|
||||
docker run --init --rm mscp-ubuntu:22.04 /mscp/scripts/test-in-container.sh
|
||||
|
||||
docker run --init --rm mscp-centos:8 /mscp/scripts/test-in-container.sh
|
||||
|
||||
docker run --init --rm mscp-rocky:8.6 /mscp/scripts/test-in-container.sh
|
||||
docker run --init --rm mscp-rocky:8.9 /mscp/scripts/test-in-container.sh
|
||||
```
|
||||
|
||||
Retrieve deb/rpm packages.
|
||||
|
||||
```console
|
||||
docker run --rm -v (pwd):/out mscp-ubuntu:20.04 \
|
||||
cp /mscp/build/mscp_0.0.0-ubuntu-20.04-x86_64.deb /out/
|
||||
cp /mscp/build/mscp_ubuntu-20.04-x86_64.deb /out/
|
||||
|
||||
docker run --rm -v (pwd):/out mscp-ubuntu:22.04 \
|
||||
cp /mscp/build/mscp_0.0.0-ubuntu-22.04-x86_64.deb /out/
|
||||
cp /mscp/build/mscp_ubuntu-22.04-x86_64.deb /out/
|
||||
|
||||
docker run --rm -v (pwd):/out mscp-centos:8 \
|
||||
cp /mscp/build/mscp_0.0.0-centos-8-x86_64.rpm /out/
|
||||
|
||||
docker run --rm -v (pwd):/out mscp-rocky:8.6 \
|
||||
cp /mscp/build/mscp_0.0.0-rocky-8.6-x86_64.rpm /out/
|
||||
docker run --rm -v (pwd):/out mscp-rocky:8.8 \
|
||||
cp /mscp/build/mscp_rocky-8.8-x86_64.rpm /out/
|
||||
```
|
||||
|
||||
I don't know whether these are good way.
|
||||
@@ -1,8 +1,4 @@
|
||||
FROM rockylinux:8.6
|
||||
|
||||
ARG mscpdir="/mscp"
|
||||
|
||||
COPY . ${mscpdir}
|
||||
FROM almalinux:8.8
|
||||
|
||||
# install pytest, sshd for test, and rpm-build
|
||||
RUN set -ex && yum -y install \
|
||||
@@ -17,6 +13,11 @@ RUN mkdir /var/run/sshd \
|
||||
&& ssh-keygen -f /root/.ssh/id_rsa -N "" \
|
||||
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
|
||||
|
||||
|
||||
ARG mscpdir="/mscp"
|
||||
|
||||
COPY . ${mscpdir}
|
||||
|
||||
# install build dependency
|
||||
RUN ${mscpdir}/scripts/install-build-deps.sh
|
||||
|
||||
@@ -2,18 +2,26 @@ FROM alpine:3.17
|
||||
|
||||
# Build mscp with conan to create single binary mscp
|
||||
|
||||
ARG mscpdir="/mscp"
|
||||
|
||||
COPY . ${mscpdir}
|
||||
|
||||
RUN apk add --no-cache \
|
||||
gcc make cmake python3 py3-pip perl linux-headers libc-dev \
|
||||
openssh bash python3-dev g++
|
||||
|
||||
RUN pip3 install conan pytest
|
||||
|
||||
# preparation for sshd
|
||||
RUN ssh-keygen -A
|
||||
RUN mkdir /var/run/sshd \
|
||||
&& ssh-keygen -f /root/.ssh/id_rsa -N "" \
|
||||
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
|
||||
|
||||
|
||||
# Build mscp as a single binary
|
||||
RUN conan profile detect --force
|
||||
|
||||
ARG mscpdir="/mscp"
|
||||
|
||||
COPY . ${mscpdir}
|
||||
|
||||
RUN cd ${mscpdir} \
|
||||
&& rm -rf build \
|
||||
&& conan install . --output-folder=build --build=missing \
|
||||
@@ -25,15 +33,8 @@ RUN cd ${mscpdir} \
|
||||
&& make \
|
||||
&& cp mscp /usr/bin/ \
|
||||
&& cp mscp /mscp/build/mscp_alpine-3.17-x86_64.static
|
||||
|
||||
# copy mscp to PKG FILE NAME because this build doesn't use CPACK
|
||||
|
||||
# preparation for sshd
|
||||
RUN ssh-keygen -A
|
||||
RUN mkdir /var/run/sshd \
|
||||
&& ssh-keygen -f /root/.ssh/id_rsa -N "" \
|
||||
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
|
||||
|
||||
# install mscp python module
|
||||
RUN cd ${mscpdir} \
|
||||
&& python3 setup.py install --user
|
||||
|
||||
@@ -1,16 +1,7 @@
|
||||
FROM centos:8
|
||||
|
||||
ARG mscpdir="/mscp"
|
||||
|
||||
COPY . ${mscpdir}
|
||||
|
||||
# from https://stackoverflow.com/questions/70963985/error-failed-to-download-metadata-for-repo-appstream-cannot-prepare-internal
|
||||
RUN cd /etc/yum.repos.d/
|
||||
RUN sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-*
|
||||
RUN sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-*
|
||||
FROM rockylinux:8.8
|
||||
|
||||
# install pytest, sshd for test, and rpm-build
|
||||
RUN set -ex && yum -y update && yum -y install \
|
||||
RUN set -ex && yum -y install \
|
||||
python3 python3-pip python3-devel openssh openssh-server openssh-clients rpm-build
|
||||
|
||||
RUN python3 -m pip install pytest
|
||||
@@ -22,13 +13,17 @@ RUN mkdir /var/run/sshd \
|
||||
&& ssh-keygen -f /root/.ssh/id_rsa -N "" \
|
||||
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
|
||||
|
||||
ARG mscpdir="/mscp"
|
||||
|
||||
COPY . ${mscpdir}
|
||||
|
||||
# install build dependency
|
||||
RUN ${mscpdir}/scripts/install-build-deps.sh
|
||||
|
||||
# build
|
||||
RUN cd ${mscpdir} \
|
||||
&& rm -rf build \
|
||||
&& cmake -B build \
|
||||
&& cmake -B build \
|
||||
&& cd ${mscpdir}/build \
|
||||
&& make \
|
||||
&& cpack -G RPM CPackConfig.cmake \
|
||||
@@ -37,3 +32,4 @@ RUN cd ${mscpdir} \
|
||||
# install mscp python module
|
||||
RUN cd ${mscpdir} \
|
||||
&& python3 setup.py install --user
|
||||
|
||||
@@ -1,10 +1,6 @@
|
||||
FROM ubuntu:20.04
|
||||
|
||||
ARG DEBIAN_FRONTEND=noninteractive
|
||||
ARG mscpdir="/mscp"
|
||||
|
||||
COPY . ${mscpdir}
|
||||
|
||||
RUN set -ex && apt-get update && apt-get install -y --no-install-recommends \
|
||||
ca-certificates
|
||||
|
||||
@@ -21,6 +17,10 @@ RUN mkdir /var/run/sshd \
|
||||
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
|
||||
|
||||
|
||||
ARG mscpdir="/mscp"
|
||||
|
||||
COPY . ${mscpdir}
|
||||
|
||||
# install build dependency
|
||||
RUN ${mscpdir}/scripts/install-build-deps.sh
|
||||
|
||||
|
||||
@@ -1,10 +1,6 @@
|
||||
FROM ubuntu:22.04
|
||||
|
||||
ARG DEBIAN_FRONTEND=noninteractive
|
||||
ARG mscpdir="/mscp"
|
||||
|
||||
COPY . ${mscpdir}
|
||||
|
||||
RUN set -ex && apt-get update && apt-get install -y --no-install-recommends \
|
||||
ca-certificates
|
||||
|
||||
@@ -20,6 +16,9 @@ RUN mkdir /var/run/sshd \
|
||||
&& ssh-keygen -f /root/.ssh/id_rsa -N "" \
|
||||
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
|
||||
|
||||
ARG mscpdir="/mscp"
|
||||
|
||||
COPY . ${mscpdir}
|
||||
|
||||
# install build dependency
|
||||
RUN ${mscpdir}/scripts/install-build-deps.sh
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
* 2. connect to remote host with mscp_connect()
|
||||
* 3. add path to source files with mscp_add_src_path()
|
||||
* 4. set path to destination with mscp_set_dst_path()
|
||||
* 5. finish preparation with mscp_prepare()
|
||||
* 5. start to scan source files with mscp_scan()
|
||||
* 6. start copy with mscp_start()
|
||||
* 7. wait for copy finished with mscp_join()
|
||||
* 8. cleanup mscp instance with mscp_cleanup() and mscp_free()
|
||||
@@ -43,6 +43,7 @@ struct mscp_opts {
|
||||
size_t max_chunk_sz; /** maximum chunk size (default file size/nr_threads) */
|
||||
size_t buf_sz; /** buffer size, default 16k. */
|
||||
char coremask[MSCP_MAX_COREMASK_STR]; /** hex to specifiy usable cpu cores */
|
||||
int max_startups; /* sshd MaxStartups concurrent connections */
|
||||
|
||||
int severity; /** messaging severity. set MSCP_SERVERITY_* */
|
||||
int msg_fd; /** fd to output message. default STDOUT (0),
|
||||
@@ -109,7 +110,7 @@ struct mscp *mscp_init(const char *remote_host, int direction,
|
||||
/**
|
||||
* @brief Connect the first SSH connection. mscp_connect connects to
|
||||
* remote host and initialize a SFTP session over the
|
||||
* connection. mscp_prepare() and mscp_start() require mscp_connect()
|
||||
* connection. mscp_scan() and mscp_start() require mscp_connect()
|
||||
* beforehand.
|
||||
*
|
||||
* @param m mscp instance.
|
||||
@@ -149,20 +150,31 @@ int mscp_add_src_path(struct mscp *m, const char *src_path);
|
||||
*/
|
||||
int mscp_set_dst_path(struct mscp *m, const char *dst_path);
|
||||
|
||||
/* check source files, resolve destination file paths for all source
|
||||
* files, and prepare chunks for all files. */
|
||||
/* scan source files, resolve destination file paths for all source
|
||||
* files, and calculate chunks for all files. */
|
||||
|
||||
/**
|
||||
* @brief Prepare for file transfer. This function checks all source
|
||||
* files (recursively), resolve paths on the destination side, and
|
||||
* calculate file chunks.
|
||||
* @brief Scan source paths and prepare. This function checks all
|
||||
* source files (recursively), resolve paths on the destination side,
|
||||
* and calculate file chunks. This function is non-blocking.
|
||||
*
|
||||
* @param m mscp instance.
|
||||
*
|
||||
* @return 0 on success, < 0 if an error occured.
|
||||
* mscp_get_error() can be used to retrieve error message.
|
||||
*/
|
||||
int mscp_prepare(struct mscp *m);
|
||||
int mscp_scan(struct mscp *m);
|
||||
|
||||
/**
|
||||
* @brief Join scna thread invoked by mscp_scan(). mscp_join()
|
||||
* involves this, so that mscp_scan_join() should be called when
|
||||
* mscp_scan() is called by mscp_start() is not.
|
||||
*
|
||||
* @param m mscp instance.
|
||||
* @return 0 on success, < 0 if an error occured.
|
||||
* mscp_get_error() can be used to retrieve error message.
|
||||
*/
|
||||
int mscp_scan_join(struct mscp *m);
|
||||
|
||||
/**
|
||||
* @brief Start to copy files. mscp_start() returns immediately. You
|
||||
@@ -172,7 +184,7 @@ int mscp_prepare(struct mscp *m);
|
||||
*
|
||||
* @param m mscp instance.
|
||||
*
|
||||
* @return 0 on success, < 0 if an error occured.
|
||||
* @return number of threads on success, < 0 if an error occured.
|
||||
* mscp_get_error() can be used to retrieve error message.
|
||||
*
|
||||
* @see mscp_join()
|
||||
@@ -245,15 +257,6 @@ enum {
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* @brief Set a file descriptor for receiving messages from mscp.
|
||||
* This function has the same effect with setting mscp_opts->msg_fd.
|
||||
*
|
||||
* @param m mscp instance.
|
||||
* @param fd fd to which libmscp writes messages.
|
||||
*/
|
||||
void mscp_set_msg_fd(struct mscp *m, int fd);
|
||||
|
||||
|
||||
/**
|
||||
* @brief Get the recent error message from libmscp. Note that this
|
||||
|
||||
23
mscp/mscp.py
23
mscp/mscp.py
@@ -37,7 +37,7 @@ SEVERITY_DEBUG = pymscp.SEVERITY_DEBUG
|
||||
|
||||
STATE_INIT = 0
|
||||
STATE_CONNECTED = 1
|
||||
STATE_PREPARED = 2
|
||||
STATE_SCANNED = 2
|
||||
STATE_RUNNING = 3
|
||||
STATE_STOPPED = 4
|
||||
STATE_JOINED = 5
|
||||
@@ -47,7 +47,7 @@ STATE_RELEASED = 7
|
||||
_state_str = {
|
||||
STATE_INIT: "init",
|
||||
STATE_CONNECTED: "connected",
|
||||
STATE_PREPARED: "prepared",
|
||||
STATE_SCANNED: "scanned",
|
||||
STATE_RUNNING: "running",
|
||||
STATE_STOPPED: "stopped",
|
||||
STATE_JOINED: "joined",
|
||||
@@ -71,6 +71,9 @@ class mscp:
|
||||
self.state = STATE_INIT
|
||||
|
||||
def __str__(self):
|
||||
if not hasattr(self, "state"):
|
||||
# this instance failed on mscp_init
|
||||
return "mscp:{}:init-failed"
|
||||
return "mscp:{}:{}".format(self.remote, self.__state2str())
|
||||
|
||||
def __repr__(self):
|
||||
@@ -100,26 +103,30 @@ class mscp:
|
||||
self.state = STATE_CONNECTED
|
||||
|
||||
def add_src_path(self, src_path: str):
|
||||
if type(src_path) != str:
|
||||
raise ValueError("src_path must be str: {}".format(src_path))
|
||||
self.src_paths.append(src_path)
|
||||
pymscp.mscp_add_src_path(m = self.m, src_path = src_path)
|
||||
|
||||
def set_dst_path(self, dst_path: str):
|
||||
if type(dst_path) != str:
|
||||
raise ValueError("dst_path must be str: {}".format(dst_path))
|
||||
self.dst_path = dst_path
|
||||
pymscp.mscp_set_dst_path(m = self.m, dst_path = dst_path);
|
||||
|
||||
def prepare(self):
|
||||
def scan(self):
|
||||
if self.state != STATE_CONNECTED:
|
||||
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
|
||||
if not self.src_paths:
|
||||
raise RuntimeError("src path list is empty")
|
||||
if not self.dst_path:
|
||||
if self.dst_path == None:
|
||||
raise RuntimeError("dst path is not set")
|
||||
|
||||
pymscp.mscp_prepare(m = self.m)
|
||||
self.state = STATE_PREPARED
|
||||
pymscp.mscp_scan(m = self.m)
|
||||
self.state = STATE_SCANNED
|
||||
|
||||
def start(self):
|
||||
if self.state != STATE_PREPARED:
|
||||
if self.state != STATE_SCANNED:
|
||||
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
|
||||
|
||||
pymscp.mscp_start(m = self.m)
|
||||
@@ -167,7 +174,7 @@ class mscp:
|
||||
|
||||
self.set_dst_path(dst)
|
||||
|
||||
self.prepare()
|
||||
self.scan()
|
||||
self.start()
|
||||
if nonblock:
|
||||
return
|
||||
|
||||
@@ -19,7 +19,7 @@ case $platform in
|
||||
apt-get install -y \
|
||||
gcc make cmake zlib1g-dev libssl-dev libkrb5-dev
|
||||
;;
|
||||
Linux-centos* | Linux-rhel* | Linux-rocky*)
|
||||
Linux-centos* | Linux-rhel* | Linux-rocky* | Linux-almalinux)
|
||||
yum install -y \
|
||||
gcc make cmake zlib-devel openssl-devel rpm-build
|
||||
;;
|
||||
|
||||
@@ -14,10 +14,10 @@ case $release in
|
||||
ubuntu-22.04*)
|
||||
echo "libc6 (>= 2.33), libgssapi-krb5-2 (>= 1.17), libssl3 (>= 3.0.0~~alpha1), zlib1g (>= 1:1.1.4)"
|
||||
;;
|
||||
centos* | rhel* | rocky*)
|
||||
centos* | rhel* | rocky* | almalinux*)
|
||||
echo "glibc crypto-policies krb5-libs openssl-libs libcom_err"
|
||||
;;
|
||||
*)
|
||||
echo "unsupported install dependency: $release"
|
||||
echo "$(basename $0): unsupported install dependency: $release"
|
||||
exit 1
|
||||
esac
|
||||
|
||||
@@ -11,6 +11,7 @@ set -x
|
||||
# Run sshd
|
||||
if [ ! -e /var/run/sshd.pid ]; then
|
||||
/usr/sbin/sshd
|
||||
sleep 1
|
||||
fi
|
||||
|
||||
ssh-keyscan localhost >> ${HOME}/.ssh/known_hosts
|
||||
|
||||
58
src/atomic.h
58
src/atomic.h
@@ -20,6 +20,8 @@ static inline refcnt refcnt_dec(refcnt *cnt)
|
||||
}
|
||||
|
||||
|
||||
/* mutex */
|
||||
|
||||
typedef pthread_mutex_t lock;
|
||||
|
||||
static inline void lock_init(lock *l)
|
||||
@@ -44,12 +46,58 @@ static inline void lock_release_via_cleanup(void *l)
|
||||
lock_release(l);
|
||||
}
|
||||
|
||||
#define LOCK_ACQUIRE_THREAD(l) \
|
||||
lock_acquire(l); \
|
||||
pthread_cleanup_push(lock_release_via_cleanup, l)
|
||||
#define LOCK_ACQUIRE(l) \
|
||||
lock_acquire(l); \
|
||||
pthread_cleanup_push(lock_release_via_cleanup, l)
|
||||
|
||||
|
||||
#define LOCK_RELEASE_THREAD() \
|
||||
#define LOCK_RELEASE() \
|
||||
pthread_cleanup_pop(1)
|
||||
|
||||
|
||||
|
||||
/* read/write lock */
|
||||
typedef pthread_rwlock_t rwlock;
|
||||
|
||||
static inline void rwlock_init(rwlock *rw)
|
||||
{
|
||||
pthread_rwlock_init(rw, NULL);
|
||||
}
|
||||
|
||||
static inline void rwlock_read_acquire(rwlock *rw)
|
||||
{
|
||||
int ret = pthread_rwlock_rdlock(rw);
|
||||
assert(ret == 0);
|
||||
}
|
||||
|
||||
static inline void rwlock_write_acquire(rwlock *rw)
|
||||
{
|
||||
int ret = pthread_rwlock_wrlock(rw);
|
||||
assert(ret == 0);
|
||||
}
|
||||
|
||||
static inline void rwlock_release(rwlock *rw)
|
||||
{
|
||||
int ret = pthread_rwlock_unlock(rw);
|
||||
assert(ret == 0);
|
||||
}
|
||||
|
||||
static inline void rwlock_release_via_cleanup(void *rw)
|
||||
{
|
||||
rwlock_release(rw);
|
||||
}
|
||||
|
||||
#define RWLOCK_READ_ACQUIRE(rw) \
|
||||
rwlock_read_acquire(rw); \
|
||||
pthread_cleanup_push(rwlock_release_via_cleanup, rw)
|
||||
|
||||
#define RWLOCK_WRITE_ACQUIRE(rw) \
|
||||
rwlock_write_acquire(rw); \
|
||||
pthread_cleanup_push(rwlock_release_via_cleanup, rw)
|
||||
|
||||
|
||||
#define RWLOCK_RELEASE() \
|
||||
pthread_cleanup_pop(1)
|
||||
|
||||
|
||||
|
||||
#endif /* _ATOMIC_H_ */
|
||||
|
||||
15
src/list.h
15
src/list.h
@@ -554,5 +554,20 @@ static inline int list_count(struct list_head *head)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* list_free_f - free items in a list with a function
|
||||
* @head the heaf for your list.
|
||||
* @f function that releases an item in the list.
|
||||
*/
|
||||
static inline void list_free_f(struct list_head *head, void (*f)(struct list_head *))
|
||||
{
|
||||
struct list_head *p, *n;
|
||||
|
||||
list_for_each_safe(p, n, head) {
|
||||
list_del(p);
|
||||
f(p);
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
54
src/main.c
54
src/main.c
@@ -24,7 +24,7 @@
|
||||
void usage(bool print_help) {
|
||||
printf("mscp v" VERSION ": copy files over multiple ssh connections\n"
|
||||
"\n"
|
||||
"Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]\n"
|
||||
"Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask] [-u max_startups]\n"
|
||||
" [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]\n"
|
||||
" [-l login_name] [-p port] [-i identity_file]\n"
|
||||
" [-c cipher_spec] [-M hmac_spec] [-C compress] source ... target\n"
|
||||
@@ -36,6 +36,8 @@ void usage(bool print_help) {
|
||||
printf(" -n NR_CONNECTIONS number of connections "
|
||||
"(default: floor(log(cores)*2)+1)\n"
|
||||
" -m COREMASK hex value to specify cores where threads pinned\n"
|
||||
" -u MAX_STARTUPS number of concurrent outgoing connections "
|
||||
"(default: 8)\n"
|
||||
" -s MIN_CHUNK_SIZE min chunk size (default: 64MB)\n"
|
||||
" -S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)\n"
|
||||
"\n"
|
||||
@@ -52,7 +54,8 @@ void usage(bool print_help) {
|
||||
" -i IDENTITY identity file for public key authentication\n"
|
||||
" -c CIPHER cipher spec\n"
|
||||
" -M HMAC hmac spec\n"
|
||||
" -C COMPRESS enable compression: yes, no, zlib, zlib@openssh.com\n"
|
||||
" -C COMPRESS enable compression: "
|
||||
"yes, no, zlib, zlib@openssh.com\n"
|
||||
" -H disable hostkey check\n"
|
||||
" -d increment ssh debug output level\n"
|
||||
" -N enable Nagle's algorithm (default disabled)\n"
|
||||
@@ -69,7 +72,7 @@ char *split_remote_and_path(const char *string, char **remote, char **path)
|
||||
*/
|
||||
|
||||
if (!(s = strdup(string))) {
|
||||
fprintf(stderr, "strdup: %s\n", strerrno());
|
||||
fprintf(stderr, "strdup: %s\n", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@@ -115,7 +118,7 @@ struct target *validate_targets(char **arg, int len)
|
||||
int n;
|
||||
|
||||
if ((t = calloc(len, sizeof(struct target))) == NULL) {
|
||||
fprintf(stderr, "calloc: %s\n", strerrno());
|
||||
fprintf(stderr, "calloc: %s\n", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
memset(t, 0, len * sizeof(struct target));
|
||||
@@ -204,7 +207,7 @@ int main(int argc, char **argv)
|
||||
memset(&o, 0, sizeof(o));
|
||||
o.severity = MSCP_SEVERITY_WARN;
|
||||
|
||||
while ((ch = getopt(argc, argv, "n:m:s:S:a:b:vqDrl:p:i:c:M:C:HdNh")) != -1) {
|
||||
while ((ch = getopt(argc, argv, "n:m:u:s:S:a:b:vqDrl:p:i:c:M:C:HdNh")) != -1) {
|
||||
switch (ch) {
|
||||
case 'n':
|
||||
o.nr_threads = atoi(optarg);
|
||||
@@ -217,6 +220,9 @@ int main(int argc, char **argv)
|
||||
case 'm':
|
||||
strncpy(o.coremask, optarg, sizeof(o.coremask));
|
||||
break;
|
||||
case 'u':
|
||||
o.max_startups = atoi(optarg);
|
||||
break;
|
||||
case 's':
|
||||
o.min_chunk_sz = atoi(optarg);
|
||||
break;
|
||||
@@ -323,7 +329,7 @@ int main(int argc, char **argv)
|
||||
|
||||
if (!dryrun) {
|
||||
if (pipe(pipe_fd) < 0) {
|
||||
fprintf(stderr, "pipe: %s\n", strerrno());
|
||||
fprintf(stderr, "pipe: %s\n", strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
msg_fd = pipe_fd[0];
|
||||
@@ -352,33 +358,33 @@ int main(int argc, char **argv)
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mscp_prepare(m) < 0) {
|
||||
fprintf(stderr, "mscp_prepare: %s\n", mscp_get_error());
|
||||
if (mscp_scan(m) < 0) {
|
||||
fprintf(stderr, "mscp_scan: %s\n", mscp_get_error());
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dryrun) {
|
||||
ret = 0;
|
||||
ret = mscp_scan_join(m);
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (pthread_create(&tid_stat, NULL, print_stat_thread, NULL) < 0) {
|
||||
fprintf(stderr, "pthread_create: %s\n", strerrno());
|
||||
fprintf(stderr, "pthread_create: %s\n", strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (signal(SIGINT, sigint_handler) == SIG_ERR) {
|
||||
fprintf(stderr, "signal: %s\n", strerrno());
|
||||
fprintf(stderr, "signal: %s\n", strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = mscp_start(m);
|
||||
if (ret < 0)
|
||||
fprintf(stderr, "%s\n", mscp_get_error());
|
||||
fprintf(stderr, "mscp_start: %s\n", mscp_get_error());
|
||||
|
||||
ret = mscp_join(m);
|
||||
if (ret != 0)
|
||||
fprintf(stderr, "%s\n", mscp_get_error());
|
||||
fprintf(stderr, "mscp_join: %s\n", mscp_get_error());
|
||||
|
||||
pthread_cancel(tid_stat);
|
||||
pthread_join(tid_stat, NULL);
|
||||
@@ -415,7 +421,8 @@ double calculate_bps(size_t diff, struct timeval *b, struct timeval *a)
|
||||
return (double)diff / calculate_timedelta(b, a);
|
||||
}
|
||||
|
||||
char *calculate_eta(size_t remain, size_t diff, struct timeval *b, struct timeval *a)
|
||||
char *calculate_eta(size_t remain, size_t diff, struct timeval *b, struct timeval *a,
|
||||
bool final)
|
||||
{
|
||||
static char buf[16];
|
||||
double elapsed = calculate_timedelta(b, a);
|
||||
@@ -423,7 +430,10 @@ char *calculate_eta(size_t remain, size_t diff, struct timeval *b, struct timeva
|
||||
|
||||
if (diff == 0)
|
||||
snprintf(buf, sizeof(buf), "--:-- ETA");
|
||||
else {
|
||||
else if (final) {
|
||||
snprintf(buf, sizeof(buf), "%02d:%02d ",
|
||||
(int)(floor(elapsed / 60)), (int)round(elapsed) % 60);
|
||||
} else {
|
||||
eta = remain / (diff / elapsed);
|
||||
snprintf(buf, sizeof(buf), "%02d:%02d ETA",
|
||||
(int)floor(eta / 60), (int)round(eta) % 60);
|
||||
@@ -468,7 +478,7 @@ void print_progress_bar(double percent, char *suffix)
|
||||
}
|
||||
|
||||
void print_progress(struct timeval *b, struct timeval *a,
|
||||
size_t total, size_t last, size_t done)
|
||||
size_t total, size_t last, size_t done, bool final)
|
||||
{
|
||||
char *bps_units[] = { "B/s ", "KB/s", "MB/s", "GB/s" };
|
||||
char *byte_units[] = { "B ", "KB", "MB", "GB", "TB", "PB" };
|
||||
@@ -503,7 +513,8 @@ void print_progress(struct timeval *b, struct timeval *a,
|
||||
|
||||
snprintf(suffix, sizeof(suffix), "%4lu%s/%lu%s %6.1f%s %s",
|
||||
done_round, byte_units[byte_du], total_round, byte_units[byte_tu],
|
||||
bps, bps_units[bps_u], calculate_eta(total - done, done - last, b, a));
|
||||
bps, bps_units[bps_u],
|
||||
calculate_eta(total - done, done - last, b, a, final));
|
||||
|
||||
print_progress_bar(percent, suffix);
|
||||
}
|
||||
@@ -527,7 +538,7 @@ void print_stat_thread_cleanup(void *arg)
|
||||
x.done = s.done;
|
||||
|
||||
/* print progress from the beginning */
|
||||
print_progress(&x.start, &x.after, x.total, 0, x.done);
|
||||
print_progress(&x.start, &x.after, x.total, 0, x.done, true);
|
||||
print_cli("\n"); /* final output */
|
||||
}
|
||||
|
||||
@@ -547,14 +558,14 @@ void *print_stat_thread(void *arg)
|
||||
|
||||
while (true) {
|
||||
if (poll(&pfd, 1, 100) < 0) {
|
||||
fprintf(stderr, "poll: %s\n", strerrno());
|
||||
fprintf(stderr, "poll: %s\n", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (pfd.revents & POLLIN) {
|
||||
memset(buf, 0, sizeof(buf));
|
||||
if (read(msg_fd, buf, sizeof(buf)) < 0) {
|
||||
fprintf(stderr, "read: %s\n", strerrno());
|
||||
fprintf(stderr, "read: %s\n", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
print_cli("\r\033[K" "%s", buf);
|
||||
@@ -566,7 +577,8 @@ void *print_stat_thread(void *arg)
|
||||
x.total = s.total;
|
||||
x.done = s.done;
|
||||
|
||||
print_progress(&x.before, &x.after, x.total, x.last, x.done);
|
||||
print_progress(&x.before, &x.after, x.total, x.last, x.done,
|
||||
false);
|
||||
x.before = x.after;
|
||||
x.last = x.done;
|
||||
}
|
||||
|
||||
@@ -4,10 +4,13 @@
|
||||
#include <limits.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#include <util.h>
|
||||
#include <message.h>
|
||||
|
||||
/* mscp error message buffer */
|
||||
/* strerror_r wrapper */
|
||||
__thread char thread_strerror[128];
|
||||
|
||||
/* mscp error message buffer */
|
||||
#define MSCP_ERRMSG_SIZE (PATH_MAX * 2)
|
||||
|
||||
static char errmsg[MSCP_ERRMSG_SIZE];
|
||||
@@ -30,29 +33,17 @@ const char *mscp_get_error()
|
||||
|
||||
/* message print functions */
|
||||
|
||||
static int mprint_serverity = MSCP_SEVERITY_WARN;
|
||||
static pthread_mutex_t mprint_lock = PTHREAD_MUTEX_INITIALIZER;
|
||||
static int mprint_severity = MSCP_SEVERITY_WARN;
|
||||
|
||||
void mprint_set_severity(int serverity)
|
||||
{
|
||||
if (serverity < 0)
|
||||
mprint_serverity = -1; /* no print */
|
||||
mprint_serverity = serverity;
|
||||
mprint_severity = -1; /* no print */
|
||||
mprint_severity = serverity;
|
||||
}
|
||||
|
||||
void mprint(int fd, int serverity, const char *fmt, ...)
|
||||
int mprint_get_severity()
|
||||
{
|
||||
va_list va;
|
||||
int ret;
|
||||
|
||||
if (fd < 0)
|
||||
return;
|
||||
|
||||
if (serverity <= mprint_serverity) {
|
||||
pthread_mutex_lock(&mprint_lock);
|
||||
va_start(va, fmt);
|
||||
vdprintf(fd, fmt, va);
|
||||
va_end(va);
|
||||
pthread_mutex_unlock(&mprint_lock);
|
||||
}
|
||||
return mprint_severity;
|
||||
}
|
||||
|
||||
|
||||
@@ -7,23 +7,47 @@
|
||||
|
||||
/* message print. printed messages are passed to application via msg_fd */
|
||||
void mprint_set_severity(int severity);
|
||||
void mprint(int fd, int severity, const char *fmt, ...);
|
||||
int mprint_get_severity();
|
||||
|
||||
#define mprint(fp, severity, fmt, ...) \
|
||||
do { \
|
||||
if (fp && severity <= mprint_get_severity()) { \
|
||||
fprintf(fp, fmt, ##__VA_ARGS__); \
|
||||
fflush(fp); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define mpr_err(fp, fmt, ...) \
|
||||
mprint(fp, MSCP_SEVERITY_ERR, fmt, ##__VA_ARGS__)
|
||||
#define mpr_warn(fp, fmt, ...) \
|
||||
mprint(fp, MSCP_SEVERITY_WARN, fmt, ##__VA_ARGS__)
|
||||
#define mpr_notice(fp, fmt, ...) \
|
||||
mprint(fp, MSCP_SEVERITY_NOTICE, fmt, ##__VA_ARGS__)
|
||||
#define mpr_info(fp, fmt, ...) \
|
||||
mprint(fp, MSCP_SEVERITY_INFO, fmt, ##__VA_ARGS__)
|
||||
#define mpr_debug(fp, fmt, ...) \
|
||||
mprint(fp, MSCP_SEVERITY_DEBUG, fmt, ##__VA_ARGS__)
|
||||
|
||||
|
||||
/* errorno wrapper */
|
||||
extern __thread char thread_strerror[128];
|
||||
|
||||
#ifdef _GNU_SOURCE
|
||||
/* GNU strerror_r */
|
||||
#define strerrno() \
|
||||
strerror_r(errno, thread_strerror, sizeof(thread_strerror))
|
||||
#else
|
||||
/* this macro assumes that strerror_r never fails. any good way? */
|
||||
#define strerrno() \
|
||||
(strerror_r(errno, thread_strerror, sizeof(thread_strerror)) \
|
||||
? thread_strerror : thread_strerror)
|
||||
#endif
|
||||
|
||||
#define mpr_err(fd, fmt, ...) \
|
||||
mprint(fd, MSCP_SEVERITY_ERR, fmt, ##__VA_ARGS__)
|
||||
#define mpr_warn(fd, fmt, ...) \
|
||||
mprint(fd, MSCP_SEVERITY_WARN, fmt, ##__VA_ARGS__)
|
||||
#define mpr_notice(fd, fmt, ...) \
|
||||
mprint(fd, MSCP_SEVERITY_NOTICE, fmt, ##__VA_ARGS__)
|
||||
#define mpr_info(fd, fmt, ...) \
|
||||
mprint(fd, MSCP_SEVERITY_INFO, fmt, ##__VA_ARGS__)
|
||||
#define mpr_debug(fd, fmt, ...) \
|
||||
mprint(fd, MSCP_SEVERITY_DEBUG, fmt, ##__VA_ARGS__)
|
||||
|
||||
|
||||
/* error message buffer */
|
||||
#define mscp_set_error(fmt, ...) \
|
||||
_mscp_set_error("%s:%d:%s: " fmt, \
|
||||
_mscp_set_error("%s:%d:%s: " fmt "\0", \
|
||||
basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
|
||||
|
||||
void _mscp_set_error(const char *fmt, ...);
|
||||
|
||||
445
src/mscp.c
445
src/mscp.c
@@ -2,7 +2,8 @@
|
||||
#include <unistd.h>
|
||||
#include <math.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#include <semaphore.h>
|
||||
#include <sys/time.h>
|
||||
|
||||
#include <list.h>
|
||||
#include <util.h>
|
||||
@@ -13,32 +14,43 @@
|
||||
#include <message.h>
|
||||
#include <mscp.h>
|
||||
|
||||
|
||||
struct mscp {
|
||||
char *remote; /* remote host (and uername) */
|
||||
int direction; /* copy direction */
|
||||
struct mscp_opts *opts;
|
||||
struct mscp_ssh_opts *ssh_opts;
|
||||
|
||||
int msg_fd; /* writer fd for message pipe */
|
||||
FILE *msg_fp; /* writer fd for message pipe */
|
||||
|
||||
int *cores; /* usable cpu cores by COREMASK */
|
||||
int nr_cores; /* length of array of cores */
|
||||
int *cores; /* usable cpu cores by COREMASK */
|
||||
int nr_cores; /* length of array of cores */
|
||||
|
||||
sem_t *sem; /* semaphore for concurrent
|
||||
* connecting ssh sessions */
|
||||
|
||||
sftp_session first; /* first sftp session */
|
||||
|
||||
char dst_path[PATH_MAX];
|
||||
struct list_head src_list;
|
||||
struct list_head path_list;
|
||||
struct list_head chunk_list;
|
||||
lock chunk_lock;
|
||||
struct chunk_pool cp;
|
||||
|
||||
pthread_t tid_scan; /* tid for scan thread */
|
||||
int ret_scan; /* return code from scan thread */
|
||||
|
||||
size_t total_bytes; /* total bytes to be transferred */
|
||||
struct mscp_thread *threads;
|
||||
|
||||
struct list_head thread_list;
|
||||
rwlock thread_rwlock;
|
||||
};
|
||||
|
||||
|
||||
struct mscp_thread {
|
||||
struct list_head list; /* mscp->thread_list */
|
||||
|
||||
struct mscp *m;
|
||||
int id;
|
||||
sftp_session sftp;
|
||||
pthread_t tid;
|
||||
int cpu;
|
||||
@@ -48,7 +60,7 @@ struct mscp_thread {
|
||||
};
|
||||
|
||||
struct src {
|
||||
struct list_head list;
|
||||
struct list_head list; /* mscp->src_list */
|
||||
char *path;
|
||||
};
|
||||
|
||||
@@ -62,8 +74,13 @@ struct src {
|
||||
* sftp_async_read returns 0.
|
||||
*/
|
||||
|
||||
#define DEFAULT_MAX_STARTUPS 8
|
||||
|
||||
#define non_null_string(s) (s[0] != '\0')
|
||||
|
||||
|
||||
|
||||
|
||||
static int expand_coremask(const char *coremask, int **cores, int *nr_cores)
|
||||
{
|
||||
int n, *core_list, core_list_len = 0, nr_usable, nr_all;
|
||||
@@ -178,6 +195,16 @@ static int validate_and_set_defaut_params(struct mscp_opts *o)
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (o->max_startups == 0)
|
||||
o->max_startups = DEFAULT_MAX_STARTUPS;
|
||||
else if (o->max_startups < 0) {
|
||||
mscp_set_error("invalid max_startups: %d", o->max_startups);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (o->msg_fd == 0)
|
||||
o->msg_fd = STDOUT_FILENO;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -188,7 +215,7 @@ struct mscp *mscp_init(const char *remote_host, int direction,
|
||||
int n;
|
||||
|
||||
if (!remote_host) {
|
||||
mscp_set_error("empty remote host\n");
|
||||
mscp_set_error("empty remote host");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@@ -198,22 +225,30 @@ struct mscp *mscp_init(const char *remote_host, int direction,
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mprint_set_severity(o->severity);
|
||||
|
||||
if (validate_and_set_defaut_params(o) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
m = malloc(sizeof(*m));
|
||||
if (!m) {
|
||||
mscp_set_error("failed to allocate memory: %s", strerrno());
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mprint_set_severity(o->severity);
|
||||
|
||||
if (validate_and_set_defaut_params(o) < 0)
|
||||
goto free_out;
|
||||
|
||||
memset(m, 0, sizeof(*m));
|
||||
INIT_LIST_HEAD(&m->src_list);
|
||||
INIT_LIST_HEAD(&m->path_list);
|
||||
INIT_LIST_HEAD(&m->chunk_list);
|
||||
lock_init(&m->chunk_lock);
|
||||
chunk_pool_init(&m->cp);
|
||||
|
||||
INIT_LIST_HEAD(&m->thread_list);
|
||||
rwlock_init(&m->thread_rwlock);
|
||||
|
||||
if ((m->sem = sem_create(o->max_startups)) == NULL) {
|
||||
mscp_set_error("sem_create: %s", strerrno());
|
||||
goto free_out;
|
||||
}
|
||||
|
||||
m->remote = strdup(remote_host);
|
||||
if (!m->remote) {
|
||||
@@ -221,15 +256,22 @@ struct mscp *mscp_init(const char *remote_host, int direction,
|
||||
goto free_out;
|
||||
}
|
||||
m->direction = direction;
|
||||
m->msg_fd = o->msg_fd;
|
||||
if (o->msg_fd > -1) {
|
||||
m->msg_fp = fdopen(o->msg_fd, "a");
|
||||
if (!m->msg_fp) {
|
||||
mscp_set_error("fdopen failed: %s", strerrno());
|
||||
goto free_out;
|
||||
}
|
||||
} else
|
||||
m->msg_fp = NULL;
|
||||
|
||||
if (strlen(o->coremask) > 0) {
|
||||
if (expand_coremask(o->coremask, &m->cores, &m->nr_cores) < 0)
|
||||
goto free_out;
|
||||
mpr_notice(m->msg_fd, "usable cpu cores:");
|
||||
mpr_notice(m->msg_fp, "usable cpu cores:");
|
||||
for (n = 0; n < m->nr_cores; n++)
|
||||
mpr_notice(m->msg_fd, " %d", m->cores[n]);
|
||||
mpr_notice(m->msg_fd, "\n");
|
||||
mpr_notice(m->msg_fp, " %d", m->cores[n]);
|
||||
mpr_notice(m->msg_fp, "\n");
|
||||
}
|
||||
|
||||
m->opts = o;
|
||||
@@ -242,11 +284,6 @@ free_out:
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void mscp_set_msg_fd(struct mscp *m, int fd)
|
||||
{
|
||||
m->msg_fd = fd;
|
||||
}
|
||||
|
||||
int mscp_connect(struct mscp *m)
|
||||
{
|
||||
m->first = ssh_init_sftp_session(m->remote, m->ssh_opts);
|
||||
@@ -293,17 +330,55 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path)
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int mscp_prepare(struct mscp *m)
|
||||
static int get_page_mask(void)
|
||||
{
|
||||
long page_sz = sysconf(_SC_PAGESIZE);
|
||||
size_t page_mask = 0;
|
||||
int n;
|
||||
|
||||
for (n = 0; page_sz > 0; page_sz >>= 1, n++) {
|
||||
page_mask <<= 1;
|
||||
page_mask |= 1;
|
||||
}
|
||||
|
||||
return page_mask >> 1;
|
||||
}
|
||||
|
||||
static void mscp_stop_copy_thread(struct mscp *m)
|
||||
{
|
||||
struct mscp_thread *t;
|
||||
|
||||
RWLOCK_READ_ACQUIRE(&m->thread_rwlock);
|
||||
list_for_each_entry(t, &m->thread_list, list) {
|
||||
if (!t->finished)
|
||||
pthread_cancel(t->tid);
|
||||
}
|
||||
RWLOCK_RELEASE();
|
||||
}
|
||||
|
||||
static void mscp_stop_scan_thread(struct mscp *m)
|
||||
{
|
||||
if (m->tid_scan)
|
||||
pthread_cancel(m->tid_scan);
|
||||
}
|
||||
|
||||
void mscp_stop(struct mscp *m)
|
||||
{
|
||||
mscp_stop_scan_thread(m);
|
||||
mscp_stop_copy_thread(m);
|
||||
}
|
||||
|
||||
void *mscp_scan_thread(void *arg)
|
||||
{
|
||||
struct mscp *m = arg;
|
||||
sftp_session src_sftp = NULL, dst_sftp = NULL;
|
||||
bool src_path_is_dir, dst_path_is_dir, dst_path_should_dir;
|
||||
struct path_resolve_args a;
|
||||
struct list_head tmp;
|
||||
struct path *p;
|
||||
struct src *s;
|
||||
mstat ss, ds;
|
||||
|
||||
src_path_is_dir = dst_path_is_dir = dst_path_should_dir = false;
|
||||
m->ret_scan = 0;
|
||||
|
||||
switch (m->direction) {
|
||||
case MSCP_DIRECTION_L2R:
|
||||
@@ -316,166 +391,185 @@ int mscp_prepare(struct mscp *m)
|
||||
break;
|
||||
default:
|
||||
mscp_set_error("invalid copy direction: %d", m->direction);
|
||||
return -1;
|
||||
goto err_out;
|
||||
}
|
||||
|
||||
/* initialize path_resolve_args */
|
||||
memset(&a, 0, sizeof(a));
|
||||
a.msg_fp = m->msg_fp;
|
||||
a.total_bytes = &m->total_bytes;
|
||||
|
||||
if (list_count(&m->src_list) > 1)
|
||||
dst_path_should_dir = true;
|
||||
a.dst_path_should_dir = true;
|
||||
|
||||
if (mscp_stat(m->dst_path, &ds, dst_sftp) == 0) {
|
||||
if (mstat_is_dir(ds))
|
||||
dst_path_is_dir = true;
|
||||
a.dst_path_is_dir = true;
|
||||
mscp_stat_free(ds);
|
||||
}
|
||||
|
||||
a.cp = &m->cp;
|
||||
a.nr_conn = m->opts->nr_threads;
|
||||
a.min_chunk_sz = m->opts->min_chunk_sz;
|
||||
a.max_chunk_sz = m->opts->max_chunk_sz;
|
||||
a.chunk_align = get_page_mask();
|
||||
|
||||
mpr_info(m->msg_fp, "start to walk source path(s)\n");
|
||||
|
||||
/* walk a src_path recusively, and resolve path->dst_path for each src */
|
||||
list_for_each_entry(s, &m->src_list, list) {
|
||||
if (mscp_stat(s->path, &ss, src_sftp) < 0) {
|
||||
mscp_set_error("stat: %s", mscp_strerror(src_sftp));
|
||||
return -1;
|
||||
mscp_stat_free(ss);
|
||||
goto err_out;
|
||||
}
|
||||
src_path_is_dir = mstat_is_dir(ss);
|
||||
|
||||
/* set path specific args */
|
||||
a.src_path = s->path;
|
||||
a.dst_path = m->dst_path;
|
||||
a.src_path_is_dir = mstat_is_dir(ss);
|
||||
mscp_stat_free(ss);
|
||||
|
||||
INIT_LIST_HEAD(&tmp);
|
||||
if (walk_src_path(src_sftp, s->path, &tmp) < 0)
|
||||
return -1;
|
||||
|
||||
if (list_count(&tmp) > 1)
|
||||
dst_path_should_dir = true;
|
||||
|
||||
if (resolve_dst_path(m->msg_fd, s->path, m->dst_path, &tmp,
|
||||
src_path_is_dir, dst_path_is_dir,
|
||||
dst_path_should_dir) < 0)
|
||||
return -1;
|
||||
if (walk_src_path(src_sftp, s->path, &tmp, &a) < 0)
|
||||
goto err_out;
|
||||
|
||||
list_splice_tail(&tmp, m->path_list.prev);
|
||||
}
|
||||
|
||||
if (resolve_chunk(&m->path_list, &m->chunk_list, m->opts->nr_threads,
|
||||
m->opts->min_chunk_sz, m->opts->max_chunk_sz) < 0)
|
||||
return -1;
|
||||
mpr_info(m->msg_fp, "walk source path(s) done\n");
|
||||
chunk_pool_set_filled(&m->cp);
|
||||
m->ret_scan = 0;
|
||||
return NULL;
|
||||
|
||||
/* save total bytes to be transferred */
|
||||
m->total_bytes = 0;
|
||||
list_for_each_entry(p, &m->path_list, list) {
|
||||
m->total_bytes += p->size;
|
||||
err_out:
|
||||
chunk_pool_set_filled(&m->cp);
|
||||
m->ret_scan = -1;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int mscp_scan(struct mscp *m)
|
||||
{
|
||||
int ret = pthread_create(&m->tid_scan, NULL, mscp_scan_thread, m);
|
||||
if (ret < 0) {
|
||||
mscp_set_error("pthread_create_error: %d", ret);
|
||||
m->tid_scan = 0;
|
||||
mscp_stop(m);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* We wait for there are over nr_threads chunks to determine
|
||||
* actual number of threads (and connections), or scan
|
||||
* finished. If the number of chunks are smaller than
|
||||
* nr_threads, we adjust nr_threads to the number of chunks.
|
||||
*/
|
||||
while (!chunk_pool_is_filled(&m->cp) &&
|
||||
chunk_pool_size(&m->cp) < m->opts->nr_threads)
|
||||
usleep(100);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mscp_stop(struct mscp *m)
|
||||
int mscp_scan_join(struct mscp *m)
|
||||
{
|
||||
int n;
|
||||
pr("stopping...\n");
|
||||
for (n = 0; n < m->opts->nr_threads; n++) {
|
||||
if (m->threads[n].tid && !m->threads[n].finished)
|
||||
pthread_cancel(m->threads[n].tid);
|
||||
}
|
||||
if (m->tid_scan) {
|
||||
pthread_join(m->tid_scan, NULL);
|
||||
m->tid_scan = 0;
|
||||
return m->ret_scan;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static void *mscp_copy_thread(void *arg);
|
||||
|
||||
static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id)
|
||||
{
|
||||
struct mscp_thread *t;
|
||||
int ret;
|
||||
|
||||
t = malloc(sizeof(*t));
|
||||
if (!t){
|
||||
mscp_set_error("malloc: %s,", strerrno());
|
||||
return NULL;
|
||||
}
|
||||
|
||||
memset(t, 0, sizeof(*t));
|
||||
t->m = m;
|
||||
t->id = id;
|
||||
if (m->cores == NULL)
|
||||
t->cpu = -1; /* not pinned to cpu */
|
||||
else
|
||||
t->cpu = m->cores[id % m->nr_cores];
|
||||
|
||||
ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
|
||||
if (ret < 0) {
|
||||
mscp_set_error("pthread_create error: %d", ret);
|
||||
free(t);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return t;
|
||||
}
|
||||
|
||||
|
||||
int mscp_start(struct mscp *m)
|
||||
{
|
||||
int n, ret;
|
||||
struct mscp_thread *t;
|
||||
int n, ret = 0;
|
||||
|
||||
if ((n = list_count(&m->chunk_list)) < m->opts->nr_threads) {
|
||||
mpr_notice(m->msg_fd, "we have only %d chunk(s). "
|
||||
if ((n = chunk_pool_size(&m->cp)) < m->opts->nr_threads) {
|
||||
mpr_notice(m->msg_fp, "we have only %d chunk(s). "
|
||||
"set number of connections to %d\n", n, n);
|
||||
m->opts->nr_threads = n;
|
||||
}
|
||||
|
||||
/* prepare thread instances */
|
||||
m->threads = calloc(m->opts->nr_threads, sizeof(struct mscp_thread));
|
||||
memset(m->threads, 0, m->opts->nr_threads * sizeof(struct mscp_thread));
|
||||
for (n = 0; n < m->opts->nr_threads; n++) {
|
||||
struct mscp_thread *t = &m->threads[n];
|
||||
t->m = m;
|
||||
if (!m->cores)
|
||||
t->cpu = -1;
|
||||
else
|
||||
t->cpu = m->cores[n % m->nr_cores];
|
||||
|
||||
if (n == 0) {
|
||||
t->sftp = m->first; /* reuse first sftp session */
|
||||
m->first = NULL;
|
||||
t = mscp_copy_thread_spawn(m, n);
|
||||
if (!t) {
|
||||
mpr_err(m->msg_fp, "failed to spawn copy thread\n");
|
||||
break;
|
||||
}
|
||||
else {
|
||||
mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n",
|
||||
m->remote);
|
||||
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
|
||||
if (!t->sftp)
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
/* spawn copy threads */
|
||||
for (n = 0; n < m->opts->nr_threads; n++) {
|
||||
struct mscp_thread *t = &m->threads[n];
|
||||
ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
|
||||
if (ret < 0) {
|
||||
mscp_set_error("pthread_create error: %d", ret);
|
||||
mscp_stop(m);
|
||||
return -1;
|
||||
}
|
||||
RWLOCK_WRITE_ACQUIRE(&m->thread_rwlock);
|
||||
list_add_tail(&t->list, &m->thread_list);
|
||||
RWLOCK_RELEASE();
|
||||
}
|
||||
|
||||
return 0;
|
||||
return n;
|
||||
}
|
||||
|
||||
int mscp_join(struct mscp *m)
|
||||
{
|
||||
struct mscp_thread *t;
|
||||
int n, ret = 0;
|
||||
|
||||
/* waiting for threads join... */
|
||||
for (n = 0; n < m->opts->nr_threads; n++) {
|
||||
if (m->threads[n].tid) {
|
||||
pthread_join(m->threads[n].tid, NULL);
|
||||
if (m->threads[n].ret < 0)
|
||||
ret = m->threads[n].ret;
|
||||
}
|
||||
}
|
||||
/* waiting for scan thread joins... */
|
||||
ret = mscp_scan_join(m);
|
||||
|
||||
/* waiting for copy threads join... */
|
||||
RWLOCK_READ_ACQUIRE(&m->thread_rwlock);
|
||||
list_for_each_entry(t, &m->thread_list, list) {
|
||||
pthread_join(t->tid, NULL);
|
||||
if (t->ret < 0)
|
||||
ret = t->ret;
|
||||
if (t->sftp) {
|
||||
ssh_sftp_close(t->sftp);
|
||||
t->sftp = NULL;
|
||||
}
|
||||
}
|
||||
RWLOCK_RELEASE();
|
||||
|
||||
if (m->first) {
|
||||
ssh_sftp_close(m->first);
|
||||
m->first = NULL;
|
||||
}
|
||||
|
||||
if (m->threads) {
|
||||
for (n = 0; n < m->opts->nr_threads; n++) {
|
||||
struct mscp_thread *t = &m->threads[n];
|
||||
if (t->ret != 0)
|
||||
ret = ret;
|
||||
|
||||
if (t->sftp) {
|
||||
ssh_sftp_close(t->sftp);
|
||||
t->sftp = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* copy thread related functions */
|
||||
|
||||
struct chunk *acquire_chunk(struct list_head *chunk_list)
|
||||
{
|
||||
/* under the lock for chunk_list */
|
||||
struct list_head *first = chunk_list->next;
|
||||
struct chunk *c = NULL;
|
||||
|
||||
if (list_empty(chunk_list))
|
||||
return NULL; /* list is empty */
|
||||
|
||||
c = list_entry(first, struct chunk, list);
|
||||
list_del(first);
|
||||
return c;
|
||||
}
|
||||
|
||||
static void mscp_copy_thread_cleanup(void *arg)
|
||||
{
|
||||
struct mscp_thread *t = arg;
|
||||
@@ -489,6 +583,34 @@ void *mscp_copy_thread(void *arg)
|
||||
struct mscp *m = t->m;
|
||||
struct chunk *c;
|
||||
|
||||
if (t->cpu > -1) {
|
||||
if (set_thread_affinity(pthread_self(), t->cpu) < 0) {
|
||||
t->ret = -1;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (sem_wait(m->sem) < 0) {
|
||||
mscp_set_error("sem_wait: %s", strerrno());
|
||||
mpr_err(m->msg_fp, "%s", mscp_get_error());
|
||||
goto err_out;
|
||||
}
|
||||
|
||||
mpr_notice(m->msg_fp, "connecting to %s for a copy thread[%d]...\n",
|
||||
m->remote, t->id);
|
||||
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
|
||||
|
||||
if (sem_post(m->sem) < 0) {
|
||||
mscp_set_error("sem_post: %s", strerrno());
|
||||
mpr_err(m->msg_fp, "%s", mscp_get_error());
|
||||
goto err_out;
|
||||
}
|
||||
|
||||
if (!t->sftp) {
|
||||
mpr_err(m->msg_fp, "copy thread[%d]: %s\n", t->id, mscp_get_error());
|
||||
goto err_out;
|
||||
}
|
||||
|
||||
switch (m->direction) {
|
||||
case MSCP_DIRECTION_L2R:
|
||||
src_sftp = NULL;
|
||||
@@ -502,24 +624,21 @@ void *mscp_copy_thread(void *arg)
|
||||
return NULL; /* not reached */
|
||||
}
|
||||
|
||||
if (t->cpu > -1) {
|
||||
if (set_thread_affinity(pthread_self(), t->cpu) < 0)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
|
||||
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
|
||||
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
|
||||
|
||||
while (1) {
|
||||
LOCK_ACQUIRE_THREAD(&m->chunk_lock);
|
||||
c = acquire_chunk(&m->chunk_list);
|
||||
LOCK_RELEASE_THREAD();
|
||||
c = chunk_pool_pop(&m->cp);
|
||||
if (c == CHUNK_POP_WAIT) {
|
||||
usleep(100); /* XXX: hard code */
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!c)
|
||||
break; /* no more chunks */
|
||||
|
||||
if ((t->ret = copy_chunk(m->msg_fd,
|
||||
if ((t->ret = copy_chunk(m->msg_fp,
|
||||
c, src_sftp, dst_sftp, m->opts->nr_ahead,
|
||||
m->opts->buf_sz, &t->done)) < 0)
|
||||
break;
|
||||
@@ -532,21 +651,16 @@ void *mscp_copy_thread(void *arg)
|
||||
c->p->path, c->off, c->off + c->len);
|
||||
|
||||
return NULL;
|
||||
|
||||
err_out:
|
||||
t->finished = true;
|
||||
t->ret = -1;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/* cleanup related functions */
|
||||
|
||||
static void release_list(struct list_head *head, void (*f)(struct list_head*))
|
||||
{
|
||||
struct list_head *p, *n;
|
||||
|
||||
list_for_each_safe(p, n, head) {
|
||||
list_del(p);
|
||||
f(p);
|
||||
}
|
||||
}
|
||||
|
||||
static void free_src(struct list_head *list)
|
||||
{
|
||||
struct src *s;
|
||||
@@ -569,6 +683,13 @@ static void free_chunk(struct list_head *list)
|
||||
free(c);
|
||||
}
|
||||
|
||||
static void free_thread(struct list_head *list)
|
||||
{
|
||||
struct mscp_thread *t;
|
||||
t = list_entry(list, typeof(*t), list);
|
||||
free(t);
|
||||
}
|
||||
|
||||
void mscp_cleanup(struct mscp *m)
|
||||
{
|
||||
if (m->first) {
|
||||
@@ -576,19 +697,18 @@ void mscp_cleanup(struct mscp *m)
|
||||
m->first = NULL;
|
||||
}
|
||||
|
||||
release_list(&m->src_list, free_src);
|
||||
list_free_f(&m->src_list, free_src);
|
||||
INIT_LIST_HEAD(&m->src_list);
|
||||
|
||||
release_list(&m->chunk_list, free_chunk);
|
||||
INIT_LIST_HEAD(&m->chunk_list);
|
||||
|
||||
release_list(&m->path_list, free_path);
|
||||
list_free_f(&m->path_list, free_path);
|
||||
INIT_LIST_HEAD(&m->path_list);
|
||||
|
||||
if (m->threads) {
|
||||
free(m->threads);
|
||||
m->threads = NULL;
|
||||
}
|
||||
chunk_pool_release(&m->cp);
|
||||
chunk_pool_init(&m->cp);
|
||||
|
||||
RWLOCK_WRITE_ACQUIRE(&m->thread_rwlock);
|
||||
list_free_f(&m->thread_list, free_thread);
|
||||
RWLOCK_RELEASE();
|
||||
}
|
||||
|
||||
void mscp_free(struct mscp *m)
|
||||
@@ -598,21 +718,26 @@ void mscp_free(struct mscp *m)
|
||||
free(m->remote);
|
||||
if (m->cores)
|
||||
free(m->cores);
|
||||
|
||||
sem_release(m->sem);
|
||||
free(m);
|
||||
}
|
||||
|
||||
void mscp_get_stats(struct mscp *m, struct mscp_stats *s)
|
||||
{
|
||||
struct mscp_thread *t;
|
||||
bool finished = true;
|
||||
int n;
|
||||
|
||||
s->total = m->total_bytes;
|
||||
for (s->done = 0, n = 0; n < m->opts->nr_threads; n++) {
|
||||
s->done += m->threads[n].done;
|
||||
s->done = 0;
|
||||
|
||||
if (!m->threads[n].done)
|
||||
RWLOCK_READ_ACQUIRE(&m->thread_rwlock);
|
||||
list_for_each_entry(t, &m->thread_list, list) {
|
||||
s->done += t->done;
|
||||
if (!t->finished)
|
||||
finished = false;
|
||||
}
|
||||
RWLOCK_RELEASE();
|
||||
|
||||
s->finished = finished;
|
||||
}
|
||||
|
||||
385
src/path.c
385
src/path.c
@@ -12,8 +12,186 @@
|
||||
#include <path.h>
|
||||
#include <message.h>
|
||||
|
||||
|
||||
/* chunk pool operations */
|
||||
#define CHUNK_POOL_STATE_FILLING 0
|
||||
#define CHUNK_POOL_STATE_FILLED 1
|
||||
|
||||
void chunk_pool_init(struct chunk_pool *cp)
|
||||
{
|
||||
memset(cp, 0, sizeof(*cp));
|
||||
INIT_LIST_HEAD(&cp->list);
|
||||
lock_init(&cp->lock);
|
||||
cp->state = CHUNK_POOL_STATE_FILLING;
|
||||
}
|
||||
|
||||
static void chunk_pool_add(struct chunk_pool *cp, struct chunk *c)
|
||||
{
|
||||
LOCK_ACQUIRE(&cp->lock);
|
||||
list_add_tail(&c->list, &cp->list);
|
||||
cp->count += 1;
|
||||
LOCK_RELEASE();
|
||||
}
|
||||
|
||||
void chunk_pool_set_filled(struct chunk_pool *cp)
|
||||
{
|
||||
cp->state = CHUNK_POOL_STATE_FILLED;
|
||||
}
|
||||
|
||||
bool chunk_pool_is_filled(struct chunk_pool *cp)
|
||||
{
|
||||
return (cp->state == CHUNK_POOL_STATE_FILLED);
|
||||
}
|
||||
|
||||
size_t chunk_pool_size(struct chunk_pool *cp)
|
||||
{
|
||||
return cp->count;
|
||||
}
|
||||
|
||||
|
||||
struct chunk *chunk_pool_pop(struct chunk_pool *cp)
|
||||
{
|
||||
struct list_head *first;
|
||||
struct chunk *c = NULL;
|
||||
|
||||
LOCK_ACQUIRE(&cp->lock);
|
||||
first = cp->list.next;
|
||||
if (list_empty(&cp->list)) {
|
||||
if (!chunk_pool_is_filled(cp))
|
||||
c = CHUNK_POP_WAIT;
|
||||
else
|
||||
c = NULL; /* no more chunks */
|
||||
} else {
|
||||
c = list_entry(first, struct chunk, list);
|
||||
list_del(first);
|
||||
}
|
||||
LOCK_RELEASE();
|
||||
|
||||
/* return CHUNK_POP_WAIT would be very rare case, because it
|
||||
* means copying over SSH is faster than traversing
|
||||
* local/remote file paths.
|
||||
*/
|
||||
|
||||
return c;
|
||||
}
|
||||
|
||||
static void chunk_free(struct list_head *list)
|
||||
{
|
||||
struct chunk *c;
|
||||
c = list_entry(list, typeof(*c), list);
|
||||
free(c);
|
||||
}
|
||||
|
||||
void chunk_pool_release(struct chunk_pool *cp)
|
||||
{
|
||||
list_free_f(&cp->list, chunk_free);
|
||||
}
|
||||
|
||||
/* paths of copy source resoltion */
|
||||
static int resolve_dst_path(const char *src_file_path, char *dst_file_path,
|
||||
struct path_resolve_args *a)
|
||||
{
|
||||
char copy[PATH_MAX];
|
||||
char *prefix;
|
||||
int offset;
|
||||
|
||||
strncpy(copy, a->src_path, PATH_MAX - 1);
|
||||
prefix = dirname(copy);
|
||||
if (!prefix) {
|
||||
mscp_set_error("dirname: %s", strerrno());
|
||||
return -1;
|
||||
}
|
||||
if (strlen(prefix) == 1 && prefix[0] == '.')
|
||||
offset = 0;
|
||||
else
|
||||
offset = strlen(prefix) + 1;
|
||||
|
||||
if (!a->src_path_is_dir && !a->dst_path_is_dir) {
|
||||
/* src path is file. dst path is (1) file, or (2) does not exist.
|
||||
* In the second case, we need to put src under the dst.
|
||||
*/
|
||||
if (a->dst_path_should_dir)
|
||||
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
|
||||
a->dst_path, a->src_path + offset);
|
||||
else
|
||||
strncpy(dst_file_path, a->dst_path, PATH_MAX - 1);
|
||||
}
|
||||
|
||||
/* src is file, and dst is dir */
|
||||
if (!a->src_path_is_dir && a->dst_path_is_dir)
|
||||
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
|
||||
a->dst_path, a->src_path + offset);
|
||||
|
||||
/* both are directory */
|
||||
if (a->src_path_is_dir && a->dst_path_is_dir)
|
||||
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
|
||||
a->dst_path, src_file_path + offset);
|
||||
|
||||
/* dst path does not exist. change dir name to dst_path */
|
||||
if (a->src_path_is_dir && !a->dst_path_is_dir)
|
||||
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
|
||||
a->dst_path, src_file_path + strlen(a->src_path) + 1);
|
||||
|
||||
mpr_debug(a->msg_fp, "file: %s -> %s\n", src_file_path, dst_file_path);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* chunk preparation */
|
||||
static struct chunk *alloc_chunk(struct path *p)
|
||||
{
|
||||
struct chunk *c;
|
||||
|
||||
if (!(c = malloc(sizeof(*c)))) {
|
||||
mscp_set_error("malloc %s", strerrno());
|
||||
return NULL;
|
||||
}
|
||||
memset(c, 0, sizeof(*c));
|
||||
|
||||
c->p = p;
|
||||
c->off = 0;
|
||||
c->len = 0;
|
||||
refcnt_inc(&p->refcnt);
|
||||
return c;
|
||||
}
|
||||
|
||||
static int resolve_chunk(struct path *p, struct path_resolve_args *a)
|
||||
{
|
||||
struct chunk *c;
|
||||
size_t chunk_sz;
|
||||
size_t size;
|
||||
|
||||
if (p->size <= a->min_chunk_sz)
|
||||
chunk_sz = p->size;
|
||||
else if (a->max_chunk_sz)
|
||||
chunk_sz = a->max_chunk_sz;
|
||||
else {
|
||||
chunk_sz = (p->size - (p->size % a->nr_conn)) / a->nr_conn;
|
||||
chunk_sz &= ~a->chunk_align; /* align with page_sz */
|
||||
if (chunk_sz <= a->min_chunk_sz)
|
||||
chunk_sz = a->min_chunk_sz;
|
||||
}
|
||||
|
||||
/* for (size = f->size; size > 0;) does not create a file
|
||||
* (chunk) when file size is 0. This do {} while (size > 0)
|
||||
* creates just open/close a 0-byte file.
|
||||
*/
|
||||
size = p->size;
|
||||
do {
|
||||
c = alloc_chunk(p);
|
||||
if (!c)
|
||||
return -1;
|
||||
c->off = p->size - size;
|
||||
c->len = size < chunk_sz ? size : chunk_sz;
|
||||
size -= c->len;
|
||||
chunk_pool_add(a->cp, c);
|
||||
} while (size > 0);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int append_path(sftp_session sftp, const char *path, mstat s,
|
||||
struct list_head *path_list)
|
||||
struct list_head *path_list, struct path_resolve_args *a)
|
||||
{
|
||||
struct path *p;
|
||||
|
||||
@@ -29,9 +207,22 @@ static int append_path(sftp_session sftp, const char *path, mstat s,
|
||||
p->mode = mstat_mode(s);
|
||||
p->state = FILE_STATE_INIT;
|
||||
lock_init(&p->lock);
|
||||
|
||||
if (resolve_dst_path(p->path, p->dst_path, a) < 0)
|
||||
goto free_out;
|
||||
|
||||
if (resolve_chunk(p, a) < 0)
|
||||
return -1; /* XXX: do not free path becuase chunk(s)
|
||||
* was added to chunk pool already */
|
||||
|
||||
list_add_tail(&p->list, path_list);
|
||||
*a->total_bytes += p->size;
|
||||
|
||||
return 0;
|
||||
|
||||
free_out:
|
||||
free(p);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static bool check_path_should_skip(const char *path)
|
||||
@@ -45,7 +236,7 @@ static bool check_path_should_skip(const char *path)
|
||||
}
|
||||
|
||||
static int walk_path_recursive(sftp_session sftp, const char *path,
|
||||
struct list_head *path_list)
|
||||
struct list_head *path_list, struct path_resolve_args *a)
|
||||
{
|
||||
char next_path[PATH_MAX];
|
||||
mdirent *e;
|
||||
@@ -58,7 +249,7 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
|
||||
|
||||
if (mstat_is_regular(s)) {
|
||||
/* this path is regular file. it is to be copied */
|
||||
ret = append_path(sftp, path, s, path_list);
|
||||
ret = append_path(sftp, path, s, path_list, a);
|
||||
mscp_stat_free(s);
|
||||
return ret;
|
||||
}
|
||||
@@ -77,15 +268,19 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
|
||||
return -1;
|
||||
|
||||
for (e = mscp_readdir(d); !mdirent_is_null(e); e = mscp_readdir(d)) {
|
||||
if (check_path_should_skip(mdirent_name(e)))
|
||||
if (check_path_should_skip(mdirent_name(e))) {
|
||||
mscp_dirent_free(e);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (strlen(path) + 1 + strlen(mdirent_name(e)) > PATH_MAX) {
|
||||
mscp_set_error("too long path: %s/%s", path, mdirent_name(e));
|
||||
mscp_dirent_free(e);
|
||||
return -1;
|
||||
}
|
||||
snprintf(next_path, sizeof(next_path), "%s/%s", path, mdirent_name(e));
|
||||
ret = walk_path_recursive(sftp, next_path, path_list);
|
||||
ret = walk_path_recursive(sftp, next_path, path_list, a);
|
||||
mscp_dirent_free(e);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
}
|
||||
@@ -96,75 +291,9 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
|
||||
}
|
||||
|
||||
int walk_src_path(sftp_session src_sftp, const char *src_path,
|
||||
struct list_head *path_list)
|
||||
struct list_head *path_list, struct path_resolve_args *a)
|
||||
{
|
||||
return walk_path_recursive(src_sftp, src_path, path_list);
|
||||
}
|
||||
|
||||
static int src2dst_path(int msg_fd, const char *src_path, const char *src_file_path,
|
||||
const char *dst_path, char *dst_file_path, size_t len,
|
||||
bool src_path_is_dir, bool dst_path_is_dir,
|
||||
bool dst_path_should_dir)
|
||||
{
|
||||
char copy[PATH_MAX];
|
||||
char *prefix;
|
||||
int offset;
|
||||
|
||||
strncpy(copy, src_path, PATH_MAX - 1);
|
||||
prefix = dirname(copy);
|
||||
if (!prefix) {
|
||||
mscp_set_error("dirname: %s", strerrno());
|
||||
return -1;
|
||||
}
|
||||
if (strlen(prefix) == 1 && prefix[0] == '.')
|
||||
offset = 0;
|
||||
else
|
||||
offset = strlen(prefix) + 1;
|
||||
|
||||
if (!src_path_is_dir && !dst_path_is_dir) {
|
||||
/* src path is file. dst path is (1) file, or (2) does not exist.
|
||||
* In the second case, we need to put src under the dst.
|
||||
*/
|
||||
if (dst_path_should_dir)
|
||||
snprintf(dst_file_path, len, "%s/%s",
|
||||
dst_path, src_path + offset);
|
||||
else
|
||||
strncpy(dst_file_path, dst_path, len);
|
||||
}
|
||||
|
||||
/* src is file, and dst is dir */
|
||||
if (!src_path_is_dir && dst_path_is_dir)
|
||||
snprintf(dst_file_path, len, "%s/%s", dst_path, src_path + offset);
|
||||
|
||||
/* both are directory */
|
||||
if (src_path_is_dir && dst_path_is_dir)
|
||||
snprintf(dst_file_path, len, "%s/%s", dst_path, src_file_path + offset);
|
||||
|
||||
/* dst path does not exist. change dir name to dst_path */
|
||||
if (src_path_is_dir && !dst_path_is_dir)
|
||||
snprintf(dst_file_path, len, "%s/%s",
|
||||
dst_path, src_file_path + strlen(src_path) + 1);
|
||||
|
||||
mpr_info(msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path,
|
||||
struct list_head *path_list, bool src_path_is_dir,
|
||||
bool dst_path_is_dir, bool dst_path_should_dir)
|
||||
{
|
||||
struct path *p;
|
||||
|
||||
list_for_each_entry(p, path_list, list) {
|
||||
if (src2dst_path(msg_fd, src_path, p->path,
|
||||
dst_path, p->dst_path, PATH_MAX,
|
||||
src_path_is_dir, dst_path_is_dir,
|
||||
dst_path_should_dir) < 0)
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
return walk_path_recursive(src_sftp, src_path, path_list, a);
|
||||
}
|
||||
|
||||
void path_dump(struct list_head *path_list)
|
||||
@@ -177,90 +306,6 @@ void path_dump(struct list_head *path_list)
|
||||
}
|
||||
}
|
||||
|
||||
/* chunk preparation */
|
||||
|
||||
static struct chunk *alloc_chunk(struct path *p)
|
||||
{
|
||||
struct chunk *c;
|
||||
|
||||
if (!(c = malloc(sizeof(*c)))) {
|
||||
mscp_set_error("malloc %s", strerrno());
|
||||
return NULL;
|
||||
}
|
||||
memset(c, 0, sizeof(*c));
|
||||
|
||||
c->p = p;
|
||||
c->off = 0;
|
||||
c->len = 0;
|
||||
refcnt_inc(&p->refcnt);
|
||||
return c;
|
||||
}
|
||||
|
||||
static int get_page_mask(void)
|
||||
{
|
||||
long page_sz = sysconf(_SC_PAGESIZE);
|
||||
size_t page_mask = 0;
|
||||
int n;
|
||||
|
||||
for (n = 0; page_sz > 0; page_sz >>= 1, n++) {
|
||||
page_mask <<= 1;
|
||||
page_mask |= 1;
|
||||
}
|
||||
|
||||
return page_mask >> 1;
|
||||
}
|
||||
|
||||
int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list,
|
||||
int nr_conn, int min_chunk_sz, int max_chunk_sz)
|
||||
{
|
||||
struct chunk *c;
|
||||
struct path *p;
|
||||
size_t page_mask;
|
||||
size_t chunk_sz;
|
||||
size_t size;
|
||||
|
||||
page_mask = get_page_mask();
|
||||
|
||||
list_for_each_entry(p, path_list, list) {
|
||||
if (p->size <= min_chunk_sz)
|
||||
chunk_sz = p->size;
|
||||
else if (max_chunk_sz)
|
||||
chunk_sz = max_chunk_sz;
|
||||
else {
|
||||
chunk_sz = (p->size - (p->size % nr_conn)) / nr_conn;
|
||||
chunk_sz &= ~page_mask; /* align with page_sz */
|
||||
if (chunk_sz <= min_chunk_sz)
|
||||
chunk_sz = min_chunk_sz;
|
||||
}
|
||||
|
||||
/* for (size = f->size; size > 0;) does not create a
|
||||
* file (chunk) when file size is 0. This do {} while
|
||||
* (size > 0) creates just open/close a 0-byte file.
|
||||
*/
|
||||
size = p->size;
|
||||
do {
|
||||
c = alloc_chunk(p);
|
||||
if (!c)
|
||||
return -1;
|
||||
c->off = p->size - size;
|
||||
c->len = size < chunk_sz ? size : chunk_sz;
|
||||
size -= c->len;
|
||||
list_add_tail(&c->list, chunk_list);
|
||||
} while (size > 0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void chunk_dump(struct list_head *chunk_list)
|
||||
{
|
||||
struct chunk *c;
|
||||
|
||||
list_for_each_entry(c, chunk_list, list) {
|
||||
printf("chunk: %s 0x%lx-%lx bytes\n",
|
||||
c->p->path, c->off, c->off + c->len);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* based on
|
||||
@@ -283,10 +328,13 @@ static int touch_dst_path(struct path *p, sftp_session sftp)
|
||||
|
||||
mstat s;
|
||||
if (mscp_stat(path, &s, sftp) == 0) {
|
||||
if (mstat_is_dir(s))
|
||||
if (mstat_is_dir(s)) {
|
||||
mscp_stat_free(s);
|
||||
goto next; /* directory exists. go deeper */
|
||||
else
|
||||
} else {
|
||||
mscp_stat_free(s);
|
||||
return -1; /* path exists, but not directory. */
|
||||
}
|
||||
}
|
||||
|
||||
if (mscp_stat_check_err_noent(sftp) == 0) {
|
||||
@@ -311,22 +359,22 @@ static int touch_dst_path(struct path *p, sftp_session sftp)
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int prepare_dst_path(int msg_fd, struct path *p, sftp_session dst_sftp)
|
||||
static int prepare_dst_path(FILE *msg_fp, struct path *p, sftp_session dst_sftp)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
LOCK_ACQUIRE_THREAD(&p->lock);
|
||||
LOCK_ACQUIRE(&p->lock);
|
||||
if (p->state == FILE_STATE_INIT) {
|
||||
if (touch_dst_path(p, dst_sftp) < 0) {
|
||||
ret = -1;
|
||||
goto out;
|
||||
}
|
||||
p->state = FILE_STATE_OPENED;
|
||||
mpr_info(msg_fd, "copy start: %s\n", p->path);
|
||||
mpr_info(msg_fp, "copy start: %s\n", p->path);
|
||||
}
|
||||
|
||||
out:
|
||||
LOCK_RELEASE_THREAD();
|
||||
LOCK_RELEASE();
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -482,7 +530,8 @@ static int _copy_chunk(struct chunk *c, mfh s, mfh d,
|
||||
return -1;
|
||||
}
|
||||
|
||||
int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
|
||||
int copy_chunk(FILE *msg_fp, struct chunk *c,
|
||||
sftp_session src_sftp, sftp_session dst_sftp,
|
||||
int nr_ahead, int buf_sz, size_t *counter)
|
||||
{
|
||||
mode_t mode;
|
||||
@@ -492,7 +541,7 @@ int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session
|
||||
|
||||
assert((src_sftp && !dst_sftp) || (!src_sftp && dst_sftp));
|
||||
|
||||
if (prepare_dst_path(msg_fd, c->p, dst_sftp) < 0)
|
||||
if (prepare_dst_path(msg_fp, c->p, dst_sftp) < 0)
|
||||
return -1;
|
||||
|
||||
/* open src */
|
||||
@@ -511,11 +560,11 @@ int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session
|
||||
if (mscp_open_is_failed(d))
|
||||
return -1;
|
||||
|
||||
mpr_debug(msg_fd, "copy chunk start: %s 0x%lx-0x%lx\n",
|
||||
mpr_debug(msg_fp, "copy chunk start: %s 0x%lx-0x%lx\n",
|
||||
c->p->path, c->off, c->off + c->len);
|
||||
ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, counter);
|
||||
|
||||
mpr_debug(msg_fd, "copy chunk done: %s 0x%lx-0x%lx\n",
|
||||
mpr_debug(msg_fp, "copy chunk done: %s 0x%lx-0x%lx\n",
|
||||
c->p->path, c->off, c->off + c->len);
|
||||
|
||||
|
||||
@@ -527,7 +576,7 @@ int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session
|
||||
if (refcnt_dec(&c->p->refcnt) == 0) {
|
||||
c->p->state = FILE_STATE_DONE;
|
||||
mscp_chmod(c->p->dst_path, c->p->mode, dst_sftp);
|
||||
mpr_info(msg_fd, "copy done: %s\n", c->p->path);
|
||||
mpr_info(msg_fp, "copy done: %s\n", c->p->path);
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
||||
83
src/path.h
83
src/path.h
@@ -29,7 +29,7 @@ struct path {
|
||||
#define FILE_STATE_DONE 2
|
||||
|
||||
struct chunk {
|
||||
struct list_head list; /* mscp->chunk_list */
|
||||
struct list_head list; /* chunk_pool->list */
|
||||
|
||||
struct path *p;
|
||||
size_t off; /* offset of this chunk on the file on path p */
|
||||
@@ -37,30 +37,66 @@ struct chunk {
|
||||
size_t done; /* copied bytes for this chunk by a thread */
|
||||
};
|
||||
|
||||
struct chunk_pool {
|
||||
struct list_head list; /* list of struct chunk */
|
||||
size_t count;
|
||||
lock lock;
|
||||
int state;
|
||||
};
|
||||
|
||||
|
||||
/* initialize chunk pool */
|
||||
void chunk_pool_init(struct chunk_pool *cp);
|
||||
|
||||
/* acquire a chunk from pool. return value is NULL indicates no more
|
||||
* chunk, GET_CHUNK_WAIT means caller should waits until a chunk is
|
||||
* added, or pointer to chunk.
|
||||
*/
|
||||
struct chunk *chunk_pool_pop(struct chunk_pool *cp);
|
||||
#define CHUNK_POP_WAIT ((void *) -1)
|
||||
|
||||
/* set and check fillingchunks to this pool has finished */
|
||||
void chunk_pool_set_filled(struct chunk_pool *cp);
|
||||
bool chunk_pool_is_filled(struct chunk_pool *cp);
|
||||
|
||||
/* return number of chunks in the pool */
|
||||
size_t chunk_pool_size(struct chunk_pool *cp);
|
||||
|
||||
/* free chunks in the chunk_pool */
|
||||
void chunk_pool_release(struct chunk_pool *cp);
|
||||
|
||||
|
||||
|
||||
struct path_resolve_args {
|
||||
FILE *msg_fp;
|
||||
size_t *total_bytes;
|
||||
|
||||
/* args to resolve src path to dst path */
|
||||
const char *src_path;
|
||||
const char *dst_path;
|
||||
bool src_path_is_dir;
|
||||
bool dst_path_is_dir;
|
||||
bool dst_path_should_dir;
|
||||
|
||||
/* args to resolve chunks for a path */
|
||||
struct chunk_pool *cp;
|
||||
int nr_conn;
|
||||
size_t min_chunk_sz;
|
||||
size_t max_chunk_sz;
|
||||
size_t chunk_align;
|
||||
};
|
||||
|
||||
/* recursivly walk through src_path and fill path_list for each file */
|
||||
int walk_src_path(sftp_session src_sftp, const char *src_path,
|
||||
struct list_head *path_list);
|
||||
|
||||
/* fill path->dst_path for all files */
|
||||
int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path,
|
||||
struct list_head *path_list,
|
||||
bool src_path_is_dir, bool dst_path_is_dir,
|
||||
bool dst_path_should_dir);
|
||||
|
||||
/* resolve chunks from files in the path_list */
|
||||
int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list,
|
||||
int nr_conn, int min_chunk_sz, int max_chunk_sz);
|
||||
struct list_head *path_list, struct path_resolve_args *a);
|
||||
|
||||
/* copy a chunk. either src_sftp or dst_sftp is not null, and another is null */
|
||||
int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
|
||||
int copy_chunk(FILE *msg_fp, struct chunk *c,
|
||||
sftp_session src_sftp, sftp_session dst_sftp,
|
||||
int nr_ahead, int buf_sz, size_t *counter);
|
||||
|
||||
/* just print contents. just for debugging */
|
||||
void path_dump(struct list_head *path_list);
|
||||
void chunk_dump(struct list_head *chunk_list);
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -136,6 +172,14 @@ static mdirent *mscp_readdir(mdir *d)
|
||||
return &e;
|
||||
}
|
||||
|
||||
static void mscp_dirent_free(mdirent *e)
|
||||
{
|
||||
if (e->r) {
|
||||
sftp_attributes_free(e->r);
|
||||
e->r = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/* wrap retriving error */
|
||||
static const char *mscp_strerror(sftp_session sftp)
|
||||
{
|
||||
@@ -157,11 +201,16 @@ static int mscp_stat(const char *path, mstat *s, sftp_session sftp)
|
||||
|
||||
if (sftp) {
|
||||
s->r = sftp_stat(sftp, path);
|
||||
if (!s->r)
|
||||
if (!s->r) {
|
||||
mscp_set_error("sftp_stat: %s %s",
|
||||
sftp_get_ssh_error(sftp), path);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
if (stat(path, &s->l) < 0)
|
||||
if (stat(path, &s->l) < 0) {
|
||||
mscp_set_error("stat: %s %s", strerrno(), path);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
#ifdef __APPLE__
|
||||
#include <stdlib.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/sysctl.h>
|
||||
#elif linux
|
||||
#define _GNU_SOURCE
|
||||
#include <sched.h>
|
||||
#include <stdlib.h>
|
||||
#else
|
||||
#error unsupported platform
|
||||
#endif
|
||||
@@ -12,6 +14,7 @@
|
||||
#include <platform.h>
|
||||
#include <message.h>
|
||||
|
||||
|
||||
#ifdef __APPLE__
|
||||
int nr_cpus()
|
||||
{
|
||||
@@ -32,6 +35,38 @@ int set_thread_affinity(pthread_t tid, int core)
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static void random_string(char *buf, size_t size)
|
||||
{
|
||||
char chars[] = "abcdefhijklmnopqrstuvwxyz1234567890";
|
||||
int n, x;
|
||||
|
||||
for (n = 0; n < size - 1; n++) {
|
||||
x = arc4random() % (sizeof(chars) - 1);
|
||||
buf[n] = chars[x];
|
||||
}
|
||||
buf[size - 1] = '\0';
|
||||
}
|
||||
|
||||
sem_t *sem_create(int value)
|
||||
{
|
||||
char sem_name[30] = "mscp-";
|
||||
sem_t *sem;
|
||||
int n;
|
||||
|
||||
n = strlen(sem_name);
|
||||
random_string(sem_name + n, sizeof(sem_name) - n - 1);
|
||||
if ((sem = sem_open(sem_name, O_CREAT, 600, value)) == SEM_FAILED)
|
||||
return NULL;
|
||||
|
||||
return sem;
|
||||
}
|
||||
|
||||
int sem_release(sem_t *sem)
|
||||
{
|
||||
return sem_close(sem);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
#ifdef linux
|
||||
@@ -56,5 +91,27 @@ int set_thread_affinity(pthread_t tid, int core)
|
||||
core, strerrno());
|
||||
return ret;
|
||||
}
|
||||
|
||||
sem_t *sem_create(int value)
|
||||
{
|
||||
sem_t *sem;
|
||||
|
||||
if ((sem = malloc(sizeof(*sem))) == NULL)
|
||||
return NULL;
|
||||
|
||||
if (sem_init(sem, 0, value) < 0) {
|
||||
free(sem);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return sem;
|
||||
}
|
||||
|
||||
int sem_release(sem_t *sem)
|
||||
{
|
||||
free(sem);
|
||||
return 0;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
@@ -2,8 +2,20 @@
|
||||
#define _PLATFORM_H_
|
||||
|
||||
#include <pthread.h>
|
||||
#include <semaphore.h>
|
||||
|
||||
int nr_cpus();
|
||||
int nr_cpus(void);
|
||||
int set_thread_affinity(pthread_t tid, int core);
|
||||
|
||||
/*
|
||||
* macOS does not support sem_init(). macOS (seems to) releases the
|
||||
* named semaphore when associated mscp process finished. In linux,
|
||||
* program (seems to) need to release named semaphore in /dev/shm by
|
||||
* sem_unlink() explicitly. So, using sem_init() (unnamed semaphore)
|
||||
* in linux and using sem_open() (named semaphore) in macOS without
|
||||
* sem_unlink() are reasonable (?).
|
||||
*/
|
||||
sem_t *sem_create(int value);
|
||||
int sem_release(sem_t *sem);
|
||||
|
||||
#endif /* _PLATFORM_H_ */
|
||||
|
||||
18
src/pymscp.c
18
src/pymscp.c
@@ -74,7 +74,7 @@ static int release_instance(struct instance *i)
|
||||
|
||||
/* wrapper functions */
|
||||
|
||||
static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
|
||||
static PyObject *wrap_mscp_init(PyObject *self, PyObject *args, PyObject *kw)
|
||||
{
|
||||
/*
|
||||
* Initialize struct mscp with options. wrap_mscp_init
|
||||
@@ -89,10 +89,14 @@ static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
|
||||
/* mscp_opts */
|
||||
"nr_threads", /* int */
|
||||
"nr_ahead", /* int */
|
||||
|
||||
"min_chunk_sz", /* unsigned long */
|
||||
"max_chunk_sz", /* unsigned long */
|
||||
"buf_sz", /* unsigned long */
|
||||
|
||||
"coremask", /* const char * */
|
||||
|
||||
"max_startups", /* int */
|
||||
"severity", /* int, MSCP_SERVERITY_* */
|
||||
"msg_fd", /* int */
|
||||
|
||||
@@ -100,17 +104,19 @@ static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
|
||||
"login_name", /* const char * */
|
||||
"port", /* const char * */
|
||||
"identity", /* const char * */
|
||||
|
||||
"cipher", /* const char * */
|
||||
"hmac", /* const char * */
|
||||
"compress", /* const char * */
|
||||
"password", /* const char * */
|
||||
"passphrase", /* const char * */
|
||||
|
||||
"debug_level", /* int */
|
||||
"no_hostkey_check", /* bool */
|
||||
"enable_nagle", /* bool */
|
||||
NULL,
|
||||
};
|
||||
const char *fmt = "si" "|iikkksii" "ssssssssipp";
|
||||
const char *fmt = "si" "|" "ii" "kkk" "s" "iii" "sss" "sssss" "ipp";
|
||||
char *coremask = NULL;
|
||||
char *login_name = NULL, *port = NULL, *identity = NULL;
|
||||
char *cipher = NULL, *hmac = NULL, *compress = NULL;
|
||||
@@ -137,6 +143,7 @@ static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
|
||||
&i->mo.max_chunk_sz,
|
||||
&i->mo.buf_sz,
|
||||
&coremask,
|
||||
&i->mo.max_startups,
|
||||
&i->mo.severity,
|
||||
&i->mo.msg_fd,
|
||||
&login_name,
|
||||
@@ -175,6 +182,7 @@ static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
|
||||
|
||||
i->m = mscp_init(remote, direction, &i->mo, &i->so);
|
||||
if (!i->m) {
|
||||
PyErr_Format(PyExc_RuntimeError, "%s", mscp_get_error());
|
||||
free(i);
|
||||
return NULL;
|
||||
}
|
||||
@@ -260,7 +268,7 @@ static PyObject *wrap_mscp_set_dst_path(PyObject *self, PyObject *args, PyObject
|
||||
return Py_BuildValue("");
|
||||
}
|
||||
|
||||
static PyObject *wrap_mscp_prepare(PyObject *self, PyObject *args, PyObject *kw)
|
||||
static PyObject *wrap_mscp_scan(PyObject *self, PyObject *args, PyObject *kw)
|
||||
{
|
||||
char *keywords[] = { "m", NULL };
|
||||
unsigned long long addr;
|
||||
@@ -275,7 +283,7 @@ static PyObject *wrap_mscp_prepare(PyObject *self, PyObject *args, PyObject *kw)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (mscp_prepare(m) < 0) {
|
||||
if (mscp_scan(m) < 0) {
|
||||
PyErr_Format(PyExc_RuntimeError, mscp_get_error());
|
||||
return NULL;
|
||||
}
|
||||
@@ -429,7 +437,7 @@ static PyMethodDef pymscpMethods[] = {
|
||||
METH_VARARGS | METH_KEYWORDS, NULL
|
||||
},
|
||||
{
|
||||
"mscp_prepare", (PyCFunction)wrap_mscp_prepare,
|
||||
"mscp_scan", (PyCFunction)wrap_mscp_scan,
|
||||
METH_VARARGS | METH_KEYWORDS, NULL
|
||||
},
|
||||
{
|
||||
|
||||
@@ -31,8 +31,6 @@
|
||||
#define pr_debug(fmt, ...)
|
||||
#endif
|
||||
|
||||
#define strerrno() strerror(errno)
|
||||
|
||||
|
||||
#define min(a, b) (((a) > (b)) ? (b) : (a))
|
||||
#define max(a, b) (((a) > (b)) ? (a) : (b))
|
||||
|
||||
@@ -61,6 +61,11 @@ def test_single_copy(mscp, src_prefix, dst_prefix, src, dst):
|
||||
src.cleanup()
|
||||
dst.cleanup()
|
||||
|
||||
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
|
||||
def test_failed_to_copy_nonexistent_file(mscp, src_prefix, dst_prefix):
|
||||
src = "nonexistent_src"
|
||||
dst = "nonexistent_dst"
|
||||
run2ng([mscp, "-H", src_prefix + src, dst_prefix + dst])
|
||||
|
||||
param_double_copy = [
|
||||
(File("src1", size = 1024 * 1024), File("src2", size = 1024 * 1024),
|
||||
@@ -139,7 +144,7 @@ def test_min_chunk(mscp, src_prefix, dst_prefix):
|
||||
src = File("src", size = 16 * 1024).make()
|
||||
dst = File("dst")
|
||||
|
||||
run2ok([mscp, "-H", "-s", 8192, src_prefix + src.path, dst_prefix + dst.path])
|
||||
run2ok([mscp, "-H", "-s", 32768, src_prefix + src.path, dst_prefix + dst.path])
|
||||
assert check_same_md5sum(src, dst)
|
||||
|
||||
src.cleanup()
|
||||
@@ -150,7 +155,7 @@ def test_thread_affinity(mscp, src_prefix, dst_prefix):
|
||||
src = File("src", size = 64 * 1024).make()
|
||||
dst = File("dst")
|
||||
|
||||
run2ok([mscp, "-H", "-n", 4, "-m", "0x01", "-s", 8192, "-S", 65536,
|
||||
run2ok([mscp, "-H", "-n", 4, "-m", "0x01",
|
||||
src_prefix + src.path, dst_prefix + dst.path])
|
||||
assert check_same_md5sum(src, dst)
|
||||
|
||||
|
||||
@@ -71,6 +71,7 @@ param_kwargs = [
|
||||
{ "min_chunk_sz": 1 * 1024 * 1024 },
|
||||
{ "max_chunk_sz": 64 * 1024 * 1024 },
|
||||
{ "coremask": "0x0f" },
|
||||
{ "max_startups": 5 },
|
||||
{ "severity": mscp.SEVERITY_NONE },
|
||||
{ "cipher": "aes128-gcm@openssh.com" },
|
||||
{ "compress": "yes" },
|
||||
@@ -102,3 +103,28 @@ def test_login_failed():
|
||||
m = mscp.mscp(remote, mscp.LOCAL2REMOTE, port = "65534")
|
||||
with pytest.raises(RuntimeError) as e:
|
||||
m.connect()
|
||||
|
||||
def test_get_stat_before_copy_start():
|
||||
m = mscp.mscp("localhost", mscp.LOCAL2REMOTE)
|
||||
m.connect()
|
||||
(total, done, finished) = m.stats()
|
||||
assert total == 0 and done == 0
|
||||
|
||||
|
||||
param_invalid_kwargs = [
|
||||
{ "nr_threads": -1 },
|
||||
{ "nr_ahead": -1 },
|
||||
{ "min_chunk_sz": 1 },
|
||||
{ "max_chunk_sz": 1 },
|
||||
{ "coremask": "xxxxx" },
|
||||
{ "max_startups": -1 },
|
||||
{ "cipher": "invalid" },
|
||||
{ "hmac": "invalid"},
|
||||
{ "compress": "invalid"},
|
||||
]
|
||||
|
||||
@pytest.mark.parametrize("kw", param_invalid_kwargs)
|
||||
def test_invalid_options(kw):
|
||||
with pytest.raises(RuntimeError) as e:
|
||||
m = mscp.mscp("localhost", mscp.LOCAL2REMOTE, **kw)
|
||||
m.connect()
|
||||
|
||||
@@ -16,6 +16,9 @@ class File():
|
||||
self.content = content
|
||||
self.perm = perm
|
||||
|
||||
def __repr__(self):
|
||||
return "<file:{} {}-bytes>".format(self.path, self.size)
|
||||
|
||||
def __str__(self):
|
||||
return self.path
|
||||
|
||||
|
||||
Reference in New Issue
Block a user