mirror of
https://github.com/upa/mscp.git
synced 2026-02-04 03:24:58 +08:00
add (not complete) end-to-end test with pytest
This commit is contained in:
1
test/.gitignore
vendored
Normal file
1
test/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
__pycache__
|
||||
164
test/test_e2e.py
Normal file
164
test/test_e2e.py
Normal file
@@ -0,0 +1,164 @@
|
||||
|
||||
import pytest
|
||||
import numpy
|
||||
import hashlib
|
||||
import os
|
||||
|
||||
from subprocess import check_call, CalledProcessError, PIPE
|
||||
from util import File, check_same_md5sum
|
||||
|
||||
|
||||
"""
|
||||
End-to-End Test.
|
||||
|
||||
Set $MSCP_BIN_PATH env variable for path to mscp binary.
|
||||
If $MSCP_BIN_PATH is not set, use mscp in default $PATH.
|
||||
"""
|
||||
|
||||
if "MSCP_BIN_PATH" in os.environ:
|
||||
mscp = os.environ["MSCP_BIN_PATH"]
|
||||
else:
|
||||
mscp = "mscp"
|
||||
|
||||
|
||||
|
||||
|
||||
""" usage test """
|
||||
def run2ok(args):
|
||||
check_call(list(map(str, args)))
|
||||
|
||||
def run2ng(args):
|
||||
with pytest.raises(CalledProcessError) as e:
|
||||
check_call(list(map(str, args)))
|
||||
|
||||
def test_usage():
|
||||
run2ng([mscp])
|
||||
run2ok([mscp, "-h"])
|
||||
|
||||
def test_invalid_chunk_size_config():
|
||||
run2ng([mscp, "-s", 8 << 20, "-S", 4 << 20])
|
||||
|
||||
param_invalid_hostnames = [
|
||||
(["a:a", "b:b", "c:c"]), (["a:a", "b:b", "c"]), (["a:a", "b", "c:c"]),
|
||||
(["a", "b:b", "c:c"])
|
||||
]
|
||||
@pytest.mark.parametrize("args", param_invalid_hostnames)
|
||||
def test_nonidentical_hostnames(args):
|
||||
run2ng([mscp] + args)
|
||||
|
||||
|
||||
|
||||
|
||||
""" copy test """
|
||||
|
||||
remote_prefix = "localhost:{}/".format(os.getcwd()) # use current dir
|
||||
|
||||
param_single_copy = [
|
||||
(File("test1", size = 64), File("test2")),
|
||||
(File("test1", size = 4096 * 1), File("test2")),
|
||||
(File("test1", size = 128 * 1024 * 1024), File("test2")),
|
||||
]
|
||||
|
||||
@pytest.mark.parametrize("src, dst", param_single_copy)
|
||||
def test_single_copy_remote2local(src, dst):
|
||||
src.make()
|
||||
run2ok([mscp, remote_prefix + src.path, dst.path])
|
||||
assert check_same_md5sum(src, dst)
|
||||
src.cleanup()
|
||||
dst.cleanup()
|
||||
|
||||
@pytest.mark.parametrize("src, dst", param_single_copy)
|
||||
def test_single_copy_local2remote(src, dst):
|
||||
src.make()
|
||||
run2ok([mscp, src.path, remote_prefix + dst.path])
|
||||
assert check_same_md5sum(src, dst)
|
||||
src.cleanup()
|
||||
dst.cleanup()
|
||||
|
||||
|
||||
|
||||
param_dir_copy = [
|
||||
( "src_dir", "dst_dir",
|
||||
[ File("src_dir/t1", size = 64),
|
||||
File("src_dir/t2", size = 4096),
|
||||
File("src_dir/d1/t3", size = 64),
|
||||
File("src_dir/d1/d2/t4", size = 128), ],
|
||||
[ File("dst_dir/t1"),
|
||||
File("dst_dir/t2"),
|
||||
File("dst_dir/d1/t3"),
|
||||
File("dst_dir/d1/d2/t4"), ],
|
||||
[ File("dst_dir/src_dir/t1"),
|
||||
File("dst_dir/src_dir/t2"),
|
||||
File("dst_dir/src_dir/d1/t3"),
|
||||
File("dst_dir/src_dir/d1/d2/t4"), ],
|
||||
)
|
||||
]
|
||||
|
||||
"""
|
||||
`scp remote:src_dir dst_dir` renames src_dir to dst_dir if dst_dir
|
||||
does not exist. If dst_dir exists, scp copies src_dir to
|
||||
dst_dir/src_dir. So, this test checks both cases.
|
||||
"""
|
||||
|
||||
@pytest.mark.parametrize("src_dir, dst_dir, src, dst, twice", param_dir_copy)
|
||||
def test_dir_copy_remote2local(src_dir, dst_dir, src, dst, twice):
|
||||
for f in src:
|
||||
f.make()
|
||||
|
||||
run2ok([mscp, remote_prefix + src_dir, dst_dir])
|
||||
for sf, df in zip(src, dst):
|
||||
assert check_same_md5sum(sf, df)
|
||||
|
||||
run2ok([mscp, remote_prefix + src_dir, dst_dir])
|
||||
for sf, df in zip(src, twice):
|
||||
assert check_same_md5sum(sf, df)
|
||||
|
||||
for sf, df, tf in zip(src, dst, twice):
|
||||
sf.cleanup()
|
||||
df.cleanup()
|
||||
tf.cleanup()
|
||||
|
||||
@pytest.mark.parametrize("src_dir, dst_dir, src, dst, twice", param_dir_copy)
|
||||
def test_dir_copy_local2remote(src_dir, dst_dir, src, dst, twice):
|
||||
for f in src:
|
||||
f.make()
|
||||
|
||||
run2ok([mscp, src_dir, remote_prefix + dst_dir])
|
||||
for sf, df in zip(src, dst):
|
||||
assert check_same_md5sum(sf, df)
|
||||
|
||||
run2ok([mscp, src_dir, remote_prefix + dst_dir])
|
||||
for sf, df in zip(src, twice):
|
||||
assert check_same_md5sum(sf, df)
|
||||
|
||||
for sf, df, tf in zip(src, dst, twice):
|
||||
sf.cleanup()
|
||||
df.cleanup()
|
||||
tf.cleanup()
|
||||
|
||||
|
||||
param_remote_prefix = [
|
||||
("", remote_prefix), (remote_prefix, "")
|
||||
]
|
||||
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
|
||||
def test_override_single_file(src_prefix, dst_prefix):
|
||||
src = File("src", size = 128).make()
|
||||
dst = File("dst", size = 128).make()
|
||||
assert not check_same_md5sum(src, dst)
|
||||
|
||||
run2ok([mscp, src_prefix + src.path, dst_prefix + dst.path])
|
||||
assert check_same_md5sum(src, dst)
|
||||
|
||||
src.cleanup()
|
||||
dst.cleanup()
|
||||
|
||||
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
|
||||
def test_min_chunk(src_prefix, dst_prefix):
|
||||
src = File("src", size = 16 * 1024).make()
|
||||
dst = File("dst")
|
||||
|
||||
run2ok([mscp, "-s", 8192, src_prefix + src.path, dst_prefix + dst.path])
|
||||
assert check_same_md5sum(src, dst)
|
||||
|
||||
src.cleanup()
|
||||
dst.cleanup()
|
||||
58
test/util.py
Normal file
58
test/util.py
Normal file
@@ -0,0 +1,58 @@
|
||||
|
||||
import hashlib
|
||||
import numpy
|
||||
import os
|
||||
|
||||
|
||||
def check_same_md5sum(fa, fb):
|
||||
return (fa.md5sum() == fb.md5sum())
|
||||
|
||||
|
||||
class File():
|
||||
def __init__(self, path, size = 0, content = "random", perm = 0o664):
|
||||
if not content in ["zero", "random"]:
|
||||
raise ValueError("invalid type: {}".format(content))
|
||||
self.path = path
|
||||
self.size = size
|
||||
self.content = content
|
||||
self.perm = perm
|
||||
|
||||
def __str__(self):
|
||||
return self.path
|
||||
|
||||
def make(self):
|
||||
d = os.path.dirname(self.path)
|
||||
if d:
|
||||
os.makedirs(d, exist_ok = True)
|
||||
if self.content == "zero":
|
||||
self.make_content_zero()
|
||||
elif self.content == "random":
|
||||
self.make_content_random()
|
||||
else:
|
||||
raise ValueError("invalud content type: {}".format(self.content))
|
||||
os.chmod(self.path, self.perm)
|
||||
return self
|
||||
|
||||
def make_content_zero(self):
|
||||
with open(self.path, "wb") as f:
|
||||
f.seek(self.size, 0)
|
||||
|
||||
def make_content_random(self):
|
||||
with open(self.path, "wb") as f:
|
||||
f.write(numpy.random.bytes(self.size))
|
||||
|
||||
def cleanup(self):
|
||||
os.remove(self.path)
|
||||
tmp = os.path.dirname(self.path)
|
||||
while tmp and not tmp in [".", "/"]:
|
||||
if len(os.listdir(tmp)) == 0:
|
||||
os.rmdir(tmp)
|
||||
tmp = os.path.dirname(tmp)
|
||||
|
||||
def md5sum(self):
|
||||
m = hashlib.md5()
|
||||
with open(self.path, 'rb') as f:
|
||||
for chunk in iter(lambda: f.read(4096 * m.block_size), b''):
|
||||
m.update(chunk)
|
||||
return m.hexdigest()
|
||||
|
||||
Reference in New Issue
Block a user