fix async handling

This commit is contained in:
Ryo Nakamura
2022-11-17 23:46:51 +09:00
parent 2d66f4ca14
commit 5ede4dc122

View File

@@ -663,12 +663,12 @@ static int chunk_copy_internal_local_to_remote_async(struct chunk *c, int fd,
sftp_file sf, int nr_ahead, sftp_file sf, int nr_ahead,
size_t *counter) size_t *counter)
{ {
size_t read_bytes, remaind, thrown; ssize_t read_bytes, remaind, thrown;
char buf[XFER_BUF_SIZE]; char buf[XFER_BUF_SIZE];
int idx, ret; int idx, ret;
struct { struct {
int id; int id;
size_t len; ssize_t len;
} reqs[nr_ahead]; } reqs[nr_ahead];
if (c->len == 0) if (c->len == 0)
@@ -688,10 +688,10 @@ static int chunk_copy_internal_local_to_remote_async(struct chunk *c, int fd,
c->f->dst_path, sftp_get_error(sf->sftp)); c->f->dst_path, sftp_get_error(sf->sftp));
return -1; return -1;
} }
thrown -= reqs[idx].len;
} }
for (idx = 0; remaind > 0;) { for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
/* TODO: should be non-blocking */
ret = sftp_async_write_end(sf, reqs[idx].id, 1); ret = sftp_async_write_end(sf, reqs[idx].id, 1);
if (ret != SSH_OK) { if (ret != SSH_OK) {
pr_err("sftp_async_write_end failed for %s: %d\n", pr_err("sftp_async_write_end failed for %s: %d\n",
@@ -699,13 +699,16 @@ static int chunk_copy_internal_local_to_remote_async(struct chunk *c, int fd,
return -1; return -1;
} }
remaind -= reqs[idx].len;
*counter += reqs[idx].len; *counter += reqs[idx].len;
remaind -= reqs[idx].len;
if (remaind == 0) if (remaind <= 0)
break; break;
reqs[idx].len = min(remaind, sizeof(buf)); if (thrown <= 0)
continue;
reqs[idx].len = min(thrown, sizeof(buf));
read_bytes = read(fd, buf, reqs[idx].len); read_bytes = read(fd, buf, reqs[idx].len);
if (read_bytes < 0) { if (read_bytes < 0) {
pr_err("read from %s failed: %s\n", c->f->src_path, strerrno()); pr_err("read from %s failed: %s\n", c->f->src_path, strerrno());
@@ -718,8 +721,14 @@ static int chunk_copy_internal_local_to_remote_async(struct chunk *c, int fd,
c->f->dst_path, sftp_get_error(sf->sftp)); c->f->dst_path, sftp_get_error(sf->sftp));
return -1; return -1;
} }
thrown -= reqs[idx].len;
}
idx = (idx + 1) % nr_ahead; if (remaind < 0) {
pr_err("invalid remaind bytes %ld. last async_write_end bytes %lu. "
"last read bytes %ld\n",
remaind, reqs[idx].len, read_bytes);
return -1;
} }
return 0; return 0;
@@ -734,7 +743,7 @@ static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_fil
int idx; int idx;
struct { struct {
int id; int id;
size_t len; ssize_t len;
} reqs[nr_ahead]; } reqs[nr_ahead];
/* TODO: sftp_buf_sz has no effect on remote to local copy. we /* TODO: sftp_buf_sz has no effect on remote to local copy. we
@@ -761,19 +770,17 @@ static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_fil
thrown -= reqs[idx].len; thrown -= reqs[idx].len;
} }
for (idx = 0; remaind > 0;) { for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
read_bytes = sftp_async_read(sf, buf, reqs[idx].len, reqs[idx].id); read_bytes = sftp_async_read(sf, buf, reqs[idx].len, reqs[idx].id);
if (read_bytes == SSH_ERROR) { if (read_bytes == SSH_ERROR) {
pr_err("sftp_async_read failed: %d\n", sftp_get_error(sf->sftp)); pr_err("sftp_async_read failed: %d\n", sftp_get_error(sf->sftp));
return -1; return -1;
} }
remaind -= read_bytes; if (thrown > 0) {
reqs[idx].len = min(thrown, sizeof(buf));
if (remaind > 0) {
reqs[idx].len = min(remaind, sizeof(buf));
reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len); reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len);
thrown -= reqs[idx].len;
} }
write_bytes = write(fd, buf, read_bytes); write_bytes = write(fd, buf, read_bytes);
@@ -788,7 +795,14 @@ static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_fil
} }
*counter += write_bytes; *counter += write_bytes;
idx = (idx + 1) % nr_ahead; remaind -= read_bytes;
}
if (remaind < 0) {
pr_err("invalid remaind bytes %ld. last async_read bytes %ld. "
"last write bytes %ld\n",
remaind, read_bytes, write_bytes);
return -1;
} }
return 0; return 0;