|
@@ -115,6 +115,8 @@ init(void) {
|
|
|
_last_attempt_stalled = true;
|
|
_last_attempt_stalled = true;
|
|
|
// We need to flush after every write in case we're interrupted
|
|
// We need to flush after every write in case we're interrupted
|
|
|
_dest_stream.setf(ios::unitbuf, 0);
|
|
_dest_stream.setf(ios::unitbuf, 0);
|
|
|
|
|
+ _last_attempt_stalled = false;
|
|
|
|
|
+ _current_attempt_stalled = false;
|
|
|
|
|
|
|
|
#if defined(WIN32)
|
|
#if defined(WIN32)
|
|
|
WSAData mydata;
|
|
WSAData mydata;
|
|
@@ -376,6 +378,13 @@ process_request() {
|
|
|
_token_board->_done.insert(tok);
|
|
_token_board->_done.insert(tok);
|
|
|
return_event->add_parameter(EventParameter(DS_success));
|
|
return_event->add_parameter(EventParameter(DS_success));
|
|
|
|
|
|
|
|
|
|
+ // Throw a "done" event now.
|
|
|
|
|
+ if (!tok->_event_name.empty()) {
|
|
|
|
|
+ PT_Event done = new Event(tok->_event_name);
|
|
|
|
|
+ done->add_parameter(EventParameter((int)tok->_id));
|
|
|
|
|
+ throw_event(done);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
if (downloader_cat.is_debug()) {
|
|
if (downloader_cat.is_debug()) {
|
|
|
downloader_cat.debug()
|
|
downloader_cat.debug()
|
|
|
<< "Downloader::process_request() - downloading complete for "
|
|
<< "Downloader::process_request() - downloading complete for "
|
|
@@ -440,9 +449,8 @@ safe_send(int socket, const char *data, int length, long timeout) {
|
|
|
////////////////////////////////////////////////////////////////////
|
|
////////////////////////////////////////////////////////////////////
|
|
|
int Downloader::
|
|
int Downloader::
|
|
|
safe_receive(int socket, DownloadStatus &status, int length,
|
|
safe_receive(int socket, DownloadStatus &status, int length,
|
|
|
- long timeout, int &bytes, bool &stalled) {
|
|
|
|
|
|
|
+ long timeout, int &bytes) {
|
|
|
bytes = 0;
|
|
bytes = 0;
|
|
|
- stalled = true;
|
|
|
|
|
if (length == 0) {
|
|
if (length == 0) {
|
|
|
downloader_cat.error()
|
|
downloader_cat.error()
|
|
|
<< "Downloader::safe_receive() - requested 0 length receive!" << endl;
|
|
<< "Downloader::safe_receive() - requested 0 length receive!" << endl;
|
|
@@ -475,8 +483,12 @@ safe_receive(int socket, DownloadStatus &status, int length,
|
|
|
bytes += ret;
|
|
bytes += ret;
|
|
|
status._next_in += ret;
|
|
status._next_in += ret;
|
|
|
status._bytes_in_buffer += ret;
|
|
status._bytes_in_buffer += ret;
|
|
|
- if (bytes == length)
|
|
|
|
|
- stalled = false;
|
|
|
|
|
|
|
+ if (bytes < length) {
|
|
|
|
|
+ if (downloader_cat.is_debug())
|
|
|
|
|
+ downloader_cat.debug()
|
|
|
|
|
+ << "Downloader::safe_receive() - Download stalled" << endl;
|
|
|
|
|
+ _current_attempt_stalled = true;
|
|
|
|
|
+ }
|
|
|
} else if (ret == 0) {
|
|
} else if (ret == 0) {
|
|
|
if (downloader_cat.is_debug())
|
|
if (downloader_cat.is_debug())
|
|
|
downloader_cat.debug()
|
|
downloader_cat.debug()
|
|
@@ -498,8 +510,7 @@ safe_receive(int socket, DownloadStatus &status, int length,
|
|
|
// Description:
|
|
// Description:
|
|
|
////////////////////////////////////////////////////////////////////
|
|
////////////////////////////////////////////////////////////////////
|
|
|
int Downloader::
|
|
int Downloader::
|
|
|
-attempt_read(int length, DownloadStatus &status, int &bytes_read,
|
|
|
|
|
- bool &stalled) {
|
|
|
|
|
|
|
+attempt_read(int length, DownloadStatus &status, int &bytes_read) {
|
|
|
|
|
|
|
|
bytes_read = 0;
|
|
bytes_read = 0;
|
|
|
for (int i = 0; i < downloader_timeout_retries; i++) {
|
|
for (int i = 0; i < downloader_timeout_retries; i++) {
|
|
@@ -518,7 +529,7 @@ attempt_read(int length, DownloadStatus &status, int &bytes_read,
|
|
|
// Make the request for length bytes
|
|
// Make the request for length bytes
|
|
|
int bytes;
|
|
int bytes;
|
|
|
int ans = safe_receive(_socket, status, length,
|
|
int ans = safe_receive(_socket, status, length,
|
|
|
- (long)downloader_timeout, bytes, stalled);
|
|
|
|
|
|
|
+ (long)downloader_timeout, bytes);
|
|
|
bytes_read += bytes;
|
|
bytes_read += bytes;
|
|
|
|
|
|
|
|
switch (ans) {
|
|
switch (ans) {
|
|
@@ -627,10 +638,11 @@ download(const string &file_name, Filename file_dest,
|
|
|
DownloadStatus status(_buffer->_buffer, event_name, first_byte, last_byte,
|
|
DownloadStatus status(_buffer->_buffer, event_name, first_byte, last_byte,
|
|
|
total_bytes, partial_content, id);
|
|
total_bytes, partial_content, id);
|
|
|
bool got_any_data = false;
|
|
bool got_any_data = false;
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
// Loop at the requested frequency until the download completes
|
|
// Loop at the requested frequency until the download completes
|
|
|
for (;;) {
|
|
for (;;) {
|
|
|
bool resize_buffer = false;
|
|
bool resize_buffer = false;
|
|
|
|
|
+
|
|
|
// Ensure that these don't change while we're computing read_size
|
|
// Ensure that these don't change while we're computing read_size
|
|
|
#ifdef HAVE_IPC
|
|
#ifdef HAVE_IPC
|
|
|
_buffer_lock.lock();
|
|
_buffer_lock.lock();
|
|
@@ -655,6 +667,10 @@ download(const string &file_name, Filename file_dest,
|
|
|
if (resize_buffer == true) {
|
|
if (resize_buffer == true) {
|
|
|
// Flush the write buffer before resizing it
|
|
// Flush the write buffer before resizing it
|
|
|
if (status._bytes_in_buffer > 0) {
|
|
if (status._bytes_in_buffer > 0) {
|
|
|
|
|
+ if (downloader_cat.is_debug())
|
|
|
|
|
+ downloader_cat.debug()
|
|
|
|
|
+ << "Downloader::download() - Flushing buffer" << endl;
|
|
|
|
|
+
|
|
|
if (write_to_disk(status) == false) {
|
|
if (write_to_disk(status) == false) {
|
|
|
downloader_cat.error()
|
|
downloader_cat.error()
|
|
|
<< "Downloader::download() - failed to flush buffer during "
|
|
<< "Downloader::download() - failed to flush buffer during "
|
|
@@ -671,7 +687,14 @@ download(const string &file_name, Filename file_dest,
|
|
|
<< "Downloader::download() - resizing disk buffer to: "
|
|
<< "Downloader::download() - resizing disk buffer to: "
|
|
|
<< _disk_buffer_size << endl;
|
|
<< _disk_buffer_size << endl;
|
|
|
_buffer.clear();
|
|
_buffer.clear();
|
|
|
|
|
+ downloader_cat.debug()
|
|
|
|
|
+ << "Downloader::download() - buffer cleared" << endl;
|
|
|
_buffer = new Buffer(_disk_buffer_size);
|
|
_buffer = new Buffer(_disk_buffer_size);
|
|
|
|
|
+ // Update the status with the new buffer
|
|
|
|
|
+ status._buffer = _buffer->_buffer;
|
|
|
|
|
+ status.reset();
|
|
|
|
|
+ downloader_cat.debug()
|
|
|
|
|
+ << "Downloader::download() - new buffer created" << endl;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
#ifdef HAVE_IPC
|
|
#ifdef HAVE_IPC
|
|
@@ -680,9 +703,16 @@ download(const string &file_name, Filename file_dest,
|
|
|
|
|
|
|
|
// Attempt to read
|
|
// Attempt to read
|
|
|
int bytes_read;
|
|
int bytes_read;
|
|
|
- bool stalled;
|
|
|
|
|
- int ret = attempt_read(_read_size, status, bytes_read, stalled);
|
|
|
|
|
- _last_attempt_stalled = stalled;
|
|
|
|
|
|
|
+
|
|
|
|
|
+ int ret = attempt_read(_read_size, status, bytes_read);
|
|
|
|
|
+ if (downloader_cat.is_debug())
|
|
|
|
|
+ downloader_cat.debug()
|
|
|
|
|
+ << "Downloader::download() - stalled status: " << _current_attempt_stalled
|
|
|
|
|
+ << endl;
|
|
|
|
|
+
|
|
|
|
|
+ _last_attempt_stalled = _current_attempt_stalled;
|
|
|
|
|
+ _current_attempt_stalled = false;
|
|
|
|
|
+
|
|
|
if (bytes_read > 0)
|
|
if (bytes_read > 0)
|
|
|
got_any_data = true;
|
|
got_any_data = true;
|
|
|
|
|
|