Read and copy file with Coroutines

I created the following application to illustrate some doubts. My Example on the Github

In this example, I copy a file to another package.

My doubts are as follows:

  1. Performing the tasks in parallel, is it possible to return the values that were completed before the cancellation?

  2. Why in contentResolver.openInputStream (uri) the message “Inappropriate blocking method call” appears, while I am working with IO context?

  3. While I am reading the file entry to copy to output, I always check the job status so that when this task is canceled, it is stopped immediately, the output file that was created is deleted and returns the cancellation exception, is that correct?

  4. Can I delimit the amount of tasks that are performed?

    fun onClickStartTask(view: View) {
        var listNewPath = emptyList<String>()
        CoroutineScope(Main + job).launch {
            try {
                //shows something in the UI - progressBar
                withContext(IO) {
                    listNewPath = listUri.map { uri ->
                        async {
                            //path to file temp
                            val pathFileTemp =
                                "${getExternalFilesDir("Temp").toString()}/${uri.lastPathSegment}"
                            val file = File(pathFileTemp)
                            val inputStream = contentResolver.openInputStream(uri)
                            inputStream?.use { input ->
                                FileOutputStream(file).use { output ->
                                    val buffer = ByteArray(1024)
                                    var read: Int = input.read(buffer)
                                    while (read != -1) {
                                        if (isActive) {
                                            output.write(buffer, 0, read)
                                            read = input.read(buffer)
                                        } else {
                                            input.close()
                                            output.close()
                                            file.deleteRecursively()
                                            throw CancellationException()
                                        }
                                    }
                                }
                            }
                            //If completed then it returns the new path.
                            return@async pathFileTemp
                        }
                    }.awaitAll()
                }
            } finally {
                //shows list complete in the UI
            }
        }
    }
    
    

This would be the button action to perform the task.

I thank all the help.

Probably want to return your results differently. Like adding to a list (careful of thread safety) or sending them out through a Channel.

It’s probably not smart enough to tell that the dispatcher is getting inherited. Likely, you can move the withContext(IO) inside the async/launch to resolve the warning

The checking isActive in your loop is good idea. I’d opt for ensureActive which throws for you. Your clean up code should really be in a finally block instead of an else. Consider the Closeable.use method.

Sure. You can send the tasks to execute into a Channel and launch N coroutines running for loops to read from the Channel and execute the tasks.

Also, Flow has the flatMapMerge method with a concurrency parameter. So you could probably do something like this (not tested):

listUri.asFlow()
    .flatMapMerge(N) {
        flow { emit(processUri(it)) }
    }
    .collect { handleResult(it) }
1 Like

Thanks for the answer.

I left it as follows:

Button action task:

    @FlowPreview
fun onClickStartTask(view: View) {
    //show progressBar
    val listNewPath = mutableListOf<String>()
    CoroutineScope(Main + job).launch {
        try {
            listUri.asFlow()
                .flatMapMerge(4) {
                    flow { emit(processUri(it)) }
                }.collect { handleResult(it, listNewPath) }

        } catch (e: CancellationException) {
            //show message
        } finally {
           //Update ui with new list and hide progressBar
        }
    }
}

And methods:

private suspend fun processUri(uri: Uri): String = withContext(IO) {
    //path to file temp
    val pathFileTemp = "${getExternalFilesDir("Temp").toString()}/${uri.lastPathSegment}"
    val file = File(pathFileTemp)

    val inputStream = contentResolver.openInputStream(uri)
    inputStream?.use { input ->

        FileOutputStream(file).use { output ->
            val buffer = ByteArray(1024)
            var read: Int = input.read(buffer)
            while (read != -1) {
                try {
                    //check the job is active
                    yield()
                    output.write(buffer, 0, read)
                    read = input.read(buffer)
                } catch (e: CancellationException) {
                    file.deleteRecursively()
                }
            }
        }
    }
    //If completed then it returns the new path
    return@withContext pathFileTemp
}

private fun handleResult(result: String, listNewPath: MutableList<String>) {
    listNewPath.add(result)
}

The processUri() method wrapped up with the IO context.

I used yield() to check the scope’s status along with try / catch to cancel the task and delete the file that was created.

Clean up should go in a finally block, so if an IOException or anything else happens, it’ll still run.

Also, I’d recommend keeping a CoroutineScope as your member property instead of the job. That way you just call scope.launch. In case you are unaware, the MainScope method may be helpful to you in creating your scope.

I’d avoid yield in this scenario. Dispatchers.IO is specifically designed for blocking calls and adds threads as needed (to a limit). yield is seems more appropriate for Dispatchers.Default or Dispatchers.Main which are intended for handling lots of coroutines running on limited threads, so you don’t want any coroutine to hold on to a thread for very long. Not sure it harms anything (it will probably just resume right away on the same thread after posting) but it seems a little odd to me.

Keep in mind, the only thing you are gaining by building your own loop is that the copy will stop sooner when the job is cancelled. If the content is not very big, consider just using the copyTo method and not worrying about stopping ASAP. It depends on the details of your specific scenario and how long a full copy may take vs user impact.

1 Like

Thanks again @nickallendev.

In this created scenario, as soon as the user cancels the task in progress, what has been completed will be shown on the screen and what has been canceled will be canceled immediately and the file generated after the cancellation will be deleted (both for small and large files). However, the scope cannot be totally canceled, as it can be restarted by the user, only when the view is destroyed then the scope can be canceled.

So from what I understood of your suggestions and a lot of reading, I left it as shown below:

Now I create variable job and mainScope

class MainActivity : AppCompatActivity() {
private var job: Job? = null
private val mainScope = MainScope()
...
}

In the action of the button that performs the task I made changes:

  • The job variable received all the scope value so that it can be canceled and restarted.
  • I added .flowOn (IO) to perform the method that processes the Uri.
  • In the block I finally check the status of the main scope, if it is active then I update the UI.

.

@FlowPreview
fun onClickStartTask(view: View) {
    val listNewPath = mutableListOf<String>()
    job = mainScope.launch {
        try {
            listUri.asFlow()
                .flatMapMerge(4) {
                    flow { emit(processUri(it)) }
                        .flowOn(IO)
                }.collect { handleResult(it, listNewPath) }

        } catch (e: Exception) {
            //show error
        } finally {
            if (mainScope.isActive) {
                //update UI
            } else {
                //view is destroyed
            }
        }
    }
}

In the processUri () method:

  • I removed the suspend
  • I changed the block try / catch, any exception that happens if the file was created, then it will be deleted and throw again the exception.
  • Inside the try / catch block I added job?.ensureActive()

.

private fun processUri(uri: Uri): String {
    //path to file temp
    val pathFileTemp = "${getExternalFilesDir("Temp").toString()}/${uri.lastPathSegment}"
    val file = File(pathFileTemp)
    try {
        val inputStream = contentResolver.openInputStream(uri)
        inputStream?.use { input ->

            FileOutputStream(file).use { output ->
                val buffer = ByteArray(1024)
                var read: Int = input.read(buffer)
                while (read != -1) {
                    //check the job is active
                    job?.ensureActive()
                    output.write(buffer, 0, read)
                    read = input.read(buffer)
                }
            }
        }
    } catch (e: Exception) {
        file.deleteRecursively()
        throw e
    }
    //If completed then it returns the new path.
    return pathFileTemp
}

And finally the action to cancel:

fun onClickCancelTask(view: View) {
    job?.cancel()
    println("Cancel the Job")
}

override fun onDestroy() {
    //MainScope is cancelled
    mainScope.cancel()
    super.onDestroy()
}

I apologize for the length of the response and my poor English writing. I appreciate all help and suggestion.