From 47c812ae5d1125598f9f7fdc1fe0591bc06e7072 Mon Sep 17 00:00:00 2001 From: pedroSG94 Date: Tue, 20 Jan 2026 00:24:43 +0100 Subject: [PATCH] add get rtt and send shouldsendpings in rtmp --- .../main/java/com/pedro/common/TimeUtils.kt | 3 +++ .../util/streamclient/RtmpStreamClient.kt | 16 +++++++++++++ .../com/pedro/rtmp/rtmp/CommandsManager.kt | 10 ++++++++ .../java/com/pedro/rtmp/rtmp/RtmpClient.kt | 24 +++++++++++++++++++ 4 files changed, 53 insertions(+) diff --git a/common/src/main/java/com/pedro/common/TimeUtils.kt b/common/src/main/java/com/pedro/common/TimeUtils.kt index ef5bcb3ba3..9476541d16 100644 --- a/common/src/main/java/com/pedro/common/TimeUtils.kt +++ b/common/src/main/java/com/pedro/common/TimeUtils.kt @@ -30,6 +30,9 @@ object TimeUtils { @JvmStatic fun getCurrentTimeMillis(): Long = SystemClock.elapsedRealtime() + @JvmStatic + fun getCurrentTimeSeconds(): Int = (getCurrentTimeMillis() / 1000).toInt() + @JvmStatic fun getCurrentTimeNano(): Long { return if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.JELLY_BEAN_MR1) { diff --git a/library/src/main/java/com/pedro/library/util/streamclient/RtmpStreamClient.kt b/library/src/main/java/com/pedro/library/util/streamclient/RtmpStreamClient.kt index e924b4ebb7..d8866ee183 100644 --- a/library/src/main/java/com/pedro/library/util/streamclient/RtmpStreamClient.kt +++ b/library/src/main/java/com/pedro/library/util/streamclient/RtmpStreamClient.kt @@ -78,6 +78,22 @@ class RtmpStreamClient( rtmpClient.setWriteChunkSize(chunkSize) } + /** + * RTT in micro seconds reported by ping-pong commands. + * shouldSendPings must be enabled to work properly. + */ + fun getRtt() = rtmpClient.rtt + + /** + * Send ping commands each second to server. + * This allow get a RTT and keep alive the read channel in servers that close it due to inactivity. + * + * Could be useful in combination with shouldFailOnRead to detect connection closed in few servers. + */ + fun shouldSendPings(enabled: Boolean) { + rtmpClient.shouldSendPings(enabled) + } + override fun reTry(delay: Long, reason: String, backupUrl: String?): Boolean { val result = rtmpClient.shouldRetry(reason) if (result) { diff --git a/rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt b/rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt index ac32b761dd..7557b7cf3b 100644 --- a/rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt +++ b/rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt @@ -172,6 +172,16 @@ abstract class CommandsManager { } } + suspend fun sendPing(socket: RtmpSocket) { + writeSync.withLock { + val ping = UserControl(Type.PING_REQUEST, Event(TimeUtils.getCurrentTimeSeconds())) + ping.writeHeader(socket) + ping.writeBody(socket) + socket.flush() + Log.i(TAG, "send ping") + } + } + @Throws(IOException::class) suspend fun sendClose(socket: RtmpSocket) { writeSync.withLock { diff --git a/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt b/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt index 8d2ec21047..bc6fad5c32 100644 --- a/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt +++ b/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt @@ -59,6 +59,7 @@ import kotlinx.coroutines.withTimeoutOrNull import java.io.IOException import java.net.URISyntaxException import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicLong import javax.net.ssl.TrustManager /** @@ -109,6 +110,10 @@ class RtmpClient(private val connectChecker: ConnectChecker) { var socketType = SocketType.KTOR var socketTimeout = StreamSocket.DEFAULT_TIMEOUT var shouldFailOnRead = false + var shouldSendPings = false + var rtt = 0 //in micro + private set + private val pingTs = AtomicLong(0) /** * Add certificates for TLS connection @@ -145,6 +150,10 @@ class RtmpClient(private val connectChecker: ConnectChecker) { checkServerAlive = enabled } + fun shouldSendPings(enabled: Boolean) { + shouldSendPings = enabled + } + /** * Must be called before connect */ @@ -286,6 +295,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) { //Handle all command received and send response for it. handleMessages() } + if (shouldSendPings) commandsManager.sendPing(socket) //read packet because maybe server want send you something while streaming handleServerPackets() }.exceptionOrNull() @@ -387,6 +397,19 @@ class RtmpClient(private val connectChecker: ConnectChecker) { Type.PING_REQUEST -> { commandsManager.sendPong(userControl.event, socket) } + Type.PONG_REPLY -> { + Log.i(TAG, "pong received: ${userControl.event.data}") + if (shouldSendPings) { + rtt = (TimeUtils.getCurrentTimeMicro() - pingTs.get()).toInt() + CoroutineScope(Dispatchers.IO).launch { + delay(1000) + if (isStreaming) { + pingTs.set(TimeUtils.getCurrentTimeMicro()) + commandsManager.sendPing(socket) + } + } + } + } else -> { Log.i(TAG, "user control command $type ignored") } @@ -563,6 +586,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) { scope = CoroutineScope(Dispatchers.IO) publishPermitted = false commandsManager.reset() + rtt = 0 } fun sendVideo(videoBuffer: ByteBuffer, info: MediaCodec.BufferInfo) {