1
0
mirror of https://github.com/fabianonline/telegram_backup.git synced 2025-07-17 03:16:21 +00:00

Merge branch 'feature-rewrite'

This commit is contained in:
2018-04-11 06:11:07 +02:00
28 changed files with 1177 additions and 1657 deletions

View File

@@ -18,7 +18,6 @@ package de.fabianonline.telegram_backup
import de.fabianonline.telegram_backup.UserManager
import de.fabianonline.telegram_backup.Database
import de.fabianonline.telegram_backup.StickerConverter
import de.fabianonline.telegram_backup.DownloadProgressInterface
import de.fabianonline.telegram_backup.mediafilemanager.FileManagerFactory
import de.fabianonline.telegram_backup.mediafilemanager.AbstractMediaFileManager
@@ -61,55 +60,21 @@ enum class MessageSource(val descr: String) {
SUPERGROUP("supergroup")
}
class DownloadManager(internal var client: TelegramClient?, p: DownloadProgressInterface) {
internal var user: UserManager? = null
internal var db: Database? = null
internal var prog: DownloadProgressInterface? = null
internal var has_seen_flood_wait_message = false
init {
this.user = UserManager.getInstance()
this.prog = p
this.db = Database.getInstance()
}
class DownloadManager(val client: TelegramClient, val prog: DownloadProgressInterface, val db: Database, val user_manager: UserManager, val settings: Settings, val file_base: String) {
@Throws(RpcErrorException::class, IOException::class)
fun downloadMessages(limit: Int?) {
var completed: Boolean
do {
completed = true
try {
_downloadMessages(limit)
} catch (e: RpcErrorException) {
if (e.getCode() == 420) { // FLOOD_WAIT
completed = false
Utils.obeyFloodWaitException(e)
} else {
throw e
}
} catch (e: TimeoutException) {
completed = false
System.out.println("")
System.out.println("Telegram took too long to respond to our request.")
System.out.println("I'm going to wait a minute and then try again.")
try {
TimeUnit.MINUTES.sleep(1)
} catch (e2: InterruptedException) {
}
System.out.println("")
}
} while (!completed)
}
@Throws(RpcErrorException::class, IOException::class, TimeoutException::class)
fun _downloadMessages(limit: Int?) {
logger.info("This is _downloadMessages with limit {}", limit)
logger.info("This is downloadMessages with limit {}", limit)
logger.info("Downloading the last dialogs")
System.out.println("Downloading most recent dialogs... ")
var max_message_id = 0
val chats = getChats()
var result: ChatList? = null
Utils.obeyFloodWait() {
result = getChats()
}
val chats = result!!
logger.debug("Got {} dialogs, {} supergoups, {} channels", chats.dialogs.size, chats.supergroups.size, chats.channels.size)
for (d in chats.dialogs) {
@@ -119,7 +84,7 @@ class DownloadManager(internal var client: TelegramClient?, p: DownloadProgressI
}
}
System.out.println("Top message ID is " + max_message_id)
var max_database_id = db!!.getTopMessageID()
var max_database_id = db.getTopMessageID()
System.out.println("Top message ID in database is " + max_database_id)
if (limit != null) {
System.out.println("Limit is set to " + limit)
@@ -136,7 +101,7 @@ class DownloadManager(internal var client: TelegramClient?, p: DownloadProgressI
if (max_database_id == max_message_id) {
System.out.println("No new messages to download.")
} else if (max_database_id > max_message_id) {
throw RuntimeException("max_database_id is bigger then max_message_id. This shouldn't happen. But the telegram api nonetheless does that sometimes. Just ignore this error, wait a few seconds and then try again.")
throw RuntimeException("max_database_id is bigger than max_message_id. This shouldn't happen. But the telegram api nonetheless does that sometimes. Just ignore this error, wait a few seconds and then try again.")
} else {
val start_id = max_database_id + 1
val end_id = max_message_id
@@ -147,50 +112,46 @@ class DownloadManager(internal var client: TelegramClient?, p: DownloadProgressI
logger.info("Searching for missing messages in the db")
System.out.println("Checking message database for completeness...")
val db_count = db!!.getMessageCount()
val db_max = db!!.getTopMessageID()
val db_count = db.getMessageCount()
val db_max = db.getTopMessageID()
logger.debug("db_count: {}", db_count)
logger.debug("db_max: {}", db_max)
/*if (db_count != db_max) {
if (limit != null) {
System.out.println("You are missing messages in your database. But since you're using '--limit-messages', I won't download these now.");
} else {
LinkedList<Integer> all_missing_ids = db.getMissingIDs();
LinkedList<Integer> downloadable_missing_ids = new LinkedList<Integer>();
for (Integer id : all_missing_ids) {
if (id > max_message_id - 1000000) downloadable_missing_ids.add(id);
/*if (db_count != db_max) {
if (limit != null) {
System.out.println("You are missing messages in your database. But since you're using '--limit-messages', I won't download these now.");
} else {
LinkedList<Integer> all_missing_ids = db.getMissingIDs();
LinkedList<Integer> downloadable_missing_ids = new LinkedList<Integer>();
for (Integer id : all_missing_ids) {
if (id > max_message_id - 1000000) downloadable_missing_ids.add(id);
}
count_missing = all_missing_ids.size();
System.out.println("" + all_missing_ids.size() + " messages are missing in your Database.");
System.out.println("I can (and will) download " + downloadable_missing_ids.size() + " of them.");
downloadMessages(downloadable_missing_ids, null);
}
count_missing = all_missing_ids.size();
System.out.println("" + all_missing_ids.size() + " messages are missing in your Database.");
System.out.println("I can (and will) download " + downloadable_missing_ids.size() + " of them.");
downloadMessages(downloadable_missing_ids, null);
logger.info("Logging this run");
db.logRun(Math.min(max_database_id + 1, max_message_id), max_message_id, count_missing);
}
*/
logger.info("Logging this run");
db.logRun(Math.min(max_database_id + 1, max_message_id), max_message_id, count_missing);
if (settings.download_channels) {
println("Checking channels...")
for (channel in chats.channels) { if (channel.download) downloadMessagesFromChannel(channel) }
}
*/
if (IniSettings.download_channels || IniSettings.download_supergroups) {
// TODO Add chat title (and other stuff?) to the database
if (IniSettings.download_channels) {
println("Checking channels...")
for (channel in chats.channels) { if (channel.download) downloadMessagesFromChannel(channel) }
}
if (IniSettings.download_supergroups) {
println("Checking supergroups...")
for (supergroup in chats.supergroups) { if (supergroup.download) downloadMessagesFromChannel(supergroup) }
}
if (settings.download_supergroups) {
println("Checking supergroups...")
for (supergroup in chats.supergroups) { if (supergroup.download) downloadMessagesFromChannel(supergroup) }
}
}
private fun downloadMessagesFromChannel(channel: Channel) {
val obj = channel.obj
val max_known_id = db!!.getTopMessageIDForChannel(channel.id)
val max_known_id = db.getTopMessageIDForChannel(channel.id)
if (obj.getTopMessage() > max_known_id) {
val ids = makeIdList(max_known_id + 1, obj.getTopMessage())
var channel_name = channel.title
@@ -210,7 +171,7 @@ class DownloadManager(internal var client: TelegramClient?, p: DownloadProgressI
} else {
"${source_type.descr} $source_name"
}
prog!!.onMessageDownloadStart(ids.size, source_string)
prog.onMessageDownloadStart(ids.size, source_string)
logger.debug("Entering download loop")
while (ids.size > 0) {
@@ -225,105 +186,62 @@ class DownloadManager(internal var client: TelegramClient?, p: DownloadProgressI
logger.trace("vector.size(): {}", vector.size)
logger.trace("ids.size(): {}", ids.size)
var response: TLAbsMessages
var tries = 0
while (true) {
logger.trace("Trying getMessages(), tries={}", tries)
if (tries >= 5) {
CommandLineController.show_error("Couldn't getMessages after 5 tries. Quitting.")
}
tries++
try {
var resp: TLAbsMessages? = null
try {
Utils.obeyFloodWait(max_tries=5) {
if (channel == null) {
response = client!!.messagesGetMessages(vector)
resp = client.messagesGetMessages(vector)
} else {
response = client!!.channelsGetMessages(channel, vector)
}
break
} catch (e: RpcErrorException) {
if (e.getCode() == 420) { // FLOOD_WAIT
Utils.obeyFloodWaitException(e, has_seen_flood_wait_message)
has_seen_flood_wait_message = true
} else {
throw e
resp = client.channelsGetMessages(channel, vector)
}
}
} catch (e: MaxTriesExceededException) {
CommandLineController.show_error("Couldn't getMessages after 5 tries. Quitting.")
}
val response = resp!!
logger.trace("response.getMessages().size(): {}", response.getMessages().size)
if (response.getMessages().size != vector.size) {
CommandLineController.show_error("Requested ${vector.size} messages, but got ${response.getMessages().size}. That is unexpected. Quitting.")
}
prog!!.onMessageDownloaded(response.getMessages().size)
db!!.saveMessages(response.getMessages(), Kotlogram.API_LAYER, source_type=source_type)
db!!.saveChats(response.getChats())
db!!.saveUsers(response.getUsers())
prog.onMessageDownloaded(response.getMessages().size)
db.saveMessages(response.getMessages(), Kotlogram.API_LAYER, source_type=source_type, settings=settings)
db.saveChats(response.getChats())
db.saveUsers(response.getUsers())
logger.trace("Sleeping")
try {
TimeUnit.MILLISECONDS.sleep(Config.DELAY_AFTER_GET_MESSAGES)
} catch (e: InterruptedException) {
}
try { TimeUnit.MILLISECONDS.sleep(Config.DELAY_AFTER_GET_MESSAGES) } catch (e: InterruptedException) { }
}
logger.debug("Finished.")
prog!!.onMessageDownloadFinished()
prog.onMessageDownloadFinished()
}
@Throws(RpcErrorException::class, IOException::class)
fun downloadMedia() {
download_client = client!!.getDownloaderClient()
var completed: Boolean
do {
completed = true
try {
_downloadMedia()
} catch (e: RpcErrorException) {
if (e.getCode() == 420) { // FLOOD_WAIT
completed = false
Utils.obeyFloodWaitException(e)
} else {
throw e
}
}
/*catch (TimeoutException e) {
completed = false;
System.out.println("");
System.out.println("Telegram took too long to respond to our request.");
System.out.println("I'm going to wait a minute and then try again.");
logger.warn("TimeoutException caught", e);
try { TimeUnit.MINUTES.sleep(1); } catch(InterruptedException e2) {}
System.out.println("");
}*/
} while (!completed)
}
@Throws(RpcErrorException::class, IOException::class)
private fun _downloadMedia() {
download_client = client.getDownloaderClient()
logger.info("This is _downloadMedia")
logger.info("Checking if there are messages in the DB with a too old API layer")
val ids = db!!.getIdsFromQuery("SELECT id FROM messages WHERE has_media=1 AND api_layer<" + Kotlogram.API_LAYER)
val ids = db.getIdsFromQuery("SELECT id FROM messages WHERE has_media=1 AND api_layer<" + Kotlogram.API_LAYER)
if (ids.size > 0) {
System.out.println("You have ${ids.size} messages in your db that need an update. Doing that now.")
logger.debug("Found {} messages", ids.size)
downloadMessages(ids, null, source_type=MessageSource.NORMAL)
}
val message_count = db.getMessagesWithMediaCount()
prog.onMediaDownloadStart(message_count)
var offset = 0
val limit = 1000
val message_count = this.db!!.getMessagesWithMediaCount()
prog!!.onMediaDownloadStart(message_count)
while (true) {
logger.debug("Querying messages with media, limit={}, offset={}", limit, offset)
val messages = this.db!!.getMessagesWithMedia(limit=limit, offset=offset)
val messages = db.getMessagesWithMedia(limit, offset)
if (messages.size == 0) break
offset += limit
logger.debug("Database returned {} messages with media", messages.size)
prog.onMediaDownloadStart(messages.size)
for (msg in messages) {
if (msg == null) continue
val m = FileManagerFactory.getFileManager(msg, user!!, client!!)
val m = FileManagerFactory.getFileManager(msg, user_manager, file_base, settings=settings)
logger.trace("message {}, {}, {}, {}, {}",
msg.getId(),
msg.getMedia().javaClass.getSimpleName().replace("TLMessageMedia", ""),
@@ -331,27 +249,27 @@ class DownloadManager(internal var client: TelegramClient?, p: DownloadProgressI
if (m.isEmpty) "empty" else "non-empty",
if (m.downloaded) "downloaded" else "not downloaded")
if (m.isEmpty) {
prog!!.onMediaDownloadedEmpty()
prog.onMediaDownloadedEmpty()
} else if (m.downloaded) {
prog!!.onMediaAlreadyPresent(m)
} else if (IniSettings.max_file_age!=null && (System.currentTimeMillis() / 1000) - msg.date > IniSettings.max_file_age * 24 * 60 * 60) {
prog!!.onMediaTooOld()
prog.onMediaAlreadyPresent(m)
} else if (settings.max_file_age>0 && (System.currentTimeMillis() / 1000) - msg.date > settings.max_file_age * 24 * 60 * 60) {
prog.onMediaTooOld()
} else {
try {
val result = m.download()
if (result) {
prog!!.onMediaDownloaded(m)
prog.onMediaDownloaded(m)
} else {
prog!!.onMediaSkipped()
prog.onMediaSkipped()
}
} catch (e: TimeoutException) {
// do nothing - skip this file
prog!!.onMediaSkipped()
prog.onMediaSkipped()
}
}
}
}
prog!!.onMediaDownloadFinished()
prog.onMediaDownloadFinished()
}
private fun makeIdList(start: Int, end: Int): MutableList<Int> {
@@ -363,7 +281,7 @@ class DownloadManager(internal var client: TelegramClient?, p: DownloadProgressI
fun getChats(): ChatList {
val cl = ChatList()
logger.trace("Calling messagesGetDialogs")
val dialogs = client!!.messagesGetDialogs(0, 0, TLInputPeerEmpty(), 100)
val dialogs = client.messagesGetDialogs(0, 0, TLInputPeerEmpty(), 100)
logger.trace("Got {} dialogs back", dialogs.getDialogs().size)
// Add dialogs
@@ -376,10 +294,10 @@ class DownloadManager(internal var client: TelegramClient?, p: DownloadProgressI
if (tl_peer_channel == null) continue
var download = true
if (IniSettings.whitelist_channels != null) {
download = IniSettings.whitelist_channels!!.contains(tl_channel.getId().toString())
} else if (IniSettings.blacklist_channels != null) {
download = !IniSettings.blacklist_channels!!.contains(tl_channel.getId().toString())
if (settings.whitelist_channels.isNotEmpty()) {
download = settings.whitelist_channels.contains(tl_channel.getId().toString())
} else if (settings.blacklist_channels.isNotEmpty()) {
download = !settings.blacklist_channels.contains(tl_channel.getId().toString())
}
val channel = Channel(id=tl_channel.getId(), access_hash=tl_channel.getAccessHash(), title=tl_channel.getTitle(), obj=tl_peer_channel, download=download)
if (tl_channel.getMegagroup()) {
@@ -440,38 +358,32 @@ class DownloadManager(internal var client: TelegramClient?, p: DownloadProgressI
}
logger.trace("offset before the loop is {}", offset)
fos = FileOutputStream(temp_filename, true)
var response: TLFile? = null
var try_again: Boolean
do {
try_again = false
logger.trace("offset: {} block_size: {} size: {}", offset, size, size)
val req = TLRequestUploadGetFile(loc, offset, size)
var resp: TLFile? = null
try {
response = download_client!!.executeRpcQuery(req, dcID) as TLFile
} catch (e: RpcErrorException) {
if (e.getCode() == 420) { // FLOOD_WAIT
try_again = true
Utils.obeyFloodWaitException(e)
continue // response is null since we didn't actually receive any data. Skip the rest of this iteration and try again.
} else if (e.getCode() == 400) {
//Somehow this file is broken. No idea why. Let's skip it for now
return false
} else {
throw e
Utils.obeyFloodWait() {
resp = download_client!!.executeRpcQuery(req, dcID) as TLFile
}
} catch (e: RpcErrorException) {
if (e.getCode() == 400) {
// Somehow this file is broken. No idea why. Let's skip it for now.
return false
}
throw e
}
val response = resp!!
offset += response.getBytes().getData().size
logger.trace("response: {} total size: {}", response.getBytes().getData().size, offset)
fos.write(response.getBytes().getData())
fos.flush()
try {
TimeUnit.MILLISECONDS.sleep(Config.DELAY_AFTER_GET_FILE)
} catch (e: InterruptedException) {
}
try { TimeUnit.MILLISECONDS.sleep(Config.DELAY_AFTER_GET_FILE) } catch (e: InterruptedException) { }
} while (offset < size && (try_again || response!!.getBytes().getData().size > 0))
} while (offset < size && response.getBytes().getData().size > 0)
fos.close()
if (offset < size) {
System.out.println("Requested file $target with $size bytes, but got only $offset bytes.")