Skip to content

Run I/O on separate thread #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jan 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Package: websocket
Version: 1.1.0.9000
Version: 1.1.0.9001
Title: 'WebSocket' Client Library
Description: Provides a 'WebSocket' client interface for R.
'WebSocket' is a protocol for low-overhead real-time communication:
Expand All @@ -21,15 +21,18 @@ LazyData: true
ByteCompile: true
Imports: Rcpp,
R6,
later
LinkingTo: Rcpp, BH, AsioHeaders
later (>= 1.0.0.9002)
LinkingTo: Rcpp, BH, AsioHeaders, later
SystemRequirements: GNU make, OpenSSL >= 1.0.1
RoxygenNote: 7.0.2
Collate:
'RcppExports.R'
'websocket.R'
Suggests:
httpuv,
testthat,
knitr,
rmarkdown
Remotes:
r-lib/later
VignetteBuilder: knitr
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Generated by roxygen2: do not edit by hand

export(WebSocket)
import(later)
importFrom(Rcpp,sourceCpp)
useDynLib(websocket, .registration = TRUE)
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
1.1.0.9000
==========

* Websocket I/O now runs on a separate thread, so Websocket no longer uses polling. This should also reduce latency for handling incoming messages. ([#62](https://github.com/rstudio/websocket/pull/62))

1.1.0
=====
Expand Down
52 changes: 18 additions & 34 deletions R/RcppExports.R
Original file line number Diff line number Diff line change
@@ -1,55 +1,39 @@
# Generated by using Rcpp::compileAttributes() -> do not edit by hand
# Generator token: 10BE3573-1514-4C36-9D1C-5A225CD40393

wsCreate <- function(uri, robjPublic, robjPrivate, accessLogChannels, errorLogChannels, maxMessageSize) {
.Call(`_websocket_wsCreate`, uri, robjPublic, robjPrivate, accessLogChannels, errorLogChannels, maxMessageSize)
wsCreate <- function(uri, loop_id, robjPublic, robjPrivate, accessLogChannels, errorLogChannels, maxMessageSize) {
.Call(`_websocket_wsCreate`, uri, loop_id, robjPublic, robjPrivate, accessLogChannels, errorLogChannels, maxMessageSize)
}

wsAppendHeader <- function(client_xptr, key, value) {
invisible(.Call(`_websocket_wsAppendHeader`, client_xptr, key, value))
wsAppendHeader <- function(wsc_xptr, key, value) {
invisible(.Call(`_websocket_wsAppendHeader`, wsc_xptr, key, value))
}

wsAddProtocols <- function(client_xptr, protocols) {
invisible(.Call(`_websocket_wsAddProtocols`, client_xptr, protocols))
wsAddProtocols <- function(wsc_xptr, protocols) {
invisible(.Call(`_websocket_wsAddProtocols`, wsc_xptr, protocols))
}

wsConnect <- function(client_xptr) {
invisible(.Call(`_websocket_wsConnect`, client_xptr))
wsConnect <- function(wsc_xptr) {
invisible(.Call(`_websocket_wsConnect`, wsc_xptr))
}

wsRestart <- function(client_xptr) {
invisible(.Call(`_websocket_wsRestart`, client_xptr))
wsSend <- function(wsc_xptr, msg) {
invisible(.Call(`_websocket_wsSend`, wsc_xptr, msg))
}

wsPoll <- function(client_xptr) {
invisible(.Call(`_websocket_wsPoll`, client_xptr))
wsClose <- function(wsc_xptr, code, reason) {
invisible(.Call(`_websocket_wsClose`, wsc_xptr, code, reason))
}

wsSend <- function(client_xptr, msg) {
invisible(.Call(`_websocket_wsSend`, client_xptr, msg))
wsProtocol <- function(wsc_xptr) {
.Call(`_websocket_wsProtocol`, wsc_xptr)
}

wsReset <- function(client_xptr) {
invisible(.Call(`_websocket_wsReset`, client_xptr))
wsState <- function(wsc_xptr) {
.Call(`_websocket_wsState`, wsc_xptr)
}

wsClose <- function(client_xptr, code, reason) {
invisible(.Call(`_websocket_wsClose`, client_xptr, code, reason))
}

wsStopped <- function(client_xptr) {
.Call(`_websocket_wsStopped`, client_xptr)
}

wsProtocol <- function(client_xptr) {
.Call(`_websocket_wsProtocol`, client_xptr)
}

wsState <- function(client_xptr) {
.Call(`_websocket_wsState`, client_xptr)
}

wsUpdateLogChannels <- function(client_xptr, accessOrError, setOrClear, logChannels) {
invisible(.Call(`_websocket_wsUpdateLogChannels`, client_xptr, accessOrError, setOrClear, logChannels))
wsUpdateLogChannels <- function(wsc_xptr, accessOrError, setOrClear, logChannels) {
invisible(.Call(`_websocket_wsUpdateLogChannels`, wsc_xptr, accessOrError, setOrClear, logChannels))
}

17 changes: 4 additions & 13 deletions R/websocket.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#' @useDynLib websocket, .registration = TRUE
#' @import later
#' @importFrom Rcpp sourceCpp
#' @include RcppExports.R
NULL
Expand Down Expand Up @@ -150,7 +151,8 @@ WebSocket <- R6::R6Class("WebSocket",
autoConnect = TRUE,
accessLogChannels = c("none"),
errorLogChannels = NULL,
maxMessageSize = 32 * 1024 * 1024
maxMessageSize = 32 * 1024 * 1024,
loop = later::current_loop()
) {
private$callbacks <- new.env(parent = emptyenv())
private$callbacks$open <- Callbacks$new()
Expand All @@ -163,7 +165,7 @@ WebSocket <- R6::R6Class("WebSocket",
}

private$wsObj <- wsCreate(
url, self, private,
url, loop$id, self, private,
private$accessLogChannels(accessLogChannels, "none"),
private$errorLogChannels(errorLogChannels, "none"),
maxMessageSize
Expand All @@ -183,7 +185,6 @@ WebSocket <- R6::R6Class("WebSocket",
if (private$pendingConnect) {
private$pendingConnect <- FALSE
wsConnect(private$wsObj)
private$scheduleIncoming()
} else {
warning("Ignoring extraneous connect() call (did you mean to have autoConnect=FALSE in the constructor?)")
}
Expand Down Expand Up @@ -244,16 +245,6 @@ WebSocket <- R6::R6Class("WebSocket",
wsObj = NULL,
callbacks = NULL,
pendingConnect = TRUE,
scheduleIncoming = function() {
later::later(private$handleIncoming, 0.01)
},
handleIncoming = function() {
wsPoll(private$wsObj)
if (self$readyState() == 3L) {
return()
}
private$scheduleIncoming()
},
getInvoker = function(eventName) {
callbacks <- private$callbacks[[eventName]]
stopifnot(!is.null(callbacks))
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[![CRAN status](https://www.r-pkg.org/badges/version/websocket)](https://cran.r-project.org/package=websocket)
[![Travis Build Status](https://travis-ci.org/rstudio/websocket.svg?branch=master)](https://travis-ci.org/rstudio/websocket)

This is an R WebSocket client library backed by the [websocketpp](https://github.com/zaphoyd/websocketpp) C++ library.
This is an R WebSocket client library backed by the [websocketpp](https://github.com/zaphoyd/websocketpp) C++ library. WebSocket I/O is handled on a separate thread from R.

## Usage examples

Expand Down
7 changes: 7 additions & 0 deletions src/Makevars.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Use C++11 if available
CXX_STD=CXX11

PKG_CFLAGS = -DSTRICT_R_HEADERS
PKG_CXXFLAGS = -DSTRICT_R_HEADERS
PKG_CPPFLAGS = -I./lib @cflags@
PKG_LIBS += @libs@

#### Debugging flags ####
# Uncomment to enable thread assertions
# PKG_CPPFLAGS += -pthread -DDEBUG_THREAD -UNDEBUG
# PKG_LIBS += -pthread
8 changes: 8 additions & 0 deletions src/Makevars.win
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
OPENSSL_VERSION=1.0.2o

PKG_CFLAGS = -DSTRICT_R_HEADERS
PKG_CXXFLAGS = -DSTRICT_R_HEADERS

# Some settings from https://github.com/zaphoyd/websocketpp/issues/478
PKG_CPPFLAGS=-I./lib -I../windows/openssl-$(OPENSSL_VERSION)/include -D_WEBSOCKETPP_CPP11_THREAD_
PKG_LIBS= -L../windows/openssl-$(OPENSSL_VERSION)/lib${R_ARCH} -lssl -lcrypto -lws2_32 -lgdi32

CXX_STD=CXX11

#### Debugging flags ####
# Uncomment to enable thread assertions
# PKG_CPPFLAGS += -DDEBUG_THREAD -UNDEBUG

.PHONY: all winlibs

all: $(SHLIB)
Expand Down
Loading