c# - Observing incoming websocket messages with Reactive Extensions? -
i want use linq process events received via websocket connection. have far:
private static void main() { string wsendpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting"; using (websocket ws = new websocket(wsendpoint)) { ws.onmessage += ws_onmessage; ws.connect(); console.readkey(); ws.close(); } } private static void ws_onmessage(object sender, messageeventargs e) { console.writeline(e.data); }
the first think stumps me how turn ws.onmessage
sort of event stream. cannot find examples online observing external event source reactive extensions. intend parse messages json objects, filter , aggregate them.
could provide example of creating observable websocket messages, , subscribing it?
edit: final working code
the difference chosen answer initialized websocket before passing observable.using
//------------------------------------------------------- // create websocket connection //------------------------------------------------------- const string wsendpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting"; websocket socket = new websocket(wsendpoint); //------------------------------------------------------- // create observable wrapping ws.onmessage //------------------------------------------------------- var globaleventstream = observable .using( () => socket, ws => observable .fromeventpattern<eventhandler<messageeventargs>, messageeventargs>( handler => ws.onmessage += handler, handler => ws.onmessage -= handler)); //--------------------------------------------------------- // subscribe globaleventstream //--------------------------------------------------------- idisposable subscription = globaleventstream.subscribe(ep => { console.writeline("event recieved"); console.writeline(ep.eventargs.data); }); //---------------------------------------------------------- // send message on websocket //---------------------------------------------------------- socket.connect(); socket.send("test message"); // when finished, close connection. socket.close();
you should set observable this:
var observable = observable .using( () => new websocket(wsendpoint), ws => observable .fromeventpattern<eventhandler<messageeventargs>, messageeventargs>( handler => ws.onmessage += handler, handler => ws.onmessage -= handler));
this correctly create socket , observe event when observable subscribed to. when subscription disposed correctly detach event , dispose of socket.
the type of observable
iobservable<eventpattern<messageeventargs>>
. consume observable in way:
idisposable subscription = observable.subscribe(ep => { console.writeline(ep.eventargs.data); });
thanks posted nuget reference.
here's working code:
const string wsendpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting"; console.writeline("defining observable:"); iobservable<eventpattern<websocketsharp.messageeventargs>> observable = observable .using( () => { var ws = new websocketsharp.websocket(wsendpoint); ws.connect(); return ws; }, ws => observable .fromeventpattern<eventhandler<websocketsharp.messageeventargs>, websocketsharp.messageeventargs>( handler => ws.onmessage += handler, handler => ws.onmessage -= handler)); console.writeline("subscribing observable:"); idisposable subscription = observable.subscribe(ep => { console.writeline("event recieved"); console.writeline(ep.eventargs.data); }); console.writeline("writing source:"); using (var source = new websocketsharp.websocket(wsendpoint)) { source.connect(); source.send("test"); }
Comments
Post a Comment