diff --git a/src/main/kotlin/de/fabianonline/telegram_backup/DownloadManager.kt b/src/main/kotlin/de/fabianonline/telegram_backup/DownloadManager.kt index 8cda106..3a41b86 100644 --- a/src/main/kotlin/de/fabianonline/telegram_backup/DownloadManager.kt +++ b/src/main/kotlin/de/fabianonline/telegram_backup/DownloadManager.kt @@ -61,36 +61,11 @@ enum class MessageSource(val descr: String) { } class DownloadManager(val client: TelegramClient, val prog: DownloadProgressInterface, val db: Database, val user_manager: UserManager, val settings: Settings, val file_base: String) { - internal var has_seen_flood_wait_message = false - @Throws(RpcErrorException::class, IOException::class) fun downloadMessages(limit: Int?) { - var completed: Boolean - do { - completed = true - try { - _downloadMessages(limit) - } catch (e: RpcErrorException) { - if (e.getCode() == 420) { // FLOOD_WAIT - completed = false - Utils.obeyFloodWaitException(e) - } else { - throw e - } - } catch (e: TimeoutException) { - completed = false - System.out.println("") - System.out.println("Telegram took too long to respond to our request.") - System.out.println("I'm going to wait a minute and then try again.") - try { - TimeUnit.MINUTES.sleep(1) - } catch (e2: InterruptedException) { - } - - System.out.println("") - } - - } while (!completed) + Utils.obeyFloodWait() { + _downloadMessages(limit) + } } @Throws(RpcErrorException::class, IOException::class, TimeoutException::class) @@ -211,31 +186,19 @@ class DownloadManager(val client: TelegramClient, val prog: DownloadProgressInte logger.trace("vector.size(): {}", vector.size) logger.trace("ids.size(): {}", ids.size) - var response: TLAbsMessages - var tries = 0 - while (true) { - logger.trace("Trying getMessages(), tries={}", tries) - if (tries >= 5) { - CommandLineController.show_error("Couldn't getMessages after 5 tries. Quitting.") - } - tries++ - try { + var resp: TLAbsMessages? = null + try { + Utils.obeyFloodWait(max_tries=5) { if (channel == null) { - response = client.messagesGetMessages(vector) + resp = client.messagesGetMessages(vector) } else { - response = client.channelsGetMessages(channel, vector) - } - break - } catch (e: RpcErrorException) { - if (e.getCode() == 420) { // FLOOD_WAIT - Utils.obeyFloodWaitException(e, has_seen_flood_wait_message) - has_seen_flood_wait_message = true - } else { - throw e + resp = client.channelsGetMessages(channel, vector) } } - + } catch (e: MaxTriesExceededException) { + CommandLineController.show_error("Couldn't getMessages after 5 tries. Quitting.") } + val response = resp!! logger.trace("response.getMessages().size(): {}", response.getMessages().size) if (response.getMessages().size != vector.size) { CommandLineController.show_error("Requested ${vector.size} messages, but got ${response.getMessages().size}. That is unexpected. Quitting.") @@ -260,29 +223,9 @@ class DownloadManager(val client: TelegramClient, val prog: DownloadProgressInte @Throws(RpcErrorException::class, IOException::class) fun downloadMedia() { download_client = client.getDownloaderClient() - var completed: Boolean - do { - completed = true - try { - _downloadMedia() - } catch (e: RpcErrorException) { - if (e.getCode() == 420) { // FLOOD_WAIT - completed = false - Utils.obeyFloodWaitException(e) - } else { - throw e - } - } - /*catch (TimeoutException e) { - completed = false; - System.out.println(""); - System.out.println("Telegram took too long to respond to our request."); - System.out.println("I'm going to wait a minute and then try again."); - logger.warn("TimeoutException caught", e); - try { TimeUnit.MINUTES.sleep(1); } catch(InterruptedException e2) {} - System.out.println(""); - }*/ - } while (!completed) + Utils.obeyFloodWait() { + _downloadMedia() + } } @Throws(RpcErrorException::class, IOException::class) @@ -423,31 +366,30 @@ class DownloadManager(val client: TelegramClient, val prog: DownloadProgressInte logger.trace("offset: {} block_size: {} size: {}", offset, size, size) val req = TLRequestUploadGetFile(loc, offset, size) try { - response = download_client!!.executeRpcQuery(req, dcID) as TLFile - } catch (e: RpcErrorException) { - if (e.getCode() == 420) { // FLOOD_WAIT - try_again = true - Utils.obeyFloodWaitException(e) - continue // response is null since we didn't actually receive any data. Skip the rest of this iteration and try again. - } else if (e.getCode() == 400) { - //Somehow this file is broken. No idea why. Let's skip it for now - return false - } else { - throw e + Utils.obeyFloodWait() { + response = download_client!!.executeRpcQuery(req, dcID) as TLFile } + } catch (e: RpcErrorException) { + if (e.getCode() == 400) { + // Somehow this file is broken. No idea why. Let's skip it for now. + return false + } + throw e } + + val resp = response!! + + offset += resp.getBytes().getData().size + logger.trace("response: {} total size: {}", resp.getBytes().getData().size, offset) - offset += response.getBytes().getData().size - logger.trace("response: {} total size: {}", response.getBytes().getData().size, offset) - - fos.write(response.getBytes().getData()) + fos.write(resp.getBytes().getData()) fos.flush() try { TimeUnit.MILLISECONDS.sleep(Config.DELAY_AFTER_GET_FILE) } catch (e: InterruptedException) { } - } while (offset < size && (try_again || response!!.getBytes().getData().size > 0)) + } while (offset < size && (try_again || resp.getBytes().getData().size > 0)) fos.close() if (offset < size) { System.out.println("Requested file $target with $size bytes, but got only $offset bytes.") diff --git a/src/main/kotlin/de/fabianonline/telegram_backup/Utils.kt b/src/main/kotlin/de/fabianonline/telegram_backup/Utils.kt index d1e2a31..2381427 100644 --- a/src/main/kotlin/de/fabianonline/telegram_backup/Utils.kt +++ b/src/main/kotlin/de/fabianonline/telegram_backup/Utils.kt @@ -20,6 +20,7 @@ import com.github.badoualy.telegram.tl.exception.RpcErrorException import java.io.File import java.util.Vector import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException import com.google.gson.* import java.net.URL import org.apache.commons.io.IOUtils @@ -31,6 +32,8 @@ object Utils { @JvmField public val VERSIONS_EQUAL = 0 @JvmField public val VERSION_1_NEWER = 1 @JvmField public val VERSION_2_NEWER = 2 + + var hasSeenFloodWaitMessage = false var anonymize = false @@ -96,30 +99,48 @@ object Utils { } } - - @Throws(RpcErrorException::class) - @JvmOverloads internal fun obeyFloodWaitException(e: RpcErrorException?, silent: Boolean = false) { - if (e == null || e.getCode() != 420) return - - val delay: Long = e.getTagInteger()!!.toLong() - if (!silent) { - System.out.println("") - System.out.println( - "Telegram complained about us (okay, me) making too many requests in too short time by\n" + - "sending us \"" + e.getTag() + "\" as an error. So we now have to wait a bit. Telegram\n" + - "asked us to wait for " + delay + " seconds.\n" + + + fun obeyFloodWait(max_tries: Int = -1, method: () -> Unit) { + var tries = 0 + while (true) { + tries++ + if (max_tries>0 && tries>max_tries) throw MaxTriesExceededException() + logger.trace("This is try ${tries}.") + try { + method.invoke() + // If we reach this, the method has returned successfully -> we are done + return + } catch(e: RpcErrorException) { + // If we got something else than a FLOOD_WAIT error, we just rethrow it + if (e.getCode() != 420) throw e + + val delay = e.getTagInteger()!!.toLong() + + if (!hasSeenFloodWaitMessage) { + println( + "\n" + + "Telegram complained about us (okay, me) making too many requests in too short time by\n" + + "sending us \"${e.getTag()}\" as an error. So we now have to wait a bit. Telegram\n" + + "asked us to wait for ${delay} seconds.\n" + + "\n" + + "So I'm going to do just that for now. If you don't want to wait, you can quit by pressing\n" + + "Ctrl+C. You can restart me at any time and I will just continue to download your\n" + + "messages and media. But be advised that just restarting me is not going to change\n" + + "the fact that Telegram won't talk to me until then." + + "\n") + } + hasSeenFloodWaitMessage = true + + try { TimeUnit.SECONDS.sleep(delay + 1) } catch (e: InterruptedException) { } + } catch (e: TimeoutException) { + println( "\n" + - "So I'm going to do just that for now. If you don't want to wait, you can quit by pressing\n" + - "Ctrl+C. You can restart me at any time and I will just continue to download your\n" + - "messages and media. But be advised that just restarting me is not going to change\n" + - "the fact that Telegram won't talk to me until then.") - System.out.println("") + "Telegram took too long to respond to our request.\n" + + "I'm going to wait a minute and then try again." + + "\n") + try { TimeUnit.MINUTES.sleep(1) } catch (e: InterruptedException) { } + } } - try { - TimeUnit.SECONDS.sleep(delay + 1) - } catch (e2: InterruptedException) { - } - } @JvmStatic @@ -200,3 +221,5 @@ fun String.anonymize(): String { fun Any.toJson(): String = Gson().toJson(this) fun Any.toPrettyJson(): String = GsonBuilder().setPrettyPrinting().create().toJson(this) + +class MaxTriesExceededException(): RuntimeException("Max tries exceeded") {}