48 Commits

Author SHA1 Message Date
Ryo Nakamura
09d7ec6a24 bump version to 0.0.7 2023-03-13 15:01:44 +09:00
Ryo Nakamura
cafbefe48c fix mscp.py 2023-03-13 14:53:54 +09:00
Ryo Nakamura
631d52b00d fix install libmscp.dylib to python package dir in macOS 2023-03-13 14:46:04 +09:00
Ryo Nakamura
6e17d0ddbc remove VERSION from package file names
This change enables downloading latest packages from URL
https://github.com/upa/mscp/releases/latest/download/PACKAGE
2023-03-12 23:01:06 +09:00
Ryo Nakamura
e2da5811ce test: add test_python.py for testing python-mscp
Dockerfiles also adapt themselvs for testing python-mscp bindings.
2023-03-12 20:37:57 +09:00
Ryo Nakamura
597a7a8cba little cleanup for python bindings 2023-03-12 17:39:51 +09:00
Ryo Nakamura
2416b5f182 fix cleanup 2023-03-12 17:06:02 +09:00
Ryo Nakamura
1028ecb53b setup.py: read version from VERSION file 2023-03-12 13:21:19 +09:00
Ryo Nakamura
d15a25d1f6 add destructor to mscp class to call mscp_free() 2023-03-12 00:17:11 +09:00
Ryo Nakamura
10812071aa mscp copy via python bindings works.
ToDo: memory for struct instance in pymscp.c is not released until
mscp.free() is called. It is memory leak in a typical pyhotn usage.
Use python extension refcnt instead.
2023-03-12 00:05:39 +09:00
Ryo Nakamura
8ea05729c2 add mscp and pymscp python modules.
pymscp is a C Python wrapper for libmscp functions. mscp module
provides simple (?) Python API.
2023-03-11 22:49:19 +09:00
Ryo Nakamura
855ee618a6 add note to mscp_cleanup() and mscp_free()
mscp_join() must be called before mscp_cleanup() and mscp_free()
are called. Need fix.
2023-03-11 22:11:44 +09:00
Ryo Nakamura
74d58e986a move direction from mscp_opts to mscp_init argument 2023-03-11 21:48:07 +09:00
Ryo Nakamura
7e7bc61ff2 start to implement pymscp.c 2023-03-11 20:54:45 +09:00
Ryo Nakamura
d22c02b793 remove numpy from test, and fix compiling single binary mscp 2023-03-10 22:07:07 +09:00
Ryo Nakamura
2477647a3b fix uninitialized dst_path_is_dir bool 2023-03-10 21:42:00 +09:00
Ryo Nakamura
e037294d3d add include GNUInstallDirs 2023-03-10 21:11:59 +09:00
Ryo Nakamura
309371ed75 now make install installs libmscp 2023-03-10 21:10:06 +09:00
Ryo Nakamura
1e92ff9e77 fix CMakeLists to build libmscp on ubnutu 22.04 2023-03-10 20:29:46 +09:00
Ryo Nakamura
b3b7299990 merge main into lib 2023-03-10 20:11:44 +09:00
Ryo Nakamura
ca94d77e45 fix typo 2023-03-10 02:09:04 +09:00
Ryo Nakamura
34a0e0c891 add alpine docker to build mscp as a single binary
The build recipe in docker/alpine-3.17.Dockerfile uses conan to
build mscp as a single binary (with statically linked musl).

Now the mscp binary is portable!
2023-03-10 00:37:06 +09:00
Ryo Nakamura
c39ab7ce62 add conanfile.txt to compile mscp as a single binary
conan cmake build with -DBUILD_CONAN=ON and -DBUILD_STATIC=ON in
alpine make mscp as a single binary with statically linked musl.
2023-03-09 23:21:58 +09:00
Ryo Nakamura
e56e1be4f6 Merge branch 'main' of github.com:upa/mscp into main 2023-03-09 22:33:04 +09:00
Ryo Nakamura
c07bdd60e5 fix cmake: remove modification to libssh CMake 2023-03-09 22:32:42 +09:00
Ryo Nakamura
d766b3a99e fix dryrun handling on main.c 2023-03-04 19:01:44 +09:00
Ryo Nakamura
d5a86292b7 add doxygen for mscp.h 2023-03-04 18:47:44 +09:00
Ryo Nakamura
cc18c74d32 remove sigalrm. integrate print messages and progress bar 2023-03-04 17:37:19 +09:00
Ryo Nakamura
205c7cf803 tiny fix for clang warning 2023-03-04 16:50:34 +09:00
Ryo Nakamura
e67b7468e5 use setitimer instead of alarm, and print message.
print_stat now prints messages per interval.
ToDo:
- realtime message printing
- use timer_create instead of setitimer (mscOS has different one)
2023-03-04 16:48:26 +09:00
Ryo Nakamura
f9c8dec389 compilable on ubuntu 2023-03-04 15:53:54 +09:00
Ryo Nakamura
9342c18f0e accidentaly swap min_chunk_sz and max_chunk_sz... 2023-03-04 15:53:42 +09:00
Ryo Nakamura
df2f922b0d remove pprint, use message (mpr_*) instead.
ToDo:
main should use pipe to receive messages from libmscp.
2023-03-04 15:44:10 +09:00
Ryo Nakamura
1e57e8fb2f implementing messaging.
ToDo: remove pprint.
mscp should use mpr_* functions, and main.c should use
just fprintf(stdout, "\r\033[K" fmt, ...) for printing progress bar.
2023-03-03 22:14:54 +09:00
Ryo Nakamura
1b9ae51974 add message.h and message.c, mscp_set|get_error()
Instead of pr_err(), libmscp uses mscp_set_error() and
applications use mscp_get_errror() to get error message.
2023-03-03 21:29:43 +09:00
Ryo Nakamura
c5aa70d9c9 tiny cleanup 2023-03-03 18:30:34 +09:00
Ryo Nakamura
a0b7482f66 add mscp_get_stats
move progress bar-related functions from mscp.c to main.c.
2023-03-03 18:27:14 +09:00
Ryo Nakamura
363296f499 add mscp_ssh_opts and change -C optarg 2023-03-03 16:50:06 +09:00
Ryo Nakamura
a8af79f9cf remove test.c 2023-02-27 10:57:10 +09:00
Ryo Nakamura
fc45fa2532 add comments to mscp.h 2023-02-26 23:56:57 +09:00
Ryo Nakamura
ca0ea3ee77 tiny fix on comment 2023-02-26 23:46:53 +09:00
Ryo Nakamura
c649742b3e fix dst path resolve 2023-02-26 23:42:25 +09:00
Ryo Nakamura
700d64b375 now mscp links libmscp 2023-02-26 23:18:39 +09:00
Ryo Nakamura
2bad21bdc2 set default params in mscp_init 2023-02-26 18:43:24 +09:00
Ryo Nakamura
89777032cd have written mscp.c 2023-02-26 18:17:58 +09:00
Ryo Nakamura
3d26cc2c18 add copy-related functions to path 2023-02-25 23:39:20 +09:00
Ryo Nakamura
1be9b70808 start to impliment mscp as a library
this commit starts to refactor file.h|c to path.h|c and
add mscp.c|h. not completed yet.
2023-02-25 22:17:29 +09:00
Ryo Nakamura
b4c021c954 README: add instructions for package install 2023-02-18 16:23:13 +09:00
38 changed files with 3316 additions and 1768 deletions

View File

@@ -24,11 +24,6 @@ jobs:
- name: install build dependency
run: sudo ./scripts/install-build-deps.sh
- name: Set variables
run: |
VER=$(cat VERSION)
echo "VERSION=$VER" >> $GITHUB_ENV
- name: Configure Cmake
run: cmake -B ${{github.workspace}}/build -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}}
@@ -41,14 +36,19 @@ jobs:
- name: Retrieve packages from containers
run: make -C ${{github.workspace}}/build docker-pkg-all
- name: Copy mscp single binary built inside Alpine
run: cp ${{github.workspace}}/build/mscp_alpine-3.17-x86_64.static ${{github.workspace}}/build/mscp.linux.x86.static
- name: Release
uses: softprops/action-gh-release@v1
with:
files: |
${{github.workspace}}/build/mscp_${{env.VERSION}}-ubuntu-20.04-x86_64.deb
${{github.workspace}}/build/mscp_${{env.VERSION}}-ubuntu-22.04-x86_64.deb
${{github.workspace}}/build/mscp_${{env.VERSION}}-centos-8-x86_64.rpm
${{github.workspace}}/build/mscp_${{env.VERSION}}-rocky-8.6-x86_64.rpm
${{github.workspace}}/build/mscp_ubuntu-20.04-x86_64.deb
${{github.workspace}}/build/mscp_ubuntu-22.04-x86_64.deb
${{github.workspace}}/build/mscp_centos-8-x86_64.rpm
${{github.workspace}}/build/mscp_rocky-8.6-x86_64.rpm
${{github.workspace}}/build/mscp_alpine-3.17-x86_64.static
${{github.workspace}}/build/mscp.linux.x86.static
source-release:
runs-on: ubuntu-latest

6
.gitignore vendored
View File

@@ -1,3 +1,9 @@
build
html
compile_commands.json
CMakeUserPresets.json
.*.swp
dist
*.egg-info
__pycache__

View File

@@ -6,48 +6,108 @@ project(mscp
VERSION ${MSCP_VERSION}
LANGUAGES C)
include(GNUInstallDirs)
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -DDEBUG")
list(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake/modules)
# add libssh static library
add_subdirectory(libssh EXCLUDE_FROM_ALL)
if(APPLE)
list(APPEND CMAKE_PREFIX_PATH /usr/local) # intel mac homebrew prefix
list(APPEND CMAKE_PREFIX_PATH /opt/homebrew) # arm mac homebrew prefix
endif() # APPLE
option(BUILD_CONAN OFF) # Build mscp with conan
if(BUILD_CONAN)
message(STATUS "Build mscp with conan")
endif()
option(BUILD_STATIC OFF) # Build mscp with -static LD flag
if (BUILD_STATIC)
message(STATUS "Build mscp with -static LD option")
if (NOT BUILD_CONAN)
message(WARNING
"BUILD_STATIC strongly recommended with BUILD_CONAN option")
endif()
endif()
# add libssh static library
set(CMAKE_POLICY_DEFAULT_CMP0077 NEW)
set(WITH_SERVER OFF)
set(BUILD_SHARED_LIBS OFF)
set(WITH_EXAMPLES OFF)
set(BUILD_STATIC_LIB ON)
if(BUILD_CONAN)
message(STATUS
"Disable libssh GSSAPI support because libkrb5 doesn't exist in conan")
set(WITH_GSSAPI OFF)
endif()
add_subdirectory(libssh EXCLUDE_FROM_ALL)
# setup mscp compile options
set(MSCP_COMPILE_OPTS -iquote ${CMAKE_CURRENT_BINARY_DIR}/libssh/include)
set(MSCP_BUILD_INCLUDE_DIRS
${mscp_SOURCE_DIR}/src
${CMAKE_CURRENT_BINARY_DIR}/libssh/include)
set(MSCP_LINK_LIBS ssh-static)
if(BUILD_CONAN)
find_package(ZLIB REQUIRED)
find_package(OpenSSL REQUIRED)
list(APPEND MSCP_LINK_LIBS ZLIB::ZLIB)
list(APPEND MSCP_LINK_LIBS OpenSSL::Crypto)
endif()
set(LIBMSCP_SRC src/mscp.c src/ssh.c src/path.c src/platform.c src/message.c)
# libmscp.so
add_library(mscp-shared SHARED ${LIBMSCP_SRC})
target_include_directories(mscp-shared
PUBLIC $<BUILD_INTERFACE:${mscp_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
PRIVATE ${MSCP_BUILD_INCLUDE_DIRS})
target_compile_options(mscp-shared PRIVATE ${MSCP_COMPILE_OPTS})
target_link_libraries(mscp-shared PRIVATE ${MSCP_LINK_LIBS})
set_target_properties(mscp-shared
PROPERTIES
OUTPUT_NAME mscp
PUBLIC_HEADER ${mscp_SOURCE_DIR}/include/mscp.h)
install(TARGETS mscp-shared)
# libmscp.a
add_library(mscp-static STATIC ${LIBMSCP_SRC})
target_include_directories(mscp-static
PRIVATE ${MSCP_BUILD_INCLUDE_DIRS} ${mscp_SOURCE_DIR}/include)
target_compile_options(mscp-static PRIVATE ${MSCP_COMPILE_OPTS})
target_link_libraries(mscp-static PRIVATE ${MSCP_LINK_LIBS})
set_target_properties(mscp-static
PROPERTIES
OUTPUT_NAME mscp)
install(TARGETS mscp-static)
# mscp executable
add_executable(mscp src/main.c src/platform.c src/ssh.c src/file.c src/pprint.c)
list(APPEND MSCP_LINK_LIBS m pthread)
set(MSCP_LINK_LIBS m pthread)
set(MSCP_LINK_DIRS "")
set(MSCP_COMPILE_OPTS "")
set(MSCP_INCLUDE_DIRS ${mscp_SOURCE_DIR}/src)
list(APPEND MSCP_COMPILE_OPTS -iquote ${CMAKE_CURRENT_BINARY_DIR}/libssh/include)
list(APPEND MSCP_INCLUDE_DIRS ${CMAKE_CURRENT_BINARY_DIR}/libssh/include)
list(APPEND MSCP_LINK_LIBS ssh-static)
find_package(GSSAPI)
list(APPEND MSCP_LINK_LIBS ${GSSAPI_LIBRARIES})
find_package(OpenSSL)
list(APPEND MSCP_LINK_LIBS ${OPENSSL_LIBRARIES})
find_package(ZLIB)
list(APPEND MSCP_LINK_LIBS ${ZLIB_LIBRARIES})
target_include_directories(mscp PRIVATE ${MSCP_INCLUDE_DIRS})
target_link_directories(mscp PRIVATE ${MSCP_LINK_DIRS})
target_link_libraries(mscp PRIVATE ${MSCP_LINK_LIBS})
add_executable(mscp src/main.c)
target_include_directories(mscp
PRIVATE ${MSCP_BUILD_INCLUDE_DIRS} ${mscp_SOURCE_DIR}/include)
target_link_libraries(mscp mscp-static ${MSCP_LINK_LIBS})
if (BUILD_STATIC)
target_link_options(mscp PRIVATE -static)
endif()
target_compile_options(mscp PRIVATE ${MSCP_COMPILE_OPTS})
target_compile_definitions(mscp PUBLIC _VERSION="${PROJECT_VERSION}")
install(TARGETS mscp RUNTIME DESTINATION bin)
install(TARGETS mscp RUNTIME DESTINATION bin)
# Test
@@ -83,7 +143,7 @@ if(UNIX AND NOT APPLE) # on linux
OUTPUT_VARIABLE DIST_DEP OUTPUT_STRIP_TRAILING_WHITESPACE)
set(PACKAGE_FILE_NAME
${PROJECT_NAME}_${PROJECT_VERSION}-${DIST_NAME}-${DIST_VER}-${ARCH})
${PROJECT_NAME}_${DIST_NAME}-${DIST_VER}-${ARCH})
set(CPACK_DEBIAN_FILE_NAME ${PACKAGE_FILE_NAME}.deb)
set(CPACK_DEBIAN_PACKAGE_DEPENDS ${DIST_DEP})
@@ -102,9 +162,9 @@ include(CPack)
# Custom targets to build and test mscp in docker containers.
# foreach(IN ZIP_LISTS) (cmake >= 3.17) can shorten the following lists.
# However, ubuntu 20.04 has cmake 3.16.3. So this is a roundabout trick.
list(APPEND DIST_NAMES ubuntu ubuntu centos rocky)
list(APPEND DIST_VERS 20.04 22.04 8 8.6)
list(APPEND DIST_PKGS deb deb rpm rpm)
list(APPEND DIST_NAMES ubuntu ubuntu centos rocky alpine)
list(APPEND DIST_VERS 20.04 22.04 8 8.6 3.17)
list(APPEND DIST_PKGS deb deb rpm rpm static)
list(LENGTH DIST_NAMES _DIST_LISTLEN)
math(EXPR DIST_LISTLEN "${_DIST_LISTLEN} - 1")
@@ -117,7 +177,7 @@ foreach(x RANGE ${DIST_LISTLEN})
set(DOCKER_IMAGE mscp-${DIST_NAME}:${DIST_VER})
set(DOCKER_INDEX ${DIST_NAME}-${DIST_VER})
set(PKG_FILE_NAME
mscp_${PROJECT_VERSION}-${DIST_NAME}-${DIST_VER}-${ARCH}.${DIST_PKG})
mscp_${DIST_NAME}-${DIST_VER}-${ARCH}.${DIST_PKG})
add_custom_target(docker-build-${DOCKER_INDEX}
COMMENT "Build mscp in ${DOCKER_IMAGE} container"

13
Doxyfile Normal file
View File

@@ -0,0 +1,13 @@
PROJECT_NAME = libmscp
GENERATE_HTML = YES
GENERATE_CHI = NO
GENERATE_LATEX = NO
GENERATE_RTF = NO
GENERATE_MAN = NO
SOURCE_BROWSER = YES
INPUT = src include mscp

View File

@@ -17,27 +17,41 @@ require anything else.
https://user-images.githubusercontent.com/184632/206889149-7cc6178a-6f0f-41e6-855c-d25e15a9abc5.mp4
--------------------------------------------------------------------
Differences from `scp` on usage:
- remote glob on remote shell expansion is not supported.
- remote to remote copy is not supported.
- `-r` option is not needed.
- `-r` option is not needed to transfer directories.
- and any other differences I have not implemented and noticed.
## Install
- homebrew
- macOS
```console
brew install upa/tap/mscp
```
- Linux
- Ubuntu 22.04
```console
wget https://github.com/upa/mscp/releases/download/v0.0.6/mscp_0.0.6-ubuntu-22.04-x86_64.deb
apt-get install -f ./mscp_0.0.6-ubuntu-22.04-x86_64.deb
```
Download a package for your environment from [Releases
page](https://github.com/upa/mscp/releases).
- Ubuntu 20.04
```console
wget https://github.com/upa/mscp/releases/download/v0.0.6/mscp_0.0.6-ubuntu-20.04-x86_64.deb
apt-get install -f ./mscp_0.0.6-ubuntu-20.04-x86_64.deb
```
- Rocky 8.6
```console
yum install https://github.com/upa/mscp/releases/download/v0.0.6/mscp_0.0.6-rocky-8.6-x86_64.rpm
```
## Build
@@ -84,12 +98,12 @@ of libssh. So you can start from cmake with it.
```console
$ mscp
mscp v0.0.5: copy files over multiple ssh connections
mscp v0.0.7: copy files over multiple ssh connections
Usage: mscp [vqDCHdNh] [-n nr_conns] [-m coremask]
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]
[-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]
[-l login_name] [-p port] [-i identity_file]
[-c cipher_spec] [-M hmac_spec] source ... target
[-c cipher_spec] [-M hmac_spec] [-C compress] source ... target
```
- Example: copy a 15GB file on memory over a 100Gbps link
@@ -110,43 +124,41 @@ $ mscp -n 5 -m 0x1f -c aes128-gcm@openssh.com /var/ram/test.img 10.0.0.1:/var/ra
- `-v` option increments verbose output level.
```console
$ mscp test 10.0.0.1:
[======================================] 100% 26B /26B 6.3KB/s 00:00 ETA
$ mscp test 10.0.0.:
[=======================================] 100% 49B /49B 198.8B/s 00:00 ETA
```
```console
$ mscp -vv test 10.0.0.1:
number of connections: 7
connecting to 10.0.0.1 for checking destinations...
file test/testdir/asdf (local) -> ./test/testdir/asdf (remote) 9B
file test/testdir/qwer (local) -> ./test/testdir/qwer (remote) 5B
file test/test1 (local) -> ./test/test1 (remote) 6B
file test/test2 (local) -> ./test/test2 (remote) 6B
file: test/test1 -> ./test/test1
file: test/testdir/asdf -> ./test/testdir/asdf
file: test/testdir/qwer -> ./test/testdir/qwer
file: test/test2 -> ./test/test2
we have only 4 chunk(s). set number of connections to 4
connecting to 10.0.0.1 for a copy thread...
connecting to 10.0.0.1 for a copy thread...
connecting to 10.0.0.1 for a copy thread...
connecting to localhost for a copy thread...
connecting to localhost for a copy thread...
connecting to localhost for a copy thread...
copy start: test/test1
copy start: test/test2
copy done: test/test1
copy start: test/testdir/asdf
copy done: test/test2
copy start: test/testdir/qwer
copy done: test/test1
copy done: test/test2
copy done: test/testdir/qwer
copy done: test/testdir/asdf
[======================================] 100% 26B /26B 5.2KB/s 00:00 ETA
[=======================================] 100% 49B /49B 198.1B/s 00:00 ETA
```
- Full usage
```console
$ mscp -h
mscp v0.0.6: copy files over multiple ssh connections
mscp v0.0.7: copy files over multiple ssh connections
Usage: mscp [vqDCHdNh] [-n nr_conns] [-m coremask]
Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]
[-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]
[-l login_name] [-p port] [-i identity_file]
[-c cipher_spec] [-M hmac_spec] source ... target
[-c cipher_spec] [-M hmac_spec] [-C compress] source ... target
-n NR_CONNECTIONS number of connections (default: floor(log(cores)*2)+1)
-m COREMASK hex value to specify cores where threads pinned
@@ -158,7 +170,7 @@ Usage: mscp [vqDCHdNh] [-n nr_conns] [-m coremask]
-v increment verbose output level
-q disable output
-D dry run
-D dry run. check copy destinations with -vvv
-r no effect
-l LOGIN_NAME login name
@@ -166,10 +178,10 @@ Usage: mscp [vqDCHdNh] [-n nr_conns] [-m coremask]
-i IDENTITY identity file for public key authentication
-c CIPHER cipher spec
-M HMAC hmac spec
-C enable compression on libssh
-C COMPRESS enable compression: yes, no, zlib, zlib@openssh.com
-H disable hostkey check
-d increment ssh debug output level
-N disable tcp nodelay (default on)
-N enable Nagle's algorithm (default disabled)
-h print this help
```

View File

@@ -1 +1 @@
0.0.6
0.0.7

7
conanfile.txt Normal file
View File

@@ -0,0 +1,7 @@
[requires]
zlib/1.2.11
openssl/1.1.1t
[generators]
CMakeDeps
CMakeToolchain

View File

@@ -0,0 +1,44 @@
FROM alpine:3.17
# Build mscp with conan to create single binary mscp
ARG mscpdir="/mscp"
COPY . ${mscpdir}
RUN apk add --no-cache \
gcc make cmake python3 py3-pip perl linux-headers libc-dev \
openssh bash python3-dev g++
RUN pip3 install conan pytest
# Build mscp as a single binary
RUN conan profile detect --force
RUN cd ${mscpdir} \
&& rm -rf build \
&& conan install . --output-folder=build --build=missing \
&& cd ${mscpdir}/build \
&& cmake .. \
-DCMAKE_BUILD_TYPE=Release \
-DCMAKE_TOOLCHAIN_FILE=conan_toolchain.cmake \
-DBUILD_CONAN=ON -DBUILD_STATIC=ON \
&& make \
&& cp mscp /usr/bin/ \
&& cp mscp /mscp/build/mscp_alpine-3.17-x86_64.static
# copy mscp to PKG FILE NAME because this build doesn't use CPACK
# preparation for sshd
RUN ssh-keygen -A
RUN mkdir /var/run/sshd \
&& ssh-keygen -f /root/.ssh/id_rsa -N "" \
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
# install mscp python module
RUN cd ${mscpdir} \
&& python3 setup.py install --user
# Need Fix: A trick putting libmscp.so to python mscp module dir does not work on alpine,
# so install libmscp.
RUN cd ${mscpdir}/build \
&& make install

View File

@@ -9,11 +9,11 @@ RUN cd /etc/yum.repos.d/
RUN sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-*
RUN sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-*
# install numpy and pytest, sshd for test, and rpm-build
# install pytest, sshd for test, and rpm-build
RUN set -ex && yum -y update && yum -y install \
python3 python3-pip openssh openssh-server openssh-clients rpm-build
python3 python3-pip python3-devel openssh openssh-server openssh-clients rpm-build
RUN python3 -m pip install numpy pytest
RUN python3 -m pip install pytest
# preparation for sshd
@@ -34,3 +34,6 @@ RUN cd ${mscpdir} \
&& cpack -G RPM CPackConfig.cmake \
&& rpm -iv *.rpm
# install mscp python module
RUN cd ${mscpdir} \
&& python3 setup.py install --user

View File

@@ -4,11 +4,11 @@ ARG mscpdir="/mscp"
COPY . ${mscpdir}
# install numpy and pytest, sshd for test, and rpm-build
# install pytest, sshd for test, and rpm-build
RUN set -ex && yum -y install \
python3 python3-pip openssh openssh-server openssh-clients rpm-build
python3 python3-pip python3-devel openssh openssh-server openssh-clients rpm-build
RUN python3 -m pip install numpy pytest
RUN python3 -m pip install pytest
# preparation for sshd
@@ -29,3 +29,7 @@ RUN cd ${mscpdir} \
&& cpack -G RPM CPackConfig.cmake \
&& rpm -iv *.rpm
# install mscp python module
RUN cd ${mscpdir} \
&& python3 setup.py install --user

View File

@@ -8,11 +8,11 @@ COPY . ${mscpdir}
RUN set -ex && apt-get update && apt-get install -y --no-install-recommends \
ca-certificates
# install numpy and pytest, and sshd for test
# install pytest, and sshd for test
RUN apt-get install -y --no-install-recommends \
python3 python3-pip openssh-server
python3 python3-pip python3-dev openssh-server
RUN python3 -m pip install numpy pytest
RUN python3 -m pip install pytest
# preparation for sshd
@@ -33,3 +33,7 @@ RUN cd ${mscpdir} \
&& make \
&& cpack -G DEB CPackConfig.cmake \
&& dpkg -i *.deb
# install mscp python module
RUN cd ${mscpdir} \
&& python3 setup.py install --user

View File

@@ -8,11 +8,11 @@ COPY . ${mscpdir}
RUN set -ex && apt-get update && apt-get install -y --no-install-recommends \
ca-certificates
# install numpy and pytest, and sshd for test
# install pytest, and sshd for test
RUN apt-get install -y --no-install-recommends \
python3 python3-pip openssh-server
python3 python3-pip python3-dev openssh-server
RUN python3 -m pip install numpy pytest
RUN python3 -m pip install pytest
# preparation for sshd
@@ -33,3 +33,8 @@ RUN cd ${mscpdir} \
&& make \
&& cpack -G DEB CPackConfig.cmake \
&& dpkg -i *.deb
# install mscp python module
RUN cd ${mscpdir} \
&& python3 setup.py install --user

268
include/mscp.h Normal file
View File

@@ -0,0 +1,268 @@
#ifndef _MSCP_H_
#define _MSCP_H_
/**
* @file mscp.h
*
* @brief mscp library header file.
*
* @mainpage
*
* libmscp is a library for multi-threaded scp. Project page is
* https://github.com/upa/mscp.
*
* All public APIs of libmscp are defined in mscp.h. Basic usage of
* libmscp is follows:
*
* 1. create mscp instance with mscp_init()
* 2. connect to remote host with mscp_connect()
* 3. add path to source files with mscp_add_src_path()
* 4. set path to destination with mscp_set_dst_path()
* 5. finish preparation with mscp_prepare()
* 6. start copy with mscp_start()
* 7. wait for copy finished with mscp_join()
* 8. cleanup mscp instance with mscp_cleanup() and mscp_free()
*/
#include <stdbool.h>
#include <limits.h>
#define MSCP_DIRECTION_L2R 1 /** Indicates local to remote copy */
#define MSCP_DIRECTION_R2L 2 /** Indicates remote to local copy */
#define MSCP_MAX_COREMASK_STR 64
/**
* @struct mscp_opts
* @brief Structure configuring mscp.
*/
struct mscp_opts {
int nr_threads; /** number of copy threads */
int nr_ahead; /** number of SFTP commands on-the-fly */
size_t min_chunk_sz; /** minimum chunk size (default 64MB) */
size_t max_chunk_sz; /** maximum chunk size (default file size/nr_threads) */
size_t buf_sz; /** buffer size, default 16k. */
char coremask[MSCP_MAX_COREMASK_STR]; /** hex to specifiy usable cpu cores */
int severity; /** messaging severity. set MSCP_SERVERITY_* */
int msg_fd; /** fd to output message. default STDOUT (0),
* and -1 disables output */
};
#define MSCP_SSH_MAX_LOGIN_NAME 64
#define MSCP_SSH_MAX_PORT_STR 32
#define MSCP_SSH_MAX_IDENTITY_PATH PATH_MAX
#define MSCP_SSH_MAX_CIPHER_STR 32
#define MSCP_SSH_MAX_HMAC_STR 32
#define MSCP_SSH_MAX_COMP_STR 32 /* yes, no, zlib, zlib@openssh.com, none */
#define MSCP_SSH_MAX_PASSWORD 128
#define MSCP_SSH_MAX_PASSPHRASE 128
/**
* @struct mscp_ssh_opts
* @brief Structure configuring SSH connections
*/
struct mscp_ssh_opts {
/* ssh options */
char login_name[MSCP_SSH_MAX_LOGIN_NAME]; /** ssh username */
char port[MSCP_SSH_MAX_PORT_STR]; /** ssh port */
char identity[MSCP_SSH_MAX_IDENTITY_PATH]; /** path to private key */
char cipher[MSCP_SSH_MAX_CIPHER_STR]; /** cipher spec */
char hmac[MSCP_SSH_MAX_HMAC_STR]; /** hmacp spec */
char compress[MSCP_SSH_MAX_COMP_STR]; /** yes, no, zlib@openssh.com */
char password[MSCP_SSH_MAX_PASSWORD]; /** password auth passowrd */
char passphrase[MSCP_SSH_MAX_PASSPHRASE]; /** passphrase for private key */
int debug_level; /** inclirement libssh debug output level */
bool no_hostkey_check; /** do not check host keys */
bool enable_nagle; /** enable Nagle's algorithm if true */
};
/**
* @struct mscp_stats
* @brief Structure to get mscp statistics
*/
struct mscp_stats {
size_t total; /** total bytes to be transferred */
size_t done; /** total bytes transferred */
bool finished; /** true when all copy threads finished */
};
/** Structure representing mscp instance */
struct mscp;
/**
* @brief Creates a new mscp instance.
*
* @param remote_host remote host for file transer.
* @param direction copy direction, `MSCP_DIRECTION_L2R` or `MSCP_DIRECTION_R2L`
* @param o options for configuring mscp.
* @param s options for configuring ssh connections.
*
* @retrun A new mscp instance or NULL on error.
*/
struct mscp *mscp_init(const char *remote_host, int direction,
struct mscp_opts *o, struct mscp_ssh_opts *s);
/**
* @brief Connect the first SSH connection. mscp_connect connects to
* remote host and initialize a SFTP session over the
* connection. mscp_prepare() and mscp_start() require mscp_connect()
* beforehand.
*
* @param m mscp instance.
*
* @return 0 on success, < 0 if an error occured.
* mscp_get_error() can be used to retrieve error message.
*/
int mscp_connect(struct mscp *m);
/* add a source file path to be copied */
/**
* @brief Add a source file path to be copied. The path indicates
* either a file or directory. The path can be `user@host:path`
* notation. In this case, `dst_path` for mscp_set_dst_path() must
* not contain remote host notation.
*
* @param m mscp instance.
* @param src_path source file path to be copied.
*
* @return 0 on success, < 0 if an error occured.
* mscp_get_error() can be used to retrieve error message.
*/
int mscp_add_src_path(struct mscp *m, const char *src_path);
/**
* @brief Set the destination file path. The path indicates either a
* file, directory, or nonexistent path. The path can be
* `user@host:path` notation. In this case, all source paths appended
* by mscp_set_src_path() must not contain remote host notation.
*
* @param m mscp instance.
* @param dst_path destination path to which source files copied.
*
* @return 0 on success, < 0 if an error occured.
* mscp_get_error() can be used to retrieve error message.
*/
int mscp_set_dst_path(struct mscp *m, const char *dst_path);
/* check source files, resolve destination file paths for all source
* files, and prepare chunks for all files. */
/**
* @brief Prepare for file transfer. This function checks all source
* files (recursively), resolve paths on the destination side, and
* calculate file chunks.
*
* @param m mscp instance.
*
* @return 0 on success, < 0 if an error occured.
* mscp_get_error() can be used to retrieve error message.
*/
int mscp_prepare(struct mscp *m);
/**
* @brief Start to copy files. mscp_start() returns immediately. You
* can get statistics via mscp_get_stats() or messages via pipe set by
* mscp_opts.msg_fd or mscp_set_msg_fd(). mscp_stop() cancels mscp
* copy threads, and mscp_join() joins the threads.
*
* @param m mscp instance.
*
* @return 0 on success, < 0 if an error occured.
* mscp_get_error() can be used to retrieve error message.
*
* @see mscp_join()
*/
int mscp_start(struct mscp *m);
/**
* @brief Stop coping files.
*
* @param m mscp instance.
*/
void mscp_stop(struct mscp *m);
/**
* @brief Join copy threads. This function is blocking until all copy
* have done.
*
* @param m mscp instance.
*
* @return 0 on success, < 0 if an error occured.
* mscp_get_error() can be used to retrieve error message.
*/
int mscp_join(struct mscp *m);
/**
* @brief Get statistics of copy.
*
* @param m mscp instance.
* @param s[out] statistics.
*/
void mscp_get_stats(struct mscp *m, struct mscp_stats *s);
/**
* @brief Cleanup the mscp instance. Before calling mscp_cleanup(),
* must call mscp_join(). After mscp_cleanup() called, the mscp
* instance can restart from mscp_connect(). Note that do not call
* mscp_cleanup() before callign mscp_join(). It causes crash (ToDo:
* check status of copy threads and return error when they are
* running).
*
* @param m mscp instance.
*/
void mscp_cleanup(struct mscp *m);
/**
* @brief Release the mscp instance. Note that do not call *
mscp_free() before calling mscp_join(). It causes crash (ToDo: check
* status of copy threads and return error when they are running).
*
* @param m mscp instance.
*/
void mscp_free(struct mscp *m);
/* messaging with mscp */
/**
* @enum mscp_serverity
* @brief Filter messages from libmscp with severity level.
*/
enum {
MSCP_SEVERITY_NONE = -1,
MSCP_SEVERITY_ERR = 0,
MSCP_SEVERITY_WARN = 1,
MSCP_SEVERITY_NOTICE = 2,
MSCP_SEVERITY_INFO = 3,
MSCP_SEVERITY_DEBUG = 4,
};
/**
* @brief Set a file descriptor for receiving messages from mscp.
* This function has the same effect with setting mscp_opts->msg_fd.
*
* @param m mscp instance.
* @param fd fd to which libmscp writes messages.
*/
void mscp_set_msg_fd(struct mscp *m, int fd);
/**
* @brief Get the recent error message from libmscp. Note that this
* function is not thread-safe.
*
* @return pointer to the message.
*/
const char *mscp_get_error(void);
#endif /* _MSCP_H_ */

1
mscp/__init__.py Normal file
View File

@@ -0,0 +1 @@
from mscp.mscp import *

176
mscp/mscp.py Normal file
View File

@@ -0,0 +1,176 @@
_retry_import_pymscp = False
try:
import pymscp
except ImportError:
_retry_import_pymscp = True
if _retry_import_pymscp:
""" libmscp.so is not installed on system library paths. So retry
to import libmscp.so installed on the mscp python module
directory.
"""
import os
import sys
import ctypes
if sys.platform == "linux":
libmscp = "libmscp.so"
elif sys.platform == "darwin":
libmscp = "libmscp.dylib"
mscp_dir = os.path.dirname(__file__)
ctypes.cdll.LoadLibrary("{}/{}".format(mscp_dir, libmscp))
import pymscp
# inherit static values from pymscp
LOCAL2REMOTE = pymscp.LOCAL2REMOTE
REMOTE2LOCAL = pymscp.REMOTE2LOCAL
SEVERITY_NONE = pymscp.SEVERITY_NONE
SEVERITY_ERR = pymscp.SEVERITY_ERR
SEVERITY_WARN = pymscp.SEVERITY_WARN
SEVERITY_NOTICE = pymscp.SEVERITY_NOTICE
SEVERITY_INFO = pymscp.SEVERITY_INFO
SEVERITY_DEBUG = pymscp.SEVERITY_DEBUG
STATE_INIT = 0
STATE_CONNECTED = 1
STATE_PREPARED = 2
STATE_RUNNING = 3
STATE_STOPPED = 4
STATE_JOINED = 5
STATE_CLEANED = 6
STATE_RELEASED = 7
_state_str = {
STATE_INIT: "init",
STATE_CONNECTED: "connected",
STATE_PREPARED: "prepared",
STATE_RUNNING: "running",
STATE_STOPPED: "stopped",
STATE_JOINED: "joined",
STATE_CLEANED: "cleaned",
STATE_RELEASED: "released",
}
class mscp:
def __init__(self, remote: str, direction: int, **kwargs):
self.remote = remote
self.direction = direction
kwargs["remote"] = remote
kwargs["direction"] = direction
self.m = pymscp.mscp_init(**kwargs)
self.src_paths = []
self.dst_path = None
self.state = STATE_INIT
def __str__(self):
return "mscp:{}:{}".format(self.remote, self.__state2str())
def __repr__(self):
return "<{}>".format(str(self))
def __del__(self):
if not hasattr(self, "state"):
return # this instance failed on mscp_init
if self.state == STATE_RUNNING:
self.stop()
if self.state == STATE_STOPPED:
self.join()
self.cleanup()
self.release()
def __state2str(self):
return _state_str[self.state]
def connect(self):
if not (self.state == STATE_INIT or state.state == STATE_CLEANED):
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
pymscp.mscp_connect(m = self.m)
self.state = STATE_CONNECTED
def add_src_path(self, src_path: str):
self.src_paths.append(src_path)
pymscp.mscp_add_src_path(m = self.m, src_path = src_path)
def set_dst_path(self, dst_path: str):
self.dst_path = dst_path
pymscp.mscp_set_dst_path(m = self.m, dst_path = dst_path);
def prepare(self):
if self.state != STATE_CONNECTED:
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
if not self.src_paths:
raise RuntimeError("src path list is empty")
if not self.dst_path:
raise RuntimeError("dst path is not set")
pymscp.mscp_prepare(m = self.m)
self.state = STATE_PREPARED
def start(self):
if self.state != STATE_PREPARED:
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
pymscp.mscp_start(m = self.m)
self.state = STATE_RUNNING
def stop(self):
if self.state != STATE_RUNNING:
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
pymscp.mscp_stop(m = self.m)
self.state = STATE_STOPPED
def join(self):
if not (self.state == STATE_RUNNING or self.state == STATE_STOPPED):
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
pymscp.mscp_join(m = self.m)
self.state = STATE_JOINED
def stats(self):
return pymscp.mscp_get_stats(m = self.m)
def cleanup(self):
if self.state == STATE_RUNNING:
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
pymscp.mscp_cleanup(m = self.m)
self.state = STATE_CLEANED
def release(self):
if self.state != STATE_CLEANED:
raise RuntimeError("invalid mscp state: {}".format(self.__state2str()))
pymscp.mscp_free(m = self.m)
self.state = STATE_RELEASED
# Simple interface: mscp.copy(src, dst)
def copy(self, src, dst, nonblock = False):
if self.state < STATE_CONNECTED:
self.connect()
if type(src) == list:
for path in src:
self.add_src_path(path)
elif type(src) == str:
self.add_src_path(src)
else:
raise ValueError("src must be str of list: '{}'".format(src))
self.set_dst_path(dst)
self.prepare()
self.start()
if nonblock:
return
self.join()
self.cleanup()

View File

@@ -1,48 +1,3 @@
diff --git a/DefineOptions.cmake b/DefineOptions.cmake
index 068db988..5fc3c8fc 100644
--- a/DefineOptions.cmake
+++ b/DefineOptions.cmake
@@ -1,7 +1,7 @@
option(WITH_GSSAPI "Build with GSSAPI support" ON)
option(WITH_ZLIB "Build with ZLIB support" ON)
option(WITH_SFTP "Build with SFTP support" ON)
-option(WITH_SERVER "Build with SSH server support" ON)
+option(WITH_SERVER "Build with SSH server support" OFF)
option(WITH_DEBUG_CRYPTO "Build with cryto debug output" OFF)
option(WITH_DEBUG_PACKET "Build with packet debug output" OFF)
option(WITH_DEBUG_CALLTRACE "Build with calltrace debug output" ON)
@@ -11,13 +11,13 @@ option(WITH_MBEDTLS "Compile against libmbedtls" OFF)
option(WITH_BLOWFISH_CIPHER "Compile with blowfish support" OFF)
option(WITH_PCAP "Compile with Pcap generation support" ON)
option(WITH_INTERNAL_DOC "Compile doxygen internal documentation" OFF)
-option(BUILD_SHARED_LIBS "Build shared libraries" ON)
+option(BUILD_SHARED_LIBS "Build shared libraries" OFF)
option(WITH_PKCS11_URI "Build with PKCS#11 URI support" OFF)
option(UNIT_TESTING "Build with unit tests" OFF)
option(CLIENT_TESTING "Build with client tests; requires openssh" OFF)
option(SERVER_TESTING "Build with server tests; requires openssh and dropbear" OFF)
option(WITH_BENCHMARKS "Build benchmarks tools" OFF)
-option(WITH_EXAMPLES "Build examples" ON)
+option(WITH_EXAMPLES "Build examples" OFF)
option(WITH_NACL "Build with libnacl (curve25519)" ON)
option(WITH_SYMBOL_VERSIONING "Build with symbol versioning" ON)
option(WITH_ABI_BREAK "Allow ABI break" OFF)
@@ -25,6 +25,7 @@ option(WITH_GEX "Enable DH Group exchange mechanisms" ON)
option(WITH_INSECURE_NONE "Enable insecure none cipher and MAC algorithms (not suitable for production!)" OFF)
option(FUZZ_TESTING "Build with fuzzer for the server and client (automatically enables none cipher!)" OFF)
option(PICKY_DEVELOPER "Build with picky developer flags" OFF)
+option(WITH_STATIC_LIB "Build static library" ON)
if (WITH_ZLIB)
set(WITH_LIBZ ON)
@@ -60,3 +61,7 @@ endif (NOT GLOBAL_CLIENT_CONFIG)
if (FUZZ_TESTING)
set(WITH_INSECURE_NONE ON)
endif (FUZZ_TESTING)
+
+if (WITH_STATIC_LIB)
+ set(BUILD_STATIC_LIB ON)
+endif()
diff --git a/include/libssh/buffer.h b/include/libssh/buffer.h
index a55a1b40..e34e075c 100644
--- a/include/libssh/buffer.h

View File

@@ -13,5 +13,7 @@ if [ ! -e /var/run/sshd.pid ]; then
/usr/sbin/sshd
fi
ssh-keyscan localhost >> ${HOME}/.ssh/known_hosts
# Run test
python3 -m pytest ../test -v

34
setup.py Normal file
View File

@@ -0,0 +1,34 @@
from setuptools import setup, Extension, find_packages
import sys
import os
mypackage_root_dir = os.path.dirname(__file__)
with open(os.path.join(mypackage_root_dir, 'VERSION')) as version_file:
version = version_file.read().strip()
if sys.platform == "linux":
libmscp = "libmscp.so"
elif sys.platform == "darwin":
libmscp = "libmscp.dylib"
setup(
name='mscp',
version = version,
description = "libmscp python binding",
author = "Ryo Nakamura",
author_email = "upa@haeena.net",
url = "https://github.com/upa/mscp",
packages = find_packages("mscp"),
package_dir = {"": "mscp"},
data_files = [ ("", ["build/" + libmscp])],
py_modules = [ "mscp" ],
ext_modules = [
Extension(
'pymscp',
['src/pymscp.c'],
library_dirs = ['build'],
libraries = ['mscp'],
include_dirs = ['include']
)
]
)

View File

@@ -2,8 +2,10 @@
#define _ATOMIC_H_
#include <stdlib.h>
#include <assert.h>
#include <pthread.h>
#include <util.h>
#include <message.h>
typedef int refcnt;
@@ -28,31 +30,13 @@ static inline void lock_init(lock *l)
static inline void lock_acquire(lock *l)
{
int ret = pthread_mutex_lock(l);
if (ret < 0) {
switch (ret) {
case EINVAL:
pr_err("invalid mutex\n");
exit(1);
case EDEADLK:
pr_err("a deadlock would occur\n");
exit(1);
}
}
assert(ret == 0);
}
static inline void lock_release(lock *l)
{
int ret = pthread_mutex_unlock(l);
if (ret < 0) {
switch (ret) {
case EINVAL:
pr_err("invalid mutex\n");
exit(1);
case EPERM:
pr_err("this thread does not hold this mutex\n");
exit(1);
}
}
assert(ret == 0);
}
static inline void lock_release_via_cleanup(void *l)
@@ -65,7 +49,7 @@ static inline void lock_release_via_cleanup(void *l)
pthread_cleanup_push(lock_release_via_cleanup, l)
#define LOCK_RELEASE_THREAD(l) \
#define LOCK_RELEASE_THREAD() \
pthread_cleanup_pop(1)
#endif /* _ATOMIC_H_ */

View File

@@ -1,903 +0,0 @@
#include <stdlib.h>
#include <stdbool.h>
#include <sys/stat.h>
#include <dirent.h>
#include <fcntl.h>
#include <libgen.h>
#include <ssh.h>
#include <util.h>
#include <file.h>
#include <pprint.h>
#include <platform.h>
bool file_has_hostname(char *path)
{
char *p;
p = strchr(path, ':');
if (p) {
if (p == path || ((p > path) && *(p - 1) == '\\'))
return false; /* first byte is colon or escaped colon, skip */
else
return true;
}
return false;
}
char *file_find_hostname(char *path)
{
char *dup, *p;
dup = strdup(path);
if (!dup) {
pr_err("%s", strerrno());
return NULL;
}
p = strchr(dup, ':');
if (p) {
if (p == dup || ((p > dup) && *(p - 1) == '\\')) {
/* first byte is colon or escaped colon, skip */
free(dup);
} else {
/* handle this as remote hostname (with username) */
*p = '\0';
return dup;
}
}
return NULL;
}
static char *file_find_path(char *path)
{
char *p;
p = strchr(path, ':');
if (p) {
if (p == path || ((p > path) && *(p - 1) == '\\')) {
/* first byte is colon or escaped colon, skip */
return path;
} else {
return p + 1;
}
}
return path;
}
/* return 1 when path is directory, 0 is not directory, and -1 on error */
static int file_is_directory(char *path, sftp_session sftp, bool print_error)
{
int ret = 0;
if (sftp) {
char *remote_path = file_find_path(path);
sftp_attributes attr;
char *p = *remote_path == '\0' ? "." : remote_path;
attr = sftp_stat(sftp, p);
if (!attr) {
if (print_error)
pr_err("sftp_stat %s: %s\n",
path, sftp_get_ssh_error(sftp));
ret = -1;
} else if (attr->type == SSH_FILEXFER_TYPE_DIRECTORY)
ret = 1;
sftp_attributes_free(attr);
} else {
struct stat statbuf;
if (stat(path, &statbuf) < 0) {
if (print_error)
pr_err("stat %s: %s\n", path, strerrno());
ret = -1;
} else if (S_ISDIR(statbuf.st_mode))
ret = 1;
}
return ret;
}
/* return 1 when directory exists, 0 not exists, and -1 on error */
int file_directory_exists(char *path, sftp_session sftp)
{
int ret = 0;
if (sftp) {
sftp_attributes attr;
attr = sftp_stat(sftp, path);
if (!attr) {
if (sftp_get_error(sftp) == SSH_FX_NO_SUCH_PATH ||
sftp_get_error(sftp) == SSH_FX_NO_SUCH_FILE)
ret = 0;
else {
pr_err("%s: %s\n", path, sftp_get_ssh_error(sftp));
ret = -1;
}
} else if (attr->type == SSH_FILEXFER_TYPE_DIRECTORY)
ret = 1;
sftp_attributes_free(attr);
} else {
struct stat statbuf;
if (stat(path, &statbuf) < 0) {
if (errno == ENOENT)
ret = 0;
else {
pr_err("%s: %s\n", path, strerrno());
ret = -1;
}
} else if ((statbuf.st_mode & S_IFMT) == S_IFDIR)
ret = 1;
}
return ret;
}
static struct file *file_alloc(char *src_path, size_t size, bool src_is_remote)
{
struct file *f;
f = malloc(sizeof(*f));
if (!f) {
pr_err("%s\n", strerrno());
return NULL;
}
memset(f, 0, sizeof(*f));
strncpy(f->src_path, src_path, PATH_MAX - 1);
f->size = size;
f->src_is_remote = src_is_remote;
f->dst_is_remote = !src_is_remote;
lock_init(&f->lock);
return f;
}
static bool check_file_should_skip(char *path)
{
int len = strlen(path);
if ((len == 1 && strncmp(path, ".", 1) == 0) ||
(len == 2 && strncmp(path, "..", 2) == 0)) {
return true;
}
return false;
}
/* return -1 when error, 0 when should skip, and 1 when should be copied */
static int check_file_tobe_copied(char *path, sftp_session sftp, size_t *size)
{
struct stat statbuf;
sftp_attributes attr;
int ret = 0;
if (!sftp) {
/* local */
if (stat(path, &statbuf) < 0) {
pr_err("stat %s: %s\n", path, strerrno());
return -1;
}
if (S_ISREG(statbuf.st_mode)) {
*size = statbuf.st_size;
return 1;
}
return 0;
}
/* remote */
attr = sftp_stat(sftp, path);
if (!attr) {
pr_err("sftp_stat %s: %s\n", path, sftp_get_ssh_error(sftp));
return -1;
}
if (attr->type == SSH_FILEXFER_TYPE_REGULAR ||
attr->type == SSH_FILEXFER_TYPE_SYMLINK) {
*size = attr->size;
ret = 1;
}
sftp_attributes_free(attr);
return ret;
}
static int check_pathlen(const char *src, const char *dst)
{
if ((strlen(src) + strlen(dst) + 1) > PATH_MAX) {
pr_err("too long path: %s/%s\n", src, dst);
return -1;
}
return 0;
}
static int file_fill_recursive(struct list_head *file_list,
bool dst_is_remote, sftp_session sftp, char *src_path,
char *rel_path, char *dst_path,
bool dst_should_dir, bool replace_dir_name)
{
char next_src_path[PATH_MAX], next_rel_path[PATH_MAX];
struct file *f;
size_t size;
int ret;
ret = file_is_directory(src_path, dst_is_remote ? NULL : sftp, true);
if (ret < 0)
return -1;
if (ret == 0) {
/* src_path is file */
ret = check_file_tobe_copied(src_path, dst_is_remote ? NULL : sftp,
&size);
if (ret <= 0)
return ret; /* error or skip */
if ((f = file_alloc(src_path, size, !dst_is_remote)) == NULL) {
pr_err("%s\n", strerrno());
return -1;
}
if (dst_should_dir)
snprintf(f->dst_path, PATH_MAX, "%s/%s%s",
dst_path, rel_path, basename(src_path));
else
snprintf(f->dst_path, PATH_MAX, "%s%s", rel_path, dst_path);
list_add_tail(&f->list, file_list);
pprint2("file %s %s -> %s %s %luB\n",
f->src_path, dst_is_remote ? "(local)" : "(remote)",
f->dst_path, dst_is_remote ? "(remote)" : "(local)",
f->size);
return 0;
}
/* src_path is directory */
if (dst_is_remote) {
/* src_path is local directory */
struct dirent *de;
DIR *dir;
if ((dir = opendir(src_path)) == NULL) {
pr_err("opendir '%s': %s\n", src_path, strerrno());
return -1;
}
while ((de = readdir(dir)) != NULL) {
if (check_file_should_skip(de->d_name))
continue;
if (check_pathlen(src_path, de->d_name) < 0 ||
check_pathlen(rel_path, basename(src_path)) < 0)
return -1;
snprintf(next_src_path, sizeof(next_src_path),
"%s/%s", src_path, de->d_name);
if (replace_dir_name)
memset(next_rel_path, 0, sizeof(next_rel_path));
else
snprintf(next_rel_path, sizeof(next_rel_path),
"%s%s/", rel_path, basename(src_path));
ret = file_fill_recursive(file_list, dst_is_remote, sftp,
next_src_path, next_rel_path,
dst_path, dst_should_dir, false);
if (ret < 0)
return ret;
}
} else {
/* src_path is remote directory */
sftp_attributes attr;
sftp_dir dir;
if ((dir = sftp_opendir(sftp, src_path)) == NULL) {
pr_err("sftp_opendir: '%s': %s\n", src_path,
sftp_get_ssh_error(sftp));
return -1;
}
while ((attr = sftp_readdir(sftp, dir)) != NULL) {
if (check_file_should_skip(attr->name))
continue;
if (check_pathlen(src_path, attr->name) < 0 ||
check_pathlen(rel_path, basename(src_path)) < 0)
return -1;
snprintf(next_src_path, sizeof(next_src_path),
"%s/%s", src_path, attr->name);
if (replace_dir_name)
memset(next_rel_path, 0, sizeof(next_rel_path));
else
snprintf(next_rel_path, sizeof(next_rel_path),
"%s%s/", rel_path, basename(src_path));
ret = file_fill_recursive(file_list, dst_is_remote, sftp,
next_src_path, next_rel_path,
dst_path, dst_should_dir, false);
if (ret < 0)
return ret;
}
}
return 0;
}
int file_fill(sftp_session sftp, struct list_head *file_list, char **src_array, int cnt,
char *dst)
{
bool dst_is_remote, dst_is_dir, dst_dir_no_exist, dst_should_dir, dst_must_dir;
char *dst_path, *src_path;
int n, ret;
dst_path = file_find_path(dst);
dst_path = *dst_path == '\0' ? "." : dst_path;
dst_is_remote = file_find_hostname(dst) ? true : false;
dst_must_dir = cnt > 1 ? true : false;
if (file_is_directory(dst_path, dst_is_remote ? sftp : NULL, false) > 0)
dst_is_dir = true;
else
dst_is_dir = false;
dst_dir_no_exist = !dst_is_dir;
for (n = 0; n < cnt; n++) {
src_path = file_find_path(src_array[n]);
if (file_is_directory(src_path, dst_is_remote ? NULL : sftp, false) > 0)
dst_should_dir = true;
else
dst_should_dir = false;
ret = file_fill_recursive(file_list, dst_is_remote, sftp,
src_path, "", dst_path,
dst_should_dir | dst_must_dir | dst_is_dir,
dst_dir_no_exist);
if (ret < 0)
return ret;
}
return 0;
}
/* based on
* https://stackoverflow.com/questions/2336242/recursive-mkdir-system-call-on-unix */
static int file_dst_prepare(struct file *f, sftp_session sftp)
{
/* XXX: should reflect the permission of the original directory? */
mode_t mode = S_IRWXU | S_IRWXG | S_IRWXO;
char path[PATH_MAX];
char *p;
int ret;
strncpy(path, f->dst_path, sizeof(path));
pr_debug("prepare for %s\n", path);
/* mkdir -p */
for (p = strchr(path + 1, '/'); p; p = strchr(p + 1, '/')) {
*p = '\0';
ret = file_directory_exists(path, sftp);
pr_debug("check %s ret=%d\n", path, ret);
if (ret < -1)
return -1;
if (ret == 1)
goto next;
pr_debug("mkdir %s\n", path);
if (sftp) {
ret = sftp_mkdir(sftp, path, mode);
if (ret < 0 &&
sftp_get_error(sftp) != SSH_FX_FILE_ALREADY_EXISTS) {
pr_err("failed to create %s: %s\n",
path, sftp_get_ssh_error(sftp));
return -1;
}
} else {
if (mkdir(path, mode) == -1 && errno != EEXIST) {
pr_err("failed to create %s: %s\n",
path, strerrno());
return -1;
}
}
next:
*p = '/';
}
/* open file with O_TRUNC to set file size 0 */
mode = O_WRONLY|O_CREAT|O_TRUNC;
if (sftp) {
sftp_file sf;
if ((sf = sftp_open(sftp, f->dst_path, mode, S_IRUSR|S_IWUSR)) == NULL) {
pr_err("sftp_open: %s\n", sftp_get_ssh_error(sftp));
return -1;
}
if (sftp_close(sf) < 0) {
pr_err("sftp_close: %s\n", sftp_get_ssh_error(sftp));
return -1;
}
} else {
int fd;
if ((fd = open(f->dst_path, mode, S_IRUSR|S_IWUSR)) < 0) {
pr_err("open: %s\n", strerrno());
return -1;
}
if (close(fd) < 0) {
pr_err("close: %s\n", strerrno());
return -1;
}
}
return 0;
}
#ifdef DEBUG
void file_dump(struct list_head *file_list)
{
struct file *f;
list_for_each_entry(f, file_list, list) {
pr_debug("%s %s -> %s %s %lu-byte\n",
f->src_path, strloc(f->src_is_remote),
f->dst_path, strloc(f->dst_is_remote),
f->size);
}
}
#endif
static void *chunk_alloc(struct file *f)
{
struct chunk *c;
c = malloc(sizeof(*c));
if (!c) {
pr_err("%s\n", strerrno());
return NULL;
}
memset(c, 0, sizeof(*c));
c->f = f;
c->off = 0;
c->len = 0;
refcnt_inc(&f->refcnt);
return c;
}
static int get_page_mask(void)
{
int n;
long page_sz = sysconf(_SC_PAGESIZE);
size_t page_mask = 0;
for (n = 0; page_sz > 0; page_sz >>= 1, n++) {
page_mask <<= 1;
page_mask |= 1;
}
return page_mask >> 1;
}
int chunk_fill(struct list_head *file_list, struct list_head *chunk_list,
int nr_conn, int min_chunk_sz, int max_chunk_sz)
{
struct chunk *c;
struct file *f;
size_t page_mask;
size_t chunk_sz;
size_t size;
page_mask = get_page_mask();
list_for_each_entry(f, file_list, list) {
if (f->size <= min_chunk_sz)
chunk_sz = f->size;
else if (max_chunk_sz)
chunk_sz = max_chunk_sz;
else {
chunk_sz = (f->size - (f->size % nr_conn)) / nr_conn;
chunk_sz &= ~page_mask; /* align with page_sz */
if (chunk_sz <= min_chunk_sz)
chunk_sz = min_chunk_sz;
}
pr_debug("%s chunk_sz %lu-byte\n", f->src_path, chunk_sz);
/* for (size = f->size; size > 0;) does not create a
* file (chunk) when file size is 0. This do {} while
* (size > 0) creates just open/close a 0-byte file.
*/
size = f->size;
do {
c = chunk_alloc(f);
if (!c)
return -1;
c->off = f->size - size;
c->len = size < chunk_sz ? size : chunk_sz;
size -= c->len;
list_add_tail(&c->list, chunk_list);
pprint4("chunk %s 0x%010lx-0x%010lx %luB\n",
c->f->src_path, c->off, c->off + c->len, c->len);
} while (size > 0);
}
return 0;
}
#ifdef DEBUG
void chunk_dump(struct list_head *chunk_list)
{
struct chunk *c;
list_for_each_entry(c, chunk_list, list) {
pr_debug("%s %s 0x%010lx-0x%010lx %lu-byte\n",
c->f->src_path, strloc(c->f->src_is_remote),
c->off, c->off + c->len, c->len);
}
}
#endif
struct chunk *chunk_acquire(struct list_head *chunk_list)
{
/* under the lock for chunk_list */
struct list_head *first = chunk_list->next;
struct chunk *c = NULL;
if (list_empty(chunk_list))
return NULL; /* list is empty */
c = list_entry(first, struct chunk, list);
list_del(first);
return c;
}
int chunk_prepare(struct chunk *c, sftp_session sftp)
{
struct file *f = c->f;
int ret = 0;
LOCK_ACQUIRE_THREAD(&f->lock);
if (f->state == FILE_STATE_INIT) {
if (file_dst_prepare(f, f->dst_is_remote ? sftp : NULL) < 0) {
ret = -1;
goto out;
}
f->state = FILE_STATE_OPENED;
pprint2("copy start: %s\n", f->src_path);
}
out:
LOCK_RELEASE_THREAD();
return ret;
}
static mode_t file_get_mode(const char *path, sftp_session sftp)
{
mode_t mode;
if (sftp) {
sftp_attributes attr = sftp_stat(sftp, path);
if (!attr) {
pr_err("sftp_stat %s: %s\n", path, sftp_get_ssh_error(sftp));
return -1;
}
mode = attr->permissions;
sftp_attributes_free(attr);
} else {
struct stat statbuf;
if (stat(path, &statbuf) < 0) {
pr_err("stat %s: %s\n", path, strerrno());
return -1;
}
mode = statbuf.st_mode & (S_IRWXU|S_IRWXG|S_IRWXO);
}
return mode;
}
static int file_set_mode(const char *path, mode_t mode, sftp_session sftp)
{
if (sftp) {
if (sftp_chmod(sftp, path, mode) < 0) {
pr_err("sftp_chmod %s: %s\n", path, sftp_get_ssh_error(sftp));
return -1;
}
} else {
if (chmod(path, mode) < 0) {
pr_err("chmod %s: %s\n", path, strerrno());
return -1;
}
}
return 0;
}
static int chunk_open_local(const char *path, int flags, mode_t mode, size_t off)
{
int fd;
fd = open(path, flags, mode);
if (fd < 0) {
pr_err("open failed for %s: %s\n", path, strerrno());
return -1;
}
if (lseek(fd, off, SEEK_SET) < 0) {
pr_err("seek error for %s: %s\n", path, strerrno());
close(fd);
return -1;
}
return fd;
}
static sftp_file chunk_open_remote(const char *path, int flags, mode_t mode, size_t off,
sftp_session sftp)
{
sftp_file sf;
sf = sftp_open(sftp, path, flags, mode);
if (!sf) {
pr_err("sftp_open %s: %s\n", path, sftp_get_ssh_error(sftp));
return NULL;
}
if (sftp_seek64(sf, off) < 0) {
pr_err("sftp_seek64 %s: %s\n", path, sftp_get_ssh_error(sftp));
return NULL;
}
return sf;
}
static ssize_t read_to_buf(void *ptr, size_t len, void *userdata)
{
int fd = *((int *)userdata);
return read(fd, ptr, len);
}
static int chunk_copy_local_to_remote_async(struct chunk *c, int fd, sftp_file sf,
int nr_ahead, int buf_sz, size_t *counter)
{
ssize_t read_bytes, remaind, thrown;
int idx, ret;
struct {
uint32_t id;
ssize_t len;
} reqs[nr_ahead];
if (c->len == 0)
return 0;
remaind = thrown = c->len;
for (idx = 0; idx < nr_ahead && thrown > 0; idx++) {
reqs[idx].len = min(thrown, buf_sz);
reqs[idx].len = sftp_async_write(sf, read_to_buf, reqs[idx].len, &fd,
&reqs[idx].id);
if (reqs[idx].len < 0) {
pr_err("sftp_async_write: %d or %s\n",
sftp_get_error(sf->sftp), strerrno());
return -1;
}
thrown -= reqs[idx].len;
}
for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
ret = sftp_async_write_end(sf, reqs[idx].id, 1);
if (ret != SSH_OK) {
pr_err("sftp_async_write_end: %d\n", sftp_get_error(sf->sftp));
return -1;
}
*counter += reqs[idx].len;
remaind -= reqs[idx].len;
if (remaind <= 0)
break;
if (thrown <= 0)
continue;
reqs[idx].len = min(thrown, buf_sz);
reqs[idx].len = sftp_async_write(sf, read_to_buf, reqs[idx].len, &fd,
&reqs[idx].id);
if (reqs[idx].len < 0) {
pr_err("sftp_async_write: %d or %s\n",
sftp_get_error(sf->sftp), strerrno());
return -1;
}
thrown -= reqs[idx].len;
}
if (remaind < 0) {
pr_err("invalid remaind bytes %ld. last async_write_end bytes %lu.",
remaind, reqs[idx].len);
return -1;
}
return 0;
}
static int chunk_copy_remote_to_local_async(struct chunk *c, int fd, sftp_file sf,
int nr_ahead, int buf_sz, size_t *counter)
{
ssize_t read_bytes, write_bytes, remaind, thrown;
char buf[buf_sz];
int idx;
struct {
int id;
ssize_t len;
} reqs[nr_ahead];
if (c->len == 0)
return 0;
remaind = thrown = c->len;
for (idx = 0; idx < nr_ahead && thrown > 0; idx++) {
reqs[idx].len = min(thrown, sizeof(buf));
reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len);
if (reqs[idx].id < 0) {
pr_err("sftp_async_read_begin: %d\n",
sftp_get_error(sf->sftp));
return -1;
}
thrown -= reqs[idx].len;
}
for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
read_bytes = sftp_async_read(sf, buf, reqs[idx].len, reqs[idx].id);
if (read_bytes == SSH_ERROR) {
pr_err("sftp_async_read: %d\n", sftp_get_error(sf->sftp));
return -1;
}
if (thrown > 0) {
reqs[idx].len = min(thrown, sizeof(buf));
reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len);
thrown -= reqs[idx].len;
}
write_bytes = write(fd, buf, read_bytes);
if (write_bytes < 0) {
pr_err("write: %s\n", strerrno());
return -1;
}
if (write_bytes < read_bytes) {
pr_err("failed to write full bytes\n");
return -1;
}
*counter += write_bytes;
remaind -= read_bytes;
}
if (remaind < 0) {
pr_err("invalid remaind bytes %ld. last async_read bytes %ld. "
"last write bytes %ld\n",
remaind, read_bytes, write_bytes);
return -1;
}
return 0;
}
static int chunk_copy_local_to_remote(struct chunk *c, sftp_session sftp,
int nr_ahead, int buf_sz, size_t *counter)
{
struct file *f = c->f;
sftp_file sf = NULL;
mode_t mode;
int ret = 0;
int fd = 0;
int flags;
flags = O_RDONLY;
mode = S_IRUSR;
if ((fd = chunk_open_local(f->src_path, flags, mode, c->off)) < 0) {
ret = -1;
goto out;
}
flags = O_WRONLY;
mode = S_IRUSR|S_IWUSR;
if (!(sf = chunk_open_remote(f->dst_path, flags, mode, c->off, sftp))) {
ret = -1;
goto out;
}
ret = chunk_copy_local_to_remote_async(c, fd, sf, nr_ahead, buf_sz, counter);
if (ret < 0)
goto out;
out:
if (fd > 0)
close(fd);
if (sf)
sftp_close(sf);
return ret;
}
static int chunk_copy_remote_to_local(struct chunk *c, sftp_session sftp,
int nr_ahead, int buf_sz, size_t *counter)
{
struct file *f = c->f;
sftp_file sf = NULL;
mode_t mode;
int flags;
int fd = 0;
int ret = 0;
flags = O_WRONLY;
mode = S_IRUSR|S_IWUSR;
if ((fd = chunk_open_local(f->dst_path, flags, mode, c->off)) < 0) {
ret = -1;
goto out;
}
flags = O_RDONLY;
mode = S_IRUSR;
if (!(sf = chunk_open_remote(f->src_path, flags, mode, c->off, sftp))) {
ret = -1;
goto out;
}
ret = chunk_copy_remote_to_local_async(c, fd, sf, nr_ahead, buf_sz, counter);
if (ret< 0)
goto out;
out:
if (fd > 0)
close(fd);
if (sf)
sftp_close(sf);
return ret;
}
static int file_cleanup(struct file *f, sftp_session sftp)
{
sftp_session s, d;
mode_t mode;
if (f->dst_is_remote) {
s = NULL;
d = sftp;
} else {
s = sftp;
d = NULL;
}
if ((mode = file_get_mode(f->src_path, s)) < 0)
return -1;
if (file_set_mode(f->dst_path, mode, d) < 0)
return -1;
return 0;
}
int chunk_copy(struct chunk *c, sftp_session sftp, int nr_ahead, int buf_sz,
size_t *counter)
{
struct file *f = c->f;
int ret = 0;
pprint4("copy start: chunk %s 0x%010lx-0x%010lx %luB\n",
c->f->src_path, c->off, c->off + c->len, c->len);
if (f->dst_is_remote)
ret = chunk_copy_local_to_remote(c, sftp, nr_ahead, buf_sz, counter);
else
ret = chunk_copy_remote_to_local(c, sftp, nr_ahead, buf_sz, counter);
if (ret < 0)
return ret;
pprint4("copy done: chunk %s 0x%010lx-0x%010lx %luB\n",
c->f->src_path, c->off, c->off + c->len, c->len);
if (refcnt_dec(&f->refcnt) == 0) {
f->state = FILE_STATE_DONE;
pprint2("copy done: %s\n", f->src_path);
ret = file_cleanup(f, sftp);
}
return ret;
}

View File

@@ -1,85 +0,0 @@
#ifndef _FILE_H_
#define _FILE_H_
#include <limits.h>
#include <pthread.h>
#include "libssh/libssh.h"
#include "libssh/sftp.h"
#include <list.h>
#include <atomic.h>
struct file {
struct list_head list; /* mscp->file_list */
char src_path[PATH_MAX]; /* copy source path */
bool src_is_remote; /* source is remote */
size_t size; /* size of this file */
char dst_path[PATH_MAX]; /* copy destination path */
bool dst_is_remote; /* destination is remote */
int state; /* destination file state */
lock lock; /* mutex to protect state */
refcnt refcnt; /* chunks referencing this file */
};
#define FILE_STATE_INIT 0
#define FILE_STATE_OPENED 1
#define FILE_STATE_DONE 2
#define strloc(is_remote) is_remote ? "(remote)" : "(local)"
/* Allocating chunk increments refcnt of the associating file.
* Multiple threads copying files follows:
*
* acquire a chunk (inside a global lock)
*
* if the file state of the chunk is INIT:
* acquire the file lock
* * if file state is INIT:
* create directory if necessary
* open file with O_TRUNC and close.
* set file state OPENED.
* // only the first thread in the lock open the destination file
* release the file lock
* endif
*
* copy the chunk to the destination.
* decrement the refcnt of the file.
*
* if refcnt == 0:
* all chunks are copied.
* set the file state DONE, print something useful output.
* endif
*/
struct chunk {
struct list_head list; /* mscp->chunk_list */
struct file *f;
size_t off; /* offset of this chunk on the file f */
size_t len; /* length of this chunk */
size_t done; /* copied bytes for this chunk by a thread */
};
char *file_find_hostname(char *path);
bool file_has_hostname(char *path);
int file_fill(sftp_session sftp, struct list_head *file_list, char **src_array, int cnt,
char *dst);
int chunk_fill(struct list_head *file_list, struct list_head *chunk_list,
int nr_conn, int min_chunk_sz, int max_chunk_sz);
struct chunk *chunk_acquire(struct list_head *chunk_list);
int chunk_prepare(struct chunk *c, sftp_session sftp);
int chunk_copy(struct chunk *c, sftp_session sftp, int nr_ahead, int buf_sz,
size_t *counter);
#ifdef DEBUG
void file_dump(struct list_head *file_list);
void chunk_dump(struct list_head *chunk_list);
#endif
#endif /* _FILE_H_ */

View File

@@ -208,6 +208,32 @@ static inline void list_splice(struct list_head *list, struct list_head *head)
__list_splice(list, head);
}
static inline void __list_splice_tail(struct list_head *list,
struct list_head *head)
{
struct list_head *first = list->next;
struct list_head *last = list->prev;
struct list_head *at = head->prev;
first->prev = at;
at->next = first;
last->next = head;
at->prev = last;
}
/**
* list_splice_tail - join two lists
* @list: the new list to add.
* @head: the place to add it in the first list.
*/
static inline void list_splice_tail(struct list_head *list, struct list_head *head)
{
if (!list_empty(list))
__list_splice_tail(list, head);
}
/**
* list_splice_init - join two lists and reinitialise the emptied list.
* @list: the new list to add.

File diff suppressed because it is too large Load Diff

58
src/message.c Normal file
View File

@@ -0,0 +1,58 @@
#include <stdio.h>
#include <stdarg.h>
#include <string.h>
#include <limits.h>
#include <pthread.h>
#include <message.h>
/* mscp error message buffer */
#define MSCP_ERRMSG_SIZE (PATH_MAX * 2)
static char errmsg[MSCP_ERRMSG_SIZE];
void _mscp_set_error(const char *fmt, ...)
{
va_list va;
memset(errmsg, 0, sizeof(errmsg));
va_start(va, fmt);
vsnprintf(errmsg, sizeof(errmsg) - 1, fmt, va);
va_end(va);
}
const char *mscp_get_error()
{
return errmsg;
}
/* message print functions */
static int mprint_serverity = MSCP_SEVERITY_WARN;
static pthread_mutex_t mprint_lock = PTHREAD_MUTEX_INITIALIZER;
void mprint_set_severity(int serverity)
{
if (serverity < 0)
mprint_serverity = -1; /* no print */
mprint_serverity = serverity;
}
void mprint(int fd, int serverity, const char *fmt, ...)
{
va_list va;
int ret;
if (fd < 0)
return;
if (serverity <= mprint_serverity) {
pthread_mutex_lock(&mprint_lock);
va_start(va, fmt);
vdprintf(fd, fmt, va);
va_end(va);
pthread_mutex_unlock(&mprint_lock);
}
}

31
src/message.h Normal file
View File

@@ -0,0 +1,31 @@
#ifndef _MESSAGE_H_
#define _MESSAGE_H_
#include <libgen.h>
#include <mscp.h>
/* message print. printed messages are passed to application via msg_fd */
void mprint_set_severity(int severity);
void mprint(int fd, int severity, const char *fmt, ...);
#define mpr_err(fd, fmt, ...) \
mprint(fd, MSCP_SEVERITY_ERR, fmt, ##__VA_ARGS__)
#define mpr_warn(fd, fmt, ...) \
mprint(fd, MSCP_SEVERITY_WARN, fmt, ##__VA_ARGS__)
#define mpr_notice(fd, fmt, ...) \
mprint(fd, MSCP_SEVERITY_NOTICE, fmt, ##__VA_ARGS__)
#define mpr_info(fd, fmt, ...) \
mprint(fd, MSCP_SEVERITY_INFO, fmt, ##__VA_ARGS__)
#define mpr_debug(fd, fmt, ...) \
mprint(fd, MSCP_SEVERITY_DEBUG, fmt, ##__VA_ARGS__)
/* error message buffer */
#define mscp_set_error(fmt, ...) \
_mscp_set_error("%s:%d:%s: " fmt, \
basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
void _mscp_set_error(const char *fmt, ...);
#endif /* _MESSAGE_H_ */

618
src/mscp.c Normal file
View File

@@ -0,0 +1,618 @@
#include <stdbool.h>
#include <unistd.h>
#include <math.h>
#include <pthread.h>
#include <list.h>
#include <util.h>
#include <ssh.h>
#include <path.h>
#include <atomic.h>
#include <platform.h>
#include <message.h>
#include <mscp.h>
struct mscp {
char *remote; /* remote host (and uername) */
int direction; /* copy direction */
struct mscp_opts *opts;
struct mscp_ssh_opts *ssh_opts;
int msg_fd; /* writer fd for message pipe */
int *cores; /* usable cpu cores by COREMASK */
int nr_cores; /* length of array of cores */
sftp_session first; /* first sftp session */
char dst_path[PATH_MAX];
struct list_head src_list;
struct list_head path_list;
struct list_head chunk_list;
lock chunk_lock;
size_t total_bytes; /* total bytes to be transferred */
struct mscp_thread *threads;
};
struct mscp_thread {
struct mscp *m;
sftp_session sftp;
pthread_t tid;
int cpu;
size_t done;
bool finished;
int ret;
};
struct src {
struct list_head list;
char *path;
};
#define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */
#define DEFAULT_NR_AHEAD 32
#define DEFAULT_BUF_SZ 16384
/* XXX: we use 16384 byte buffer pointed by
* https://api.libssh.org/stable/libssh_tutor_sftp.html. The larget
* read length from sftp_async_read is 65536 byte. Read sizes larger
* than 65536 cause a situation where data remainds but
* sftp_async_read returns 0.
*/
#define non_null_string(s) (s[0] != '\0')
static int expand_coremask(const char *coremask, int **cores, int *nr_cores)
{
int n, *core_list, core_list_len = 0, nr_usable, nr_all;
char c[2] = { 'x', '\0' };
const char *_coremask;
long v, needle;
int ncores = nr_cpus();
/*
* This function returns array of usable cores in `cores` and
* returns the number of usable cores (array length) through
* nr_cores.
*/
if (strncmp(coremask, "0x", 2) == 0)
_coremask = coremask + 2;
else
_coremask = coremask;
core_list = realloc(NULL, sizeof(int) * 64);
if (!core_list) {
mscp_set_error("failed to realloc: %s", strerrno());
return -1;
}
nr_usable = 0;
nr_all = 0;
for (n = strlen(_coremask) - 1; n >=0; n--) {
c[0] = _coremask[n];
v = strtol(c, NULL, 16);
if (v == LONG_MIN || v == LONG_MAX) {
mscp_set_error("invalid coremask: %s", coremask);
return -1;
}
for (needle = 0x01; needle < 0x10; needle <<= 1) {
nr_all++;
if (nr_all > ncores)
break; /* too long coremask */
if (v & needle) {
nr_usable++;
core_list = realloc(core_list, sizeof(int) * nr_usable);
if (!core_list) {
mscp_set_error("realloc: %s", strerrno());
return -1;
}
core_list[nr_usable - 1] = nr_all - 1;
}
}
}
if (nr_usable < 1) {
mscp_set_error("invalid core mask: %s", coremask);
return -1;
}
*cores = core_list;
*nr_cores = nr_usable;
return 0;
}
static int default_nr_threads()
{
return (int)(floor(log(nr_cpus()) * 2) + 1);
}
static int validate_and_set_defaut_params(struct mscp_opts *o)
{
if (o->nr_threads < 0) {
mscp_set_error("invalid nr_threads: %d", o->nr_threads);
return -1;
} else if (o->nr_threads == 0)
o->nr_threads = default_nr_threads();
if (o->nr_ahead < 0) {
mscp_set_error("invalid nr_ahead: %d", o->nr_ahead);
return -1;
} else if (o->nr_ahead == 0)
o->nr_ahead = DEFAULT_NR_AHEAD;
if (o->min_chunk_sz == 0)
o->min_chunk_sz = DEFAULT_MIN_CHUNK_SZ;
else {
if (o->min_chunk_sz < getpagesize() ||
o->min_chunk_sz % getpagesize() != 0) {
mscp_set_error("min chunk size must be "
"larget than and multiple of page size %d: %lu",
getpagesize(), o->min_chunk_sz);
return -1;
}
}
if (o->max_chunk_sz) {
if (o->max_chunk_sz < getpagesize() ||
o->max_chunk_sz % getpagesize() != 0) {
mscp_set_error("min chunk size must be larget than and "
"multiple of page size %d: %lu",
getpagesize(), o->max_chunk_sz);
}
if (o->min_chunk_sz > o->max_chunk_sz) {
mscp_set_error("smaller max chunk size than "
"min chunk size: %lu < %lu",
o->max_chunk_sz, o->min_chunk_sz);
return -1;
}
}
if (o->buf_sz == 0)
o->buf_sz = DEFAULT_BUF_SZ;
else if (o->buf_sz == 0) {
mscp_set_error("invalid buf size: %lu", o->buf_sz);
return -1;
}
return 0;
}
struct mscp *mscp_init(const char *remote_host, int direction,
struct mscp_opts *o, struct mscp_ssh_opts *s)
{
struct mscp *m;
int n;
if (!remote_host) {
mscp_set_error("empty remote host\n");
return NULL;
}
if (!(direction == MSCP_DIRECTION_L2R ||
direction == MSCP_DIRECTION_R2L)) {
mscp_set_error("invalid copy direction: %d", direction);
return NULL;
}
m = malloc(sizeof(*m));
if (!m) {
mscp_set_error("failed to allocate memory: %s", strerrno());
return NULL;
}
mprint_set_severity(o->severity);
if (validate_and_set_defaut_params(o) < 0)
goto free_out;
memset(m, 0, sizeof(*m));
INIT_LIST_HEAD(&m->src_list);
INIT_LIST_HEAD(&m->path_list);
INIT_LIST_HEAD(&m->chunk_list);
lock_init(&m->chunk_lock);
m->remote = strdup(remote_host);
if (!m->remote) {
mscp_set_error("failed to allocate memory: %s", strerrno());
goto free_out;
}
m->direction = direction;
m->msg_fd = o->msg_fd;
if (strlen(o->coremask) > 0) {
if (expand_coremask(o->coremask, &m->cores, &m->nr_cores) < 0)
goto free_out;
mpr_notice(m->msg_fd, "usable cpu cores:");
for (n = 0; n < m->nr_cores; n++)
mpr_notice(m->msg_fd, " %d", m->cores[n]);
mpr_notice(m->msg_fd, "\n");
}
m->opts = o;
m->ssh_opts = s;
return m;
free_out:
free(m);
return NULL;
}
void mscp_set_msg_fd(struct mscp *m, int fd)
{
m->msg_fd = fd;
}
int mscp_connect(struct mscp *m)
{
m->first = ssh_init_sftp_session(m->remote, m->ssh_opts);
if (!m->first)
return -1;
return 0;
}
int mscp_add_src_path(struct mscp *m, const char *src_path)
{
struct src *s;
s = malloc(sizeof(*s));
if (!s) {
mscp_set_error("failed to allocate memory: %s", strerrno());
return -1;
}
memset(s, 0, sizeof(*s));
s->path = strdup(src_path);
if (!s->path) {
mscp_set_error("failed to allocate memory: %s", strerrno());
free(s);
return -1;
}
list_add_tail(&s->list, &m->src_list);
return 0;
}
int mscp_set_dst_path(struct mscp *m, const char *dst_path)
{
if (strlen(dst_path) + 1 >= PATH_MAX) {
mscp_set_error("too long dst path: %s", dst_path);
return -1;
}
if (!non_null_string(dst_path))
strncpy(m->dst_path, ".", 1);
else
strncpy(m->dst_path, dst_path, PATH_MAX);
return 0;
}
int mscp_prepare(struct mscp *m)
{
sftp_session src_sftp = NULL, dst_sftp = NULL;
bool src_path_is_dir, dst_path_is_dir, dst_path_should_dir;
struct list_head tmp;
struct path *p;
struct src *s;
mstat ss, ds;
src_path_is_dir = dst_path_is_dir = dst_path_should_dir = false;
switch (m->direction) {
case MSCP_DIRECTION_L2R:
src_sftp = NULL;
dst_sftp = m->first;
break;
case MSCP_DIRECTION_R2L:
src_sftp = m->first;
dst_sftp = NULL;
break;
default:
mscp_set_error("invalid copy direction: %d", m->direction);
return -1;
}
if (list_count(&m->src_list) > 1)
dst_path_should_dir = true;
if (mscp_stat(m->dst_path, &ds, dst_sftp) == 0) {
if (mstat_is_dir(ds))
dst_path_is_dir = true;
mscp_stat_free(ds);
}
/* walk a src_path recusively, and resolve path->dst_path for each src */
list_for_each_entry(s, &m->src_list, list) {
if (mscp_stat(s->path, &ss, src_sftp) < 0) {
mscp_set_error("stat: %s", mscp_strerror(src_sftp));
return -1;
}
src_path_is_dir = mstat_is_dir(ss);
mscp_stat_free(ss);
INIT_LIST_HEAD(&tmp);
if (walk_src_path(src_sftp, s->path, &tmp) < 0)
return -1;
if (list_count(&tmp) > 1)
dst_path_should_dir = true;
if (resolve_dst_path(m->msg_fd, s->path, m->dst_path, &tmp,
src_path_is_dir, dst_path_is_dir,
dst_path_should_dir) < 0)
return -1;
list_splice_tail(&tmp, m->path_list.prev);
}
if (resolve_chunk(&m->path_list, &m->chunk_list, m->opts->nr_threads,
m->opts->min_chunk_sz, m->opts->max_chunk_sz) < 0)
return -1;
/* save total bytes to be transferred */
m->total_bytes = 0;
list_for_each_entry(p, &m->path_list, list) {
m->total_bytes += p->size;
}
return 0;
}
void mscp_stop(struct mscp *m)
{
int n;
pr("stopping...\n");
for (n = 0; n < m->opts->nr_threads; n++) {
if (m->threads[n].tid && !m->threads[n].finished)
pthread_cancel(m->threads[n].tid);
}
}
static void *mscp_copy_thread(void *arg);
int mscp_start(struct mscp *m)
{
int n, ret;
if ((n = list_count(&m->chunk_list)) < m->opts->nr_threads) {
mpr_notice(m->msg_fd, "we have only %d chunk(s). "
"set number of connections to %d\n", n, n);
m->opts->nr_threads = n;
}
/* prepare thread instances */
m->threads = calloc(m->opts->nr_threads, sizeof(struct mscp_thread));
memset(m->threads, 0, m->opts->nr_threads * sizeof(struct mscp_thread));
for (n = 0; n < m->opts->nr_threads; n++) {
struct mscp_thread *t = &m->threads[n];
t->m = m;
if (!m->cores)
t->cpu = -1;
else
t->cpu = m->cores[n % m->nr_cores];
if (n == 0) {
t->sftp = m->first; /* reuse first sftp session */
m->first = NULL;
}
else {
mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n",
m->remote);
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
if (!t->sftp)
return -1;
}
}
/* spawn copy threads */
for (n = 0; n < m->opts->nr_threads; n++) {
struct mscp_thread *t = &m->threads[n];
ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
if (ret < 0) {
mscp_set_error("pthread_create error: %d", ret);
mscp_stop(m);
return -1;
}
}
return 0;
}
int mscp_join(struct mscp *m)
{
int n, ret = 0;
/* waiting for threads join... */
for (n = 0; n < m->opts->nr_threads; n++) {
if (m->threads[n].tid) {
pthread_join(m->threads[n].tid, NULL);
if (m->threads[n].ret < 0)
ret = m->threads[n].ret;
}
}
if (m->first) {
ssh_sftp_close(m->first);
m->first = NULL;
}
if (m->threads) {
for (n = 0; n < m->opts->nr_threads; n++) {
struct mscp_thread *t = &m->threads[n];
if (t->ret != 0)
ret = ret;
if (t->sftp) {
ssh_sftp_close(t->sftp);
t->sftp = NULL;
}
}
}
return ret;
}
/* copy thread related functions */
struct chunk *acquire_chunk(struct list_head *chunk_list)
{
/* under the lock for chunk_list */
struct list_head *first = chunk_list->next;
struct chunk *c = NULL;
if (list_empty(chunk_list))
return NULL; /* list is empty */
c = list_entry(first, struct chunk, list);
list_del(first);
return c;
}
static void mscp_copy_thread_cleanup(void *arg)
{
struct mscp_thread *t = arg;
t->finished = true;
}
void *mscp_copy_thread(void *arg)
{
sftp_session src_sftp, dst_sftp;
struct mscp_thread *t = arg;
struct mscp *m = t->m;
struct chunk *c;
switch (m->direction) {
case MSCP_DIRECTION_L2R:
src_sftp = NULL;
dst_sftp = t->sftp;
break;
case MSCP_DIRECTION_R2L:
src_sftp = t->sftp;
dst_sftp = NULL;
break;
default:
return NULL; /* not reached */
}
if (t->cpu > -1) {
if (set_thread_affinity(pthread_self(), t->cpu) < 0)
return NULL;
}
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
while (1) {
LOCK_ACQUIRE_THREAD(&m->chunk_lock);
c = acquire_chunk(&m->chunk_list);
LOCK_RELEASE_THREAD();
if (!c)
break; /* no more chunks */
if ((t->ret = copy_chunk(m->msg_fd,
c, src_sftp, dst_sftp, m->opts->nr_ahead,
m->opts->buf_sz, &t->done)) < 0)
break;
}
pthread_cleanup_pop(1);
if (t->ret < 0)
mscp_set_error("copy failed: chunk %s 0x%010lx-0x%010lx",
c->p->path, c->off, c->off + c->len);
return NULL;
}
/* cleanup related functions */
static void release_list(struct list_head *head, void (*f)(struct list_head*))
{
struct list_head *p, *n;
list_for_each_safe(p, n, head) {
list_del(p);
f(p);
}
}
static void free_src(struct list_head *list)
{
struct src *s;
s = list_entry(list, typeof(*s), list);
free(s->path);
free(s);
}
static void free_path(struct list_head *list)
{
struct path *p;
p = list_entry(list, typeof(*p), list);
free(p);
}
static void free_chunk(struct list_head *list)
{
struct chunk *c;
c = list_entry(list, typeof(*c), list);
free(c);
}
void mscp_cleanup(struct mscp *m)
{
if (m->first) {
ssh_sftp_close(m->first);
m->first = NULL;
}
release_list(&m->src_list, free_src);
INIT_LIST_HEAD(&m->src_list);
release_list(&m->chunk_list, free_chunk);
INIT_LIST_HEAD(&m->chunk_list);
release_list(&m->path_list, free_path);
INIT_LIST_HEAD(&m->path_list);
if (m->threads) {
free(m->threads);
m->threads = NULL;
}
}
void mscp_free(struct mscp *m)
{
mscp_cleanup(m);
if (m->remote)
free(m->remote);
if (m->cores)
free(m->cores);
free(m);
}
void mscp_get_stats(struct mscp *m, struct mscp_stats *s)
{
bool finished = true;
int n;
s->total = m->total_bytes;
for (s->done = 0, n = 0; n < m->opts->nr_threads; n++) {
s->done += m->threads[n].done;
if (!m->threads[n].done)
finished = false;
}
s->finished = finished;
}

534
src/path.c Normal file
View File

@@ -0,0 +1,534 @@
#include <string.h>
#include <unistd.h>
#include <dirent.h>
#include <sys/stat.h>
#include <libgen.h>
#include <assert.h>
#include <ssh.h>
#include <util.h>
#include <list.h>
#include <atomic.h>
#include <path.h>
#include <message.h>
static int append_path(sftp_session sftp, const char *path, mstat s,
struct list_head *path_list)
{
struct path *p;
if (!(p = malloc(sizeof(*p)))) {
mscp_set_error("failed to allocate memory: %s", strerrno());
return -1;
}
memset(p, 0, sizeof(*p));
INIT_LIST_HEAD(&p->list);
strncpy(p->path, path, PATH_MAX - 1);
p->size = mstat_size(s);
p->mode = mstat_mode(s);
p->state = FILE_STATE_INIT;
lock_init(&p->lock);
list_add_tail(&p->list, path_list);
return 0;
}
static bool check_path_should_skip(const char *path)
{
int len = strlen(path);
if ((len == 1 && strncmp(path, ".", 1) == 0) ||
(len == 2 && strncmp(path, "..", 2) == 0)) {
return true;
}
return false;
}
static int walk_path_recursive(sftp_session sftp, const char *path,
struct list_head *path_list)
{
char next_path[PATH_MAX];
mdirent *e;
mdir *d;
mstat s;
int ret;
if (mscp_stat(path, &s, sftp) < 0)
return -1;
if (mstat_is_regular(s)) {
/* this path is regular file. it is to be copied */
ret = append_path(sftp, path, s, path_list);
mscp_stat_free(s);
return ret;
}
if (!mstat_is_dir(s)) {
/* not regular file and not directory, skip it. */
mscp_stat_free(s);
return 0;
}
mscp_stat_free(s);
/* ok, this path is directory. walk it. */
if (!(d = mscp_opendir(path, sftp)))
return -1;
for (e = mscp_readdir(d); !mdirent_is_null(e); e = mscp_readdir(d)) {
if (check_path_should_skip(mdirent_name(e)))
continue;
if (strlen(path) + 1 + strlen(mdirent_name(e)) > PATH_MAX) {
mscp_set_error("too long path: %s/%s", path, mdirent_name(e));
return -1;
}
snprintf(next_path, sizeof(next_path), "%s/%s", path, mdirent_name(e));
ret = walk_path_recursive(sftp, next_path, path_list);
if (ret < 0)
return ret;
}
mscp_closedir(d);
return 0;
}
int walk_src_path(sftp_session src_sftp, const char *src_path,
struct list_head *path_list)
{
return walk_path_recursive(src_sftp, src_path, path_list);
}
static int src2dst_path(int msg_fd, const char *src_path, const char *src_file_path,
const char *dst_path, char *dst_file_path, size_t len,
bool src_path_is_dir, bool dst_path_is_dir,
bool dst_path_should_dir)
{
char copy[PATH_MAX];
char *prefix;
int offset;
strncpy(copy, src_path, PATH_MAX - 1);
prefix = dirname(copy);
if (!prefix) {
mscp_set_error("dirname: %s", strerrno());
return -1;
}
if (strlen(prefix) == 1 && prefix[0] == '.')
offset = 0;
else
offset = strlen(prefix) + 1;
if (!src_path_is_dir && !dst_path_is_dir) {
/* src path is file. dst path is (1) file, or (2) does not exist.
* In the second case, we need to put src under the dst.
*/
if (dst_path_should_dir)
snprintf(dst_file_path, len, "%s/%s",
dst_path, src_path + offset);
else
strncpy(dst_file_path, dst_path, len);
}
/* src is file, and dst is dir */
if (!src_path_is_dir && dst_path_is_dir)
snprintf(dst_file_path, len, "%s/%s", dst_path, src_path + offset);
/* both are directory */
if (src_path_is_dir && dst_path_is_dir)
snprintf(dst_file_path, len, "%s/%s", dst_path, src_file_path + offset);
/* dst path does not exist. change dir name to dst_path */
if (src_path_is_dir && !dst_path_is_dir)
snprintf(dst_file_path, len, "%s/%s",
dst_path, src_file_path + strlen(src_path) + 1);
mpr_info(msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path);
return 0;
}
int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path,
struct list_head *path_list, bool src_path_is_dir,
bool dst_path_is_dir, bool dst_path_should_dir)
{
struct path *p;
list_for_each_entry(p, path_list, list) {
if (src2dst_path(msg_fd, src_path, p->path,
dst_path, p->dst_path, PATH_MAX,
src_path_is_dir, dst_path_is_dir,
dst_path_should_dir) < 0)
return -1;
}
return 0;
}
void path_dump(struct list_head *path_list)
{
struct path *p;
list_for_each_entry(p, path_list, list) {
printf("src: %s %lu-byte\n", p->path, p->size);
printf("dst: %s\n", p->dst_path);
}
}
/* chunk preparation */
static struct chunk *alloc_chunk(struct path *p)
{
struct chunk *c;
if (!(c = malloc(sizeof(*c)))) {
mscp_set_error("malloc %s", strerrno());
return NULL;
}
memset(c, 0, sizeof(*c));
c->p = p;
c->off = 0;
c->len = 0;
refcnt_inc(&p->refcnt);
return c;
}
static int get_page_mask(void)
{
long page_sz = sysconf(_SC_PAGESIZE);
size_t page_mask = 0;
int n;
for (n = 0; page_sz > 0; page_sz >>= 1, n++) {
page_mask <<= 1;
page_mask |= 1;
}
return page_mask >> 1;
}
int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list,
int nr_conn, int min_chunk_sz, int max_chunk_sz)
{
struct chunk *c;
struct path *p;
size_t page_mask;
size_t chunk_sz;
size_t size;
page_mask = get_page_mask();
list_for_each_entry(p, path_list, list) {
if (p->size <= min_chunk_sz)
chunk_sz = p->size;
else if (max_chunk_sz)
chunk_sz = max_chunk_sz;
else {
chunk_sz = (p->size - (p->size % nr_conn)) / nr_conn;
chunk_sz &= ~page_mask; /* align with page_sz */
if (chunk_sz <= min_chunk_sz)
chunk_sz = min_chunk_sz;
}
/* for (size = f->size; size > 0;) does not create a
* file (chunk) when file size is 0. This do {} while
* (size > 0) creates just open/close a 0-byte file.
*/
size = p->size;
do {
c = alloc_chunk(p);
if (!c)
return -1;
c->off = p->size - size;
c->len = size < chunk_sz ? size : chunk_sz;
size -= c->len;
list_add_tail(&c->list, chunk_list);
} while (size > 0);
}
return 0;
}
void chunk_dump(struct list_head *chunk_list)
{
struct chunk *c;
list_for_each_entry(c, chunk_list, list) {
printf("chunk: %s 0x%lx-%lx bytes\n",
c->p->path, c->off, c->off + c->len);
}
}
/* based on
* https://stackoverflow.com/questions/2336242/recursive-mkdir-system-call-on-unix */
static int touch_dst_path(struct path *p, sftp_session sftp)
{
/* XXX: should reflect the permission of the original directory? */
mode_t mode = S_IRWXU | S_IRWXG | S_IRWXO;
char path[PATH_MAX];
char *needle;
int ret;
mfh h;
strncpy(path, p->dst_path, sizeof(path));
/* mkdir -p.
* XXX: this may be slow when dst is the remote side. need speed-up. */
for (needle = strchr(path + 1, '/'); needle; needle = strchr(needle + 1, '/')) {
*needle = '\0';
mstat s;
if (mscp_stat(path, &s, sftp) == 0) {
if (mstat_is_dir(s))
goto next; /* directory exists. go deeper */
else
return -1; /* path exists, but not directory. */
}
if (mscp_stat_check_err_noent(sftp) == 0) {
/* no file on the path. create directory. */
if (mscp_mkdir(path, mode, sftp) < 0) {
mscp_set_error("mkdir %s: %s", path,
mscp_strerror(sftp));
return -1;
}
}
next:
*needle = '/';
}
/* open file with O_TRUNC to set file size 0 */
h = mscp_open(p->dst_path, O_WRONLY|O_CREAT|O_TRUNC, S_IRUSR|S_IWUSR, 0, sftp);
if (mscp_open_is_failed(h))
return -1;
mscp_close(h);
return 0;
}
static int prepare_dst_path(int msg_fd, struct path *p, sftp_session dst_sftp)
{
int ret = 0;
LOCK_ACQUIRE_THREAD(&p->lock);
if (p->state == FILE_STATE_INIT) {
if (touch_dst_path(p, dst_sftp) < 0) {
ret = -1;
goto out;
}
p->state = FILE_STATE_OPENED;
mpr_info(msg_fd, "copy start: %s\n", p->path);
}
out:
LOCK_RELEASE_THREAD();
return ret;
}
/* functions for copy */
static ssize_t read_to_buf(void *ptr, size_t len, void *userdata)
{
int fd = *((int *)userdata);
return read(fd, ptr, len);
}
static int copy_chunk_l2r(struct chunk *c, int fd, sftp_file sf,
int nr_ahead, int buf_sz, size_t *counter)
{
ssize_t read_bytes, remaind, thrown;
int idx, ret;
struct {
uint32_t id;
ssize_t len;
} reqs[nr_ahead];
if (c->len == 0)
return 0;
remaind = thrown = c->len;
for (idx = 0; idx < nr_ahead && thrown > 0; idx++) {
reqs[idx].len = min(thrown, buf_sz);
reqs[idx].len = sftp_async_write(sf, read_to_buf, reqs[idx].len, &fd,
&reqs[idx].id);
if (reqs[idx].len < 0) {
mscp_set_error("sftp_async_write: %s or %s",
sftp_get_ssh_error(sf->sftp), strerrno());
return -1;
}
thrown -= reqs[idx].len;
}
for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
ret = sftp_async_write_end(sf, reqs[idx].id, 1);
if (ret != SSH_OK) {
mscp_set_error("sftp_async_write_end: %s",
sftp_get_ssh_error(sf->sftp));
return -1;
}
*counter += reqs[idx].len;
remaind -= reqs[idx].len;
if (remaind <= 0)
break;
if (thrown <= 0)
continue;
reqs[idx].len = min(thrown, buf_sz);
reqs[idx].len = sftp_async_write(sf, read_to_buf, reqs[idx].len, &fd,
&reqs[idx].id);
if (reqs[idx].len < 0) {
mscp_set_error("sftp_async_write: %s or %s",
sftp_get_ssh_error(sf->sftp), strerrno());
return -1;
}
thrown -= reqs[idx].len;
}
if (remaind < 0) {
mscp_set_error("invalid remaind bytes %ld. "
"last async_write_end bytes %lu.",
remaind, reqs[idx].len);
return -1;
}
return 0;
}
static int copy_chunk_r2l(struct chunk *c, sftp_file sf, int fd,
int nr_ahead, int buf_sz, size_t *counter)
{
ssize_t read_bytes, write_bytes, remaind, thrown;
char buf[buf_sz];
int idx;
struct {
int id;
ssize_t len;
} reqs[nr_ahead];
if (c->len == 0)
return 0;
remaind = thrown = c->len;
for (idx = 0; idx < nr_ahead && thrown > 0; idx++) {
reqs[idx].len = min(thrown, sizeof(buf));
reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len);
if (reqs[idx].id < 0) {
mscp_set_error("sftp_async_read_begin: %d",
sftp_get_error(sf->sftp));
return -1;
}
thrown -= reqs[idx].len;
}
for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
read_bytes = sftp_async_read(sf, buf, reqs[idx].len, reqs[idx].id);
if (read_bytes == SSH_ERROR) {
mscp_set_error("sftp_async_read: %d",
sftp_get_error(sf->sftp));
return -1;
}
if (thrown > 0) {
reqs[idx].len = min(thrown, sizeof(buf));
reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len);
thrown -= reqs[idx].len;
}
write_bytes = write(fd, buf, read_bytes);
if (write_bytes < 0) {
mscp_set_error("write: %s", strerrno());
return -1;
}
if (write_bytes < read_bytes) {
mscp_set_error("failed to write full bytes");
return -1;
}
*counter += write_bytes;
remaind -= read_bytes;
}
if (remaind < 0) {
mscp_set_error("invalid remaind bytes %ld. last async_read bytes %ld. "
"last write bytes %ld",
remaind, read_bytes, write_bytes);
return -1;
}
return 0;
}
static int _copy_chunk(struct chunk *c, mfh s, mfh d,
int nr_ahead, int buf_sz, size_t *counter)
{
if (s.fd > 0 && d.sf) /* local to remote copy */
return copy_chunk_l2r(c, s.fd, d.sf, nr_ahead, buf_sz, counter);
else if (s.sf && d.fd > 0) /* remote to local copy */
return copy_chunk_r2l(c, s.sf, d.fd, nr_ahead, buf_sz, counter);
assert(true); /* not reached */
return -1;
}
int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
int nr_ahead, int buf_sz, size_t *counter)
{
mode_t mode;
int flags;
mfh s, d;
int ret;
assert((src_sftp && !dst_sftp) || (!src_sftp && dst_sftp));
if (prepare_dst_path(msg_fd, c->p, dst_sftp) < 0)
return -1;
/* open src */
flags = O_RDONLY;
mode = S_IRUSR;
s = mscp_open(c->p->path, flags, mode, c->off, src_sftp);
if (mscp_open_is_failed(s)) {
mscp_close(d);
return -1;
}
/* open dst */
flags = O_WRONLY;
mode = S_IRUSR|S_IWUSR;
d = mscp_open(c->p->dst_path, flags, mode, c->off, dst_sftp);
if (mscp_open_is_failed(d))
return -1;
mpr_debug(msg_fd, "copy chunk start: %s 0x%lx-0x%lx\n",
c->p->path, c->off, c->off + c->len);
ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, counter);
mpr_debug(msg_fd, "copy chunk done: %s 0x%lx-0x%lx\n",
c->p->path, c->off, c->off + c->len);
mscp_close(d);
mscp_close(s);
if (ret < 0)
return ret;
if (refcnt_dec(&c->p->refcnt) == 0) {
c->p->state = FILE_STATE_DONE;
mscp_chmod(c->p->dst_path, c->p->mode, dst_sftp);
mpr_info(msg_fd, "copy done: %s\n", c->p->path);
}
return ret;
}

301
src/path.h Normal file
View File

@@ -0,0 +1,301 @@
#ifndef _PATH_H_
#define _PATH_H_
#include <limits.h>
#include <fcntl.h>
#include <dirent.h>
#include <sys/stat.h>
#include <list.h>
#include <atomic.h>
#include <ssh.h>
#include <message.h>
struct path {
struct list_head list; /* mscp->path_list */
char path[PATH_MAX]; /* file path */
size_t size; /* size of file on this path */
mode_t mode; /* permission */
char dst_path[PATH_MAX]; /* copy dst path */
int state;
lock lock;
refcnt refcnt;
};
#define FILE_STATE_INIT 0
#define FILE_STATE_OPENED 1
#define FILE_STATE_DONE 2
struct chunk {
struct list_head list; /* mscp->chunk_list */
struct path *p;
size_t off; /* offset of this chunk on the file on path p */
size_t len; /* length of this chunk */
size_t done; /* copied bytes for this chunk by a thread */
};
/* recursivly walk through src_path and fill path_list for each file */
int walk_src_path(sftp_session src_sftp, const char *src_path,
struct list_head *path_list);
/* fill path->dst_path for all files */
int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path,
struct list_head *path_list,
bool src_path_is_dir, bool dst_path_is_dir,
bool dst_path_should_dir);
/* resolve chunks from files in the path_list */
int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list,
int nr_conn, int min_chunk_sz, int max_chunk_sz);
/* copy a chunk. either src_sftp or dst_sftp is not null, and another is null */
int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
int nr_ahead, int buf_sz, size_t *counter);
/* just print contents. just for debugging */
void path_dump(struct list_head *path_list);
void chunk_dump(struct list_head *chunk_list);
/* wrap DIR/dirent and sftp_dir/sftp_attribute. not thread safe */
struct mscp_dir {
DIR *l;
sftp_dir r;
sftp_session sftp;
};
typedef struct mscp_dir mdir;
struct mscp_dirent {
struct dirent *l;
sftp_attributes r;
};
typedef struct mscp_dirent mdirent;
#define mdirent_name(e) ((e->l) ? e->l->d_name : e->r->name)
#define mdirent_is_dir(e) ((e->l) ? \
(e->l->d_type == DT_DIR) : \
(e->r->type == SSH_FILEXFER_TYPE_DIRECTORY))
#define mdirent_is_null(e) (e->l == NULL && e->r == NULL)
static mdir *mscp_opendir(const char *path, sftp_session sftp)
{
mdir *d;
if (!(d = malloc(sizeof(*d))))
return NULL;
memset(d, 0, sizeof(*d));
d->sftp = sftp;
if (sftp) {
d->r = sftp_opendir(sftp, path);
if (!d->r) {
mscp_set_error("sftp_opendir '%s': %s",
path, sftp_get_ssh_error(sftp));
free(d);
return NULL;
}
} else {
d->l = opendir(path);
if (!d->l) {
mscp_set_error("opendir '%s': %s", path, strerrno());
free(d);
return NULL;
}
}
return d;
}
static int mscp_closedir(mdir *d)
{
int ret;
if (d->r)
ret = sftp_closedir(d->r);
else
ret = closedir(d->l);
free(d);
return ret;
}
static mdirent *mscp_readdir(mdir *d)
{
static mdirent e;
memset(&e, 0, sizeof(e));
if (d->r)
e.r = sftp_readdir(d->sftp, d->r);
else
e.l = readdir(d->l);
return &e;
}
/* wrap retriving error */
static const char *mscp_strerror(sftp_session sftp)
{
if (sftp)
return sftp_get_ssh_error(sftp);
return strerrno();
}
/* warp stat/sftp_stat */
struct mscp_stat {
struct stat l;
sftp_attributes r;
};
typedef struct mscp_stat mstat;
static int mscp_stat(const char *path, mstat *s, sftp_session sftp)
{
memset(s, 0, sizeof(*s));
if (sftp) {
s->r = sftp_stat(sftp, path);
if (!s->r)
return -1;
} else {
if (stat(path, &s->l) < 0)
return -1;
}
return 0;
}
static int mscp_stat_check_err_noent(sftp_session sftp)
{
if (sftp) {
if (sftp_get_error(sftp) == SSH_FX_NO_SUCH_PATH ||
sftp_get_error(sftp) == SSH_FX_NO_SUCH_FILE)
return 0;
} else {
if (errno == ENOENT)
return 0;
}
return -1;
}
static void mscp_stat_free(mstat s) {
if (s.r)
sftp_attributes_free(s.r);
}
#define mstat_size(s) ((s.r) ? s.r->size : s.l.st_size)
#define mstat_mode(s) ((s.r) ? \
s.r->permissions : \
s.l.st_mode & (S_IRWXU|S_IRWXG|S_IRWXO))
#define mstat_is_regular(s) ((s.r) ? \
(s.r->type == SSH_FILEXFER_TYPE_REGULAR) : \
S_ISREG(s.l.st_mode))
#define mstat_is_dir(s) ((s.r) ? \
(s.r->type == SSH_FILEXFER_TYPE_DIRECTORY) : \
S_ISDIR(s.l.st_mode))
/* wrap mkdir */
static int mscp_mkdir(const char *path, mode_t mode, sftp_session sftp)
{
int ret;
if (sftp) {
ret = sftp_mkdir(sftp, path, mode);
if (ret < 0 &&
sftp_get_error(sftp) != SSH_FX_FILE_ALREADY_EXISTS) {
mscp_set_error("sftp_mkdir '%s': %s",
path, sftp_get_ssh_error(sftp));
return -1;
}
} else {
if (mkdir(path, mode) == -1 && errno != EEXIST) {
mscp_set_error("mkdir '%s': %s", path, strerrno());
return -1;
}
}
return 0;
}
/* wrap open/sftp_open */
struct mscp_file_handle {
int fd;
sftp_file sf;
};
typedef struct mscp_file_handle mfh;
static mfh mscp_open(const char *path, int flags, mode_t mode, size_t off,
sftp_session sftp)
{
mfh h;
h.fd = -1;
h.sf = NULL;
if (sftp) {
h.sf = sftp_open(sftp, path, flags, mode);
if (!h.sf) {
mscp_set_error("sftp_open '%s': %s",
path, sftp_get_ssh_error(sftp));
return h;
}
if (sftp_seek64(h.sf, off) < 0) {
mscp_set_error("sftp_seek64 '%s': %s",
path, sftp_get_ssh_error(sftp));
sftp_close(h.sf);
h.sf = NULL;
return h;
}
} else {
h.fd = open(path, flags, mode);
if (h.fd < 0) {
mscp_set_error("open '%s': %s", path, strerrno());
return h;
}
if (lseek(h.fd, off, SEEK_SET) < 0) {
mscp_set_error("lseek '%s': %s", path, strerrno());
close(h.fd);
h.fd = -1;
return h;
}
}
return h;
}
#define mscp_open_is_failed(h) (h.fd < 0 && h.sf == NULL)
static void mscp_close(mfh h)
{
if (h.sf)
sftp_close(h.sf);
if (h.fd > 0)
close(h.fd);
h.sf = NULL;
h.fd = -1;
}
/* wrap chmod/sftp_chmod */
static int mscp_chmod(const char *path, mode_t mode, sftp_session sftp)
{
if (sftp) {
if (sftp_chmod(sftp, path, mode) < 0) {
mscp_set_error("sftp_chmod '%s': %s",
path, sftp_get_ssh_error(sftp));
return -1;
}
} else {
if (chmod(path, mode) < 0) {
mscp_set_error("chmod '%s': %s", path, strerrno());
return -1;
}
}
return 0;
}
#endif /* _PATH_H_ */

View File

@@ -10,6 +10,7 @@
#include <util.h>
#include <platform.h>
#include <message.h>
#ifdef __APPLE__
int nr_cpus()
@@ -18,7 +19,7 @@ int nr_cpus()
size_t size = sizeof(n);
if (sysctlbyname("machdep.cpu.core_count", &n, &size, NULL, 0) != 0) {
pr_err("failed to get number of cpu cores: %s\n", strerrno());
mscp_set_error("failed to get number of cpu cores: %s", strerrno());
return -1;
}
@@ -51,8 +52,8 @@ int set_thread_affinity(pthread_t tid, int core)
CPU_SET(core, &target_cpu_set);
ret = pthread_setaffinity_np(tid, sizeof(target_cpu_set), &target_cpu_set);
if (ret < 0)
pr_err("failed to set thread/cpu affinity for core %d: %s",
core, strerrno());
mscp_set_error("failed to set thread/cpu affinity for core %d: %s",
core, strerrno());
return ret;
}
#endif

View File

@@ -1,27 +0,0 @@
#include <stdio.h>
#include <stdarg.h>
#include <pthread.h>
static int pprint_level = 1;
static pthread_mutex_t pprint_lock = PTHREAD_MUTEX_INITIALIZER;
void pprint_set_level(int level)
{
pprint_level = level;
}
void pprint(int level, const char *fmt, ...)
{
va_list va;
if (level <= pprint_level) {
pthread_mutex_lock(&pprint_lock);
va_start(va, fmt);
vfprintf(stdout, fmt, va);
fflush(stdout);
va_end(va);
pthread_mutex_unlock(&pprint_lock);
}
}

View File

@@ -1,20 +0,0 @@
#ifndef _PPRINT_H_
#define _PPRINT_H_
/* progress print functions */
/* level 1: print progress bar only.
* level 2: print copy start/done messages.
* level 3: print ssh connection establishment/disconnection.
* level 4: print chunk information.
*/
void pprint_set_level(int level);
void pprint(int level, const char *fmt, ...);
#define pprint1(fmt, ...) pprint(1, "\r\033[K" fmt, ##__VA_ARGS__)
#define pprint2(fmt, ...) pprint(2, "\r\033[K" fmt, ##__VA_ARGS__)
#define pprint3(fmt, ...) pprint(3, "\r\033[K" fmt, ##__VA_ARGS__)
#define pprint4(fmt, ...) pprint(4, "\r\033[K" fmt, ##__VA_ARGS__)
#endif /* _PPRRINT_H_ */

480
src/pymscp.c Normal file
View File

@@ -0,0 +1,480 @@
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <mscp.h>
/*
* This is a wrapper for python binding of libmscp. setup.py builds
* pymscp.c after libmscp was built, and setup.py installs pymscp
* modlue and mscp python module (mscp/mscp.py), which is a warpper
* for pymscp.
*/
#define MAX_MSCP_INSTS 64
/* XXX: cut corners */
struct instance {
struct mscp_opts mo;
struct mscp_ssh_opts so;
struct mscp *m;
};
struct instance *insts[MAX_MSCP_INSTS];
static int add_instance(struct instance *i)
{
int n;
for (n = 0; n < MAX_MSCP_INSTS; n++) {
if (insts[n] == NULL) {
insts[n] = i;
return 0;
}
}
return -1; /* full of mscp instances */
}
static struct instance *get_instance(unsigned long long addr)
{
int n;
for (n = 0; n < MAX_MSCP_INSTS; n++) {
if (insts[n] == (void *)addr)
return insts[n];
}
return NULL;
}
static struct mscp *get_mscp(unsigned long long addr)
{
struct instance *i = get_instance(addr);
if (!i)
return NULL;
return i->m;
}
static int release_instance(struct instance *i)
{
int n;
for (n = 0; n < MAX_MSCP_INSTS; n++) {
if (insts[n] == i) {
insts[n] = NULL;
return 0;
}
}
free(i);
return -1;
}
/* wrapper functions */
static PyObject *wrap_mscp_init(PyObject *sef, PyObject *args, PyObject *kw)
{
/*
* Initialize struct mscp with options. wrap_mscp_init
* receives all the arguments with keywords.
*/
char *remote;
char *keywords[] = {
"remote", /* const char * */
"direction", /* int, MSCP_DIRECTION_L2R or MSCP_DIRECTION_R2L */
/* mscp_opts */
"nr_threads", /* int */
"nr_ahead", /* int */
"min_chunk_sz", /* unsigned long */
"max_chunk_sz", /* unsigned long */
"buf_sz", /* unsigned long */
"coremask", /* const char * */
"severity", /* int, MSCP_SERVERITY_* */
"msg_fd", /* int */
/* mscp_ssh_opts */
"login_name", /* const char * */
"port", /* const char * */
"identity", /* const char * */
"cipher", /* const char * */
"hmac", /* const char * */
"compress", /* const char * */
"password", /* const char * */
"passphrase", /* const char * */
"debug_level", /* int */
"no_hostkey_check", /* bool */
"enable_nagle", /* bool */
NULL,
};
const char *fmt = "si" "|iikkksii" "ssssssssipp";
char *coremask = NULL;
char *login_name = NULL, *port = NULL, *identity = NULL;
char *cipher = NULL, *hmac = NULL, *compress = NULL;
char *password = NULL, *passphrase = NULL;
struct instance *i;
int direction;
int ret;
i = malloc(sizeof(*i));
if (!i) {
PyErr_Format(PyExc_RuntimeError, strerror(errno));
return NULL;
}
memset(i, 0, sizeof(*i));
ret = PyArg_ParseTupleAndKeywords(args, kw, fmt, keywords,
&remote,
&direction,
&i->mo.nr_threads,
&i->mo.nr_ahead,
&i->mo.min_chunk_sz,
&i->mo.max_chunk_sz,
&i->mo.buf_sz,
&coremask,
&i->mo.severity,
&i->mo.msg_fd,
&login_name,
&port,
&identity,
&cipher,
&hmac,
&compress,
&password,
&passphrase,
&i->so.debug_level,
&i->so.no_hostkey_check,
&i->so.enable_nagle);
if (!ret)
return NULL;
if (coremask)
strncpy(i->mo.coremask, coremask, MSCP_MAX_COREMASK_STR - 1);
if (login_name)
strncpy(i->so.login_name, login_name, MSCP_SSH_MAX_LOGIN_NAME - 1);
if (port)
strncpy(i->so.port, port, MSCP_SSH_MAX_PORT_STR - 1);
if (identity)
strncpy(i->so.identity, identity, MSCP_SSH_MAX_IDENTITY_PATH - 1);
if (cipher)
strncpy(i->so.cipher, cipher, MSCP_SSH_MAX_CIPHER_STR - 1);
if (hmac)
strncpy(i->so.hmac, hmac, MSCP_SSH_MAX_HMAC_STR - 1);
if (compress)
strncpy(i->so.compress, compress, MSCP_SSH_MAX_COMP_STR - 1);
if (password)
strncpy(i->so.password, password, MSCP_SSH_MAX_PASSWORD - 1);
if (passphrase)
strncpy(i->so.passphrase, passphrase, MSCP_SSH_MAX_PASSPHRASE - 1);
i->m = mscp_init(remote, direction, &i->mo, &i->so);
if (!i->m) {
free(i);
return NULL;
}
if (add_instance(i) < 0) {
PyErr_Format(PyExc_RuntimeError, "too many mscp isntances");
mscp_free(i->m);
free(i);
return NULL;
}
return Py_BuildValue("K", (unsigned long long)i);
}
static PyObject *wrap_mscp_connect(PyObject *self, PyObject *args, PyObject *kw)
{
char *keywords[] = { "m", NULL };
unsigned long long addr;
struct mscp *m;
if (!PyArg_ParseTupleAndKeywords(args, kw, "K", keywords, &addr))
return NULL;
m = get_mscp(addr);
if (!m) {
PyErr_Format(PyExc_RuntimeError, "invalid mscp instance address");
return NULL;
}
if (mscp_connect(m) < 0) {
PyErr_Format(PyExc_RuntimeError, mscp_get_error());
return NULL;
}
return Py_BuildValue("");
}
static PyObject *wrap_mscp_add_src_path(PyObject *self, PyObject *args, PyObject *kw)
{
char *keywords[] = { "m", "src_path", NULL };
unsigned long long addr;
char *src_path;
struct mscp *m;
if (!PyArg_ParseTupleAndKeywords(args, kw, "Ks", keywords, &addr, &src_path))
return NULL;
m = get_mscp(addr);
if (!m) {
PyErr_Format(PyExc_RuntimeError, "invalid mscp instance address");
return NULL;
}
if (mscp_add_src_path(m, src_path) < 0) {
PyErr_Format(PyExc_RuntimeError, mscp_get_error());
return NULL;
}
return Py_BuildValue("");
}
static PyObject *wrap_mscp_set_dst_path(PyObject *self, PyObject *args, PyObject *kw)
{
char *keywords[] = { "m", "dst_path", NULL };
unsigned long long addr;
char *dst_path;
struct mscp *m;
if (!PyArg_ParseTupleAndKeywords(args, kw, "Ks", keywords, &addr, &dst_path))
return NULL;
m = get_mscp(addr);
if (!m) {
PyErr_Format(PyExc_RuntimeError, "invalid mscp instance address");
return NULL;
}
if (mscp_set_dst_path(m, dst_path) < 0) {
PyErr_Format(PyExc_RuntimeError, mscp_get_error());
return NULL;
}
return Py_BuildValue("");
}
static PyObject *wrap_mscp_prepare(PyObject *self, PyObject *args, PyObject *kw)
{
char *keywords[] = { "m", NULL };
unsigned long long addr;
struct mscp *m;
if (!PyArg_ParseTupleAndKeywords(args, kw, "K", keywords, &addr))
return NULL;
m = get_mscp(addr);
if (!m) {
PyErr_Format(PyExc_RuntimeError, "invalid mscp instance address");
return NULL;
}
if (mscp_prepare(m) < 0) {
PyErr_Format(PyExc_RuntimeError, mscp_get_error());
return NULL;
}
return Py_BuildValue("");
}
static PyObject *wrap_mscp_start(PyObject *self, PyObject *args, PyObject *kw)
{
char *keywords[] = { "m", NULL };
unsigned long long addr;
struct mscp *m;
if (!PyArg_ParseTupleAndKeywords(args, kw, "K", keywords, &addr))
return NULL;
m = get_mscp(addr);
if (!m) {
PyErr_Format(PyExc_RuntimeError, "invalid mscp instance address");
return NULL;
}
if (mscp_start(m) < 0) {
PyErr_Format(PyExc_RuntimeError, mscp_get_error());
return NULL;
}
return Py_BuildValue("");
}
static PyObject *wrap_mscp_stop(PyObject *self, PyObject *args, PyObject *kw)
{
char *keywords[] = { "m", NULL };
unsigned long long addr;
struct mscp *m;
if (!PyArg_ParseTupleAndKeywords(args, kw, "K", keywords, &addr))
return NULL;
m = get_mscp(addr);
if (!m) {
PyErr_Format(PyExc_RuntimeError, "invalid mscp instance address");
return NULL;
}
mscp_stop(m);
return Py_BuildValue("");
}
static PyObject *wrap_mscp_join(PyObject *self, PyObject *args, PyObject *kw)
{
char *keywords[] = { "m", NULL };
unsigned long long addr;
struct mscp *m;
if (!PyArg_ParseTupleAndKeywords(args, kw, "K", keywords, &addr))
return NULL;
m = get_mscp(addr);
if (!m) {
PyErr_Format(PyExc_RuntimeError, "invalid mscp instance address");
return NULL;
}
if (mscp_join(m) < 0) {
PyErr_Format(PyExc_RuntimeError, mscp_get_error());
return NULL;
}
return Py_BuildValue("");
}
static PyObject *wrap_mscp_get_stats(PyObject *self, PyObject *args, PyObject *kw)
{
char *keywords[] = { "m", NULL };
unsigned long long addr;
struct mscp_stats s;
struct mscp *m;
if (!PyArg_ParseTupleAndKeywords(args, kw, "K", keywords, &addr))
return NULL;
m = get_mscp(addr);
if (!m) {
PyErr_Format(PyExc_RuntimeError, "invalid mscp instance address");
return NULL;
}
mscp_get_stats(m, &s);
return Py_BuildValue("KKd", s.total, s.done, s.finished);
}
static PyObject *wrap_mscp_cleanup(PyObject *self, PyObject *args, PyObject *kw)
{
char *keywords[] = { "m", NULL };
unsigned long long addr;
struct mscp *m;
if (!PyArg_ParseTupleAndKeywords(args, kw, "K", keywords, &addr))
return NULL;
m = get_mscp(addr);
if (!m) {
PyErr_Format(PyExc_RuntimeError, "invalid mscp instance address");
return NULL;
}
mscp_cleanup(m);
return Py_BuildValue("");
}
static PyObject *wrap_mscp_free(PyObject *self, PyObject *args, PyObject *kw)
{
char *keywords[] = { "m", NULL };
unsigned long long addr;
struct instance *i;
if (!PyArg_ParseTupleAndKeywords(args, kw, "K", keywords, &addr))
return NULL;
i = get_instance(addr);
if (!i) {
PyErr_Format(PyExc_RuntimeError, "invalid mscp instance address");
return NULL;
}
mscp_free(i->m);
release_instance(i);
return Py_BuildValue("");
}
static PyMethodDef pymscpMethods[] = {
{
"mscp_init", (PyCFunction)wrap_mscp_init,
METH_VARARGS | METH_KEYWORDS, NULL
},
{
"mscp_connect", (PyCFunction)wrap_mscp_connect,
METH_VARARGS | METH_KEYWORDS, NULL
},
{
"mscp_add_src_path", (PyCFunction)wrap_mscp_add_src_path,
METH_VARARGS | METH_KEYWORDS, NULL
},
{
"mscp_set_dst_path", (PyCFunction)wrap_mscp_set_dst_path,
METH_VARARGS | METH_KEYWORDS, NULL
},
{
"mscp_prepare", (PyCFunction)wrap_mscp_prepare,
METH_VARARGS | METH_KEYWORDS, NULL
},
{
"mscp_start", (PyCFunction)wrap_mscp_start,
METH_VARARGS | METH_KEYWORDS, NULL
},
{
"mscp_stop", (PyCFunction)wrap_mscp_stop,
METH_VARARGS | METH_KEYWORDS, NULL
},
{
"mscp_join", (PyCFunction)wrap_mscp_join,
METH_VARARGS | METH_KEYWORDS, NULL
},
{
"mscp_get_stats", (PyCFunction)wrap_mscp_get_stats,
METH_VARARGS | METH_KEYWORDS, NULL
},
{
"mscp_cleanup", (PyCFunction)wrap_mscp_cleanup,
METH_VARARGS | METH_KEYWORDS, NULL
},
{
"mscp_free", (PyCFunction)wrap_mscp_free,
METH_VARARGS | METH_KEYWORDS, NULL
},
{ NULL, NULL, 0, NULL },
};
static PyModuleDef pymscpModule = {
PyModuleDef_HEAD_INIT, "pymscp", NULL, -1, pymscpMethods,
};
PyMODINIT_FUNC PyInit_pymscp(void) {
PyObject *mod = PyModule_Create(&pymscpModule);
PyModule_AddIntConstant(mod, "LOCAL2REMOTE", MSCP_DIRECTION_L2R);
PyModule_AddIntConstant(mod, "REMOTE2LOCAL", MSCP_DIRECTION_R2L);
PyModule_AddIntConstant(mod, "SEVERITY_NONE", MSCP_SEVERITY_NONE);
PyModule_AddIntConstant(mod, "SEVERITY_ERR", MSCP_SEVERITY_ERR);
PyModule_AddIntConstant(mod, "SEVERITY_WARN", MSCP_SEVERITY_WARN);
PyModule_AddIntConstant(mod, "SEVERITY_NOTICE", MSCP_SEVERITY_NOTICE);
PyModule_AddIntConstant(mod, "SEVERITY_INFO", MSCP_SEVERITY_INFO);
PyModule_AddIntConstant(mod, "SEVERITY_DEBUG", MSCP_SEVERITY_DEBUG);
return mod;
}

106
src/ssh.c
View File

@@ -7,70 +7,76 @@
#include <ssh.h>
#include <util.h>
#include <message.h>
static int ssh_verify_known_hosts(ssh_session session);
static int ssh_set_opts(ssh_session ssh, struct ssh_opts *opts)
#define is_specified(s) (strlen(s) > 0)
static int ssh_set_opts(ssh_session ssh, struct mscp_ssh_opts *opts)
{
ssh_set_log_level(opts->debuglevel);
ssh_set_log_level(opts->debug_level);
if (opts->login_name &&
if (is_specified(opts->login_name) &&
ssh_options_set(ssh, SSH_OPTIONS_USER, opts->login_name) < 0) {
pr_err("failed to set login name\n");
mscp_set_error("failed to set login name");
return -1;
}
if (opts->port &&
if (is_specified(opts->port) &&
ssh_options_set(ssh, SSH_OPTIONS_PORT_STR, opts->port) < 0) {
pr_err("failed to set port number\n");
mscp_set_error("failed to set port number");
return -1;
}
if (opts->identity &&
if (is_specified(opts->identity) &&
ssh_options_set(ssh, SSH_OPTIONS_IDENTITY, opts->identity) < 0) {
pr_err("failed to set identity\n");
mscp_set_error("failed to set identity");
return -1;
}
if (opts->cipher) {
if (is_specified(opts->cipher)) {
if (ssh_options_set(ssh, SSH_OPTIONS_CIPHERS_C_S, opts->cipher) < 0) {
pr_err("failed to set cipher for client to server\n");
mscp_set_error("failed to set cipher for client to server");
return -1;
}
if (ssh_options_set(ssh, SSH_OPTIONS_CIPHERS_S_C, opts->cipher) < 0) {
pr_err("failed to set cipher for server to client\n");
mscp_set_error("failed to set cipher for server to client");
return -1;
}
}
if (opts->hmac) {
if (is_specified(opts->hmac)) {
if (ssh_options_set(ssh, SSH_OPTIONS_HMAC_C_S, opts->hmac) < 0) {
pr_err("failed to set hmac for client to server\n");
mscp_set_error("failed to set hmac for client to server");
return -1;
}
if (ssh_options_set(ssh, SSH_OPTIONS_HMAC_S_C, opts->hmac) < 0) {
pr_err("failed to set hmac for server to client\n");
mscp_set_error("failed to set hmac for server to client");
return -1;
}
}
if (opts->compress &&
ssh_options_set(ssh, SSH_OPTIONS_COMPRESSION, "yes") < 0) {
pr_err("failed to enable ssh compression\n");
if (is_specified(opts->compress) &&
ssh_options_set(ssh, SSH_OPTIONS_COMPRESSION, opts->compress) < 0) {
mscp_set_error("failed to enable ssh compression");
return -1;
}
if (opts->nodelay &&
ssh_options_set(ssh, SSH_OPTIONS_NODELAY, &opts->nodelay) < 0) {
pr_err("failed to set nodelay\n");
return -1;
/* if NOT specified to enable Nagle's algorithm, disable it (set TCP_NODELAY) */
if (!opts->enable_nagle) {
int v = 1;
if (ssh_options_set(ssh, SSH_OPTIONS_NODELAY, &v) < 0) {
mscp_set_error("failed to set TCP_NODELAY");
return -1;
}
}
return 0;
}
static int ssh_authenticate(ssh_session ssh, struct ssh_opts *opts)
static int ssh_authenticate(ssh_session ssh, struct mscp_ssh_opts *opts)
{
int auth_bit_mask;
int ret;
@@ -86,21 +92,16 @@ static int ssh_authenticate(ssh_session ssh, struct ssh_opts *opts)
ssh_userauth_none(ssh, NULL) == SSH_AUTH_SUCCESS)
return 0;
if (auth_bit_mask & SSH_AUTH_METHOD_PUBLICKEY &&
ssh_userauth_publickey_auto(ssh, NULL, opts->passphrase) == SSH_AUTH_SUCCESS)
return 0;
if (auth_bit_mask & SSH_AUTH_METHOD_PUBLICKEY) {
char *p = is_specified(opts->passphrase) ? opts->passphrase : NULL;
if (ssh_userauth_publickey_auto(ssh, NULL, p) == SSH_AUTH_SUCCESS)
return 0;
}
if (auth_bit_mask & SSH_AUTH_METHOD_PASSWORD) {
if (!opts->password) {
opts->password = malloc(PASSWORD_BUF_SZ);
if (!opts->password) {
pr_err("malloc: %s\n", strerrno());
return -1;
}
memset(opts->password, 0, PASSWORD_BUF_SZ);
if (ssh_getpass("Password: ", opts->password, PASSWORD_BUF_SZ,
0, 0) < 0) {
if (!is_specified(opts->password)) {
if (ssh_getpass("Password: ", opts->password,
MSCP_SSH_MAX_PASSWORD, 0, 0) < 0) {
return -1;
}
}
@@ -115,30 +116,22 @@ static int ssh_authenticate(ssh_session ssh, struct ssh_opts *opts)
static int ssh_cache_passphrase(const char *prompt, char *buf, size_t len, int echo,
int verify, void *userdata)
{
struct ssh_opts *opts = userdata;
struct mscp_ssh_opts *opts = userdata;
/* This function is called on the first time for importing
* priv key file with passphrase. It is not called on the
* second time or after because cached passphrase is passed
* to ssh_userauth_publickey_auto(). */
if (opts->passphrase) {
/* passphrase is cached, but this function is called.
* maybe it was an invalid passphrase? */
free(opts->passphrase);
opts->passphrase = NULL;
}
if (ssh_getpass("Passphrase: ", buf, len, echo, verify) < 0)
return -1;
/* cache the passphrase */
opts->passphrase = malloc(len);
if (!opts->passphrase) {
pr_err("malloc: %s\n", strerrno());
return -1;
if (strlen(buf) > MSCP_SSH_MAX_PASSPHRASE - 1) {
pr_warn("sorry, passphrase is too long to cache...\n");
return 0;
}
memcpy(opts->passphrase, buf, len);
strncpy(opts->passphrase, buf, MSCP_SSH_MAX_PASSPHRASE);
return 0;
}
@@ -148,7 +141,7 @@ static struct ssh_callbacks_struct cb = {
.userdata = NULL,
};
static ssh_session ssh_init_session(char *sshdst, struct ssh_opts *opts)
static ssh_session ssh_init_session(const char *sshdst, struct mscp_ssh_opts *opts)
{
ssh_session ssh = ssh_new();
@@ -160,17 +153,17 @@ static ssh_session ssh_init_session(char *sshdst, struct ssh_opts *opts)
goto free_out;
if (ssh_options_set(ssh, SSH_OPTIONS_HOST, sshdst) != SSH_OK) {
pr_err("failed to set destination host\n");
mscp_set_error("failed to set destination host");
goto free_out;
}
if (ssh_connect(ssh) != SSH_OK) {
pr_err("failed to connect ssh server: %s\n", ssh_get_error(ssh));
mscp_set_error("failed to connect ssh server: %s", ssh_get_error(ssh));
goto free_out;
}
if (ssh_authenticate(ssh, opts) != 0) {
pr_err("authentication failed: %s\n", ssh_get_error(ssh));
mscp_set_error("authentication failed: %s", ssh_get_error(ssh));
goto disconnect_out;
}
@@ -187,7 +180,7 @@ free_out:
return NULL;
}
sftp_session ssh_init_sftp_session(char *sshdst, struct ssh_opts *opts)
sftp_session ssh_init_sftp_session(const char *sshdst, struct mscp_ssh_opts *opts)
{
sftp_session sftp;
ssh_session ssh = ssh_init_session(sshdst, opts);
@@ -198,13 +191,14 @@ sftp_session ssh_init_sftp_session(char *sshdst, struct ssh_opts *opts)
sftp = sftp_new(ssh);
if (!sftp) {
pr_err("failed to allocate sftp session: %s\n", ssh_get_error(ssh));
mscp_set_error("failed to allocate sftp session: %s",
ssh_get_error(ssh));
goto err_out;
}
if (sftp_init(sftp) != SSH_OK) {
pr_err("failed to initialize sftp session: err code %d\n",
sftp_get_error(sftp));
mscp_set_error("failed to initialize sftp session: err code %d",
sftp_get_error(sftp));
goto err_out;
}

View File

@@ -5,27 +5,12 @@
#include "libssh/libssh.h"
#include "libssh/sftp.h"
struct ssh_opts {
char *login_name; /* -l */
char *port; /* -p */
char *identity; /* -i */
char *cipher; /* -c */
char *hmac; /* -M */
int compress; /* -C */
int nodelay; /* -N */
int debuglevel; /* -v */
bool no_hostkey_check; /* -H */
#define PASSWORD_BUF_SZ 128
char *password; /* password for password auth */
char *passphrase; /* passphrase for private key */
};
#include <mscp.h>
/* ssh_init_sftp_session() creates sftp_session. sshdst accpets
* user@hostname and hostname notations (by libssh).
*/
sftp_session ssh_init_sftp_session(char *sshdst, struct ssh_opts *opts);
sftp_session ssh_init_sftp_session(const char *sshdst, struct mscp_ssh_opts *opts);
void ssh_sftp_close(sftp_session sftp);
#define sftp_ssh(sftp) (sftp)->session

View File

@@ -4,6 +4,7 @@
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <libgen.h>
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
@@ -19,8 +20,8 @@
__func__, ##__VA_ARGS__)
#define pr_err(fmt, ...) fprintf(stderr, "\x1b[1m\x1b[31m" \
"ERR:%s():\x1b[0m " fmt, \
__func__, ##__VA_ARGS__)
"ERR:%s:%d:%s():\x1b[0m " fmt, \
basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
#ifdef DEBUG
#define pr_debug(fmt, ...) fprintf(stderr, "\x1b[1m\x1b[33m" \

View File

@@ -1,7 +1,9 @@
"""
test_e2e.py: End-to-End test for mscp executable.
"""
import pytest
import numpy
import hashlib
import os
from subprocess import check_call, CalledProcessError, PIPE
@@ -182,3 +184,14 @@ def test_override_dst_having_larger_size(mscp, src_prefix, dst_prefix):
assert check_same_md5sum(src, dst)
src.cleanup()
dst.cleanup()
compressions = ["yes", "no", "none"]
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
@pytest.mark.parametrize("compress", compressions)
def test_compression(mscp, src_prefix, dst_prefix, compress):
src = File("src", size = 1024 * 1024).make()
dst = File("dst", size = 1024 * 1024 * 2).make()
run2ok([mscp, "-H", "-C", compress, src_prefix + src.path, dst_prefix + "dst"])
assert check_same_md5sum(src, dst)
src.cleanup()
dst.cleanup()

104
test/test_python.py Normal file
View File

@@ -0,0 +1,104 @@
"""
test_python.py: Testing libmscp through the mscp python binding.
"""
import pytest
import mscp
import os
from util import File, check_same_md5sum
def test_create_and_release():
m = mscp.mscp("localhost", mscp.LOCAL2REMOTE)
m.cleanup()
""" copy test """
remote = "localhost"
remote_prefix = "{}/".format(os.getcwd()) # use current dir
param_remote_prefix_and_direction = [
("", remote_prefix, mscp.LOCAL2REMOTE), (remote_prefix, "", mscp.REMOTE2LOCAL)
]
param_single_copy = [
(File("src", size = 64), File("dst")),
(File("src", size = 4096 * 1), File("dst")),
(File("src", size = 128 * 1024 * 1024), File("dst")),
]
@pytest.mark.parametrize("src_prefix, dst_prefix, direction",
param_remote_prefix_and_direction)
@pytest.mark.parametrize("src, dst", param_single_copy)
def test_single_copy(src_prefix, dst_prefix, direction, src, dst):
src.make()
m = mscp.mscp(remote, direction)
m.copy(src_prefix + src.path, dst_prefix + dst.path)
assert check_same_md5sum(src, dst)
src.cleanup()
dst.cleanup()
param_double_copy = [
(File("src1", size = 1024 * 1024), File("src2", size = 1024 * 1024),
File("dst/src1"), File("dst/src2")
)
]
@pytest.mark.parametrize("src_prefix, dst_prefix, direction",
param_remote_prefix_and_direction)
@pytest.mark.parametrize("s1, s2, d1, d2", param_double_copy)
def test_double_copy(src_prefix, dst_prefix, direction, s1, s2, d1, d2):
s1.make()
s2.make()
mscp.mscp(remote, direction).copy([src_prefix + s1.path, src_prefix + s2.path],
dst_prefix + "dst")
assert check_same_md5sum(s1, d1)
assert check_same_md5sum(s2, d2)
s1.cleanup()
s2.cleanup()
d1.cleanup()
d2.cleanup()
param_single_copy = [
(File("src", size = 1024 * 1024 * 4), File("dst")),
]
param_kwargs = [
{ "nr_threads": 6 },
{ "nr_ahead": 64 },
{ "min_chunk_sz": 1 * 1024 * 1024 },
{ "max_chunk_sz": 64 * 1024 * 1024 },
{ "coremask": "0x0f" },
{ "severity": mscp.SEVERITY_NONE },
{ "cipher": "aes128-gcm@openssh.com" },
{ "compress": "yes" },
{ "no_hostkey_check": True },
{ "enable_nagle": True },
]
@pytest.mark.parametrize("src_prefix, dst_prefix, direction",
param_remote_prefix_and_direction)
@pytest.mark.parametrize("src, dst", param_single_copy)
@pytest.mark.parametrize("kw", param_kwargs)
def test_kwargs(src_prefix, dst_prefix, direction, src, dst, kw):
src.make()
m = mscp.mscp(remote, direction, **kw)
m.copy(src_prefix + src.path, dst_prefix + dst.path)
assert check_same_md5sum(src, dst)
src.cleanup()
dst.cleanup()
def test_login_failed():
m = mscp.mscp("asdfasdf@" + remote, mscp.LOCAL2REMOTE)
with pytest.raises(RuntimeError) as e:
m.connect()
m = mscp.mscp(remote, mscp.LOCAL2REMOTE, login_name = "asdfadsf")
with pytest.raises(RuntimeError) as e:
m.connect()
m = mscp.mscp(remote, mscp.LOCAL2REMOTE, port = "65534")
with pytest.raises(RuntimeError) as e:
m.connect()

View File

@@ -1,6 +1,5 @@
import hashlib
import numpy
import os
@@ -39,7 +38,7 @@ class File():
def make_content_random(self):
with open(self.path, "wb") as f:
f.write(numpy.random.bytes(self.size))
f.write(os.urandom(self.size))
def cleanup(self):
os.remove(self.path)