51 Commits

Author SHA1 Message Date
Ryo Nakamura
9227938297 bump version to 0.1.0 2023-09-07 15:06:06 +09:00
Ryo Nakamura
ccc4dedf30 fix docker/alpine-3.17: no need to make install
because libmscp is installed by data_files.
2023-09-05 21:20:38 +09:00
Ryo Nakamura
49e8e26f2a add jupyter example
ToDo: refactor state handling of pymscp
2023-09-02 17:23:57 +09:00
Ryo Nakamura
11e024c1da fix libmscp python bindings.
- fix libmscp install path by setup.py with data_files
- fix return values of mscp_get_stats()
- add examples directory for mscp python binding
2023-08-30 21:24:00 +09:00
Ryo Nakamura
5466a8b9e1 setup.py: fix data_files to isntall libmscp to python library path 2023-08-30 20:35:27 +09:00
Ryo Nakamura
13ec652195 fix mscp_opendir, do not use tls_sftp, use sftp isntead.
The fixed issue causes mscp_opendir wrongly calls opendir() for
local when tls_sftp is NULL although sftp is not NULL.
2023-08-30 19:09:29 +09:00
Ryo Nakamura
6b45cf7c9c update README: adjust to the current usage 2023-08-04 16:12:36 +09:00
Ryo Nakamura
58026790d9 fix usage: "none" is not supported for -F 2023-08-04 16:11:29 +09:00
Ryo Nakamura
23d9577bde introduce git-based versioning
MSCP_BUILD_VERSION (`git describe --tags --dirty --match "v*"`) is
passed through include/mscp_version.h.in and cmake.

When git is failed, use VERSION file instead (for building from
source tar balls that excludes .git).
2023-08-04 16:07:37 +09:00
Ryo Nakamura
24c1bc9149 do not set O_TRUNC when opening destination file.
It prevents `mscp localhost:hoge ~/hoge` from truncating the source
file. See https://bugzilla.mindrot.org/show_bug.cgi?id=3431.

https://github.com/upa/mscp/issues/1
2023-08-04 15:06:14 +09:00
Ryo Nakamura
16f2f88cc9 update README: adjust -h output to HEAD 2023-08-04 14:11:58 +09:00
Ryo Nakamura
2773c7b4d6 add test for specifying ssh_config 2023-08-04 14:04:46 +09:00
Ryo Nakamura
518aa42208 add -F ssh_config option 2023-08-04 13:31:10 +09:00
Ryo Nakamura
3b26c7c719 update README: glob is now supported 2023-08-04 01:53:48 +09:00
Ryo Nakamura
fbc817213b use pseudo glob/globfree for remote-glob when musl
musllibc does not implement GLOB_ALTDIRFUNC, so do not call
glob for remote sides when libc is musl.

test_e2e.py skips test_glob_src_path when running on alpine.
2023-08-03 21:59:54 +09:00
Ryo Nakamura
5a4c043889 cmake: add docker-build no-cache target 2023-08-03 21:58:59 +09:00
Ryo Nakamura
ba6f53d253 add glob for source paths
https://github.com/upa/mscp/issues/3
2023-08-03 20:26:13 +09:00
Ryo Nakamura
9f7c135b15 cleanup wrappers for file operations
Previously wrapper functions for open(), opendir(), and stat(), etc,
are implemneted in path.h, and now they are in fileops.h and fileops.c.
This commit is a reparation for remote glob.
2023-08-03 17:07:39 +09:00
Ryo Nakamura
8ab06c9531 bump version to 0.0.9 2023-07-20 22:11:48 +09:00
Ryo Nakamura
a847ef1ea8 drop centos8, add almalinux 8.8, update rocky to 8.8
And cleanup Docker files
2023-07-20 21:54:43 +09:00
Ryo Nakamura
24e86f58d8 mscp: maintain mscp_thread structs in list
Instead of m->threads array, struct mscp_thread instanes are
maintained in m->thread_list. This enables stable counter access
via mscp_get_stats().
2023-05-07 21:05:05 +09:00
Ryo Nakamura
1d3b3a2261 main: add a white space to the elapsed time output
It adjusts the position of XX:XX in elapsed timeou output.
2023-04-05 19:07:10 +09:00
Ryo Nakamura
575c920b6e main: print elapsed time instead ETA at the end 2023-04-05 19:00:29 +09:00
Ryo Nakamura
1bd832a135 Merge branch 'main' of github.com:upa/mscp 2023-03-26 01:50:00 +09:00
Ryo Nakamura
834407379d fix error handling when scan thread failed.
set chunk pool to fill to invoke copy threads when scan failed.
2023-03-25 22:29:09 +09:00
Ryo Nakamura
6be61e8adf test: add sleep -1 before ssh-keyscan 2023-03-22 19:24:14 +09:00
Ryo Nakamura
8192151154 fix invalid return sem 2023-03-22 18:06:19 +09:00
Ryo Nakamura
3f00bd2c7b test: set min_chunk_sz to 32768 on test_min_chunk
Page size of arm mac is 16384.
2023-03-22 18:00:52 +09:00
Ryo Nakamura
5ac0874621 bump version to 0.0.8 2023-03-19 16:41:15 +09:00
Ryo Nakamura
e0e6fae296 do not sem_close() for unnamed semaphore 2023-03-16 01:01:46 +09:00
Ryo Nakamura
6305f02770 fix semaphore handling for macOS 2023-03-16 00:03:22 +09:00
Ryo Nakamura
ae4b848ba0 add sem_create(), wrappign sem_init() for linux and sem_open() for macOS 2023-03-15 23:54:57 +09:00
Ryo Nakamura
3902fb584a linux also needs stdlib.h for random() 2023-03-15 23:26:11 +09:00
Ryo Nakamura
4ec877a290 test: add __repr__ to File 2023-03-15 23:23:14 +09:00
Ryo Nakamura
f0c70a148b macOS does not support sem_init. use sem_open instead 2023-03-15 23:18:33 +09:00
Ryo Nakamura
e038b3020d fix readme 2023-03-15 22:28:23 +09:00
Ryo Nakamura
2fdfa7b830 test: add invalid kwargs test 2023-03-15 22:20:00 +09:00
Ryo Nakamura
f5d0f526f2 add comment to mscp_scan(), why usleep(100) 2023-03-15 22:19:09 +09:00
Ryo Nakamura
a086e6a154 rename mscp_prepare to mscp_scan 2023-03-15 22:03:14 +09:00
Ryo Nakamura
3bce4ec277 set m->tid_prepare 0 to avoid duble join 2023-03-15 21:56:46 +09:00
Ryo Nakamura
a923d40ada mscp: add -u max_startups option.
pymscp also accepts keyword 'max_startups' (int).
2023-03-15 21:53:34 +09:00
Ryo Nakamura
24fef5f539 fix: when msg_fd is 0, use STDOUT_FILENO 2023-03-15 01:35:55 +09:00
Ryo Nakamura
4e80b05da7 do not fdopen(msg_fd) if msg_fd < 0 2023-03-15 00:39:56 +09:00
Ryo Nakamura
98eca409af introduce semaphore for concurrent connecting ssh
instead of ssh_estab_queue (delay-based approach). MaxStartups in
sshd_config limits number of conccurent incoming ssh connections.
mscp_opts->max_startups adjusts this value.
2023-03-15 00:35:48 +09:00
Ryo Nakamura
cf99a439cb cleanup message print functions 2023-03-15 00:00:23 +09:00
Ryo Nakamura
3077bb0856 rename ssh_connect_flag to ssh_estab_queue 2023-03-14 01:20:55 +09:00
Ryo Nakamura
72c27f16d6 implement ssh_connect_flag
Each copy thread establishes SSH/SFTP connection to remote host.
A delay is inserted between SSH connecting to the remote.
2023-03-14 00:43:53 +09:00
Ryo Nakamura
9b0eb668f9 cleanup mscp_prepare-related code 2023-03-14 00:11:13 +09:00
Ryo Nakamura
5f9f20f150 mscp_prepare() scans source paths in a thread.
This commit runs mscp_prepare() in a pthread. mscp copy threads
run aysnchronously with mscp_prepare(). So, when mscp_prepare()
has not finished yet (due to too many source files), we can start
to copy files.
2023-03-13 22:35:51 +09:00
Ryo Nakamura
ceb9ebd5a8 revise walk_src_path.
In new walk_src_path, resolve dst path and resolve chunks are
invoked when adding a path.
2023-03-13 21:02:26 +09:00
Ryo Nakamura
3810d6314d Update README.md 2023-03-13 15:37:56 +09:00
39 changed files with 1933 additions and 854 deletions

View File

@@ -45,8 +45,8 @@ jobs:
files: |
${{github.workspace}}/build/mscp_ubuntu-20.04-x86_64.deb
${{github.workspace}}/build/mscp_ubuntu-22.04-x86_64.deb
${{github.workspace}}/build/mscp_centos-8-x86_64.rpm
${{github.workspace}}/build/mscp_rocky-8.6-x86_64.rpm
${{github.workspace}}/build/mscp_rocky-8.8-x86_64.rpm
${{github.workspace}}/build/mscp_almalinux-8.8-x86_64.rpm
${{github.workspace}}/build/mscp_alpine-3.17-x86_64.static
${{github.workspace}}/build/mscp.linux.x86.static

2
.gitignore vendored
View File

@@ -4,6 +4,8 @@ compile_commands.json
CMakeUserPresets.json
.*.swp
include/mscp_version.h
dist
*.egg-info
__pycache__

View File

@@ -6,6 +6,26 @@ project(mscp
VERSION ${MSCP_VERSION}
LANGUAGES C)
find_package(Git)
if (Git_FOUND)
# based on https://github.com/nocnokneo/cmake-git-versioning-example
execute_process(
COMMAND ${GIT_EXECUTABLE} describe --tags --dirty --match "v*"
OUTPUT_VARIABLE GIT_DESCRIBE_VERSION
RESULT_VARIABLE GIT_DESCRIBE_ERROR_CODE
OUTPUT_STRIP_TRAILING_WHITESPACE)
if(NOT GIT_DESCRIBE_ERROR_CODE)
set(MSCP_BUILD_VERSION ${GIT_DESCRIBE_VERSION})
endif()
endif()
if (NOT MSCP_BUILD_VERSION)
message(STATUS "Failed to determine version via Git. Use VERSION file instead.")
set(MSCP_BUILD_VERSION v${MSCP_VERSION})
endif()
include(GNUInstallDirs)
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -DDEBUG")
@@ -62,9 +82,16 @@ if(BUILD_CONAN)
list(APPEND MSCP_LINK_LIBS OpenSSL::Crypto)
endif()
set(LIBMSCP_SRC src/mscp.c src/ssh.c src/path.c src/platform.c src/message.c)
# generate version header file
configure_file(
${mscp_SOURCE_DIR}/include/mscp_version.h.in
${mscp_SOURCE_DIR}/include/mscp_version.h)
# libmscp.so
set(LIBMSCP_SRC
src/mscp.c src/ssh.c src/fileops.c src/path.c src/platform.c src/message.c)
add_library(mscp-shared SHARED ${LIBMSCP_SRC})
target_include_directories(mscp-shared
PUBLIC $<BUILD_INTERFACE:${mscp_SOURCE_DIR}/include>
@@ -104,7 +131,6 @@ if (BUILD_STATIC)
target_link_options(mscp PRIVATE -static)
endif()
target_compile_options(mscp PRIVATE ${MSCP_COMPILE_OPTS})
target_compile_definitions(mscp PUBLIC _VERSION="${PROJECT_VERSION}")
install(TARGETS mscp RUNTIME DESTINATION bin)
@@ -121,9 +147,8 @@ enable_testing()
# CPACK Rules
set(CPACK_SET_DESTDIR true)
set(CPACK_PROJECT_NAME ${PROJECT_NAME})
set(CPACK_PROJECT_VERSION ${PROJECT_VERSION})
#set(CPACK_SET_DESTDIR true)
set(CPACK_PACKAGE_NAME ${PROJECT_NAME})
set(CPACK_PACKAGE_CONTACT "Ryo Nakamura <upa@haeena.net>")
set(CPACK_PACKAGE_DESCRIPTION
"mscp, copy files over multiple ssh connections")
@@ -153,6 +178,7 @@ if(UNIX AND NOT APPLE) # on linux
set(CPACK_RPM_PACKAGE_REQUIRES ${DIST_DEP})
set(CPACK_RPM_PACKAGE_HOMEPAGE "https://github.com/upa/mscp")
set(CPACK_RPM_PACKAGE_DESCRIPTION ${CPACK_PACKAGE_DESCRIPTION})
set(CPACK_RPM_PACKAGE_LICENSE "GPLv3")
endif() # on linux
include(CPack)
@@ -162,9 +188,9 @@ include(CPack)
# Custom targets to build and test mscp in docker containers.
# foreach(IN ZIP_LISTS) (cmake >= 3.17) can shorten the following lists.
# However, ubuntu 20.04 has cmake 3.16.3. So this is a roundabout trick.
list(APPEND DIST_NAMES ubuntu ubuntu centos rocky alpine)
list(APPEND DIST_VERS 20.04 22.04 8 8.6 3.17)
list(APPEND DIST_PKGS deb deb rpm rpm static)
list(APPEND DIST_NAMES ubuntu ubuntu rocky almalinux alpine)
list(APPEND DIST_VERS 20.04 22.04 8.8 8.8 3.17)
list(APPEND DIST_PKGS deb deb rpm rpm static)
list(LENGTH DIST_NAMES _DIST_LISTLEN)
math(EXPR DIST_LISTLEN "${_DIST_LISTLEN} - 1")
@@ -185,6 +211,12 @@ foreach(x RANGE ${DIST_LISTLEN})
COMMAND
docker build -t ${DOCKER_IMAGE} -f docker/${DOCKER_INDEX}.Dockerfile .)
add_custom_target(docker-build-${DOCKER_INDEX}-no-cache
COMMENT "Build mscp in ${DOCKER_IMAGE} container"
WORKING_DIRECTORY ${mscp_SOURCE_DIR}
COMMAND
docker build --no-cache -t ${DOCKER_IMAGE} -f docker/${DOCKER_INDEX}.Dockerfile .)
add_custom_target(docker-test-${DOCKER_INDEX}
COMMENT "Test mscp in ${DOCKER_IMAGE} container"
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
@@ -198,11 +230,13 @@ foreach(x RANGE ${DIST_LISTLEN})
docker run --rm -v ${CMAKE_BINARY_DIR}:/out ${DOCKER_IMAGE}
cp /mscp/build/${PKG_FILE_NAME} /out/)
list(APPEND DOCKER_BUILDS docker-build-${DOCKER_INDEX})
list(APPEND DOCKER_TESTS docker-test-${DOCKER_INDEX})
list(APPEND DOCKER_PKGS docker-pkg-${DOCKER_INDEX})
list(APPEND DOCKER_BUILDS docker-build-${DOCKER_INDEX})
list(APPEND DOCKER_BUILDS_NO_CACHE docker-build-${DOCKER_INDEX}-no-cache)
list(APPEND DOCKER_TESTS docker-test-${DOCKER_INDEX})
list(APPEND DOCKER_PKGS docker-pkg-${DOCKER_INDEX})
endforeach()
add_custom_target(docker-build-all DEPENDS ${DOCKER_BUILDS})
add_custom_target(docker-test-all DEPENDS ${DOCKER_TESTS})
add_custom_target(docker-pkg-all DEPENDS ${DOCKER_PKGS})
add_custom_target(docker-build-all DEPENDS ${DOCKER_BUILDS})
add_custom_target(docker-build-all-no-cache DEPENDS ${DOCKER_BUILDS_NO_CACHE})
add_custom_target(docker-test-all DEPENDS ${DOCKER_TESTS})
add_custom_target(docker-pkg-all DEPENDS ${DOCKER_PKGS})

View File

@@ -21,8 +21,7 @@ https://user-images.githubusercontent.com/184632/206889149-7cc6178a-6f0f-41e6-85
Differences from `scp` on usage:
- remote glob on remote shell expansion is not supported.
- remote to remote copy is not supported.
- Remote-to-remote copy is not supported.
- `-r` option is not needed to transfer directories.
- and any other differences I have not implemented and noticed.
@@ -38,19 +37,30 @@ brew install upa/tap/mscp
- Ubuntu 22.04
```console
wget https://github.com/upa/mscp/releases/download/v0.0.6/mscp_0.0.6-ubuntu-22.04-x86_64.deb
apt-get install -f ./mscp_0.0.6-ubuntu-22.04-x86_64.deb
wget https://github.com/upa/mscp/releases/latest/download/mscp_ubuntu-22.04-x86_64.deb
apt-get install -f ./mscp_ubuntu-22.04-x86_64.deb
```
- Ubuntu 20.04
```console
wget https://github.com/upa/mscp/releases/download/v0.0.6/mscp_0.0.6-ubuntu-20.04-x86_64.deb
apt-get install -f ./mscp_0.0.6-ubuntu-20.04-x86_64.deb
wget https://github.com/upa/mscp/releases/latest/download/mscp_ubuntu-20.04-x86_64.deb
apt-get install -f ./mscp_ubuntu-20.04-x86_64.deb
```
- Rocky 8.6
- Rocky 8.8
```console
yum install https://github.com/upa/mscp/releases/download/v0.0.6/mscp_0.0.6-rocky-8.6-x86_64.rpm
yum install https://github.com/upa/mscp/releases/latest/download/mscp_rocky-8.8-x86_64.rpm
```
- Alma 8.8
```console
yum install https://github.com/upa/mscp/releases/latest/download/mscp_almalinux-8.8-x86_64.rpm
```
- Linux with single binary `mscp` (x86_64 only)
```console
wget https://github.com/upa/mscp/releases/latest/download/mscp.linux.x86.static -O /usr/local/bin/mscp
chmod 755 /usr/local/bin/mscp
```
@@ -61,7 +71,7 @@ patch introduces asynchronous SFTP Write, which is derived from
https://github.com/limes-datentechnik-gmbh/libssh (see [Re: SFTP Write
async](https://archive.libssh.org/libssh/2020-06/0000004.html)).
Currently macOS and Linux (Ubuntu, CentOS, Rocky) are supported.
Currently macOS and Linux (Ubuntu, Rocky and Alma) are supported.
```console
# clone this repository
@@ -98,9 +108,9 @@ of libssh. So you can start from cmake with it.
```console
$ mscp
mscp v0.0.7: copy files over multiple ssh connections
mscp v0.0.8: copy files over multiple ssh connections
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask] [-u max_startups]
[-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]
[-l login_name] [-p port] [-i identity_file]
[-c cipher_spec] [-M hmac_spec] [-C compress] source ... target
@@ -124,7 +134,7 @@ $ mscp -n 5 -m 0x1f -c aes128-gcm@openssh.com /var/ram/test.img 10.0.0.1:/var/ra
- `-v` option increments verbose output level.
```console
$ mscp test 10.0.0.:
$ mscp test 10.0.0.1:
[=======================================] 100% 49B /49B 198.8B/s 00:00 ETA
```
@@ -153,15 +163,16 @@ copy done: test/testdir/asdf
```console
$ mscp -h
mscp v0.0.7: copy files over multiple ssh connections
mscp v0.0.9-11-g5802679: copy files over multiple ssh connections
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask] [-u max_startups]
[-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]
[-l login_name] [-p port] [-i identity_file]
[-l login_name] [-p port] [-F ssh_config] [-i identity_file]
[-c cipher_spec] [-M hmac_spec] [-C compress] source ... target
-n NR_CONNECTIONS number of connections (default: floor(log(cores)*2)+1)
-m COREMASK hex value to specify cores where threads pinned
-u MAX_STARTUPS number of concurrent outgoing connections (default: 8)
-s MIN_CHUNK_SIZE min chunk size (default: 64MB)
-S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)
@@ -175,6 +186,7 @@ Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]
-l LOGIN_NAME login name
-p PORT port number
-F CONFIG path to user ssh config (default ~/.ssh/config)
-i IDENTITY identity file for public key authentication
-c CIPHER cipher spec
-M HMAC hmac spec

View File

@@ -1 +1 @@
0.0.7
0.1.0

View File

@@ -8,9 +8,7 @@ docker build -t mscp-ubuntu:20.04 -f docker/ubuntu-20.04.Dockerfile .
docker build -t mscp-ubuntu:22.04 -f docker/ubuntu-22.04.Dockerfile .
docker build -t mscp-centos:8 -f docker/centos-8.Dockerfile .
docker build -t mscp-rocky:8.6 -f docker/rocky-8.6.Dockerfile .
docker build -t mscp-rocky:8.8 -f docker/rocky-8.Dockerfile .
```
Test `mscp` in the containers.
@@ -20,25 +18,20 @@ docker run --init --rm mscp-ubuntu:20.04 /mscp/scripts/test-in-container.sh
docker run --init --rm mscp-ubuntu:22.04 /mscp/scripts/test-in-container.sh
docker run --init --rm mscp-centos:8 /mscp/scripts/test-in-container.sh
docker run --init --rm mscp-rocky:8.6 /mscp/scripts/test-in-container.sh
docker run --init --rm mscp-rocky:8.9 /mscp/scripts/test-in-container.sh
```
Retrieve deb/rpm packages.
```console
docker run --rm -v (pwd):/out mscp-ubuntu:20.04 \
cp /mscp/build/mscp_0.0.0-ubuntu-20.04-x86_64.deb /out/
cp /mscp/build/mscp_ubuntu-20.04-x86_64.deb /out/
docker run --rm -v (pwd):/out mscp-ubuntu:22.04 \
cp /mscp/build/mscp_0.0.0-ubuntu-22.04-x86_64.deb /out/
cp /mscp/build/mscp_ubuntu-22.04-x86_64.deb /out/
docker run --rm -v (pwd):/out mscp-centos:8 \
cp /mscp/build/mscp_0.0.0-centos-8-x86_64.rpm /out/
docker run --rm -v (pwd):/out mscp-rocky:8.6 \
cp /mscp/build/mscp_0.0.0-rocky-8.6-x86_64.rpm /out/
docker run --rm -v (pwd):/out mscp-rocky:8.8 \
cp /mscp/build/mscp_rocky-8.8-x86_64.rpm /out/
```
I don't know whether these are good way.

View File

@@ -1,8 +1,4 @@
FROM rockylinux:8.6
ARG mscpdir="/mscp"
COPY . ${mscpdir}
FROM almalinux:8.8
# install pytest, sshd for test, and rpm-build
RUN set -ex && yum -y install \
@@ -17,6 +13,11 @@ RUN mkdir /var/run/sshd \
&& ssh-keygen -f /root/.ssh/id_rsa -N "" \
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
ARG mscpdir="/mscp"
COPY . ${mscpdir}
# install build dependency
RUN ${mscpdir}/scripts/install-build-deps.sh

View File

@@ -2,18 +2,26 @@ FROM alpine:3.17
# Build mscp with conan to create single binary mscp
ARG mscpdir="/mscp"
COPY . ${mscpdir}
RUN apk add --no-cache \
gcc make cmake python3 py3-pip perl linux-headers libc-dev \
openssh bash python3-dev g++
RUN pip3 install conan pytest
# preparation for sshd
RUN ssh-keygen -A
RUN mkdir /var/run/sshd \
&& ssh-keygen -f /root/.ssh/id_rsa -N "" \
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
# Build mscp as a single binary
RUN conan profile detect --force
ARG mscpdir="/mscp"
COPY . ${mscpdir}
RUN cd ${mscpdir} \
&& rm -rf build \
&& conan install . --output-folder=build --build=missing \
@@ -25,20 +33,9 @@ RUN cd ${mscpdir} \
&& make \
&& cp mscp /usr/bin/ \
&& cp mscp /mscp/build/mscp_alpine-3.17-x86_64.static
# copy mscp to PKG FILE NAME because this build doesn't use CPACK
# preparation for sshd
RUN ssh-keygen -A
RUN mkdir /var/run/sshd \
&& ssh-keygen -f /root/.ssh/id_rsa -N "" \
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
# install mscp python module
RUN cd ${mscpdir} \
&& python3 setup.py install --user
# Need Fix: A trick putting libmscp.so to python mscp module dir does not work on alpine,
# so install libmscp.
RUN cd ${mscpdir}/build \
&& make install

View File

@@ -1,16 +1,7 @@
FROM centos:8
ARG mscpdir="/mscp"
COPY . ${mscpdir}
# from https://stackoverflow.com/questions/70963985/error-failed-to-download-metadata-for-repo-appstream-cannot-prepare-internal
RUN cd /etc/yum.repos.d/
RUN sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-*
RUN sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-*
FROM rockylinux:8.8
# install pytest, sshd for test, and rpm-build
RUN set -ex && yum -y update && yum -y install \
RUN set -ex && yum -y install \
python3 python3-pip python3-devel openssh openssh-server openssh-clients rpm-build
RUN python3 -m pip install pytest
@@ -22,13 +13,17 @@ RUN mkdir /var/run/sshd \
&& ssh-keygen -f /root/.ssh/id_rsa -N "" \
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
ARG mscpdir="/mscp"
COPY . ${mscpdir}
# install build dependency
RUN ${mscpdir}/scripts/install-build-deps.sh
# build
RUN cd ${mscpdir} \
&& rm -rf build \
&& cmake -B build \
&& cmake -B build \
&& cd ${mscpdir}/build \
&& make \
&& cpack -G RPM CPackConfig.cmake \
@@ -37,3 +32,4 @@ RUN cd ${mscpdir} \
# install mscp python module
RUN cd ${mscpdir} \
&& python3 setup.py install --user

View File

@@ -1,10 +1,6 @@
FROM ubuntu:20.04
ARG DEBIAN_FRONTEND=noninteractive
ARG mscpdir="/mscp"
COPY . ${mscpdir}
RUN set -ex && apt-get update && apt-get install -y --no-install-recommends \
ca-certificates
@@ -21,6 +17,10 @@ RUN mkdir /var/run/sshd \
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
ARG mscpdir="/mscp"
COPY . ${mscpdir}
# install build dependency
RUN ${mscpdir}/scripts/install-build-deps.sh

View File

@@ -1,10 +1,6 @@
FROM ubuntu:22.04
ARG DEBIAN_FRONTEND=noninteractive
ARG mscpdir="/mscp"
COPY . ${mscpdir}
RUN set -ex && apt-get update && apt-get install -y --no-install-recommends \
ca-certificates
@@ -20,6 +16,9 @@ RUN mkdir /var/run/sshd \
&& ssh-keygen -f /root/.ssh/id_rsa -N "" \
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
ARG mscpdir="/mscp"
COPY . ${mscpdir}
# install build dependency
RUN ${mscpdir}/scripts/install-build-deps.sh

3
examples/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
simple-copy-dest
*.img
.ipynb_checkpoints

226
examples/mscp-example.ipynb Normal file
View File

@@ -0,0 +1,226 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "ccda9e3a-35de-43fc-9b6e-02475c763f6b",
"metadata": {},
"source": [
"# mscp python binding example"
]
},
{
"cell_type": "code",
"execution_count": 60,
"id": "df04d655-a082-47eb-9a1e-154ebc2a5655",
"metadata": {},
"outputs": [],
"source": [
"import glob\n",
"import time\n",
"import os\n",
"\n",
"import mscp"
]
},
{
"cell_type": "code",
"execution_count": 53,
"id": "e9ed4519-c3fd-4639-89a5-1c1cdffd9519",
"metadata": {},
"outputs": [],
"source": [
"this_dir = os.getcwd()"
]
},
{
"cell_type": "markdown",
"id": "fee75bf8-df40-45f4-81d1-113069c34f13",
"metadata": {},
"source": [
"## Simple copy"
]
},
{
"cell_type": "code",
"execution_count": 54,
"id": "2b06e6d3-30cc-47be-bd4f-af27eb141c8c",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['../src/ssh.c',\n",
" '../src/mscp.c',\n",
" '../src/platform.c',\n",
" '../src/pymscp.c',\n",
" '../src/main.c',\n",
" '../src/path.c',\n",
" '../src/message.c',\n",
" '../src/fileops.c']"
]
},
"execution_count": 54,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# preparing files to be transferred\n",
"c_sources = glob.glob(\"../src/*.c\")\n",
"c_sources"
]
},
{
"cell_type": "code",
"execution_count": 55,
"id": "89bb4558-9472-4d26-9af3-24f426b15edc",
"metadata": {},
"outputs": [],
"source": [
"# copy files using mscp\n",
"dst_dir = this_dir + \"/simple-copy-dest\"\n",
"m = mscp.mscp(\"localhost\", mscp.LOCAL2REMOTE)\n",
"m.copy(c_sources, dst_dir)"
]
},
{
"cell_type": "code",
"execution_count": 56,
"id": "6daf2c98-8905-4039-b82a-a593df3107fe",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['ssh.c',\n",
" 'mscp.c',\n",
" 'platform.c',\n",
" 'pymscp.c',\n",
" 'main.c',\n",
" 'path.c',\n",
" 'message.c',\n",
" 'fileops.c']"
]
},
"execution_count": 56,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"os.listdir(\"simple-copy-dest\")"
]
},
{
"cell_type": "markdown",
"id": "f4a3869a-878e-43b0-9758-a049eaf8b5bd",
"metadata": {},
"source": [
"## Simple Copy with Python Rich ProgressBar"
]
},
{
"cell_type": "code",
"execution_count": 64,
"id": "e7cb7cd6-b845-4d26-93ed-aee8ed3983ab",
"metadata": {},
"outputs": [],
"source": [
"# make a 256MB file\n",
"src = \"example-256MB-src.img\"\n",
"with open(src, \"wb\") as f:\n",
" f.seek(128 * 1024 * 1024 -1, 0)\n",
" f.write(b'1')"
]
},
{
"cell_type": "code",
"execution_count": 69,
"id": "878607ed-5c06-4b15-81ac-9845dad0c9c6",
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "b700e9fc00464969a22a26300404dc35",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Output()"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\"></pre>\n"
],
"text/plain": []
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\">\n",
"</pre>\n"
],
"text/plain": [
"\n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"# copy the 256MB file while ploting progress bar using python rich\n",
"dst = this_dir + \"/example-256MB-dst.img\"\n",
"\n",
"kw = {\"nr_threads\": 1, \"nr_ahead\": 1} # slow mscp to watch the progress bar\n",
"\n",
"m = mscp.mscp(\"localhost\", mscp.LOCAL2REMOTE, **kw)\n",
"m.copy(src, dst, nonblock = True)\n",
"\n",
"# m.stats() returns total bytes to be transferred, bytes transferred (done), and finished (bool).\n",
"total, done, finished = m.stats()\n",
"with Progress() as progress:\n",
"\n",
" task = progress.add_task(f\"[green]Copying {src}\", total = total)\n",
"\n",
" while not progress.finished:\n",
" total, done, finished = m.stats()\n",
" progress.update(task, completed = done)\n",
" time.sleep(0.5)\n",
"\n",
"m.join()\n",
"m.cleanup()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.4"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

63
examples/mscp-python.py Executable file
View File

@@ -0,0 +1,63 @@
#!/usr/bin/env python3
"""mscp.py
An example python script running mscp
"""
import argparse
import time
import sys
from rich.progress import Progress
import mscp
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--from", dest = "fr",
metavar = "REMOTE", default = None,
help = "copy a file from this remote host")
parser.add_argument("-t", "--to", metavar = "REMOTE", default = None,
help = "copy a file to this remote host")
parser.add_argument("source", help = "path to source file to be copied")
parser.add_argument("destination", help = "path of copy destination")
args = parser.parse_args()
if args.fr and args.to:
print("-f and -t are exclusive", file = sys.stderr)
sys.exit(1)
elif args.fr:
d = mscp.REMOTE2LOCAL
remote = args.fr
elif args.to:
d = mscp.LOCAL2REMOTE
remote = args.to
else:
print("-f or -t must be specified", file = sys.stderr)
sys.exit(1)
m = mscp.mscp(remote, d)
m.connect()
m.add_src_path(args.source)
m.set_dst_path(args.destination)
m.scan()
m.start()
total, done, finished = m.stats()
with Progress() as progress:
task = progress.add_task("[green]Copying...", total = total)
while not progress.finished:
total, done, finished = m.stats()
progress.update(task, completed = done)
time.sleep(0.5)
m.join()
m.cleanup()
if __name__ == "__main__":
main()

View File

@@ -18,7 +18,7 @@
* 2. connect to remote host with mscp_connect()
* 3. add path to source files with mscp_add_src_path()
* 4. set path to destination with mscp_set_dst_path()
* 5. finish preparation with mscp_prepare()
* 5. start to scan source files with mscp_scan()
* 6. start copy with mscp_start()
* 7. wait for copy finished with mscp_join()
* 8. cleanup mscp instance with mscp_cleanup() and mscp_free()
@@ -43,6 +43,7 @@ struct mscp_opts {
size_t max_chunk_sz; /** maximum chunk size (default file size/nr_threads) */
size_t buf_sz; /** buffer size, default 16k. */
char coremask[MSCP_MAX_COREMASK_STR]; /** hex to specifiy usable cpu cores */
int max_startups; /* sshd MaxStartups concurrent connections */
int severity; /** messaging severity. set MSCP_SERVERITY_* */
int msg_fd; /** fd to output message. default STDOUT (0),
@@ -66,6 +67,7 @@ struct mscp_ssh_opts {
/* ssh options */
char login_name[MSCP_SSH_MAX_LOGIN_NAME]; /** ssh username */
char port[MSCP_SSH_MAX_PORT_STR]; /** ssh port */
char config[PATH_MAX]; /** path to ssh_config, default ~/.ssh/config*/
char identity[MSCP_SSH_MAX_IDENTITY_PATH]; /** path to private key */
char cipher[MSCP_SSH_MAX_CIPHER_STR]; /** cipher spec */
char hmac[MSCP_SSH_MAX_HMAC_STR]; /** hmacp spec */
@@ -109,7 +111,7 @@ struct mscp *mscp_init(const char *remote_host, int direction,
/**
* @brief Connect the first SSH connection. mscp_connect connects to
* remote host and initialize a SFTP session over the
* connection. mscp_prepare() and mscp_start() require mscp_connect()
* connection. mscp_scan() and mscp_start() require mscp_connect()
* beforehand.
*
* @param m mscp instance.
@@ -149,20 +151,31 @@ int mscp_add_src_path(struct mscp *m, const char *src_path);
*/
int mscp_set_dst_path(struct mscp *m, const char *dst_path);
/* check source files, resolve destination file paths for all source
* files, and prepare chunks for all files. */
/* scan source files, resolve destination file paths for all source
* files, and calculate chunks for all files. */
/**
* @brief Prepare for file transfer. This function checks all source
* files (recursively), resolve paths on the destination side, and
* calculate file chunks.
* @brief Scan source paths and prepare. This function checks all
* source files (recursively), resolve paths on the destination side,
* and calculate file chunks. This function is non-blocking.
*
* @param m mscp instance.
*
* @return 0 on success, < 0 if an error occured.
* mscp_get_error() can be used to retrieve error message.
*/
int mscp_prepare(struct mscp *m);
int mscp_scan(struct mscp *m);
/**
* @brief Join scna thread invoked by mscp_scan(). mscp_join()
* involves this, so that mscp_scan_join() should be called when
* mscp_scan() is called by mscp_start() is not.
*
* @param m mscp instance.
* @return 0 on success, < 0 if an error occured.
* mscp_get_error() can be used to retrieve error message.
*/
int mscp_scan_join(struct mscp *m);
/**
* @brief Start to copy files. mscp_start() returns immediately. You
@@ -172,7 +185,7 @@ int mscp_prepare(struct mscp *m);
*
* @param m mscp instance.
*
* @return 0 on success, < 0 if an error occured.
* @return number of threads on success, < 0 if an error occured.
* mscp_get_error() can be used to retrieve error message.
*
* @see mscp_join()
@@ -245,15 +258,6 @@ enum {
};
/**
* @brief Set a file descriptor for receiving messages from mscp.
* This function has the same effect with setting mscp_opts->msg_fd.
*
* @param m mscp instance.
* @param fd fd to which libmscp writes messages.
*/
void mscp_set_msg_fd(struct mscp *m, int fd);
/**
* @brief Get the recent error message from libmscp. Note that this

View File

@@ -0,0 +1,7 @@
#ifndef _MSCP_VERSION_H_
#define _MSCP_VERSION_H_
#define MSCP_VERSION "@MSCP_VERSION@"
#define MSCP_BUILD_VERSION "@MSCP_BUILD_VERSION@"
#endif /* _MSCP_VERSION_H_ */

View File

@@ -37,7 +37,7 @@ SEVERITY_DEBUG = pymscp.SEVERITY_DEBUG
STATE_INIT = 0
STATE_CONNECTED = 1
STATE_PREPARED = 2
STATE_SCANNED = 2
STATE_RUNNING = 3
STATE_STOPPED = 4
STATE_JOINED = 5
@@ -47,7 +47,7 @@ STATE_RELEASED = 7
_state_str = {
STATE_INIT: "init",
STATE_CONNECTED: "connected",
STATE_PREPARED: "prepared",
STATE_SCANNED: "scanned",
STATE_RUNNING: "running",
STATE_STOPPED: "stopped",
STATE_JOINED: "joined",
@@ -71,6 +71,9 @@ class mscp:
self.state = STATE_INIT
def __str__(self):
if not hasattr(self, "state"):
# this instance failed on mscp_init
return "mscp:{}:init-failed"
return "mscp:{}:{}".format(self.remote, self.__state2str())
def __repr__(self):
@@ -100,26 +103,32 @@ class mscp:
self.state = STATE_CONNECTED
def add_src_path(self, src_path: str):
if type(src_path) != str:
raise ValueError("src_path must be str: {}".format(src_path))
self.src_paths.append(src_path)
pymscp.mscp_add_src_path(m = self.m, src_path = src_path)
def set_dst_path(self, dst_path: str):
if type(dst_path) != str:
raise ValueError("dst_path must be str: {}".format(dst_path))
self.dst_path = dst_path
pymscp.mscp_set_dst_path(m = self.m, dst_path = dst_path);
def prepare(self):
def scan(self):
if self.state == STATE_SCANNED:
return
if self.state != STATE_CONNECTED:
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
if not self.src_paths:
raise RuntimeError("src path list is empty")
if not self.dst_path:
if self.dst_path == None:
raise RuntimeError("dst path is not set")
pymscp.mscp_prepare(m = self.m)
self.state = STATE_PREPARED
pymscp.mscp_scan(m = self.m)
self.state = STATE_SCANNED
def start(self):
if self.state != STATE_PREPARED:
if self.state != STATE_SCANNED:
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
pymscp.mscp_start(m = self.m)
@@ -132,6 +141,8 @@ class mscp:
self.state = STATE_STOPPED
def join(self):
if self.state == STATE_JOINED:
return
if not (self.state == STATE_RUNNING or self.state == STATE_STOPPED):
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
pymscp.mscp_join(m = self.m)
@@ -167,7 +178,7 @@ class mscp:
self.set_dst_path(dst)
self.prepare()
self.scan()
self.start()
if nonblock:
return

View File

@@ -19,7 +19,7 @@ case $platform in
apt-get install -y \
gcc make cmake zlib1g-dev libssl-dev libkrb5-dev
;;
Linux-centos* | Linux-rhel* | Linux-rocky*)
Linux-centos* | Linux-rhel* | Linux-rocky* | Linux-almalinux)
yum install -y \
gcc make cmake zlib-devel openssl-devel rpm-build
;;

View File

@@ -14,10 +14,10 @@ case $release in
ubuntu-22.04*)
echo "libc6 (>= 2.33), libgssapi-krb5-2 (>= 1.17), libssl3 (>= 3.0.0~~alpha1), zlib1g (>= 1:1.1.4)"
;;
centos* | rhel* | rocky*)
centos* | rhel* | rocky* | almalinux*)
echo "glibc crypto-policies krb5-libs openssl-libs libcom_err"
;;
*)
echo "unsupported install dependency: $release"
echo "$(basename $0): unsupported install dependency: $release"
exit 1
esac

View File

@@ -11,6 +11,7 @@ set -x
# Run sshd
if [ ! -e /var/run/sshd.pid ]; then
/usr/sbin/sshd
sleep 1
fi
ssh-keyscan localhost >> ${HOME}/.ssh/known_hosts

View File

@@ -11,6 +11,9 @@ if sys.platform == "linux":
elif sys.platform == "darwin":
libmscp = "libmscp.dylib"
data_dir = sys.prefix + "/lib"
libmscp = "build/" + libmscp
setup(
name='mscp',
version = version,
@@ -20,7 +23,7 @@ setup(
url = "https://github.com/upa/mscp",
packages = find_packages("mscp"),
package_dir = {"": "mscp"},
data_files = [ ("", ["build/" + libmscp])],
data_files = [ (data_dir, [libmscp])],
py_modules = [ "mscp" ],
ext_modules = [
Extension(

View File

@@ -20,6 +20,8 @@ static inline refcnt refcnt_dec(refcnt *cnt)
}
/* mutex */
typedef pthread_mutex_t lock;
static inline void lock_init(lock *l)
@@ -44,12 +46,58 @@ static inline void lock_release_via_cleanup(void *l)
lock_release(l);
}
#define LOCK_ACQUIRE_THREAD(l) \
lock_acquire(l); \
pthread_cleanup_push(lock_release_via_cleanup, l)
#define LOCK_ACQUIRE(l) \
lock_acquire(l); \
pthread_cleanup_push(lock_release_via_cleanup, l)
#define LOCK_RELEASE_THREAD() \
#define LOCK_RELEASE() \
pthread_cleanup_pop(1)
/* read/write lock */
typedef pthread_rwlock_t rwlock;
static inline void rwlock_init(rwlock *rw)
{
pthread_rwlock_init(rw, NULL);
}
static inline void rwlock_read_acquire(rwlock *rw)
{
int ret = pthread_rwlock_rdlock(rw);
assert(ret == 0);
}
static inline void rwlock_write_acquire(rwlock *rw)
{
int ret = pthread_rwlock_wrlock(rw);
assert(ret == 0);
}
static inline void rwlock_release(rwlock *rw)
{
int ret = pthread_rwlock_unlock(rw);
assert(ret == 0);
}
static inline void rwlock_release_via_cleanup(void *rw)
{
rwlock_release(rw);
}
#define RWLOCK_READ_ACQUIRE(rw) \
rwlock_read_acquire(rw); \
pthread_cleanup_push(rwlock_release_via_cleanup, rw)
#define RWLOCK_WRITE_ACQUIRE(rw) \
rwlock_write_acquire(rw); \
pthread_cleanup_push(rwlock_release_via_cleanup, rw)
#define RWLOCK_RELEASE() \
pthread_cleanup_pop(1)
#endif /* _ATOMIC_H_ */

366
src/fileops.c Normal file
View File

@@ -0,0 +1,366 @@
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <dirent.h>
#include <fileops.h>
#include <ssh.h>
#include <message.h>
sftp_session __thread tls_sftp;
/* tls_sftp is used *_wrapped() functions */
void set_tls_sftp_session(sftp_session sftp)
{
tls_sftp = sftp;
}
static void sftp_err_to_errno(sftp_session sftp)
{
int sftperr = sftp_get_error(sftp);
switch (sftperr){
case SSH_FX_OK:
case SSH_FX_EOF:
errno = 0;
break;
case SSH_FX_NO_SUCH_FILE:
case SSH_FX_NO_SUCH_PATH:
errno = ENOENT;
break;
case SSH_FX_PERMISSION_DENIED:
errno = EACCES;
break;
case SSH_FX_FAILURE:
errno = EINVAL;
case SSH_FX_BAD_MESSAGE:
errno = EBADMSG;
case SSH_FX_NO_CONNECTION:
errno = ENOTCONN;
break;
case SSH_FX_CONNECTION_LOST:
errno = ENETRESET;
break;
case SSH_FX_OP_UNSUPPORTED:
errno = EOPNOTSUPP;
break;
case SSH_FX_INVALID_HANDLE:
errno = EBADF;
break;
case SSH_FX_FILE_ALREADY_EXISTS:
errno = EEXIST;
break;
case SSH_FX_WRITE_PROTECT:
errno = EPERM;
break;
case SSH_FX_NO_MEDIA:
errno = ENODEV;
break;
default:
mpr_warn(stderr, "unkown SSH_FX response %d", sftperr);
}
}
MDIR *mscp_opendir(const char *path, sftp_session sftp)
{
MDIR *md;
if (!(md = malloc(sizeof(*md))))
return NULL;
memset(md, 0, sizeof(*md));
if (sftp) {
md->remote = sftp_opendir(sftp, path);
sftp_err_to_errno(sftp);
if (!md->remote) {
goto free_out;
}
} else {
md->local = opendir(path);
if (!md->local) {
goto free_out;
}
}
return md;
free_out:
free(md);
return NULL;
}
MDIR *mscp_opendir_wrapped(const char *path)
{
return mscp_opendir(path, tls_sftp);
}
void mscp_closedir(MDIR *md)
{
int ret;
if (md->remote)
sftp_closedir(md->remote);
else
closedir(md->local);
free(md);
}
struct dirent __thread tls_dirent;
/* tls_dirent contains dirent converted from sftp_attributes returned
* from sftp_readdir(). This trick is derived from openssh's
* fudge_readdir() */
struct dirent *mscp_readdir(MDIR *md)
{
sftp_attributes attr;
struct dirent *ret = NULL;
static int inum = 1;
if (md->remote) {
attr = sftp_readdir(md->remote->sftp, md->remote);
if (!attr) {
sftp_err_to_errno(md->remote->sftp);
return NULL;
}
memset(&tls_dirent, 0, sizeof(tls_dirent));
strncpy(tls_dirent.d_name, attr->name, sizeof(tls_dirent.d_name) - 1);
tls_dirent.d_ino = inum++;
if (!inum)
inum = 1;
ret = &tls_dirent;
sftp_attributes_free(attr);
} else
ret = readdir(md->local);
return ret;
}
int mscp_mkdir(const char *path, mode_t mode, sftp_session sftp)
{
int ret;
if (sftp) {
ret = sftp_mkdir(sftp, path, mode);
sftp_err_to_errno(sftp);
} else
ret = mkdir(path, mode);
if (ret < 0 && errno == EEXIST) {
ret = 0;
}
return ret;
}
static void sftp_attr_to_stat(sftp_attributes attr, struct stat *st)
{
memset(st, 0, sizeof(*st));
st->st_size = attr->size;
st->st_uid = attr->uid;
st->st_gid = attr->gid;
st->st_mode = attr->permissions;
switch (attr->type) {
case SSH_FILEXFER_TYPE_REGULAR:
st->st_mode |= S_IFREG;
break;
case SSH_FILEXFER_TYPE_DIRECTORY:
st->st_mode |= S_IFDIR;
break;
case SSH_FILEXFER_TYPE_SYMLINK:
st->st_mode |= S_IFLNK;
break;
case SSH_FILEXFER_TYPE_SPECIAL:
st->st_mode |= S_IFCHR; /* or block? */
break;
case SSH_FILEXFER_TYPE_UNKNOWN:
st->st_mode |= S_IFIFO; /* really? */
break;
default:
mpr_warn(stderr, "unkown SSH_FILEXFER_TYPE %d", attr->type);
}
/* ToDo: convert atime, ctime, and mtime */
}
int mscp_stat(const char *path, struct stat *st, sftp_session sftp)
{
sftp_attributes attr;
int ret = 0;
if (sftp) {
attr = sftp_stat(sftp, path);
sftp_err_to_errno(sftp);
if (!attr)
return -1;
sftp_attr_to_stat(attr, st);
sftp_attributes_free(attr);
ret = 0;
} else
ret = stat(path, st);
return ret;
}
int mscp_stat_wrapped(const char *path, struct stat *st)
{
return mscp_stat(path, st, tls_sftp);
}
int mscp_lstat(const char *path, struct stat *st, sftp_session sftp)
{
sftp_attributes attr;
int ret = 0;
if (sftp) {
attr = sftp_lstat(sftp, path);
sftp_err_to_errno(sftp);
if (!attr)
return -1;
sftp_attr_to_stat(attr, st);
sftp_attributes_free(attr);
ret = 0;
} else
ret = lstat(path, st);
return ret;
}
int mscp_lstat_wrapped(const char *path, struct stat *st)
{
return mscp_lstat(path, st, tls_sftp);
}
mf *mscp_open(const char *path, int flags, mode_t mode, sftp_session sftp)
{
mf *f;
f = malloc(sizeof(*f));
if (!f)
return NULL;
memset(f, 0, sizeof(*f));
if (sftp) {
f->remote = sftp_open(sftp, path, flags, mode);
if (!f->remote) {
sftp_err_to_errno(sftp);
goto free_out;
}
} else {
f->local = open(path, flags, mode);
if (f->local < 0)
goto free_out;
}
return f;
free_out:
free(f);
return NULL;
}
void mscp_close(mf *f)
{
if (f->remote)
sftp_close(f->remote);
if (f->local > 0)
close(f->local);
free(f);
}
int mscp_lseek(mf *f, size_t off)
{
int ret;
if (f->remote) {
ret = sftp_seek64(f->remote, off);
sftp_err_to_errno(f->remote->sftp);
} else
ret = lseek(f->local, off, SEEK_SET);
return ret;
}
int mscp_setstat(const char *path, mode_t mode, size_t size, sftp_session sftp)
{
int ret;
if (sftp) {
struct sftp_attributes_struct attr;
memset(&attr, 0, sizeof(attr));
attr.permissions = mode;
attr.size = size;
attr.flags = (SSH_FILEXFER_ATTR_PERMISSIONS|SSH_FILEXFER_ATTR_SIZE);
ret = sftp_setstat(sftp, path, &attr);
sftp_err_to_errno(sftp);
} else {
if ((ret = chmod(path, mode)) < 0)
return ret;
if ((ret = truncate(path, size)) < 0)
return ret;
}
return ret;
}
int mscp_glob(const char *pattern, int flags, glob_t *pglob, sftp_session sftp)
{
int ret;
if (sftp) {
#ifndef GLOB_ALTDIRFUNC
#define GLOB_NOALTDIRMAGIC INT_MAX
/* musl does not implement GLOB_ALTDIRFUNC */
pglob->gl_pathc = 1;
pglob->gl_pathv = malloc(sizeof(char *));
pglob->gl_pathv[0] = strdup(pattern);
pglob->gl_offs = GLOB_NOALTDIRMAGIC;
return 0;
#else
flags |= GLOB_ALTDIRFUNC;
set_tls_sftp_session(sftp);
#ifdef __APPLE__
pglob->gl_opendir = (void *(*)(const char *))mscp_opendir_wrapped;
pglob->gl_readdir = (struct dirent *(*)(void *))mscp_readdir;
pglob->gl_closedir = (void (*)(void *))mscp_closedir;
pglob->gl_lstat = mscp_lstat_wrapped;
pglob->gl_stat = mscp_stat_wrapped;
#elif linux
pglob->gl_opendir = (void *(*)(const char *))mscp_opendir_wrapped;
pglob->gl_readdir = (void *(*)(void *))mscp_readdir;
pglob->gl_closedir = (void (*)(void *))mscp_closedir;
pglob->gl_lstat = (int (*)(const char *, void *))mscp_lstat_wrapped;
pglob->gl_stat = (int (*)(const char *, void *))mscp_stat_wrapped;
#else
#error unsupported platform
#endif
#endif
}
ret = glob(pattern, flags, NULL, pglob);
if (sftp)
set_tls_sftp_session(NULL);
return ret;
}
void mscp_globfree(glob_t *pglob)
{
#ifndef GLOB_ALTDIRFUNC
if (pglob->gl_offs == GLOB_NOALTDIRMAGIC) {
free(pglob->gl_pathv[0]);
free(pglob->gl_pathv);
return;
}
#endif
globfree(pglob);
}

57
src/fileops.h Normal file
View File

@@ -0,0 +1,57 @@
#include <dirent.h>
#include <sys/stat.h>
#include <glob.h>
#include <ssh.h>
void set_tls_sftp_session(sftp_session sftp);
/* sftp_session set by set_tls_sftp_session is sued in
mscp_open_wrapped(), mscp_stat_wrapped(), and
mscp_lstat_wrapped(). This _wrapped() functions exist for
sftp_glob() */
/* directory operations */
struct mdir_struct {
DIR *local;
sftp_dir remote;
};
typedef struct mdir_struct MDIR;
MDIR *mscp_opendir(const char *path, sftp_session sftp);
MDIR *mscp_opendir_wrapped(const char *path);
void mscp_closedir(MDIR *md);
struct dirent *mscp_readdir(MDIR *md);
int mscp_mkdir(const char *path, mode_t mode, sftp_session sftp);
/* stat operations */
int mscp_stat(const char *path, struct stat *st, sftp_session sftp);
int mscp_stat_wrapped(const char *path, struct stat *st);
int mscp_lstat(const char *path, struct stat *st, sftp_session sftp);
int mscp_lstat_wrapped(const char *path, struct stat *st);
/* file operations */
struct mf_struct {
sftp_file remote;
int local;
};
typedef struct mf_struct mf;
mf *mscp_open(const char *path, int flags, mode_t mode, sftp_session sftp);
void mscp_close(mf *f);
int mscp_lseek(mf *f, size_t off);
/* mscp_setstat() involves chmod and truncate. It executes both at
* once via a single SFTP command (sftp_setstat()).
*/
int mscp_setstat(const char *path, mode_t mode, size_t size, sftp_session sftp);
/* remote glob */
int mscp_glob(const char *pattern, int flags, glob_t *pglob, sftp_session sftp);
void mscp_globfree(glob_t *pglob);

View File

@@ -554,5 +554,20 @@ static inline int list_count(struct list_head *head)
}
/**
* list_free_f - free items in a list with a function
* @head the heaf for your list.
* @f function that releases an item in the list.
*/
static inline void list_free_f(struct list_head *head, void (*f)(struct list_head *))
{
struct list_head *p, *n;
list_for_each_safe(p, n, head) {
list_del(p);
f(p);
}
}
#endif

View File

@@ -11,22 +11,16 @@
#include <pthread.h>
#include <mscp.h>
#include <mscp_version.h>
#include <util.h>
#ifndef _VERSION /* passed through cmake */
#define VERSION "(unknown)"
#else
#define VERSION _VERSION
#endif
void usage(bool print_help) {
printf("mscp v" VERSION ": copy files over multiple ssh connections\n"
printf("mscp " MSCP_BUILD_VERSION ": copy files over multiple ssh connections\n"
"\n"
"Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]\n"
"Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask] [-u max_startups]\n"
" [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]\n"
" [-l login_name] [-p port] [-i identity_file]\n"
" [-l login_name] [-p port] [-F ssh_config] [-i identity_file]\n"
" [-c cipher_spec] [-M hmac_spec] [-C compress] source ... target\n"
"\n");
@@ -36,6 +30,8 @@ void usage(bool print_help) {
printf(" -n NR_CONNECTIONS number of connections "
"(default: floor(log(cores)*2)+1)\n"
" -m COREMASK hex value to specify cores where threads pinned\n"
" -u MAX_STARTUPS number of concurrent outgoing connections "
"(default: 8)\n"
" -s MIN_CHUNK_SIZE min chunk size (default: 64MB)\n"
" -S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)\n"
"\n"
@@ -49,10 +45,12 @@ void usage(bool print_help) {
"\n"
" -l LOGIN_NAME login name\n"
" -p PORT port number\n"
" -F CONFIG path to user ssh config (default ~/.ssh/config)\n"
" -i IDENTITY identity file for public key authentication\n"
" -c CIPHER cipher spec\n"
" -M HMAC hmac spec\n"
" -C COMPRESS enable compression: yes, no, zlib, zlib@openssh.com\n"
" -C COMPRESS enable compression: "
"yes, no, zlib, zlib@openssh.com\n"
" -H disable hostkey check\n"
" -d increment ssh debug output level\n"
" -N enable Nagle's algorithm (default disabled)\n"
@@ -69,7 +67,7 @@ char *split_remote_and_path(const char *string, char **remote, char **path)
*/
if (!(s = strdup(string))) {
fprintf(stderr, "strdup: %s\n", strerrno());
fprintf(stderr, "strdup: %s\n", strerror(errno));
return NULL;
}
@@ -115,7 +113,7 @@ struct target *validate_targets(char **arg, int len)
int n;
if ((t = calloc(len, sizeof(struct target))) == NULL) {
fprintf(stderr, "calloc: %s\n", strerrno());
fprintf(stderr, "calloc: %s\n", strerror(errno));
return NULL;
}
memset(t, 0, len * sizeof(struct target));
@@ -204,7 +202,7 @@ int main(int argc, char **argv)
memset(&o, 0, sizeof(o));
o.severity = MSCP_SEVERITY_WARN;
while ((ch = getopt(argc, argv, "n:m:s:S:a:b:vqDrl:p:i:c:M:C:HdNh")) != -1) {
while ((ch = getopt(argc, argv, "n:m:u:s:S:a:b:vqDrl:p:i:F:c:M:C:HdNh")) != -1) {
switch (ch) {
case 'n':
o.nr_threads = atoi(optarg);
@@ -217,6 +215,9 @@ int main(int argc, char **argv)
case 'm':
strncpy(o.coremask, optarg, sizeof(o.coremask));
break;
case 'u':
o.max_startups = atoi(optarg);
break;
case 's':
o.min_chunk_sz = atoi(optarg);
break;
@@ -255,6 +256,9 @@ int main(int argc, char **argv)
}
strncpy(s.port, optarg, MSCP_SSH_MAX_PORT_STR);
break;
case 'F':
strncpy(s.config, optarg, PATH_MAX - 1);
break;
case 'i':
if (strlen(optarg) > MSCP_SSH_MAX_IDENTITY_PATH - 1) {
fprintf(stderr, "long identity path: %s\n", optarg);
@@ -323,7 +327,7 @@ int main(int argc, char **argv)
if (!dryrun) {
if (pipe(pipe_fd) < 0) {
fprintf(stderr, "pipe: %s\n", strerrno());
fprintf(stderr, "pipe: %s\n", strerror(errno));
return -1;
}
msg_fd = pipe_fd[0];
@@ -352,33 +356,33 @@ int main(int argc, char **argv)
return -1;
}
if (mscp_prepare(m) < 0) {
fprintf(stderr, "mscp_prepare: %s\n", mscp_get_error());
if (mscp_scan(m) < 0) {
fprintf(stderr, "mscp_scan: %s\n", mscp_get_error());
return -1;
}
if (dryrun) {
ret = 0;
ret = mscp_scan_join(m);
goto out;
}
if (pthread_create(&tid_stat, NULL, print_stat_thread, NULL) < 0) {
fprintf(stderr, "pthread_create: %s\n", strerrno());
fprintf(stderr, "pthread_create: %s\n", strerror(errno));
return -1;
}
if (signal(SIGINT, sigint_handler) == SIG_ERR) {
fprintf(stderr, "signal: %s\n", strerrno());
fprintf(stderr, "signal: %s\n", strerror(errno));
return -1;
}
ret = mscp_start(m);
if (ret < 0)
fprintf(stderr, "%s\n", mscp_get_error());
fprintf(stderr, "mscp_start: %s\n", mscp_get_error());
ret = mscp_join(m);
if (ret != 0)
fprintf(stderr, "%s\n", mscp_get_error());
fprintf(stderr, "mscp_join: %s\n", mscp_get_error());
pthread_cancel(tid_stat);
pthread_join(tid_stat, NULL);
@@ -415,7 +419,8 @@ double calculate_bps(size_t diff, struct timeval *b, struct timeval *a)
return (double)diff / calculate_timedelta(b, a);
}
char *calculate_eta(size_t remain, size_t diff, struct timeval *b, struct timeval *a)
char *calculate_eta(size_t remain, size_t diff, struct timeval *b, struct timeval *a,
bool final)
{
static char buf[16];
double elapsed = calculate_timedelta(b, a);
@@ -423,7 +428,10 @@ char *calculate_eta(size_t remain, size_t diff, struct timeval *b, struct timeva
if (diff == 0)
snprintf(buf, sizeof(buf), "--:-- ETA");
else {
else if (final) {
snprintf(buf, sizeof(buf), "%02d:%02d ",
(int)(floor(elapsed / 60)), (int)round(elapsed) % 60);
} else {
eta = remain / (diff / elapsed);
snprintf(buf, sizeof(buf), "%02d:%02d ETA",
(int)floor(eta / 60), (int)round(eta) % 60);
@@ -468,7 +476,7 @@ void print_progress_bar(double percent, char *suffix)
}
void print_progress(struct timeval *b, struct timeval *a,
size_t total, size_t last, size_t done)
size_t total, size_t last, size_t done, bool final)
{
char *bps_units[] = { "B/s ", "KB/s", "MB/s", "GB/s" };
char *byte_units[] = { "B ", "KB", "MB", "GB", "TB", "PB" };
@@ -503,7 +511,8 @@ void print_progress(struct timeval *b, struct timeval *a,
snprintf(suffix, sizeof(suffix), "%4lu%s/%lu%s %6.1f%s %s",
done_round, byte_units[byte_du], total_round, byte_units[byte_tu],
bps, bps_units[bps_u], calculate_eta(total - done, done - last, b, a));
bps, bps_units[bps_u],
calculate_eta(total - done, done - last, b, a, final));
print_progress_bar(percent, suffix);
}
@@ -527,7 +536,7 @@ void print_stat_thread_cleanup(void *arg)
x.done = s.done;
/* print progress from the beginning */
print_progress(&x.start, &x.after, x.total, 0, x.done);
print_progress(&x.start, &x.after, x.total, 0, x.done, true);
print_cli("\n"); /* final output */
}
@@ -547,14 +556,14 @@ void *print_stat_thread(void *arg)
while (true) {
if (poll(&pfd, 1, 100) < 0) {
fprintf(stderr, "poll: %s\n", strerrno());
fprintf(stderr, "poll: %s\n", strerror(errno));
return NULL;
}
if (pfd.revents & POLLIN) {
memset(buf, 0, sizeof(buf));
if (read(msg_fd, buf, sizeof(buf)) < 0) {
fprintf(stderr, "read: %s\n", strerrno());
fprintf(stderr, "read: %s\n", strerror(errno));
return NULL;
}
print_cli("\r\033[K" "%s", buf);
@@ -566,7 +575,8 @@ void *print_stat_thread(void *arg)
x.total = s.total;
x.done = s.done;
print_progress(&x.before, &x.after, x.total, x.last, x.done);
print_progress(&x.before, &x.after, x.total, x.last, x.done,
false);
x.before = x.after;
x.last = x.done;
}

View File

@@ -4,10 +4,13 @@
#include <limits.h>
#include <pthread.h>
#include <util.h>
#include <message.h>
/* mscp error message buffer */
/* strerror_r wrapper */
__thread char thread_strerror[128];
/* mscp error message buffer */
#define MSCP_ERRMSG_SIZE (PATH_MAX * 2)
static char errmsg[MSCP_ERRMSG_SIZE];
@@ -30,29 +33,17 @@ const char *mscp_get_error()
/* message print functions */
static int mprint_serverity = MSCP_SEVERITY_WARN;
static pthread_mutex_t mprint_lock = PTHREAD_MUTEX_INITIALIZER;
static int mprint_severity = MSCP_SEVERITY_WARN;
void mprint_set_severity(int serverity)
{
if (serverity < 0)
mprint_serverity = -1; /* no print */
mprint_serverity = serverity;
mprint_severity = -1; /* no print */
mprint_severity = serverity;
}
void mprint(int fd, int serverity, const char *fmt, ...)
int mprint_get_severity()
{
va_list va;
int ret;
if (fd < 0)
return;
if (serverity <= mprint_serverity) {
pthread_mutex_lock(&mprint_lock);
va_start(va, fmt);
vdprintf(fd, fmt, va);
va_end(va);
pthread_mutex_unlock(&mprint_lock);
}
return mprint_severity;
}

View File

@@ -2,28 +2,53 @@
#define _MESSAGE_H_
#include <libgen.h>
#include <stdio.h>
#include <mscp.h>
/* message print. printed messages are passed to application via msg_fd */
void mprint_set_severity(int severity);
void mprint(int fd, int severity, const char *fmt, ...);
int mprint_get_severity();
#define mprint(fp, severity, fmt, ...) \
do { \
if (fp && severity <= mprint_get_severity()) { \
fprintf(fp, fmt, ##__VA_ARGS__); \
fflush(fp); \
} \
} while (0)
#define mpr_err(fp, fmt, ...) \
mprint(fp, MSCP_SEVERITY_ERR, fmt, ##__VA_ARGS__)
#define mpr_warn(fp, fmt, ...) \
mprint(fp, MSCP_SEVERITY_WARN, fmt, ##__VA_ARGS__)
#define mpr_notice(fp, fmt, ...) \
mprint(fp, MSCP_SEVERITY_NOTICE, fmt, ##__VA_ARGS__)
#define mpr_info(fp, fmt, ...) \
mprint(fp, MSCP_SEVERITY_INFO, fmt, ##__VA_ARGS__)
#define mpr_debug(fp, fmt, ...) \
mprint(fp, MSCP_SEVERITY_DEBUG, fmt, ##__VA_ARGS__)
/* errorno wrapper */
extern __thread char thread_strerror[128];
#ifdef _GNU_SOURCE
/* GNU strerror_r */
#define strerrno() \
strerror_r(errno, thread_strerror, sizeof(thread_strerror))
#else
/* this macro assumes that strerror_r never fails. any good way? */
#define strerrno() \
(strerror_r(errno, thread_strerror, sizeof(thread_strerror)) \
? thread_strerror : thread_strerror)
#endif
#define mpr_err(fd, fmt, ...) \
mprint(fd, MSCP_SEVERITY_ERR, fmt, ##__VA_ARGS__)
#define mpr_warn(fd, fmt, ...) \
mprint(fd, MSCP_SEVERITY_WARN, fmt, ##__VA_ARGS__)
#define mpr_notice(fd, fmt, ...) \
mprint(fd, MSCP_SEVERITY_NOTICE, fmt, ##__VA_ARGS__)
#define mpr_info(fd, fmt, ...) \
mprint(fd, MSCP_SEVERITY_INFO, fmt, ##__VA_ARGS__)
#define mpr_debug(fd, fmt, ...) \
mprint(fd, MSCP_SEVERITY_DEBUG, fmt, ##__VA_ARGS__)
/* error message buffer */
#define mscp_set_error(fmt, ...) \
_mscp_set_error("%s:%d:%s: " fmt, \
_mscp_set_error("%s:%d:%s: " fmt "\0", \
basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
void _mscp_set_error(const char *fmt, ...);

View File

@@ -2,43 +2,56 @@
#include <unistd.h>
#include <math.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/time.h>
#include <list.h>
#include <util.h>
#include <ssh.h>
#include <path.h>
#include <fileops.h>
#include <atomic.h>
#include <platform.h>
#include <message.h>
#include <mscp.h>
struct mscp {
char *remote; /* remote host (and uername) */
int direction; /* copy direction */
struct mscp_opts *opts;
struct mscp_ssh_opts *ssh_opts;
int msg_fd; /* writer fd for message pipe */
FILE *msg_fp; /* writer fd for message pipe */
int *cores; /* usable cpu cores by COREMASK */
int nr_cores; /* length of array of cores */
int *cores; /* usable cpu cores by COREMASK */
int nr_cores; /* length of array of cores */
sem_t *sem; /* semaphore for concurrent
* connecting ssh sessions */
sftp_session first; /* first sftp session */
char dst_path[PATH_MAX];
struct list_head src_list;
struct list_head path_list;
struct list_head chunk_list;
lock chunk_lock;
struct chunk_pool cp;
pthread_t tid_scan; /* tid for scan thread */
int ret_scan; /* return code from scan thread */
size_t total_bytes; /* total bytes to be transferred */
struct mscp_thread *threads;
struct list_head thread_list;
rwlock thread_rwlock;
};
struct mscp_thread {
struct list_head list; /* mscp->thread_list */
struct mscp *m;
int id;
sftp_session sftp;
pthread_t tid;
int cpu;
@@ -48,7 +61,7 @@ struct mscp_thread {
};
struct src {
struct list_head list;
struct list_head list; /* mscp->src_list */
char *path;
};
@@ -62,8 +75,13 @@ struct src {
* sftp_async_read returns 0.
*/
#define DEFAULT_MAX_STARTUPS 8
#define non_null_string(s) (s[0] != '\0')
static int expand_coremask(const char *coremask, int **cores, int *nr_cores)
{
int n, *core_list, core_list_len = 0, nr_usable, nr_all;
@@ -178,6 +196,16 @@ static int validate_and_set_defaut_params(struct mscp_opts *o)
return -1;
}
if (o->max_startups == 0)
o->max_startups = DEFAULT_MAX_STARTUPS;
else if (o->max_startups < 0) {
mscp_set_error("invalid max_startups: %d", o->max_startups);
return -1;
}
if (o->msg_fd == 0)
o->msg_fd = STDOUT_FILENO;
return 0;
}
@@ -188,7 +216,7 @@ struct mscp *mscp_init(const char *remote_host, int direction,
int n;
if (!remote_host) {
mscp_set_error("empty remote host\n");
mscp_set_error("empty remote host");
return NULL;
}
@@ -198,22 +226,30 @@ struct mscp *mscp_init(const char *remote_host, int direction,
return NULL;
}
mprint_set_severity(o->severity);
if (validate_and_set_defaut_params(o) < 0) {
return NULL;
}
m = malloc(sizeof(*m));
if (!m) {
mscp_set_error("failed to allocate memory: %s", strerrno());
return NULL;
}
mprint_set_severity(o->severity);
if (validate_and_set_defaut_params(o) < 0)
goto free_out;
memset(m, 0, sizeof(*m));
INIT_LIST_HEAD(&m->src_list);
INIT_LIST_HEAD(&m->path_list);
INIT_LIST_HEAD(&m->chunk_list);
lock_init(&m->chunk_lock);
chunk_pool_init(&m->cp);
INIT_LIST_HEAD(&m->thread_list);
rwlock_init(&m->thread_rwlock);
if ((m->sem = sem_create(o->max_startups)) == NULL) {
mscp_set_error("sem_create: %s", strerrno());
goto free_out;
}
m->remote = strdup(remote_host);
if (!m->remote) {
@@ -221,15 +257,22 @@ struct mscp *mscp_init(const char *remote_host, int direction,
goto free_out;
}
m->direction = direction;
m->msg_fd = o->msg_fd;
if (o->msg_fd > -1) {
m->msg_fp = fdopen(o->msg_fd, "a");
if (!m->msg_fp) {
mscp_set_error("fdopen failed: %s", strerrno());
goto free_out;
}
} else
m->msg_fp = NULL;
if (strlen(o->coremask) > 0) {
if (expand_coremask(o->coremask, &m->cores, &m->nr_cores) < 0)
goto free_out;
mpr_notice(m->msg_fd, "usable cpu cores:");
mpr_notice(m->msg_fp, "usable cpu cores:");
for (n = 0; n < m->nr_cores; n++)
mpr_notice(m->msg_fd, " %d", m->cores[n]);
mpr_notice(m->msg_fd, "\n");
mpr_notice(m->msg_fp, " %d", m->cores[n]);
mpr_notice(m->msg_fp, "\n");
}
m->opts = o;
@@ -242,11 +285,6 @@ free_out:
return NULL;
}
void mscp_set_msg_fd(struct mscp *m, int fd)
{
m->msg_fd = fd;
}
int mscp_connect(struct mscp *m)
{
m->first = ssh_init_sftp_session(m->remote, m->ssh_opts);
@@ -293,17 +331,57 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path)
return 0;
}
int mscp_prepare(struct mscp *m)
static int get_page_mask(void)
{
long page_sz = sysconf(_SC_PAGESIZE);
size_t page_mask = 0;
int n;
for (n = 0; page_sz > 0; page_sz >>= 1, n++) {
page_mask <<= 1;
page_mask |= 1;
}
return page_mask >> 1;
}
static void mscp_stop_copy_thread(struct mscp *m)
{
struct mscp_thread *t;
RWLOCK_READ_ACQUIRE(&m->thread_rwlock);
list_for_each_entry(t, &m->thread_list, list) {
if (!t->finished)
pthread_cancel(t->tid);
}
RWLOCK_RELEASE();
}
static void mscp_stop_scan_thread(struct mscp *m)
{
if (m->tid_scan)
pthread_cancel(m->tid_scan);
}
void mscp_stop(struct mscp *m)
{
mscp_stop_scan_thread(m);
mscp_stop_copy_thread(m);
}
void *mscp_scan_thread(void *arg)
{
struct mscp *m = arg;
sftp_session src_sftp = NULL, dst_sftp = NULL;
bool src_path_is_dir, dst_path_is_dir, dst_path_should_dir;
struct path_resolve_args a;
struct list_head tmp;
struct path *p;
struct src *s;
mstat ss, ds;
struct stat ss, ds;
glob_t pglob;
int n;
src_path_is_dir = dst_path_is_dir = dst_path_should_dir = false;
m->ret_scan = 0;
switch (m->direction) {
case MSCP_DIRECTION_L2R:
@@ -316,166 +394,194 @@ int mscp_prepare(struct mscp *m)
break;
default:
mscp_set_error("invalid copy direction: %d", m->direction);
return -1;
goto err_out;
}
/* initialize path_resolve_args */
memset(&a, 0, sizeof(a));
a.msg_fp = m->msg_fp;
a.total_bytes = &m->total_bytes;
if (list_count(&m->src_list) > 1)
dst_path_should_dir = true;
a.dst_path_should_dir = true;
if (mscp_stat(m->dst_path, &ds, dst_sftp) == 0) {
if (mstat_is_dir(ds))
dst_path_is_dir = true;
mscp_stat_free(ds);
if (S_ISDIR(ds.st_mode))
a.dst_path_is_dir = true;
}
a.cp = &m->cp;
a.nr_conn = m->opts->nr_threads;
a.min_chunk_sz = m->opts->min_chunk_sz;
a.max_chunk_sz = m->opts->max_chunk_sz;
a.chunk_align = get_page_mask();
mpr_info(m->msg_fp, "start to walk source path(s)\n");
/* walk a src_path recusively, and resolve path->dst_path for each src */
list_for_each_entry(s, &m->src_list, list) {
if (mscp_stat(s->path, &ss, src_sftp) < 0) {
mscp_set_error("stat: %s", mscp_strerror(src_sftp));
return -1;
memset(&pglob, 0, sizeof(pglob));
if (mscp_glob(s->path, GLOB_NOCHECK, &pglob, src_sftp) < 0) {
mscp_set_error("mscp_glob: %s", strerrno());
goto err_out;
}
src_path_is_dir = mstat_is_dir(ss);
mscp_stat_free(ss);
INIT_LIST_HEAD(&tmp);
if (walk_src_path(src_sftp, s->path, &tmp) < 0)
return -1;
if (list_count(&tmp) > 1)
dst_path_should_dir = true;
for (n = 0; n < pglob.gl_pathc; n++) {
if (mscp_stat(pglob.gl_pathv[n], &ss, src_sftp) < 0) {
mscp_set_error("stat: %s %s", s->path, strerrno());
goto err_out;
}
if (resolve_dst_path(m->msg_fd, s->path, m->dst_path, &tmp,
src_path_is_dir, dst_path_is_dir,
dst_path_should_dir) < 0)
return -1;
if (!a.dst_path_should_dir && pglob.gl_pathc > 1)
a.dst_path_should_dir = true; /* we have over 1 src */
list_splice_tail(&tmp, m->path_list.prev);
/* set path specific args */
a.src_path = pglob.gl_pathv[n];
a.dst_path = m->dst_path;
a.src_path_is_dir = S_ISDIR(ss.st_mode);
INIT_LIST_HEAD(&tmp);
if (walk_src_path(src_sftp, pglob.gl_pathv[n], &tmp, &a) < 0)
goto err_out;
list_splice_tail(&tmp, m->path_list.prev);
}
mscp_globfree(&pglob);
}
if (resolve_chunk(&m->path_list, &m->chunk_list, m->opts->nr_threads,
m->opts->min_chunk_sz, m->opts->max_chunk_sz) < 0)
mpr_info(m->msg_fp, "walk source path(s) done\n");
chunk_pool_set_filled(&m->cp);
m->ret_scan = 0;
return NULL;
err_out:
chunk_pool_set_filled(&m->cp);
m->ret_scan = -1;
return NULL;
}
int mscp_scan(struct mscp *m)
{
int ret = pthread_create(&m->tid_scan, NULL, mscp_scan_thread, m);
if (ret < 0) {
mscp_set_error("pthread_create_error: %d", ret);
m->tid_scan = 0;
mscp_stop(m);
return -1;
/* save total bytes to be transferred */
m->total_bytes = 0;
list_for_each_entry(p, &m->path_list, list) {
m->total_bytes += p->size;
}
/* We wait for there are over nr_threads chunks to determine
* actual number of threads (and connections), or scan
* finished. If the number of chunks are smaller than
* nr_threads, we adjust nr_threads to the number of chunks.
*/
while (!chunk_pool_is_filled(&m->cp) &&
chunk_pool_size(&m->cp) < m->opts->nr_threads)
usleep(100);
return 0;
}
void mscp_stop(struct mscp *m)
int mscp_scan_join(struct mscp *m)
{
int n;
pr("stopping...\n");
for (n = 0; n < m->opts->nr_threads; n++) {
if (m->threads[n].tid && !m->threads[n].finished)
pthread_cancel(m->threads[n].tid);
}
if (m->tid_scan) {
pthread_join(m->tid_scan, NULL);
m->tid_scan = 0;
return m->ret_scan;
}
return 0;
}
static void *mscp_copy_thread(void *arg);
static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id)
{
struct mscp_thread *t;
int ret;
t = malloc(sizeof(*t));
if (!t){
mscp_set_error("malloc: %s,", strerrno());
return NULL;
}
memset(t, 0, sizeof(*t));
t->m = m;
t->id = id;
if (m->cores == NULL)
t->cpu = -1; /* not pinned to cpu */
else
t->cpu = m->cores[id % m->nr_cores];
ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
if (ret < 0) {
mscp_set_error("pthread_create error: %d", ret);
free(t);
return NULL;
}
return t;
}
int mscp_start(struct mscp *m)
{
int n, ret;
struct mscp_thread *t;
int n, ret = 0;
if ((n = list_count(&m->chunk_list)) < m->opts->nr_threads) {
mpr_notice(m->msg_fd, "we have only %d chunk(s). "
if ((n = chunk_pool_size(&m->cp)) < m->opts->nr_threads) {
mpr_notice(m->msg_fp, "we have only %d chunk(s). "
"set number of connections to %d\n", n, n);
m->opts->nr_threads = n;
}
/* prepare thread instances */
m->threads = calloc(m->opts->nr_threads, sizeof(struct mscp_thread));
memset(m->threads, 0, m->opts->nr_threads * sizeof(struct mscp_thread));
for (n = 0; n < m->opts->nr_threads; n++) {
struct mscp_thread *t = &m->threads[n];
t->m = m;
if (!m->cores)
t->cpu = -1;
else
t->cpu = m->cores[n % m->nr_cores];
if (n == 0) {
t->sftp = m->first; /* reuse first sftp session */
m->first = NULL;
t = mscp_copy_thread_spawn(m, n);
if (!t) {
mpr_err(m->msg_fp, "failed to spawn copy thread\n");
break;
}
else {
mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n",
m->remote);
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
if (!t->sftp)
return -1;
}
}
/* spawn copy threads */
for (n = 0; n < m->opts->nr_threads; n++) {
struct mscp_thread *t = &m->threads[n];
ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
if (ret < 0) {
mscp_set_error("pthread_create error: %d", ret);
mscp_stop(m);
return -1;
}
RWLOCK_WRITE_ACQUIRE(&m->thread_rwlock);
list_add_tail(&t->list, &m->thread_list);
RWLOCK_RELEASE();
}
return 0;
return n;
}
int mscp_join(struct mscp *m)
{
struct mscp_thread *t;
int n, ret = 0;
/* waiting for threads join... */
for (n = 0; n < m->opts->nr_threads; n++) {
if (m->threads[n].tid) {
pthread_join(m->threads[n].tid, NULL);
if (m->threads[n].ret < 0)
ret = m->threads[n].ret;
}
}
/* waiting for scan thread joins... */
ret = mscp_scan_join(m);
/* waiting for copy threads join... */
RWLOCK_READ_ACQUIRE(&m->thread_rwlock);
list_for_each_entry(t, &m->thread_list, list) {
pthread_join(t->tid, NULL);
if (t->ret < 0)
ret = t->ret;
if (t->sftp) {
ssh_sftp_close(t->sftp);
t->sftp = NULL;
}
}
RWLOCK_RELEASE();
if (m->first) {
ssh_sftp_close(m->first);
m->first = NULL;
}
if (m->threads) {
for (n = 0; n < m->opts->nr_threads; n++) {
struct mscp_thread *t = &m->threads[n];
if (t->ret != 0)
ret = ret;
if (t->sftp) {
ssh_sftp_close(t->sftp);
t->sftp = NULL;
}
}
}
return ret;
}
/* copy thread related functions */
struct chunk *acquire_chunk(struct list_head *chunk_list)
{
/* under the lock for chunk_list */
struct list_head *first = chunk_list->next;
struct chunk *c = NULL;
if (list_empty(chunk_list))
return NULL; /* list is empty */
c = list_entry(first, struct chunk, list);
list_del(first);
return c;
}
static void mscp_copy_thread_cleanup(void *arg)
{
struct mscp_thread *t = arg;
@@ -489,6 +595,34 @@ void *mscp_copy_thread(void *arg)
struct mscp *m = t->m;
struct chunk *c;
if (t->cpu > -1) {
if (set_thread_affinity(pthread_self(), t->cpu) < 0) {
t->ret = -1;
return NULL;
}
}
if (sem_wait(m->sem) < 0) {
mscp_set_error("sem_wait: %s", strerrno());
mpr_err(m->msg_fp, "%s", mscp_get_error());
goto err_out;
}
mpr_notice(m->msg_fp, "connecting to %s for a copy thread[%d]...\n",
m->remote, t->id);
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
if (sem_post(m->sem) < 0) {
mscp_set_error("sem_post: %s", strerrno());
mpr_err(m->msg_fp, "%s", mscp_get_error());
goto err_out;
}
if (!t->sftp) {
mpr_err(m->msg_fp, "copy thread[%d]: %s\n", t->id, mscp_get_error());
goto err_out;
}
switch (m->direction) {
case MSCP_DIRECTION_L2R:
src_sftp = NULL;
@@ -502,24 +636,21 @@ void *mscp_copy_thread(void *arg)
return NULL; /* not reached */
}
if (t->cpu > -1) {
if (set_thread_affinity(pthread_self(), t->cpu) < 0)
return NULL;
}
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
while (1) {
LOCK_ACQUIRE_THREAD(&m->chunk_lock);
c = acquire_chunk(&m->chunk_list);
LOCK_RELEASE_THREAD();
c = chunk_pool_pop(&m->cp);
if (c == CHUNK_POP_WAIT) {
usleep(100); /* XXX: hard code */
continue;
}
if (!c)
break; /* no more chunks */
if ((t->ret = copy_chunk(m->msg_fd,
if ((t->ret = copy_chunk(m->msg_fp,
c, src_sftp, dst_sftp, m->opts->nr_ahead,
m->opts->buf_sz, &t->done)) < 0)
break;
@@ -528,25 +659,20 @@ void *mscp_copy_thread(void *arg)
pthread_cleanup_pop(1);
if (t->ret < 0)
mscp_set_error("copy failed: chunk %s 0x%010lx-0x%010lx",
c->p->path, c->off, c->off + c->len);
mpr_err(m->msg_fp, "copy failed: chunk %s 0x%010lx-0x%010lx\n",
c->p->path, c->off, c->off + c->len);
return NULL;
err_out:
t->finished = true;
t->ret = -1;
return NULL;
}
/* cleanup related functions */
static void release_list(struct list_head *head, void (*f)(struct list_head*))
{
struct list_head *p, *n;
list_for_each_safe(p, n, head) {
list_del(p);
f(p);
}
}
static void free_src(struct list_head *list)
{
struct src *s;
@@ -569,6 +695,13 @@ static void free_chunk(struct list_head *list)
free(c);
}
static void free_thread(struct list_head *list)
{
struct mscp_thread *t;
t = list_entry(list, typeof(*t), list);
free(t);
}
void mscp_cleanup(struct mscp *m)
{
if (m->first) {
@@ -576,19 +709,18 @@ void mscp_cleanup(struct mscp *m)
m->first = NULL;
}
release_list(&m->src_list, free_src);
list_free_f(&m->src_list, free_src);
INIT_LIST_HEAD(&m->src_list);
release_list(&m->chunk_list, free_chunk);
INIT_LIST_HEAD(&m->chunk_list);
release_list(&m->path_list, free_path);
list_free_f(&m->path_list, free_path);
INIT_LIST_HEAD(&m->path_list);
if (m->threads) {
free(m->threads);
m->threads = NULL;
}
chunk_pool_release(&m->cp);
chunk_pool_init(&m->cp);
RWLOCK_WRITE_ACQUIRE(&m->thread_rwlock);
list_free_f(&m->thread_list, free_thread);
RWLOCK_RELEASE();
}
void mscp_free(struct mscp *m)
@@ -598,21 +730,26 @@ void mscp_free(struct mscp *m)
free(m->remote);
if (m->cores)
free(m->cores);
sem_release(m->sem);
free(m);
}
void mscp_get_stats(struct mscp *m, struct mscp_stats *s)
{
struct mscp_thread *t;
bool finished = true;
int n;
s->total = m->total_bytes;
for (s->done = 0, n = 0; n < m->opts->nr_threads; n++) {
s->done += m->threads[n].done;
s->done = 0;
if (!m->threads[n].done)
RWLOCK_READ_ACQUIRE(&m->thread_rwlock);
list_for_each_entry(t, &m->thread_list, list) {
s->done += t->done;
if (!t->finished)
finished = false;
}
RWLOCK_RELEASE();
s->finished = finished;
}

View File

@@ -7,178 +7,138 @@
#include <ssh.h>
#include <util.h>
#include <fileops.h>
#include <list.h>
#include <atomic.h>
#include <path.h>
#include <message.h>
static int append_path(sftp_session sftp, const char *path, mstat s,
struct list_head *path_list)
/* chunk pool operations */
#define CHUNK_POOL_STATE_FILLING 0
#define CHUNK_POOL_STATE_FILLED 1
void chunk_pool_init(struct chunk_pool *cp)
{
struct path *p;
if (!(p = malloc(sizeof(*p)))) {
mscp_set_error("failed to allocate memory: %s", strerrno());
return -1;
}
memset(p, 0, sizeof(*p));
INIT_LIST_HEAD(&p->list);
strncpy(p->path, path, PATH_MAX - 1);
p->size = mstat_size(s);
p->mode = mstat_mode(s);
p->state = FILE_STATE_INIT;
lock_init(&p->lock);
list_add_tail(&p->list, path_list);
return 0;
memset(cp, 0, sizeof(*cp));
INIT_LIST_HEAD(&cp->list);
lock_init(&cp->lock);
cp->state = CHUNK_POOL_STATE_FILLING;
}
static bool check_path_should_skip(const char *path)
{
int len = strlen(path);
if ((len == 1 && strncmp(path, ".", 1) == 0) ||
(len == 2 && strncmp(path, "..", 2) == 0)) {
return true;
}
return false;
static void chunk_pool_add(struct chunk_pool *cp, struct chunk *c)
{
LOCK_ACQUIRE(&cp->lock);
list_add_tail(&c->list, &cp->list);
cp->count += 1;
LOCK_RELEASE();
}
static int walk_path_recursive(sftp_session sftp, const char *path,
struct list_head *path_list)
void chunk_pool_set_filled(struct chunk_pool *cp)
{
char next_path[PATH_MAX];
mdirent *e;
mdir *d;
mstat s;
int ret;
if (mscp_stat(path, &s, sftp) < 0)
return -1;
if (mstat_is_regular(s)) {
/* this path is regular file. it is to be copied */
ret = append_path(sftp, path, s, path_list);
mscp_stat_free(s);
return ret;
}
if (!mstat_is_dir(s)) {
/* not regular file and not directory, skip it. */
mscp_stat_free(s);
return 0;
}
mscp_stat_free(s);
/* ok, this path is directory. walk it. */
if (!(d = mscp_opendir(path, sftp)))
return -1;
for (e = mscp_readdir(d); !mdirent_is_null(e); e = mscp_readdir(d)) {
if (check_path_should_skip(mdirent_name(e)))
continue;
if (strlen(path) + 1 + strlen(mdirent_name(e)) > PATH_MAX) {
mscp_set_error("too long path: %s/%s", path, mdirent_name(e));
return -1;
}
snprintf(next_path, sizeof(next_path), "%s/%s", path, mdirent_name(e));
ret = walk_path_recursive(sftp, next_path, path_list);
if (ret < 0)
return ret;
}
mscp_closedir(d);
return 0;
cp->state = CHUNK_POOL_STATE_FILLED;
}
int walk_src_path(sftp_session src_sftp, const char *src_path,
struct list_head *path_list)
bool chunk_pool_is_filled(struct chunk_pool *cp)
{
return walk_path_recursive(src_sftp, src_path, path_list);
return (cp->state == CHUNK_POOL_STATE_FILLED);
}
static int src2dst_path(int msg_fd, const char *src_path, const char *src_file_path,
const char *dst_path, char *dst_file_path, size_t len,
bool src_path_is_dir, bool dst_path_is_dir,
bool dst_path_should_dir)
size_t chunk_pool_size(struct chunk_pool *cp)
{
char copy[PATH_MAX];
char *prefix;
int offset;
return cp->count;
}
strncpy(copy, src_path, PATH_MAX - 1);
prefix = dirname(copy);
if (!prefix) {
mscp_set_error("dirname: %s", strerrno());
return -1;
}
if (strlen(prefix) == 1 && prefix[0] == '.')
offset = 0;
else
offset = strlen(prefix) + 1;
if (!src_path_is_dir && !dst_path_is_dir) {
/* src path is file. dst path is (1) file, or (2) does not exist.
* In the second case, we need to put src under the dst.
*/
if (dst_path_should_dir)
snprintf(dst_file_path, len, "%s/%s",
dst_path, src_path + offset);
struct chunk *chunk_pool_pop(struct chunk_pool *cp)
{
struct list_head *first;
struct chunk *c = NULL;
LOCK_ACQUIRE(&cp->lock);
first = cp->list.next;
if (list_empty(&cp->list)) {
if (!chunk_pool_is_filled(cp))
c = CHUNK_POP_WAIT;
else
strncpy(dst_file_path, dst_path, len);
c = NULL; /* no more chunks */
} else {
c = list_entry(first, struct chunk, list);
list_del(first);
}
LOCK_RELEASE();
/* src is file, and dst is dir */
if (!src_path_is_dir && dst_path_is_dir)
snprintf(dst_file_path, len, "%s/%s", dst_path, src_path + offset);
/* return CHUNK_POP_WAIT would be very rare case, because it
* means copying over SSH is faster than traversing
* local/remote file paths.
*/
/* both are directory */
if (src_path_is_dir && dst_path_is_dir)
snprintf(dst_file_path, len, "%s/%s", dst_path, src_file_path + offset);
/* dst path does not exist. change dir name to dst_path */
if (src_path_is_dir && !dst_path_is_dir)
snprintf(dst_file_path, len, "%s/%s",
dst_path, src_file_path + strlen(src_path) + 1);
mpr_info(msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path);
return 0;
return c;
}
int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path,
struct list_head *path_list, bool src_path_is_dir,
bool dst_path_is_dir, bool dst_path_should_dir)
static void chunk_free(struct list_head *list)
{
struct path *p;
list_for_each_entry(p, path_list, list) {
if (src2dst_path(msg_fd, src_path, p->path,
dst_path, p->dst_path, PATH_MAX,
src_path_is_dir, dst_path_is_dir,
dst_path_should_dir) < 0)
return -1;
}
return 0;
struct chunk *c;
c = list_entry(list, typeof(*c), list);
free(c);
}
void path_dump(struct list_head *path_list)
void chunk_pool_release(struct chunk_pool *cp)
{
struct path *p;
list_for_each_entry(p, path_list, list) {
printf("src: %s %lu-byte\n", p->path, p->size);
printf("dst: %s\n", p->dst_path);
}
list_free_f(&cp->list, chunk_free);
}
/* paths of copy source resoltion */
static int resolve_dst_path(const char *src_file_path, char *dst_file_path,
struct path_resolve_args *a)
{
char copy[PATH_MAX];
char *prefix;
int offset;
strncpy(copy, a->src_path, PATH_MAX - 1);
prefix = dirname(copy);
if (!prefix) {
mscp_set_error("dirname: %s", strerrno());
return -1;
}
if (strlen(prefix) == 1 && prefix[0] == '.')
offset = 0;
else
offset = strlen(prefix) + 1;
if (!a->src_path_is_dir && !a->dst_path_is_dir) {
/* src path is file. dst path is (1) file, or (2) does not exist.
* In the second case, we need to put src under the dst.
*/
if (a->dst_path_should_dir)
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
a->dst_path, a->src_path + offset);
else
strncpy(dst_file_path, a->dst_path, PATH_MAX - 1);
}
/* src is file, and dst is dir */
if (!a->src_path_is_dir && a->dst_path_is_dir)
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
a->dst_path, a->src_path + offset);
/* both are directory */
if (a->src_path_is_dir && a->dst_path_is_dir)
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
a->dst_path, src_file_path + offset);
/* dst path does not exist. change dir name to dst_path */
if (a->src_path_is_dir && !a->dst_path_is_dir)
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
a->dst_path, src_file_path + strlen(a->src_path) + 1);
mpr_debug(a->msg_fp, "file: %s -> %s\n", src_file_path, dst_file_path);
return 0;
}
/* chunk preparation */
static struct chunk *alloc_chunk(struct path *p)
{
struct chunk *c;
@@ -196,71 +156,145 @@ static struct chunk *alloc_chunk(struct path *p)
return c;
}
static int get_page_mask(void)
{
long page_sz = sysconf(_SC_PAGESIZE);
size_t page_mask = 0;
int n;
for (n = 0; page_sz > 0; page_sz >>= 1, n++) {
page_mask <<= 1;
page_mask |= 1;
}
return page_mask >> 1;
}
int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list,
int nr_conn, int min_chunk_sz, int max_chunk_sz)
static int resolve_chunk(struct path *p, struct path_resolve_args *a)
{
struct chunk *c;
struct path *p;
size_t page_mask;
size_t chunk_sz;
size_t size;
page_mask = get_page_mask();
list_for_each_entry(p, path_list, list) {
if (p->size <= min_chunk_sz)
chunk_sz = p->size;
else if (max_chunk_sz)
chunk_sz = max_chunk_sz;
else {
chunk_sz = (p->size - (p->size % nr_conn)) / nr_conn;
chunk_sz &= ~page_mask; /* align with page_sz */
if (chunk_sz <= min_chunk_sz)
chunk_sz = min_chunk_sz;
}
/* for (size = f->size; size > 0;) does not create a
* file (chunk) when file size is 0. This do {} while
* (size > 0) creates just open/close a 0-byte file.
*/
size = p->size;
do {
c = alloc_chunk(p);
if (!c)
return -1;
c->off = p->size - size;
c->len = size < chunk_sz ? size : chunk_sz;
size -= c->len;
list_add_tail(&c->list, chunk_list);
} while (size > 0);
if (p->size <= a->min_chunk_sz)
chunk_sz = p->size;
else if (a->max_chunk_sz)
chunk_sz = a->max_chunk_sz;
else {
chunk_sz = (p->size - (p->size % a->nr_conn)) / a->nr_conn;
chunk_sz &= ~a->chunk_align; /* align with page_sz */
if (chunk_sz <= a->min_chunk_sz)
chunk_sz = a->min_chunk_sz;
}
/* for (size = f->size; size > 0;) does not create a file
* (chunk) when file size is 0. This do {} while (size > 0)
* creates just open/close a 0-byte file.
*/
size = p->size;
do {
c = alloc_chunk(p);
if (!c)
return -1;
c->off = p->size - size;
c->len = size < chunk_sz ? size : chunk_sz;
size -= c->len;
chunk_pool_add(a->cp, c);
} while (size > 0);
return 0;
}
void chunk_dump(struct list_head *chunk_list)
static int append_path(sftp_session sftp, const char *path, struct stat st,
struct list_head *path_list, struct path_resolve_args *a)
{
struct chunk *c;
struct path *p;
list_for_each_entry(c, chunk_list, list) {
printf("chunk: %s 0x%lx-%lx bytes\n",
c->p->path, c->off, c->off + c->len);
if (!(p = malloc(sizeof(*p)))) {
mscp_set_error("failed to allocate memory: %s", strerrno());
return -1;
}
memset(p, 0, sizeof(*p));
INIT_LIST_HEAD(&p->list);
strncpy(p->path, path, PATH_MAX - 1);
p->size = st.st_size;
p->mode = st.st_mode;
p->state = FILE_STATE_INIT;
lock_init(&p->lock);
if (resolve_dst_path(p->path, p->dst_path, a) < 0)
goto free_out;
if (resolve_chunk(p, a) < 0)
return -1; /* XXX: do not free path becuase chunk(s)
* was added to chunk pool already */
list_add_tail(&p->list, path_list);
*a->total_bytes += p->size;
return 0;
free_out:
free(p);
return -1;
}
static bool check_path_should_skip(const char *path)
{
int len = strlen(path);
if ((len == 1 && strncmp(path, ".", 1) == 0) ||
(len == 2 && strncmp(path, "..", 2) == 0)) {
return true;
}
return false;
}
static int walk_path_recursive(sftp_session sftp, const char *path,
struct list_head *path_list, struct path_resolve_args *a)
{
char next_path[PATH_MAX];
struct dirent *e;
struct stat st;
MDIR *d;
int ret;
if (mscp_stat(path, &st, sftp) < 0)
return -1;
if (S_ISREG(st.st_mode)) {
/* this path is regular file. it is to be copied */
return append_path(sftp, path, st, path_list, a);
}
if (!S_ISDIR(st.st_mode))
return 0; /* not a regular file and not a directory, skip it. */
/* ok, this path is directory. walk it. */
if (!(d = mscp_opendir(path, sftp)))
return -1;
for (e = mscp_readdir(d); e; e = mscp_readdir(d)) {
if (check_path_should_skip(e->d_name))
continue;
if (strlen(path) + 1 + strlen(e->d_name) > PATH_MAX) {
mscp_set_error("too long path: %s/%s", path, e->d_name);
return -1;
}
snprintf(next_path, sizeof(next_path), "%s/%s", path, e->d_name);
ret = walk_path_recursive(sftp, next_path, path_list, a);
if (ret < 0)
return ret;
}
mscp_closedir(d);
return 0;
}
int walk_src_path(sftp_session src_sftp, const char *src_path,
struct list_head *path_list, struct path_resolve_args *a)
{
return walk_path_recursive(src_sftp, src_path, path_list, a);
}
void path_dump(struct list_head *path_list)
{
struct path *p;
list_for_each_entry(p, path_list, list) {
printf("src: %s %lu-byte\n", p->path, p->size);
printf("dst: %s\n", p->dst_path);
}
}
/* based on
@@ -269,10 +303,11 @@ static int touch_dst_path(struct path *p, sftp_session sftp)
{
/* XXX: should reflect the permission of the original directory? */
mode_t mode = S_IRWXU | S_IRWXG | S_IRWXO;
struct stat st;
char path[PATH_MAX];
char *needle;
int ret;
mfh h;
mf *f;
strncpy(path, p->dst_path, sizeof(path));
@@ -281,19 +316,17 @@ static int touch_dst_path(struct path *p, sftp_session sftp)
for (needle = strchr(path + 1, '/'); needle; needle = strchr(needle + 1, '/')) {
*needle = '\0';
mstat s;
if (mscp_stat(path, &s, sftp) == 0) {
if (mstat_is_dir(s))
if (mscp_stat(path, &st, sftp) == 0) {
if (S_ISDIR(st.st_mode))
goto next; /* directory exists. go deeper */
else
return -1; /* path exists, but not directory. */
}
if (mscp_stat_check_err_noent(sftp) == 0) {
if (errno == ENOENT) {
/* no file on the path. create directory. */
if (mscp_mkdir(path, mode, sftp) < 0) {
mscp_set_error("mkdir %s: %s", path,
mscp_strerror(sftp));
mscp_set_error("mscp_mkdir %s: %s", path, strerrno());
return -1;
}
}
@@ -301,32 +334,35 @@ static int touch_dst_path(struct path *p, sftp_session sftp)
*needle = '/';
}
/* open file with O_TRUNC to set file size 0 */
h = mscp_open(p->dst_path, O_WRONLY|O_CREAT|O_TRUNC, S_IRUSR|S_IWUSR, 0, sftp);
if (mscp_open_is_failed(h))
/* Do not set O_TRUNC here. Instead, do mscp_setstat() at the
* end. see https://bugzilla.mindrot.org/show_bug.cgi?id=3431 */
f = mscp_open(p->dst_path, O_WRONLY|O_CREAT, S_IRUSR|S_IWUSR, sftp);
if (!f) {
mscp_set_error("mscp_open %s: %s\n", p->dst_path, strerrno());
return -1;
}
mscp_close(h);
mscp_close(f);
return 0;
}
static int prepare_dst_path(int msg_fd, struct path *p, sftp_session dst_sftp)
static int prepare_dst_path(FILE *msg_fp, struct path *p, sftp_session dst_sftp)
{
int ret = 0;
LOCK_ACQUIRE_THREAD(&p->lock);
LOCK_ACQUIRE(&p->lock);
if (p->state == FILE_STATE_INIT) {
if (touch_dst_path(p, dst_sftp) < 0) {
ret = -1;
goto out;
}
p->state = FILE_STATE_OPENED;
mpr_info(msg_fd, "copy start: %s\n", p->path);
mpr_info(msg_fp, "copy start: %s\n", p->path);
}
out:
LOCK_RELEASE_THREAD();
LOCK_RELEASE();
return ret;
}
@@ -470,52 +506,65 @@ static int copy_chunk_r2l(struct chunk *c, sftp_file sf, int fd,
return 0;
}
static int _copy_chunk(struct chunk *c, mfh s, mfh d,
static int _copy_chunk(struct chunk *c, mf *s, mf *d,
int nr_ahead, int buf_sz, size_t *counter)
{
if (s.fd > 0 && d.sf) /* local to remote copy */
return copy_chunk_l2r(c, s.fd, d.sf, nr_ahead, buf_sz, counter);
else if (s.sf && d.fd > 0) /* remote to local copy */
return copy_chunk_r2l(c, s.sf, d.fd, nr_ahead, buf_sz, counter);
if (s->local && d->remote) /* local to remote copy */
return copy_chunk_l2r(c, s->local, d->remote, nr_ahead, buf_sz, counter);
else if (s->remote && d->local) /* remote to local copy */
return copy_chunk_r2l(c, s->remote, d->local, nr_ahead, buf_sz, counter);
assert(true); /* not reached */
return -1;
assert(false);
return -1; /* not reached */
}
int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
int copy_chunk(FILE *msg_fp, struct chunk *c,
sftp_session src_sftp, sftp_session dst_sftp,
int nr_ahead, int buf_sz, size_t *counter)
{
mode_t mode;
int flags;
mfh s, d;
mf *s, *d;
int ret;
assert((src_sftp && !dst_sftp) || (!src_sftp && dst_sftp));
if (prepare_dst_path(msg_fd, c->p, dst_sftp) < 0)
if (prepare_dst_path(msg_fp, c->p, dst_sftp) < 0)
return -1;
/* open src */
flags = O_RDONLY;
mode = S_IRUSR;
s = mscp_open(c->p->path, flags, mode, c->off, src_sftp);
if (mscp_open_is_failed(s)) {
mscp_close(d);
s = mscp_open(c->p->path, flags, mode, src_sftp);
if (!s) {
mscp_set_error("mscp_open: %s: %s", c->p->path, strerrno());
return -1;
}
if (mscp_lseek(s, c->off) < 0) {
mscp_set_error("mscp_lseek: %s: %s", c->p->path, strerrno());
return -1;
}
/* open dst */
flags = O_WRONLY;
mode = S_IRUSR|S_IWUSR;
d = mscp_open(c->p->dst_path, flags, mode, c->off, dst_sftp);
if (mscp_open_is_failed(d))
d = mscp_open(c->p->dst_path, flags, mode, dst_sftp);
if (!d) {
mscp_close(s);
mscp_set_error("mscp_open: %s: %s", c->p->dst_path, strerrno());
return -1;
}
if (mscp_lseek(d, c->off) < 0) {
mscp_set_error("mscp_lseek: %s: %s", c->p->dst_path, strerrno());
return -1;
}
mpr_debug(msg_fd, "copy chunk start: %s 0x%lx-0x%lx\n",
mpr_debug(msg_fp, "copy chunk start: %s 0x%lx-0x%lx\n",
c->p->path, c->off, c->off + c->len);
ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, counter);
mpr_debug(msg_fd, "copy chunk done: %s 0x%lx-0x%lx\n",
mpr_debug(msg_fp, "copy chunk done: %s 0x%lx-0x%lx\n",
c->p->path, c->off, c->off + c->len);
@@ -526,8 +575,10 @@ int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session
if (refcnt_dec(&c->p->refcnt) == 0) {
c->p->state = FILE_STATE_DONE;
mscp_chmod(c->p->dst_path, c->p->mode, dst_sftp);
mpr_info(msg_fd, "copy done: %s\n", c->p->path);
if (mscp_setstat(c->p->dst_path, c->p->mode, c->p->size, dst_sftp) < 0)
mpr_err(msg_fp, "failed to chmod and truncate %s: %s\n",
c->p->path, strerrno());
mpr_info(msg_fp, "copy done: %s\n", c->p->path);
}
return ret;

View File

@@ -29,7 +29,7 @@ struct path {
#define FILE_STATE_DONE 2
struct chunk {
struct list_head list; /* mscp->chunk_list */
struct list_head list; /* chunk_pool->list */
struct path *p;
size_t off; /* offset of this chunk on the file on path p */
@@ -37,265 +37,65 @@ struct chunk {
size_t done; /* copied bytes for this chunk by a thread */
};
struct chunk_pool {
struct list_head list; /* list of struct chunk */
size_t count;
lock lock;
int state;
};
/* initialize chunk pool */
void chunk_pool_init(struct chunk_pool *cp);
/* acquire a chunk from pool. return value is NULL indicates no more
* chunk, GET_CHUNK_WAIT means caller should waits until a chunk is
* added, or pointer to chunk.
*/
struct chunk *chunk_pool_pop(struct chunk_pool *cp);
#define CHUNK_POP_WAIT ((void *) -1)
/* set and check fillingchunks to this pool has finished */
void chunk_pool_set_filled(struct chunk_pool *cp);
bool chunk_pool_is_filled(struct chunk_pool *cp);
/* return number of chunks in the pool */
size_t chunk_pool_size(struct chunk_pool *cp);
/* free chunks in the chunk_pool */
void chunk_pool_release(struct chunk_pool *cp);
struct path_resolve_args {
FILE *msg_fp;
size_t *total_bytes;
/* args to resolve src path to dst path */
const char *src_path;
const char *dst_path;
bool src_path_is_dir;
bool dst_path_is_dir;
bool dst_path_should_dir;
/* args to resolve chunks for a path */
struct chunk_pool *cp;
int nr_conn;
size_t min_chunk_sz;
size_t max_chunk_sz;
size_t chunk_align;
};
/* recursivly walk through src_path and fill path_list for each file */
int walk_src_path(sftp_session src_sftp, const char *src_path,
struct list_head *path_list);
/* fill path->dst_path for all files */
int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path,
struct list_head *path_list,
bool src_path_is_dir, bool dst_path_is_dir,
bool dst_path_should_dir);
/* resolve chunks from files in the path_list */
int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list,
int nr_conn, int min_chunk_sz, int max_chunk_sz);
struct list_head *path_list, struct path_resolve_args *a);
/* copy a chunk. either src_sftp or dst_sftp is not null, and another is null */
int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
int copy_chunk(FILE *msg_fp, struct chunk *c,
sftp_session src_sftp, sftp_session dst_sftp,
int nr_ahead, int buf_sz, size_t *counter);
/* just print contents. just for debugging */
void path_dump(struct list_head *path_list);
void chunk_dump(struct list_head *chunk_list);
/* wrap DIR/dirent and sftp_dir/sftp_attribute. not thread safe */
struct mscp_dir {
DIR *l;
sftp_dir r;
sftp_session sftp;
};
typedef struct mscp_dir mdir;
struct mscp_dirent {
struct dirent *l;
sftp_attributes r;
};
typedef struct mscp_dirent mdirent;
#define mdirent_name(e) ((e->l) ? e->l->d_name : e->r->name)
#define mdirent_is_dir(e) ((e->l) ? \
(e->l->d_type == DT_DIR) : \
(e->r->type == SSH_FILEXFER_TYPE_DIRECTORY))
#define mdirent_is_null(e) (e->l == NULL && e->r == NULL)
static mdir *mscp_opendir(const char *path, sftp_session sftp)
{
mdir *d;
if (!(d = malloc(sizeof(*d))))
return NULL;
memset(d, 0, sizeof(*d));
d->sftp = sftp;
if (sftp) {
d->r = sftp_opendir(sftp, path);
if (!d->r) {
mscp_set_error("sftp_opendir '%s': %s",
path, sftp_get_ssh_error(sftp));
free(d);
return NULL;
}
} else {
d->l = opendir(path);
if (!d->l) {
mscp_set_error("opendir '%s': %s", path, strerrno());
free(d);
return NULL;
}
}
return d;
}
static int mscp_closedir(mdir *d)
{
int ret;
if (d->r)
ret = sftp_closedir(d->r);
else
ret = closedir(d->l);
free(d);
return ret;
}
static mdirent *mscp_readdir(mdir *d)
{
static mdirent e;
memset(&e, 0, sizeof(e));
if (d->r)
e.r = sftp_readdir(d->sftp, d->r);
else
e.l = readdir(d->l);
return &e;
}
/* wrap retriving error */
static const char *mscp_strerror(sftp_session sftp)
{
if (sftp)
return sftp_get_ssh_error(sftp);
return strerrno();
}
/* warp stat/sftp_stat */
struct mscp_stat {
struct stat l;
sftp_attributes r;
};
typedef struct mscp_stat mstat;
static int mscp_stat(const char *path, mstat *s, sftp_session sftp)
{
memset(s, 0, sizeof(*s));
if (sftp) {
s->r = sftp_stat(sftp, path);
if (!s->r)
return -1;
} else {
if (stat(path, &s->l) < 0)
return -1;
}
return 0;
}
static int mscp_stat_check_err_noent(sftp_session sftp)
{
if (sftp) {
if (sftp_get_error(sftp) == SSH_FX_NO_SUCH_PATH ||
sftp_get_error(sftp) == SSH_FX_NO_SUCH_FILE)
return 0;
} else {
if (errno == ENOENT)
return 0;
}
return -1;
}
static void mscp_stat_free(mstat s) {
if (s.r)
sftp_attributes_free(s.r);
}
#define mstat_size(s) ((s.r) ? s.r->size : s.l.st_size)
#define mstat_mode(s) ((s.r) ? \
s.r->permissions : \
s.l.st_mode & (S_IRWXU|S_IRWXG|S_IRWXO))
#define mstat_is_regular(s) ((s.r) ? \
(s.r->type == SSH_FILEXFER_TYPE_REGULAR) : \
S_ISREG(s.l.st_mode))
#define mstat_is_dir(s) ((s.r) ? \
(s.r->type == SSH_FILEXFER_TYPE_DIRECTORY) : \
S_ISDIR(s.l.st_mode))
/* wrap mkdir */
static int mscp_mkdir(const char *path, mode_t mode, sftp_session sftp)
{
int ret;
if (sftp) {
ret = sftp_mkdir(sftp, path, mode);
if (ret < 0 &&
sftp_get_error(sftp) != SSH_FX_FILE_ALREADY_EXISTS) {
mscp_set_error("sftp_mkdir '%s': %s",
path, sftp_get_ssh_error(sftp));
return -1;
}
} else {
if (mkdir(path, mode) == -1 && errno != EEXIST) {
mscp_set_error("mkdir '%s': %s", path, strerrno());
return -1;
}
}
return 0;
}
/* wrap open/sftp_open */
struct mscp_file_handle {
int fd;
sftp_file sf;
};
typedef struct mscp_file_handle mfh;
static mfh mscp_open(const char *path, int flags, mode_t mode, size_t off,
sftp_session sftp)
{
mfh h;
h.fd = -1;
h.sf = NULL;
if (sftp) {
h.sf = sftp_open(sftp, path, flags, mode);
if (!h.sf) {
mscp_set_error("sftp_open '%s': %s",
path, sftp_get_ssh_error(sftp));
return h;
}
if (sftp_seek64(h.sf, off) < 0) {
mscp_set_error("sftp_seek64 '%s': %s",
path, sftp_get_ssh_error(sftp));
sftp_close(h.sf);
h.sf = NULL;
return h;
}
} else {
h.fd = open(path, flags, mode);
if (h.fd < 0) {
mscp_set_error("open '%s': %s", path, strerrno());
return h;
}
if (lseek(h.fd, off, SEEK_SET) < 0) {
mscp_set_error("lseek '%s': %s", path, strerrno());
close(h.fd);
h.fd = -1;
return h;
}
}
return h;
}
#define mscp_open_is_failed(h) (h.fd < 0 && h.sf == NULL)
static void mscp_close(mfh h)
{
if (h.sf)
sftp_close(h.sf);
if (h.fd > 0)
close(h.fd);
h.sf = NULL;
h.fd = -1;
}
/* wrap chmod/sftp_chmod */
static int mscp_chmod(const char *path, mode_t mode, sftp_session sftp)
{
if (sftp) {
if (sftp_chmod(sftp, path, mode) < 0) {
mscp_set_error("sftp_chmod '%s': %s",
path, sftp_get_ssh_error(sftp));
return -1;
}
} else {
if (chmod(path, mode) < 0) {
mscp_set_error("chmod '%s': %s", path, strerrno());
return -1;
}
}
return 0;
}
#endif /* _PATH_H_ */

View File

@@ -1,9 +1,11 @@
#ifdef __APPLE__
#include <stdlib.h>
#include <sys/types.h>
#include <sys/sysctl.h>
#elif linux
#define _GNU_SOURCE
#include <sched.h>
#include <stdlib.h>
#else
#error unsupported platform
#endif
@@ -12,6 +14,7 @@
#include <platform.h>
#include <message.h>
#ifdef __APPLE__
int nr_cpus()
{
@@ -32,6 +35,38 @@ int set_thread_affinity(pthread_t tid, int core)
return 0;
}
static void random_string(char *buf, size_t size)
{
char chars[] = "abcdefhijklmnopqrstuvwxyz1234567890";
int n, x;
for (n = 0; n < size - 1; n++) {
x = arc4random() % (sizeof(chars) - 1);
buf[n] = chars[x];
}
buf[size - 1] = '\0';
}
sem_t *sem_create(int value)
{
char sem_name[30] = "mscp-";
sem_t *sem;
int n;
n = strlen(sem_name);
random_string(sem_name + n, sizeof(sem_name) - n - 1);
if ((sem = sem_open(sem_name, O_CREAT, 600, value)) == SEM_FAILED)
return NULL;
return sem;
}
int sem_release(sem_t *sem)
{
return sem_close(sem);
}
#endif
#ifdef linux
@@ -56,5 +91,27 @@ int set_thread_affinity(pthread_t tid, int core)
core, strerrno());
return ret;
}
sem_t *sem_create(int value)
{
sem_t *sem;
if ((sem = malloc(sizeof(*sem))) == NULL)
return NULL;
if (sem_init(sem, 0, value) < 0) {
free(sem);
return NULL;
}
return sem;
}
int sem_release(sem_t *sem)
{
free(sem);
return 0;
}
#endif

View File

@@ -2,8 +2,20 @@
#define _PLATFORM_H_
#include <pthread.h>
#include <semaphore.h>
int nr_cpus();
int nr_cpus(void);
int set_thread_affinity(pthread_t tid, int core);
/*
* macOS does not support sem_init(). macOS (seems to) releases the
* named semaphore when associated mscp process finished. In linux,
* program (seems to) need to release named semaphore in /dev/shm by
* sem_unlink() explicitly. So, using sem_init() (unnamed semaphore)
* in linux and using sem_open() (named semaphore) in macOS without
* sem_unlink() are reasonable (?).
*/
sem_t *sem_create(int value);
int sem_release(sem_t *sem);
#endif /* _PLATFORM_H_ */

View File

@@ -74,7 +74,7 @@ static int release_instance(struct instance *i)
/* wrapper functions */
static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
static PyObject *wrap_mscp_init(PyObject *self, PyObject *args, PyObject *kw)
{
/*
* Initialize struct mscp with options. wrap_mscp_init
@@ -89,30 +89,37 @@ static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
/* mscp_opts */
"nr_threads", /* int */
"nr_ahead", /* int */
"min_chunk_sz", /* unsigned long */
"max_chunk_sz", /* unsigned long */
"buf_sz", /* unsigned long */
"coremask", /* const char * */
"max_startups", /* int */
"severity", /* int, MSCP_SERVERITY_* */
"msg_fd", /* int */
/* mscp_ssh_opts */
"login_name", /* const char * */
"port", /* const char * */
"config", /* const char * */
"identity", /* const char * */
"cipher", /* const char * */
"hmac", /* const char * */
"compress", /* const char * */
"password", /* const char * */
"passphrase", /* const char * */
"debug_level", /* int */
"no_hostkey_check", /* bool */
"enable_nagle", /* bool */
NULL,
};
const char *fmt = "si" "|iikkksii" "ssssssssipp";
const char *fmt = "si" "|" "ii" "kkk" "s" "iii" "ssss" "sssss" "ipp";
char *coremask = NULL;
char *login_name = NULL, *port = NULL, *identity = NULL;
char *login_name = NULL, *port = NULL, *config = NULL, *identity = NULL;
char *cipher = NULL, *hmac = NULL, *compress = NULL;
char *password = NULL, *passphrase = NULL;
@@ -137,10 +144,12 @@ static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
&i->mo.max_chunk_sz,
&i->mo.buf_sz,
&coremask,
&i->mo.max_startups,
&i->mo.severity,
&i->mo.msg_fd,
&login_name,
&port,
&config,
&identity,
&cipher,
&hmac,
@@ -160,6 +169,8 @@ static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
strncpy(i->so.login_name, login_name, MSCP_SSH_MAX_LOGIN_NAME - 1);
if (port)
strncpy(i->so.port, port, MSCP_SSH_MAX_PORT_STR - 1);
if (config)
strncpy(i->so.config, config, PATH_MAX - 1);
if (identity)
strncpy(i->so.identity, identity, MSCP_SSH_MAX_IDENTITY_PATH - 1);
if (cipher)
@@ -175,6 +186,7 @@ static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
i->m = mscp_init(remote, direction, &i->mo, &i->so);
if (!i->m) {
PyErr_Format(PyExc_RuntimeError, "%s", mscp_get_error());
free(i);
return NULL;
}
@@ -260,7 +272,7 @@ static PyObject *wrap_mscp_set_dst_path(PyObject *self, PyObject *args, PyObject
return Py_BuildValue("");
}
static PyObject *wrap_mscp_prepare(PyObject *self, PyObject *args, PyObject *kw)
static PyObject *wrap_mscp_scan(PyObject *self, PyObject *args, PyObject *kw)
{
char *keywords[] = { "m", NULL };
unsigned long long addr;
@@ -275,7 +287,7 @@ static PyObject *wrap_mscp_prepare(PyObject *self, PyObject *args, PyObject *kw)
return NULL;
}
if (mscp_prepare(m) < 0) {
if (mscp_scan(m) < 0) {
PyErr_Format(PyExc_RuntimeError, mscp_get_error());
return NULL;
}
@@ -367,7 +379,7 @@ static PyObject *wrap_mscp_get_stats(PyObject *self, PyObject *args, PyObject *k
mscp_get_stats(m, &s);
return Py_BuildValue("KKd", s.total, s.done, s.finished);
return Py_BuildValue("KKO", s.total, s.done, PyBool_FromLong(s.finished));
}
static PyObject *wrap_mscp_cleanup(PyObject *self, PyObject *args, PyObject *kw)
@@ -429,7 +441,7 @@ static PyMethodDef pymscpMethods[] = {
METH_VARARGS | METH_KEYWORDS, NULL
},
{
"mscp_prepare", (PyCFunction)wrap_mscp_prepare,
"mscp_scan", (PyCFunction)wrap_mscp_scan,
METH_VARARGS | METH_KEYWORDS, NULL
},
{

View File

@@ -73,6 +73,12 @@ static int ssh_set_opts(ssh_session ssh, struct mscp_ssh_opts *opts)
}
}
if (is_specified(opts->config) &&
ssh_options_parse_config(ssh, opts->config) < 0) {
mscp_set_error("failed to parse ssh_config: %s", opts->config);
return -1;
}
return 0;
}
@@ -149,14 +155,14 @@ static ssh_session ssh_init_session(const char *sshdst, struct mscp_ssh_opts *op
cb.userdata = opts;
ssh_set_callbacks(ssh, &cb);
if (ssh_set_opts(ssh, opts) != 0)
goto free_out;
if (ssh_options_set(ssh, SSH_OPTIONS_HOST, sshdst) != SSH_OK) {
mscp_set_error("failed to set destination host");
goto free_out;
}
if (ssh_set_opts(ssh, opts) != 0)
goto free_out;
if (ssh_connect(ssh) != SSH_OK) {
mscp_set_error("failed to connect ssh server: %s", ssh_get_error(ssh));
goto free_out;

View File

@@ -31,8 +31,6 @@
#define pr_debug(fmt, ...)
#endif
#define strerrno() strerror(errno)
#define min(a, b) (((a) > (b)) ? (b) : (a))
#define max(a, b) (((a) > (b)) ? (a) : (b))

View File

@@ -11,11 +11,15 @@ from util import File, check_same_md5sum
def run2ok(args):
check_call(list(map(str, args)))
cmd = list(map(str, args))
print("cmd: {}".format(" ".join(cmd)))
check_call(cmd)
def run2ng(args):
cmd = list(map(str, args))
print("cmd: {}".format(" ".join(cmd)))
with pytest.raises(CalledProcessError) as e:
check_call(list(map(str, args)))
check_call(cmd)
""" usage test """
@@ -56,11 +60,16 @@ param_single_copy = [
@pytest.mark.parametrize("src, dst", param_single_copy)
def test_single_copy(mscp, src_prefix, dst_prefix, src, dst):
src.make()
run2ok([mscp, "-H", src_prefix + src.path, dst_prefix + dst.path])
run2ok([mscp, "-H", "-vvv", src_prefix + src.path, dst_prefix + dst.path])
assert check_same_md5sum(src, dst)
src.cleanup()
dst.cleanup()
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
def test_failed_to_copy_nonexistent_file(mscp, src_prefix, dst_prefix):
src = "nonexistent_src"
dst = "nonexistent_dst"
run2ng([mscp, "-H", "-vvv", src_prefix + src, dst_prefix + dst])
param_double_copy = [
(File("src1", size = 1024 * 1024), File("src2", size = 1024 * 1024),
@@ -72,7 +81,7 @@ param_double_copy = [
def test_double_copy(mscp, src_prefix, dst_prefix, s1, s2, d1, d2):
s1.make()
s2.make()
run2ok([mscp, "-H", src_prefix + s1.path, src_prefix + s2.path, dst_prefix + "dst"])
run2ok([mscp, "-H", "-vvv", src_prefix + s1.path, src_prefix + s2.path, dst_prefix + "dst"])
assert check_same_md5sum(s1, d1)
assert check_same_md5sum(s2, d2)
s1.cleanup()
@@ -109,11 +118,11 @@ def test_dir_copy(mscp, src_prefix, dst_prefix, src_dir, dst_dir, src, dst, twic
for f in src:
f.make()
run2ok([mscp, "-H", src_prefix + src_dir, dst_prefix + dst_dir])
run2ok([mscp, "-H", "-vvv", src_prefix + src_dir, dst_prefix + dst_dir])
for sf, df in zip(src, dst):
assert check_same_md5sum(sf, df)
run2ok([mscp, "-H", src_prefix + src_dir, dst_prefix + dst_dir])
run2ok([mscp, "-H", "-vvv", src_prefix + src_dir, dst_prefix + dst_dir])
for sf, df in zip(src, twice):
assert check_same_md5sum(sf, df)
@@ -122,13 +131,30 @@ def test_dir_copy(mscp, src_prefix, dst_prefix, src_dir, dst_dir, src, dst, twic
df.cleanup()
tf.cleanup()
param_dir_copy_single = [
("src_dir", "dst_dir",
File("src_dir/t1", size = 1024 * 1024),
File("dst_dir/src_dir/t1"),
)
]
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
@pytest.mark.parametrize("src_dir, dst_dir, src, dst", param_dir_copy_single)
def test_dir_copy_single(mscp, src_prefix, dst_prefix, src_dir, dst_dir, src, dst):
src.make()
os.mkdir(dst_dir)
run2ok(["mscp", "-H", "-vvv", src_prefix + src_dir, dst_prefix + dst_dir])
assert check_same_md5sum(src, dst)
src.cleanup()
dst.cleanup()
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
def test_override_single_file(mscp, src_prefix, dst_prefix):
src = File("src", size = 128).make()
dst = File("dst", size = 128).make()
assert not check_same_md5sum(src, dst)
run2ok([mscp, "-H", src_prefix + src.path, dst_prefix + dst.path])
run2ok([mscp, "-H", "-vvv", src_prefix + src.path, dst_prefix + dst.path])
assert check_same_md5sum(src, dst)
src.cleanup()
@@ -139,18 +165,55 @@ def test_min_chunk(mscp, src_prefix, dst_prefix):
src = File("src", size = 16 * 1024).make()
dst = File("dst")
run2ok([mscp, "-H", "-s", 8192, src_prefix + src.path, dst_prefix + dst.path])
run2ok([mscp, "-H", "-vvv", "-s", 32768, src_prefix + src.path, dst_prefix + dst.path])
assert check_same_md5sum(src, dst)
src.cleanup()
dst.cleanup()
def is_alpine():
if os.path.exists("/etc/os-release"):
with open("/etc/os-release", "r") as f:
for line in f:
if line.strip() == "ID=alpine":
return True
return False
param_glob_copy = [
(
"src*", "dstx",
[ File("src1"), File("src2"), File("src3") ],
[ File("dstx/src1"), File("dstx/src2"), File("dstx/src3") ],
),
(
"src*", "dstx",
[ File("src1/s1"), File("src2/s2"), File("src3/s3") ],
[ File("dstx/s1"), File("dstx/s2"), File("dstx/s3") ],
)
]
@pytest.mark.skipif(is_alpine(),
reason = "musl does not implement glob ALTDIRFUNC")
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
@pytest.mark.parametrize("src_glob_path, dst_path, srcs, dsts", param_glob_copy)
def test_glob_src_path(mscp, src_prefix, dst_prefix,
src_glob_path, dst_path, srcs, dsts):
for src in srcs:
src.make(size = 1024 * 1024)
run2ok([mscp, "-H", "-vvv", src_prefix + src_glob_path, dst_prefix + dst_path])
for src, dst in zip(srcs, dsts):
assert check_same_md5sum(src, dst)
src.cleanup()
dst.cleanup()
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
def test_thread_affinity(mscp, src_prefix, dst_prefix):
src = File("src", size = 64 * 1024).make()
dst = File("dst")
run2ok([mscp, "-H", "-n", 4, "-m", "0x01", "-s", 8192, "-S", 65536,
run2ok([mscp, "-H", "-vvv", "-n", 4, "-m", "0x01",
src_prefix + src.path, dst_prefix + dst.path])
assert check_same_md5sum(src, dst)
@@ -162,7 +225,7 @@ def test_cannot_override_file_with_dir(mscp, src_prefix, dst_prefix):
src = File("src", size = 128).make()
dst = File("dst").make()
run2ng([mscp, "-H", src_prefix + src.path, dst_prefix + "dst/src"])
run2ng([mscp, "-H", "-vvv", src_prefix + src.path, dst_prefix + "dst/src"])
src.cleanup()
dst.cleanup()
@@ -171,7 +234,7 @@ def test_cannot_override_file_with_dir(mscp, src_prefix, dst_prefix):
def test_transfer_zero_bytes(mscp, src_prefix, dst_prefix):
src = File("src", size = 0).make()
dst = File("dst")
run2ok([mscp, "-H", src_prefix + src.path, dst_prefix + "dst"])
run2ok([mscp, "-H", "-vvv", src_prefix + src.path, dst_prefix + "dst"])
assert os.path.exists("dst")
src.cleanup()
dst.cleanup()
@@ -180,18 +243,65 @@ def test_transfer_zero_bytes(mscp, src_prefix, dst_prefix):
def test_override_dst_having_larger_size(mscp, src_prefix, dst_prefix):
src = File("src", size = 1024 * 1024).make()
dst = File("dst", size = 1024 * 1024 * 2).make()
run2ok([mscp, "-H", src_prefix + src.path, dst_prefix + "dst"])
run2ok([mscp, "-H", "-vvv", src_prefix + src.path, dst_prefix + "dst"])
assert check_same_md5sum(src, dst)
src.cleanup()
dst.cleanup()
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
def test_dont_truncate_dst(mscp, src_prefix, dst_prefix):
f = File("srcanddst", size = 1024 * 1024 * 128).make()
md5_before = f.md5sum()
run2ok([mscp, "-H", "-vvv", src_prefix + f.path, dst_prefix + f.path])
md5_after = f.md5sum()
assert md5_before == md5_after
f.cleanup()
compressions = ["yes", "no", "none"]
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
@pytest.mark.parametrize("compress", compressions)
def test_compression(mscp, src_prefix, dst_prefix, compress):
src = File("src", size = 1024 * 1024).make()
dst = File("dst", size = 1024 * 1024 * 2).make()
run2ok([mscp, "-H", "-C", compress, src_prefix + src.path, dst_prefix + "dst"])
run2ok([mscp, "-H", "-vvv", "-C", compress, src_prefix + src.path, dst_prefix + "dst"])
assert check_same_md5sum(src, dst)
src.cleanup()
dst.cleanup()
testhost = "mscptestlocalhost"
testhost_prefix = "{}:{}/".format(testhost, os.getcwd()) # use current dir
param_testhost_prefix = [
("", testhost_prefix), (testhost_prefix, "")
]
@pytest.mark.parametrize("src_prefix, dst_prefix", param_testhost_prefix)
def test_config_ok(mscp, src_prefix, dst_prefix):
config = "/tmp/mscp_test_ssh_config"
with open(config, "w") as f:
f.write("host {}\n".format(testhost))
f.write(" hostname localhost\n")
src = File("src", size = 1024 * 1024).make()
dst = File("dst", size = 1024 * 1024 * 2).make()
run2ok([mscp, "-H", "-vvv", "-F", config,
src_prefix + src.path, dst_prefix + "dst"])
os.remove(config)
assert check_same_md5sum(src, dst)
src.cleanup()
dst.cleanup()
@pytest.mark.parametrize("src_prefix, dst_prefix", param_testhost_prefix)
def test_config_ng(mscp, src_prefix, dst_prefix):
config = "/tmp/mscp_test_ssh_config"
with open(config, "w") as f:
f.write("\n") # use empty ssh_config
src = File("src", size = 1024 * 1024).make()
dst = File("dst", size = 1024 * 1024 * 2).make()
run2ng([mscp, "-H", "-vvv", "-F", config,
src_prefix + src.path, dst_prefix + "dst"])
os.remove(config)
src.cleanup()
dst.cleanup()

View File

@@ -71,6 +71,7 @@ param_kwargs = [
{ "min_chunk_sz": 1 * 1024 * 1024 },
{ "max_chunk_sz": 64 * 1024 * 1024 },
{ "coremask": "0x0f" },
{ "max_startups": 5 },
{ "severity": mscp.SEVERITY_NONE },
{ "cipher": "aes128-gcm@openssh.com" },
{ "compress": "yes" },
@@ -102,3 +103,28 @@ def test_login_failed():
m = mscp.mscp(remote, mscp.LOCAL2REMOTE, port = "65534")
with pytest.raises(RuntimeError) as e:
m.connect()
def test_get_stat_before_copy_start():
m = mscp.mscp("localhost", mscp.LOCAL2REMOTE)
m.connect()
(total, done, finished) = m.stats()
assert total == 0 and done == 0
param_invalid_kwargs = [
{ "nr_threads": -1 },
{ "nr_ahead": -1 },
{ "min_chunk_sz": 1 },
{ "max_chunk_sz": 1 },
{ "coremask": "xxxxx" },
{ "max_startups": -1 },
{ "cipher": "invalid" },
{ "hmac": "invalid"},
{ "compress": "invalid"},
]
@pytest.mark.parametrize("kw", param_invalid_kwargs)
def test_invalid_options(kw):
with pytest.raises(RuntimeError) as e:
m = mscp.mscp("localhost", mscp.LOCAL2REMOTE, **kw)
m.connect()

View File

@@ -16,10 +16,16 @@ class File():
self.content = content
self.perm = perm
def __repr__(self):
return "<file:{} {}-bytes>".format(self.path, self.size)
def __str__(self):
return self.path
def make(self):
def make(self, size = None):
if size:
self.size = size
d = os.path.dirname(self.path)
if d:
os.makedirs(d, exist_ok = True)