Reworked the obeyFloodLimit-Stuff to now use lambdas.

This commit is contained in:
Fabian Schlenz 2018-04-10 06:26:46 +02:00
parent be1cf8ba91
commit 9affb47130
2 changed files with 74 additions and 109 deletions

View File

@ -61,36 +61,11 @@ enum class MessageSource(val descr: String) {
}
class DownloadManager(val client: TelegramClient, val prog: DownloadProgressInterface, val db: Database, val user_manager: UserManager, val settings: Settings, val file_base: String) {
internal var has_seen_flood_wait_message = false
@Throws(RpcErrorException::class, IOException::class)
fun downloadMessages(limit: Int?) {
var completed: Boolean
do {
completed = true
try {
_downloadMessages(limit)
} catch (e: RpcErrorException) {
if (e.getCode() == 420) { // FLOOD_WAIT
completed = false
Utils.obeyFloodWaitException(e)
} else {
throw e
}
} catch (e: TimeoutException) {
completed = false
System.out.println("")
System.out.println("Telegram took too long to respond to our request.")
System.out.println("I'm going to wait a minute and then try again.")
try {
TimeUnit.MINUTES.sleep(1)
} catch (e2: InterruptedException) {
}
System.out.println("")
}
} while (!completed)
Utils.obeyFloodWait() {
_downloadMessages(limit)
}
}
@Throws(RpcErrorException::class, IOException::class, TimeoutException::class)
@ -211,31 +186,19 @@ class DownloadManager(val client: TelegramClient, val prog: DownloadProgressInte
logger.trace("vector.size(): {}", vector.size)
logger.trace("ids.size(): {}", ids.size)
var response: TLAbsMessages
var tries = 0
while (true) {
logger.trace("Trying getMessages(), tries={}", tries)
if (tries >= 5) {
CommandLineController.show_error("Couldn't getMessages after 5 tries. Quitting.")
}
tries++
try {
var resp: TLAbsMessages? = null
try {
Utils.obeyFloodWait(max_tries=5) {
if (channel == null) {
response = client.messagesGetMessages(vector)
resp = client.messagesGetMessages(vector)
} else {
response = client.channelsGetMessages(channel, vector)
}
break
} catch (e: RpcErrorException) {
if (e.getCode() == 420) { // FLOOD_WAIT
Utils.obeyFloodWaitException(e, has_seen_flood_wait_message)
has_seen_flood_wait_message = true
} else {
throw e
resp = client.channelsGetMessages(channel, vector)
}
}
} catch (e: MaxTriesExceededException) {
CommandLineController.show_error("Couldn't getMessages after 5 tries. Quitting.")
}
val response = resp!!
logger.trace("response.getMessages().size(): {}", response.getMessages().size)
if (response.getMessages().size != vector.size) {
CommandLineController.show_error("Requested ${vector.size} messages, but got ${response.getMessages().size}. That is unexpected. Quitting.")
@ -260,29 +223,9 @@ class DownloadManager(val client: TelegramClient, val prog: DownloadProgressInte
@Throws(RpcErrorException::class, IOException::class)
fun downloadMedia() {
download_client = client.getDownloaderClient()
var completed: Boolean
do {
completed = true
try {
_downloadMedia()
} catch (e: RpcErrorException) {
if (e.getCode() == 420) { // FLOOD_WAIT
completed = false
Utils.obeyFloodWaitException(e)
} else {
throw e
}
}
/*catch (TimeoutException e) {
completed = false;
System.out.println("");
System.out.println("Telegram took too long to respond to our request.");
System.out.println("I'm going to wait a minute and then try again.");
logger.warn("TimeoutException caught", e);
try { TimeUnit.MINUTES.sleep(1); } catch(InterruptedException e2) {}
System.out.println("");
}*/
} while (!completed)
Utils.obeyFloodWait() {
_downloadMedia()
}
}
@Throws(RpcErrorException::class, IOException::class)
@ -423,31 +366,30 @@ class DownloadManager(val client: TelegramClient, val prog: DownloadProgressInte
logger.trace("offset: {} block_size: {} size: {}", offset, size, size)
val req = TLRequestUploadGetFile(loc, offset, size)
try {
response = download_client!!.executeRpcQuery(req, dcID) as TLFile
} catch (e: RpcErrorException) {
if (e.getCode() == 420) { // FLOOD_WAIT
try_again = true
Utils.obeyFloodWaitException(e)
continue // response is null since we didn't actually receive any data. Skip the rest of this iteration and try again.
} else if (e.getCode() == 400) {
//Somehow this file is broken. No idea why. Let's skip it for now
return false
} else {
throw e
Utils.obeyFloodWait() {
response = download_client!!.executeRpcQuery(req, dcID) as TLFile
}
} catch (e: RpcErrorException) {
if (e.getCode() == 400) {
// Somehow this file is broken. No idea why. Let's skip it for now.
return false
}
throw e
}
val resp = response!!
offset += resp.getBytes().getData().size
logger.trace("response: {} total size: {}", resp.getBytes().getData().size, offset)
offset += response.getBytes().getData().size
logger.trace("response: {} total size: {}", response.getBytes().getData().size, offset)
fos.write(response.getBytes().getData())
fos.write(resp.getBytes().getData())
fos.flush()
try {
TimeUnit.MILLISECONDS.sleep(Config.DELAY_AFTER_GET_FILE)
} catch (e: InterruptedException) {
}
} while (offset < size && (try_again || response!!.getBytes().getData().size > 0))
} while (offset < size && (try_again || resp.getBytes().getData().size > 0))
fos.close()
if (offset < size) {
System.out.println("Requested file $target with $size bytes, but got only $offset bytes.")

View File

@ -20,6 +20,7 @@ import com.github.badoualy.telegram.tl.exception.RpcErrorException
import java.io.File
import java.util.Vector
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import com.google.gson.*
import java.net.URL
import org.apache.commons.io.IOUtils
@ -31,6 +32,8 @@ object Utils {
@JvmField public val VERSIONS_EQUAL = 0
@JvmField public val VERSION_1_NEWER = 1
@JvmField public val VERSION_2_NEWER = 2
var hasSeenFloodWaitMessage = false
var anonymize = false
@ -96,30 +99,48 @@ object Utils {
}
}
@Throws(RpcErrorException::class)
@JvmOverloads internal fun obeyFloodWaitException(e: RpcErrorException?, silent: Boolean = false) {
if (e == null || e.getCode() != 420) return
val delay: Long = e.getTagInteger()!!.toLong()
if (!silent) {
System.out.println("")
System.out.println(
"Telegram complained about us (okay, me) making too many requests in too short time by\n" +
"sending us \"" + e.getTag() + "\" as an error. So we now have to wait a bit. Telegram\n" +
"asked us to wait for " + delay + " seconds.\n" +
fun obeyFloodWait(max_tries: Int = -1, method: () -> Unit) {
var tries = 0
while (true) {
tries++
if (max_tries>0 && tries>max_tries) throw MaxTriesExceededException()
logger.trace("This is try ${tries}.")
try {
method.invoke()
// If we reach this, the method has returned successfully -> we are done
return
} catch(e: RpcErrorException) {
// If we got something else than a FLOOD_WAIT error, we just rethrow it
if (e.getCode() != 420) throw e
val delay = e.getTagInteger()!!.toLong()
if (!hasSeenFloodWaitMessage) {
println(
"\n" +
"Telegram complained about us (okay, me) making too many requests in too short time by\n" +
"sending us \"${e.getTag()}\" as an error. So we now have to wait a bit. Telegram\n" +
"asked us to wait for ${delay} seconds.\n" +
"\n" +
"So I'm going to do just that for now. If you don't want to wait, you can quit by pressing\n" +
"Ctrl+C. You can restart me at any time and I will just continue to download your\n" +
"messages and media. But be advised that just restarting me is not going to change\n" +
"the fact that Telegram won't talk to me until then." +
"\n")
}
hasSeenFloodWaitMessage = true
try { TimeUnit.SECONDS.sleep(delay + 1) } catch (e: InterruptedException) { }
} catch (e: TimeoutException) {
println(
"\n" +
"So I'm going to do just that for now. If you don't want to wait, you can quit by pressing\n" +
"Ctrl+C. You can restart me at any time and I will just continue to download your\n" +
"messages and media. But be advised that just restarting me is not going to change\n" +
"the fact that Telegram won't talk to me until then.")
System.out.println("")
"Telegram took too long to respond to our request.\n" +
"I'm going to wait a minute and then try again." +
"\n")
try { TimeUnit.MINUTES.sleep(1) } catch (e: InterruptedException) { }
}
}
try {
TimeUnit.SECONDS.sleep(delay + 1)
} catch (e2: InterruptedException) {
}
}
@JvmStatic
@ -200,3 +221,5 @@ fun String.anonymize(): String {
fun Any.toJson(): String = Gson().toJson(this)
fun Any.toPrettyJson(): String = GsonBuilder().setPrettyPrinting().create().toJson(this)
class MaxTriesExceededException(): RuntimeException("Max tries exceeded") {}