A TCP connection. When connecting, the Happy Eyeballs algorithm is used.
The following code creates a client that connects to port 8989 of the local host, writes "hello world", and listens for a response, which it then prints.
use "net" class MyTCPConnectionNotify is TCPConnectionNotify let _out: OutStream new create(out: OutStream) => _out = out fun ref connected(conn: TCPConnection ref) => conn.write("hello world") fun ref received( conn: TCPConnection ref, data: Array[U8] iso, times: USize) : Bool => _out.print("GOT:" + String.from_array(consume data)) conn.close() true fun ref connect_failed(conn: TCPConnection ref) => None actor Main new create(env: Env) => try TCPConnection(env.root as AmbientAuth, recover MyTCPConnectionNotify(env.out) end, "", "8989") end
Note: when writing to the connection data will be silently discarded if the connection has not yet been established.
The TCP protocol has built-in backpressure support. This is generally experienced as the outgoing write buffer becoming full and being unable to write all requested data to the socket. In TCPConnection
, this is hidden from the programmer. When this occurs, TCPConnection
will buffer the extra data until such time as it is able to be sent. Left unchecked, this could result in uncontrolled queuing. To address this, TCPConnectionNotify
implements two methods throttled
and unthrottled
that are called when backpressure is applied and released.
Upon receiving a throttled
notification, your application has two choices on how to handle it. One is to inform the Pony runtime that it can no longer make progress and that runtime backpressure should be applied to any actors sending this one messages. For example, you might construct your application like:
// Here we have a TCPConnectionNotify that upon construction // is given a BackpressureAuth token. This allows the notifier // to inform the Pony runtime when to apply and release backpressure // as the connection experiences it. // Note the calls to // // Backpressure.apply(_auth) // Backpressure.release(_auth) // // that apply and release backpressure as needed use "backpressure" use "collections" use "net" class SlowDown is TCPConnectionNotify let _auth: BackpressureAuth let _out: StdStream new iso create(auth: BackpressureAuth, out: StdStream) => _auth = auth _out = out fun ref throttled(connection: TCPConnection ref) => _out.print("Experiencing backpressure!") Backpressure.apply(_auth) fun ref unthrottled(connection: TCPConnection ref) => _out.print("Releasing backpressure!") Backpressure.release(_auth) fun ref closed(connection: TCPConnection ref) => // if backpressure has been applied, make sure we release // when shutting down _out.print("Releasing backpressure if applied!") Backpressure.release(_auth) fun ref connect_failed(conn: TCPConnection ref) => None actor Main new create(env: Env) => try let auth = env.root as AmbientAuth let socket = TCPConnection(auth, recover SlowDown(auth, env.out) end, "", "7669") end
Or if you want, you could handle backpressure by shedding load, that is, dropping the extra data rather than carrying out the send. This might look like:
use "net" class ThrowItAway is TCPConnectionNotify var _throttled: Bool = false fun ref sent(conn: TCPConnection ref, data: ByteSeq): ByteSeq => if not _throttled then data else "" end fun ref sentv(conn: TCPConnection ref, data: ByteSeqIter): ByteSeqIter => if not _throttled then data else recover Array[String] end end fun ref throttled(connection: TCPConnection ref) => _throttled = true fun ref unthrottled(connection: TCPConnection ref) => _throttled = false fun ref connect_failed(conn: TCPConnection ref) => None actor Main new create(env: Env) => try TCPConnection(env.root as AmbientAuth, recover ThrowItAway end, "", "7669") end
In general, unless you have a very specific use case, we strongly advise that you don't implement a load shedding scheme where you drop data.
If your application is unable to keep up with data being sent to it over a TCPConnection
you can use the builtin read backpressure support to pause reading the socket which will in turn start to exert backpressure on the corresponding writer on the other end of that socket.
The mute
behavior allow any other actors in your application to request the cessation of additional reads until such time as unmute
is called. Please note that this cessation is not guaranteed to happen immediately as it is the result of an asynchronous behavior call and as such will have to wait for existing messages in the TCPConnection
's mailbox to be handled.
On non-windows platforms, your TCPConnection
will not notice if the other end of the connection closes until you unmute it. Unix type systems like FreeBSD, Linux and OSX learn about a closed connection upon read. On these platforms, you must call unmute
on a muted connection to have it close. Without calling unmute
the TCPConnection
actor will never exit.
actor tag TCPConnection
Connect via IPv4 or IPv6. If from
is a non-empty string, the connection will be made from the specified interface.
new tag create( auth: (AmbientAuth val | NetAuth val | TCPAuth val | TCPConnectAuth val), notify: TCPConnectionNotify iso, host: String val, service: String val, from: String val = "", read_buffer_size: USize val = 16384, yield_after_reading: USize val = 16384, yield_after_writing: USize val = 16384) : TCPConnection tag^
Connect via IPv4.
new tag ip4( auth: (AmbientAuth val | NetAuth val | TCPAuth val | TCPConnectAuth val), notify: TCPConnectionNotify iso, host: String val, service: String val, from: String val = "", read_buffer_size: USize val = 16384, yield_after_reading: USize val = 16384, yield_after_writing: USize val = 16384) : TCPConnection tag^
Connect via IPv6.
new tag ip6( auth: (AmbientAuth val | NetAuth val | TCPAuth val | TCPConnectAuth val), notify: TCPConnectionNotify iso, host: String val, service: String val, from: String val = "", read_buffer_size: USize val = 16384, yield_after_reading: USize val = 16384, yield_after_writing: USize val = 16384) : TCPConnection tag^
A new connection accepted on a server.
new tag _accept( listen: TCPListener tag, notify: TCPConnectionNotify iso, fd: U32 val, read_buffer_size: USize val = 16384, yield_after_reading: USize val = 16384, yield_after_writing: USize val = 16384) : TCPConnection tag^
Write a single sequence of bytes. Data will be silently discarded if the connection has not yet been established though.
be write( data: (String val | Array[U8 val] val))
Write a sequence of sequences of bytes. Data will be silently discarded if the connection has not yet been established though.
be writev( data: ByteSeqIter val)
Temporarily suspend reading off this TCPConnection until such time as unmute
is called.
be mute()
Start reading off this TCPConnection again after having been muted.
be unmute()
Change the notifier.
be set_notify( notify: TCPConnectionNotify iso)
Close the connection gracefully once all writes are sent.
be dispose()
Return the local IP address. If this TCPConnection is closed then the address returned is invalid.
fun box local_address() : NetAddress val
Return the remote IP address. If this TCPConnection is closed then the address returned is invalid.
fun box remote_address() : NetAddress val
A received
call on the notifier must contain exactly qty
bytes. If qty
is zero, the call can contain any amount of data. This has no effect if called in the sent
notifier callback.
Errors if qty
exceeds the max buffer size as indicated by the read_buffer_size
supplied when the connection was created.
fun ref expect( qty: USize val = 0) : None val ?
Turn Nagle on/off. Defaults to on. This can only be set on a connected socket.
fun ref set_nodelay( state: Bool val) : None val
Sets the TCP keepalive timeout to approximately secs
seconds. Exact timing is OS dependent. If secs
is zero, TCP keepalive is disabled. TCP keepalive is disabled by default. This can only be set on a connected socket.
fun ref set_keepalive( secs: U32 val) : None val
Write as much as possible to the socket. Set _writeable
to false
if not everything was written. On an error, close the connection. This is for data that has already been transformed by the notifier. Data will be silently discarded if the connection has not yet been established though.
fun ref write_final( data: (String val | Array[U8 val] val)) : None val
Attempt to perform a graceful shutdown. Don't accept new writes. If the connection isn't muted then we won't finish closing until we get a zero length read. If the connection is muted, perform a hard close and shut down immediately.
fun ref close() : None val
When an error happens, do a non-graceful close.
fun ref hard_close() : None val
General wrapper for TCP sockets to the getsockopt(2)
system call.
The caller must provide an array that is pre-allocated to be at least as large as the largest data structure that the kernel may return for the requested option.
In case of system call success, this function returns the 2-tuple: 1. The integer 0
. 2. An Array[U8]
of data returned by the system call's void *
4th argument. Its size is specified by the kernel via the system call's sockopt_len_t *
5th argument.
In case of system call failure, this function returns the 2-tuple: 1. The value of errno
. 2. An undefined value that must be ignored.
Usage example:
// connected() is a callback function for class TCPConnectionNotify fun ref connected(conn: TCPConnection ref) => match conn.getsockopt(OSSockOpt.sol_socket(), OSSockOpt.so_rcvbuf(), 4) | (0, let gbytes: Array[U8] iso) => try let br = Reader.create().>append(consume gbytes) ifdef littleendian then let buffer_size = br.u32_le()? else let buffer_size = br.u32_be()? end end | (let errno: U32, _) => // System call failed end
fun ref getsockopt( level: I32 val, option_name: I32 val, option_max_size: USize val = 4) : (U32 val , Array[U8 val] iso^)
Wrapper for TCP sockets to the getsockopt(2)
system call where the kernel's returned option value is a C uint32_t
type / Pony type U32
.
In case of system call success, this function returns the 2-tuple: 1. The integer 0
. 2. The *option_value
returned by the kernel converted to a Pony U32
.
In case of system call failure, this function returns the 2-tuple: 1. The value of errno
. 2. An undefined value that must be ignored.
fun ref getsockopt_u32( level: I32 val, option_name: I32 val) : (U32 val , U32 val)
General wrapper for TCP sockets to the setsockopt(2)
system call.
The caller is responsible for the correct size and byte contents of the option
array for the requested level
and option_name
, including using the appropriate machine endian byte order.
This function returns 0
on success, else the value of errno
on failure.
Usage example:
// connected() is a callback function for class TCPConnectionNotify fun ref connected(conn: TCPConnection ref) => let sb = Writer sb.u32_le(7744) // Our desired socket buffer size let sbytes = Array[U8] for bs in sb.done().values() do sbytes.append(bs) end match conn.setsockopt(OSSockOpt.sol_socket(), OSSockOpt.so_rcvbuf(), sbytes) | 0 => // System call was successful | let errno: U32 => // System call failed end
fun ref setsockopt( level: I32 val, option_name: I32 val, option: Array[U8 val] ref) : U32 val
General wrapper for TCP sockets to the setsockopt(2)
system call where the kernel expects an option value of a C uint32_t
type / Pony type U32
.
This function returns 0
on success, else the value of errno
on failure.
fun ref setsockopt_u32( level: I32 val, option_name: I32 val, option: U32 val) : U32 val
Wrapper for the FFI call getsockopt(fd, SOL_SOCKET, SO_ERROR, ...)
fun ref get_so_error() : (U32 val , U32 val)
Wrapper for the FFI call getsockopt(fd, SOL_SOCKET, SO_RCVBUF, ...)
fun ref get_so_rcvbuf() : (U32 val , U32 val)
Wrapper for the FFI call getsockopt(fd, SOL_SOCKET, SO_SNDBUF, ...)
fun ref get_so_sndbuf() : (U32 val , U32 val)
Wrapper for the FFI call getsockopt(fd, SOL_SOCKET, TCP_NODELAY, ...)
fun ref get_tcp_nodelay() : (U32 val , U32 val)
Wrapper for the FFI call setsockopt(fd, SOL_SOCKET, SO_RCVBUF, ...)
fun ref set_so_rcvbuf( bufsize: U32 val) : U32 val
Wrapper for the FFI call setsockopt(fd, SOL_SOCKET, SO_SNDBUF, ...)
fun ref set_so_sndbuf( bufsize: U32 val) : U32 val
Wrapper for the FFI call setsockopt(fd, SOL_SOCKET, TCP_NODELAY, ...)
fun ref set_tcp_nodelay( state: Bool val) : U32 val
Handle socket events.
be _event_notify( event: Pointer[AsioEvent val] tag, flags: U32 val, arg: U32 val)
Resume reading.
be _read_again()
Resume writing.
be _write_again()
The OS has informed us that len
bytes of pending writes have completed. This occurs only with IOCP on Windows.
fun ref _complete_writes( len: U32 val) : None val
Send pending data. If any data can't be sent, keep it and mark as not writeable. On an error, dispose of the connection. Returns whether it sent all pending data or not.
fun ref _pending_writes() : Bool val
Manage pending buffer for data sent. Returns a boolean of whether the pending buffer is empty or not.
fun ref _manage_pending_buffer( bytes_sent: USize val, bytes_to_send: USize val, num_to_send: USize val) : Bool val ?
The OS has informed us that len
bytes of pending reads have completed. This occurs only with IOCP on Windows.
fun ref _complete_reads( len: U32 val) : None val
Resize the read buffer if it is empty or smaller than the next payload size
fun ref _read_buf_size() : None val
Begin an IOCP read on Windows.
fun ref _queue_read() : None val
Unless this connection is currently muted, read while data is available, guessing the next packet length as we go. If we read 5 kb of data, send ourself a resume message and stop reading, to avoid starving other actors. Currently we can handle a varying value of _expect (greater than 0) and constant _expect of 0 but we cannot handle switching between these two cases.
fun ref _pending_reads() : None val
Inform the notifier that we're connecting.
fun ref _notify_connecting() : None val
fun ref _close() : None val
If we have closed and we have no remaining writes or pending connections, then shutdown.
fun ref _try_shutdown() : None val
fun box _is_sock_connected( fd: U32 val) : Bool val
fun ref _apply_backpressure() : None val
fun ref _release_backpressure() : None val
© 2016-2018, The Pony Developers
© 2014-2015, Causality Ltd.
Licensed under the BSD 2-Clause License.
https://stdlib.ponylang.io/net-TCPConnection