Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/src/main/java/com/pedro/common/TimeUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ object TimeUtils {
@JvmStatic
fun getCurrentTimeMillis(): Long = SystemClock.elapsedRealtime()

@JvmStatic
fun getCurrentTimeSeconds(): Int = (getCurrentTimeMillis() / 1000).toInt()

@JvmStatic
fun getCurrentTimeNano(): Long {
return if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.JELLY_BEAN_MR1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ class RtmpStreamClient(
rtmpClient.setWriteChunkSize(chunkSize)
}

/**
* RTT in micro seconds reported by ping-pong commands.
* shouldSendPings must be enabled to work properly.
*/
fun getRtt() = rtmpClient.rtt

/**
* Send ping commands each second to server.
* This allow get a RTT and keep alive the read channel in servers that close it due to inactivity.
*
* Could be useful in combination with shouldFailOnRead to detect connection closed in few servers.
*/
fun shouldSendPings(enabled: Boolean) {
rtmpClient.shouldSendPings(enabled)
}

override fun reTry(delay: Long, reason: String, backupUrl: String?): Boolean {
val result = rtmpClient.shouldRetry(reason)
if (result) {
Expand Down
10 changes: 10 additions & 0 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,16 @@ abstract class CommandsManager {
}
}

suspend fun sendPing(socket: RtmpSocket) {
writeSync.withLock {
val ping = UserControl(Type.PING_REQUEST, Event(TimeUtils.getCurrentTimeSeconds()))
ping.writeHeader(socket)
ping.writeBody(socket)
socket.flush()
Log.i(TAG, "send ping")
}
}

@Throws(IOException::class)
suspend fun sendClose(socket: RtmpSocket) {
writeSync.withLock {
Expand Down
24 changes: 24 additions & 0 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import kotlinx.coroutines.withTimeoutOrNull
import java.io.IOException
import java.net.URISyntaxException
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
import javax.net.ssl.TrustManager

/**
Expand Down Expand Up @@ -109,6 +110,10 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
var socketType = SocketType.KTOR
var socketTimeout = StreamSocket.DEFAULT_TIMEOUT
var shouldFailOnRead = false
var shouldSendPings = false
var rtt = 0 //in micro
private set
private val pingTs = AtomicLong(0)

/**
* Add certificates for TLS connection
Expand Down Expand Up @@ -145,6 +150,10 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
checkServerAlive = enabled
}

fun shouldSendPings(enabled: Boolean) {
shouldSendPings = enabled
}

/**
* Must be called before connect
*/
Expand Down Expand Up @@ -286,6 +295,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
//Handle all command received and send response for it.
handleMessages()
}
if (shouldSendPings) commandsManager.sendPing(socket)
//read packet because maybe server want send you something while streaming
handleServerPackets()
}.exceptionOrNull()
Expand Down Expand Up @@ -387,6 +397,19 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
Type.PING_REQUEST -> {
commandsManager.sendPong(userControl.event, socket)
}
Type.PONG_REPLY -> {
Log.i(TAG, "pong received: ${userControl.event.data}")
if (shouldSendPings) {
rtt = (TimeUtils.getCurrentTimeMicro() - pingTs.get()).toInt()
CoroutineScope(Dispatchers.IO).launch {
delay(1000)
if (isStreaming) {
pingTs.set(TimeUtils.getCurrentTimeMicro())
commandsManager.sendPing(socket)
}
}
}
}
else -> {
Log.i(TAG, "user control command $type ignored")
}
Expand Down Expand Up @@ -563,6 +586,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
scope = CoroutineScope(Dispatchers.IO)
publishPermitted = false
commandsManager.reset()
rtt = 0
}

fun sendVideo(videoBuffer: ByteBuffer, info: MediaCodec.BufferInfo) {
Expand Down