Using OPENRNDR as a websocket client

Right now I’m using KTor as a way to expose websockets to Kotlin I’d like to be able to receive data from a preexisting server while the sketch runs, but Ive had limited success. It seems like theres difficulty in keeping the websocket session open while the sketch runs. My current goal is to print any incoming messages to the command line while the sketch runs. Instead of that, Ive gotten:

  • Receive data while the sketch window does not appear
  • Receive data while the sketch window appears but is black (seems like the drawer hasnt been initiated yet).
  • Receive data and sketch renders, but the websocket connection consistently closes and opens, barraging the server.
  • Sketch renders, only receives init data, and the connection seems to immediately close. Sketch proceeds as normal. <— I am here

I have an additional terminal showing the data the server is outputting, so I’m very sure it is not the server. Has anyone successfully done this?
I will post my code a few minutes after post time.

import org.openrndr.application
import org.openrndr.color.ColorRGBa
import org.openrndr.draw.loadFont
import org.openrndr.draw.loadImage
import org.openrndr.draw.tint
import kotlin.math.cos
import kotlin.math.sin
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.plugins.websocket.*
import io.ktor.http.*
import io.ktor.websocket.*
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.channels.ClosedReceiveChannelException

fun main() = application {
    configure {
        width = 768
        height = 576
    }
    var session: WebSocketSession? = null

    val client = HttpClient(CIO) {
        install(WebSockets)
    }
    runBlocking{
            client.webSocket(method = HttpMethod.Get,host = "localhost", port=8040,path="/"){
                session = this
            }
        }
    program {
        val image = loadImage("data/images/pm5544.png")
        val font = loadFont("data/fonts/default.otf", 64.0)

        extend {
            runBlocking {
                try{
                    for(msg in session!!.incoming){
                        msg as? Frame.Text ?: continue
                        println(msg.readText())
                    }
                }catch(e:ClosedReceiveChannelException){
                }
            }
            drawer.drawStyle.colorMatrix = tint(ColorRGBa.WHITE.shade(0.2))
            drawer.image(image)

            drawer.fill = ColorRGBa.PINK
            drawer.circle(cos(seconds) * width / 2.0 + width / 2.0, sin(0.5 * seconds) * height / 2.0 + height / 2.0, 140.0)

            drawer.fontMap = font
            drawer.fill = ColorRGBa.WHITE
            drawer.text("OPENRNDR", width / 2.0, height / 2.0)
        }
    }
}

added to gradle file

implementation("io.ktor:ktor-client-core:2.2.4")
implementation("io.ktor:ktor-client-cio:2.2.4")
implementation("io.ktor:ktor-client-logging:2.2.4")
implementation("io.ktor:ktor-client-websockets:2.2.4")

Hi! Nice idea to use websockets :slight_smile:

The errors you describe sound like threading issues to me. If ktor runs in a loop then it doesn’t get to the extend loop. Or the other way around. I think ktor should run in a separate thread (so it starts and runs there independently of the OPENRNDR program), but not sure how.

When it does run in a separate thread, then one also needs to be careful because nothing should be drawn triggered by events that are not coming from the default graphics thread. I think it would crash.

This example plays a midi file. The player runs in its own thread (which it creates automatically, I didn’t do that). The interesting thing is that I use a ConcurrentHashMap to make sure the external event doesn’t try to use OPENGL directly. The external event updates that hash map, then I read that hash map from extend. I think receiving websocket events and midi file events may require a similar approach.

This orx also uses ktor. Does it give you any ideas?

I would be happy to see a simple working example with websockets :slight_smile:

2 Likes

I am very interested in the topic as well, having worked with websockets but in javascript/node.js world. :slight_smile:
I think what @abe is suggesting makes sense: meanhwile, have you tried to change the ping interval?
I have tried to reproduce the error by connecting to some websocket I have used in the past (which I know to be active), but I always get back a UnresolvedAddressException error.

1 Like

Hm… I am not at all familiar with threading and mildly familiar with websockets, but I think I’ll give it a shot. I agree that @abe 's logic makes sense and I’ll do some research on how to recieve from a websocket server in its own thread.
What is the ping interval and how do I change it?

Unfortunately it’s not that easy to solve your problem without going deeper to kotlin coroutines and OPENRNDR internals and architectural choices. When you do runBlocking() it is literally blocking until the block of code finishes. You will find examples like this in ktor documentation, because there it makes sense - if one wants to have a main() function creating a “daemon” waiting forever in an endless loop, then runBlocking() will block the main JVM thread calling the app until the loop is finished, and the only instructions left to do is to close used resources after runBlocking(), when the application is closed. In case of OPENRNDR it creates such a sequences on it’s own, by establishing so called rendering thread, which then is associated with own coroutine scope. If you do lunch inside extend, it will use custom coroutine scope, which is strictly bound to rendering thread, so your code running there will be competing for execution time with the thread scheduling drawing operations on GPU through OpenGL abstraction.

The proper way of establishing your WebSocket demon from within OPENRNDR would be to create new coroutine scope, and equivalent of runBlocking working on the main execution thread:

val dispatcher =  Executors.newSingleThreadExecutor().asCoroutineDispatcher()
launch(dispatcher){
  // WebSocket handling code
}

Communication between coroutines (WebSockect<->rendering thread) can be realized either through concurrent data structures, like @abe is suggesting, or through abstractions like Flow or Channel designed for this purpose.

Dropping here a link in case it brings us a step closer:

And another one:

I got it :sob:
Here is functioning websockets in OPENRNDR using kTor. Each time a message is sent, the application catches it and prints it out. The OPENRNDR link that @abe posted helped a lot as the issue that I had was just getting the sketch to run while it processed websocket input without reinitializing the websocket connection each time the extend loop was executed.

import org.openrndr.application
import org.openrndr.color.ColorRGBa
import org.openrndr.draw.loadFont
import org.openrndr.draw.loadImage
import org.openrndr.draw.tint
import kotlin.math.cos
import kotlin.math.sin
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.plugins.websocket.*
import io.ktor.http.*
import io.ktor.websocket.*
import kotlinx.coroutines.*
import org.openrndr.launch
import org.openrndr.math.IntVector2

fun main() = application {
    configure {
        width = 768
        height = 576
        position = IntVector2(300,50)
    }
    //var session: WebSocketSession
    val client = HttpClient(CIO){
        install(WebSockets)
    }


    program {
        val image = loadImage("data/images/pm5544.png")
        val font = loadFont("data/fonts/default.otf", 64.0)
        val session = client.webSocketSession(method = HttpMethod.Get, host = "localhost", port =8040,path = "/")

        extend {
          launch{
              GlobalScope.launch {
                  val msg = session.incoming.receive() as Frame.Text
                  println(msg.readText())
              }.join()
            }

            drawer.drawStyle.colorMatrix = tint(ColorRGBa.WHITE.shade(0.2))
            drawer.image(image)

            drawer.fill = ColorRGBa.PINK
            drawer.circle(cos(seconds) * width / 2.0 + width / 2.0, sin(0.5 * seconds) * height / 2.0 + height / 2.0, 140.0)

            drawer.fontMap = font
            drawer.fill = ColorRGBa.WHITE
            drawer.text("OPENRNDR", width / 2.0, height / 2.0)
        }
    }
}


Dependencies added to build gradle file

    implementation("io.ktor:ktor-client-core:2.2.4")
    implementation("io.ktor:ktor-client-cio:2.2.4")
    implementation("io.ktor:ktor-client-websockets:2.2.4")

Some notes:
client.webSocketSession has to be called in a suspend function, which program just so happens to be

Thanks @kazik for clarifying what runBlocking() does, as using it definitely led me to a dead end. I attempted to use you method before I saw your post by passing in Dispatchers.IO into the launch function, but I couldn’t get the hang of it.

Quite frankly, I dont fully understand why an error doesnt occur in this location here:

     extend {
           launch{
                GlobalScope.launch {
                    val msg = session.incoming.receive() as Frame.Text
                    println(msg.readText())
                }.join()
            }

           //snip
        }
    }

Wouldnt it either throw an error due to a lack of an incoming message or constantly print out the latest received message in the console?

I think you are right in wondering why it doesn’t break on that location :slight_smile:

If you add a print("+") before the session.incoming.receive() you will see what’s happening: I think it’s creating “receivers” non stop. I don’t know what’s the right term, but it’s running that line and waiting there for a message. And it’s creating more and more of those “waiters”, which at some point may be a problem.

So I decided to take a look and I came up with this approach, in which I think this issue doesn’t happen, but not sure if it’s the ideal one either.

Create a server/ folder with the next two files

package.json

{
  "name": "serv",
  "version": "1.0.0",
  "description": "",
  "main": "server.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1",
    "start": "node server.js"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
    "ws": "^8.13.0"
  }
}

server.js

const WebSocket = require('ws');
const server = new WebSocket.Server({ port: 8080 });

let sockets = [];
server.on('connection', function(socket) {
  sockets.push(socket);

  console.log('user connected');

  var interval = setInterval(function() {
    socket.send('hello');
  }, 2000);

  // When you receive a message, send that message to every socket.
  socket.on('message', function(msg) {
    sockets.forEach(s => s.send(msg));
  });

  // When a socket closes, or disconnects, remove it from the array.
  socket.on('close', function() {
    console.log('user disconnected');
    sockets = sockets.filter(s => s !== socket);
  });
});

Run npm install

Run npm start

client.kt

imports
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.plugins.websocket.*
import io.ktor.http.*
import io.ktor.websocket.*
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.openrndr.application
import org.openrndr.color.ColorRGBa
import org.openrndr.extra.noise.Random
import org.openrndr.math.Polar
import java.util.concurrent.atomic.AtomicInteger
fun main() = application {
    val client = HttpClient(CIO) {
        install(WebSockets) {
            pingInterval = 10_000
        }
    }
    program {
        val session = client.webSocketSession(HttpMethod.Get, "localhost", 8080, "/")
        val radius = AtomicInteger(140)
        extend {
            GlobalScope.launch {
                while (!session.incoming.isEmpty) {
                    val msg = session.incoming.receive() as Frame.Text
                    radius.set(Random.int(50, 200))
                    println(msg.readText())
                }
            }
            drawer.fill = ColorRGBa.PINK
            drawer.circle(
                Polar(seconds * 50, 200.0).cartesian + drawer.bounds.center, radius.toDouble()
            )
        }
    }
}

The main difference is that it doesn’t try to receive() if there is nothing to process. I also used an AtomicInteger to trigger visual changes while avoiding concurrency issues.

This is a very minimal example and it requires node/js knowledge to build something actually interesting, but maybe it serves as a starting point?

1 Like

Ok I think this is much cleaner, with the WebSocket listening outside the draw loop.

There’s no need to keep creating Coroutines inside extends { }. Just create one with an infinite loop that waits for new messages to arrive. It seems to automatically sleep when there’s nothing received.

fun main() = application {
    val client = HttpClient(CIO) {
        install(WebSockets) {
            pingInterval = 10_000
        }
    }

    program {
        val session = client.webSocketSession(HttpMethod.Get, "localhost", 8080, "/")
        val radius = AtomicInteger(140)

        GlobalScope.launch {
            while (true) {
                val msg = session.incoming.receive() as Frame.Text
                radius.set(Random.int(50, 200))
                println(msg.readText())
            }
        }

        extend {
            drawer.fill = ColorRGBa.PINK
            drawer.circle(
                Polar(seconds * 50, 200.0).cartesian + drawer.bounds.center, radius.toDouble()
            )
        }
    }
}
1 Like

This thread is great! I tried the setup, and it works quite nicely. Nevertheless, I didn’t have success with anything except localhost, i.e. with any socket connection with something hosted online. I tried with a couple of websockets I used in the past, they are still active, but I can’t establish a connection, I must be doing something wrong.
Do you have any example of using something apart from localhost?

I assume that’s because online services will use wss, the secure version of the protocol.

Their examples don’t include wss at all. Let me know if you find such an example.

There’s this but no idea how to use it.

1 Like

Realized what was the problem: you don’t need the “ws://” in the address of the web socket I was using. Works pretty fine now. :slight_smile:
I haven’t tried for the secure version of the protocol, though.

I asked in the Kotlin Slack and Aleksei Tirman provided this example for wss:

val session = client.webSocketSession("wss://ws.postman-echo.com/raw")
1 Like