Browse Source

Added std.socket and std.serversocket.

Mark Sibly 9 years ago
parent
commit
b0f68808fd

+ 132 - 0
bananas/echoserver/echoserver.monkey2

@@ -0,0 +1,132 @@
+
+#Import "<mojox>"
+#Import "<mojo>"
+#Import "<std>"
+
+Using mojox..
+Using mojo..
+Using std..
+
+Class MyWindow Extends Window
+
+	Method New()
+	
+		New Fiber( Server )
+		
+		New Fiber( Client )
+		
+'		New Timer( 60,App.RequestRender )
+	End
+	
+	Method Server()
+	
+		Local server:=SocketServer.Listen( 12345 )
+		If Not server print "Server:Failed to create server" ; Return
+
+		Print "Server:Listening"
+		
+		Repeat
+		
+			Local socket:=server.Accept()
+			If Not socket Exit
+			
+			Print "Server:Accepted"
+			
+			New Fiber( Lambda()
+			
+				Repeat
+				
+					Local line:=ReadLine( socket )
+					If Not line Exit
+					
+					WriteLine( socket,line )
+				Forever
+				
+				socket.Close()
+				
+			End )
+		
+		Forever
+		
+		'Never gets here!
+		'
+		Print "Server:Bye"
+		
+		server.Close()
+	
+	End
+	
+	Method Client()
+	
+		Fiber.Sleep( .1 )	'wait a bit for server to start!
+		
+		Local socket:=Socket.Connect( "localhost",12345 )
+		If Not socket Print "Client:Couldn't connect to server" ; Return
+		
+		Print "Client:Connected"
+
+		For Local i:=0 Until 100
+		
+			WriteLine( socket,"This is a number:"+i )
+			
+			Print "Reply:"+ReadLine( socket )
+		Next
+		
+		Print "Client:Bye"
+		
+		socket.Close()
+	End
+	
+	'Returns empty line at EOF.
+	'
+	Method ReadLine:String( socket:Socket )
+	
+		Local buf:=New DataBuffer( 1024 )
+		
+		Local pos:=0
+		Repeat
+			Assert( pos<1024 )
+			Local n:=socket.Read( buf,pos,1024-pos )
+			If Not n Exit
+			pos+=n
+			If buf.PeekByte( pos-1 )=10 Exit
+		Forever
+		
+		Return String.FromCString( buf.Data,pos )
+	End
+	
+	Method WriteLine( socket:Socket,line:String )
+	
+		Assert( line.Length<1024 )
+	
+		Local buf:=New DataBuffer( 1024 )
+		
+		Local pos:=0
+		For pos=0 Until line.Length
+			If line[pos]=10 Exit
+			buf.PokeByte( pos,line[pos] )
+		Next
+		
+		buf.PokeByte( pos,10 )
+		socket.Write( buf,0,pos+1 )
+	End
+	
+	Method OnRender( canvas:Canvas ) Override
+	
+'		App.RequestRender()
+		
+		Global ticks:=0
+		ticks+=1
+		
+		canvas.DrawText( ticks,0,0 )
+	End
+	
+End
+
+Function Main()
+
+	New AppInstance
+	New MyWindow
+	App.Run()
+
+End

+ 487 - 0
modules/std/socket/native/socket.cpp

@@ -0,0 +1,487 @@
+
+#include "socket.h"
+
+#include "../../async/native/async.h"
+#include "../../fiber/native/fiber.h"
+
+#if _WIN32
+
+#include <Ws2tcpip.h>
+
+typedef int socklen_t;
+
+#else
+
+#include <netdb.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <arpa/inet.h>
+#include <netinet/tcp.h>
+
+#define closesocket close
+#define ioctlsocket ioctl
+
+#endif
+
+namespace bbSocket{
+
+	struct Future : public bbAsync::Event{
+		
+		int fiber;
+		int result=-1;
+				
+		Future():fiber( bbFiber::getCurrentFiber() ){}
+				
+		void dispatch(){
+				
+			bbFiber::resumeFiber( fiber );
+		}
+		
+		void set( int result ){
+		
+			this->result=result;
+			
+			post();
+		}
+		
+		int get(){
+		
+			bbFiber::suspendCurrentFiber();
+			
+			return result;
+		}
+	};
+		
+	void init(){
+		static bool done;
+		if( done ) return;
+		done=true;
+#if _WIN32	
+		WSADATA wsa;
+		WSAStartup( MAKEWORD(2,2),&wsa );
+#endif
+	}
+
+	void dontBlock( int sock ){
+		//make non-blocking
+		u_long cmd=1;
+		ioctlsocket( sock,FIONBIO,&cmd );
+	}
+   
+	bool wouldBlock(){
+#if _WIN32
+		return WSAGetLastError()==WSAEWOULDBLOCK;
+#else
+		return errno==EAGAIN || errno==EWOULDBLOCK;
+#endif
+	}
+	
+	int _connect( const char *hostname,const char *service ){
+	
+		init();
+	
+		addrinfo hints;
+		memset( &hints,0,sizeof( hints ) );
+
+		hints.ai_family=AF_UNSPEC;
+		hints.ai_socktype=SOCK_STREAM;
+
+		addrinfo *res=0;
+		if( getaddrinfo( hostname,service,&hints,&res ) ) return -1;
+		
+		addrinfo *pres=res;
+		
+		int sock=-1;
+		
+		while( res ){
+			
+			sock=socket( res->ai_family,res->ai_socktype,res->ai_protocol );
+
+			if( sock>=0 ){
+			
+				if( !connect( sock,res->ai_addr,res->ai_addrlen ) ) break;
+				
+				::closesocket( sock );
+				sock=-1;
+			}
+
+			res=res->ai_next;
+		}
+		
+		freeaddrinfo( pres );
+		
+		if( sock<0 ) return -1;
+		
+		return sock;
+	}
+	
+	int _listen( const char *service,int queue ){
+	
+		init();
+	
+		addrinfo hints;
+		memset( &hints,0,sizeof( hints ) );
+		
+		hints.ai_family=AF_UNSPEC;
+		hints.ai_socktype=SOCK_STREAM;
+		hints.ai_flags=AI_PASSIVE;
+		
+		addrinfo *res=0;
+		if( getaddrinfo( 0,service,&hints,&res ) ) return -1;
+		
+		addrinfo *pres=res;
+		
+		int sock=-1;
+		
+		while( res ){
+		
+			sock=socket( res->ai_family,res->ai_socktype,res->ai_protocol );
+			
+			if( sock>=0 ){
+			
+				if( !bind( sock,res->ai_addr,res->ai_addrlen ) ) break;
+				
+				::closesocket( sock );
+				sock=-1;
+			}
+			
+			res=res->ai_next;
+		}
+		
+		freeaddrinfo( pres );
+		
+		if( sock<0 ) return -1;
+		
+// So server ports can be quickly reused...
+//
+#if __APPLE__ || __linux
+		int flag=1;
+		setsockopt( sock,SOL_SOCKET,SO_REUSEADDR,&flag,sizeof(flag) );
+#endif
+		::listen( sock,queue );
+		
+		return sock;
+	}
+	
+	int connect( bbString hostname,bbString service ){
+	
+		if( hostname.length()>1023 || service.length()>79 ) return -1;
+
+		char _hostname[1024];
+		char _service[80];
+		
+		strcpy( _hostname,hostname.c_str() );
+		strcpy( _service,service.c_str() );
+		
+		int result=-1;
+		
+		if( bbFiber::getCurrentFiber() ){
+
+			Future future;
+			
+			std::thread thread( [=,&future](){
+
+				future.set( _connect( _hostname,_service ) );
+	
+			} );
+			
+			result=future.get();
+			
+			thread.join();
+
+		}else{
+		
+			result=_connect( _hostname,_service );
+		}
+		
+		return result;
+	}
+	
+	int listen( bbString service,int queue ){
+	
+		if( service.length()>79 ) return -1;
+	
+		char _service[80];
+		strcpy( _service,service.c_str() );
+		
+		int result=-1;
+		
+		if( bbFiber::getCurrentFiber() ){
+
+			Future future;
+			
+			std::thread thread( [=,&future](){
+			
+				future.set( _listen( _service,queue ) );
+	
+			} );
+			
+			result=future.get();
+			
+			thread.join();
+			
+		}else{
+		
+			result=_listen( _service,queue);
+		}
+		
+		return result;
+	}
+	
+	int accept( int socket ){
+	
+		sockaddr_storage clientaddr;
+		socklen_t addrlen=sizeof( clientaddr );
+		
+		int newsock=-1;
+		
+		if( bbFiber::getCurrentFiber() ){
+		
+			Future future;
+			
+			std::thread thread( [&,socket](){
+			
+				future.set( ::accept( socket,(sockaddr*)&clientaddr,&addrlen ) );
+			} );
+			
+			newsock=future.get();
+			
+			thread.join();
+		
+		}else{
+			
+			newsock=::accept( socket,(struct sockaddr*)&clientaddr,&addrlen );
+		}
+		
+		return newsock;
+	}
+	
+	void close( int socket ){
+	
+		if( bbFiber::getCurrentFiber() ){
+		
+			Future future;
+			
+			std::thread thread( [=,&future](){
+			
+				future.set( ::closesocket( socket ) );
+				
+			} );
+			
+			future.get();
+			
+			thread.join();
+			
+		}else{
+
+			::closesocket( socket );
+		}
+	}
+	
+	int send( int socket,void *data,int size ){
+	
+		int sent=0;
+		
+		while( size>0 ){
+	
+			int n=-1;
+		
+			if( bbFiber::getCurrentFiber() ){
+			
+				Future future;
+				
+				std::thread thread( [=,&future](){
+				
+					future.set( ::send( socket,(const char*)data,size,0 ) );
+					
+				} );
+				
+				n=future.get();
+				
+				thread.join();
+				
+			}else{
+	
+				n=::send( socket,(const char*)data,size,0 );
+			}
+			
+			if( !n ) return sent;
+			
+			if( n<0 ){
+				printf( "socket_send error!\n" );fflush( stdout );
+				return sent;
+			}
+			
+			data=(char*)data+n;
+			size-=n;
+			sent+=n;
+		}
+		
+		return sent;
+	}
+	
+	int recv( int socket,void *data,int size ){
+	
+		int n=-1;
+	
+		if( bbFiber::getCurrentFiber() ){
+		
+			Future future;
+			
+			std::thread thread( [=,&future](){
+			
+				future.set( ::recv( socket,(char*)data,size,0 ) );
+				
+			} );
+			
+			n=future.get();
+			
+			thread.join();
+			
+		}else{
+
+			n=::recv( socket,(char*)data,size,0 );
+		}
+		
+		if( !n ) return 0;
+		
+		if( n<0 ){
+			printf( "socket_recv error!\n" );fflush( stdout );
+			return 0;
+		}
+		
+		return n;
+	}
+	
+	void setopt( int socket,bbString name,int value ){
+	
+		if( name=="TCP_NODELAY" ){
+			setsockopt( socket,IPPROTO_TCP,TCP_NODELAY,(const char*)&value,sizeof(value) );
+		}
+	}
+	
+	int getopt( int socket,bbString name ){
+
+		int value=-1;
+		socklen_t optlen=sizeof(value);
+		
+		if( name=="TCP_NODELAY" ){
+			getsockopt( socket,IPPROTO_TCP,TCP_NODELAY,(char*)&value,&optlen );
+		}
+		
+		return value;
+	}
+	
+
+	/*	***** EXPERIMENTAL *****
+	
+	int send( int socket,void *data,int size ){
+	
+		if( !size ) return 0;
+	
+		int sent=0;
+		
+		while( size ){
+		
+			int n=::send( socket,(const char*)data,size,0 );
+			
+			if( !n ) return sent;
+			
+			if( n<0 ){
+
+				if( !wouldBlock() ){
+					printf( "socket_send error!\n",fflush( stdout ) );
+					return sent;
+				}
+				
+				Future future;
+	
+				std::thread thread( [=,&future](){
+					
+					fd_set writeset;
+						
+					FD_ZERO( &writeset );
+					FD_SET( socket,&writeset );
+					
+					if( ::select( socket+1,0,&writeset,0,0 )==1 ){
+						
+						future.set( ::send( socket,(const char*)data,size,0 ) );
+						
+					}else{
+					
+						future.set( -1 );
+					}
+					
+				} );
+					
+				n=future.get();
+				
+				thread.join();
+				
+				if( n<0 ){
+					printf( "socket_send error!\n",fflush( stdout ) );
+					return sent;
+				}
+			}
+
+			data=(char*)data+n;
+			size-=n;
+			sent+=n;
+		}
+		
+		return sent;
+	}
+	*/
+
+	/*	***** EXPERIMENTAL *****
+	
+	int recv( int socket,void *data,int size ){
+		
+		if( !size ) return 0;
+		
+		for( ;; ){
+	
+			int n=::recv( socket,(char*)data,size,0 );
+				
+			if( !n ) return 0;
+				
+			if( n<0 ){
+				
+				if( !wouldBlock() ){
+					printf( "socket_recv error!\n" );fflush( stdout );
+					return 0;
+				}
+				
+				Future future;
+						
+				std::thread thread( [=,&future](){
+						
+					fd_set readset;
+							
+					FD_ZERO( &readset );
+					FD_SET( socket,&readset );
+							
+					if( ::select( socket+1,&readset,0,0,0 )==1 ){
+						future.set( ::recv( socket,(char*)data,size,0 ) );
+					}else{
+						future.set( -1 );
+					}
+				} );
+						
+				n=future.get();
+					
+				thread.join();
+					
+				if( n<0 ){
+					printf( "socket_recv error!\n" );fflush( stdout );
+					return 0;
+				}
+			}
+				
+			return n;
+		}
+	}
+	*/
+	
+}

+ 26 - 0
modules/std/socket/native/socket.h

@@ -0,0 +1,26 @@
+
+#ifndef BB_STD_SOCKET_H
+#define BB_STD_SOCKET_H
+
+#include <bbmonkey.h>
+
+namespace bbSocket{
+
+	int connect( bbString hostname,bbString service );
+
+	int listen( bbString service,int queue );
+	
+	int accept( int socket );
+	
+	void close( int socket );
+	
+	int send( int socket,void *data,int size );
+	
+	int recv( int socket,void *data,int size );
+
+	void setopt( int socket,bbString name,int value );
+	
+	int getopt( int socket,bbString name );
+}
+
+#endif

+ 202 - 0
modules/std/socket/socket.monkey2

@@ -0,0 +1,202 @@
+
+Namespace std.socket
+
+#If __TARGET__="windows"
+#Import "<libWs2_32.a>"
+#Endif
+
+#Import "native/socket.cpp"
+#Import "native/socket.h"
+
+Extern
+
+#rem monkeydoc @hidden
+#end
+Function socket_connect:Int( hostname:String,service:String )="bbSocket::connect"
+
+#rem monkeydoc @hidden
+#end
+Function socket_listen:Int( service:String,queue:Int=128 )="bbSocket::listen"
+
+#rem monkeydoc @hidden
+#end
+Function socket_accept:Int( socket:Int )="bbSocket::accept"
+
+#rem monkeydoc @hidden
+#end
+Function socket_close( socket:Int )="bbSocket::close"
+
+#rem monkeydoc @hidden
+#end
+Function socket_send:Int( socket:Int,data:Void Ptr,size:Int )="bbSocket::send"
+
+#rem monkeydoc @hidden
+#end
+Function socket_recv:Int( socket:Int,data:Void Ptr,size:Int )="bbSocket::recv"
+
+#rem monkeydoc @hidden
+#end
+Function socket_setopt( socket:Int,opt:String,value:Int )="bbSocket::setopt"
+
+#rem monkeydoc @hidden
+#end
+Function socket_getopt:Int( socket:Int,opt:String )="bbSocket::getopt"
+
+Public
+
+Class Socket Extends std.stream.Stream
+
+	#rem monkeydoc True if socket has been closed.
+	#end
+	Property Eof:Bool() Override
+		Return _socket<0
+	End
+
+	#rem monkeydoc Always 0.
+	#end
+	Property Position:Int() Override
+	
+		Return 0
+	End
+
+	#rem monkeydoc Always 0.
+	#end
+	Property Length:Int() Override
+	
+		Return 0
+	End
+
+	#rem monkeydoc Closes the socket.
+	#end
+	Method Close:Void() Override
+		If _socket<0 Return
+	
+		socket_close( _socket )
+		_socket=0
+	End
+
+	#rem monkeydoc No operation.
+	#end
+	Method Seek( position:Int ) Override
+	End
+
+	#rem monkeydoc Reads data from the socket.
+	
+	Reads at most `count` bytes from the socket.
+	
+	Returns 0 if the socket has been closed by the peer.
+	
+	Can return less than `count`, in which case you may have to read again if you know there's more data coming.
+
+	#end
+	Method Read:Int( buf:Void Ptr,count:Int ) Override
+		If _socket<0 Return 0
+	
+		Return socket_recv( _socket,buf,count )
+	End
+	
+	#rem monkeydoc Writes data to the socket.
+	
+	Writes `count` bytes to the socket.
+	
+	Can return less than `count` if the socket has been closed by the peer or if an error occured.
+	
+	#end
+	Method Write:Int( buf:Void Ptr,count:Int ) Override
+		If _socket<0 Return 0
+	
+		Return socket_send( _socket,buf,count )
+	End
+
+	#rem monkeydoc Sets a socket option.
+	
+	Currently, only "TCP_NODELAY" is supported, which should be 1 to enable, 0 to disable.
+	
+	#end	
+	Method SetOption( name:String,value:Int )
+		If _socket<0 Return
+	
+		socket_setopt( _socket,name,value )
+	End
+	
+	#rem monkeydoc Gets a socket option.
+	
+	Currently, only "TCP_NODELAY" is supported.
+	
+	#end	
+	Method GetOption:Int( name:String )
+		If _socket<0 Return -1
+	
+		Return socket_getopt( _socket,name )
+	End
+
+	#rem monkeydoc Connects to a host/service.
+	
+	Returns a new socket if successful, else null.
+	
+	`service` can be an integer port number.
+	
+	#end	
+	Function Connect:Socket( hostname:String,service:String )
+	
+		Local socket:=socket_connect( hostname,service )
+		If socket<0 Return Null
+		
+		Return New Socket( socket )
+	End
+	
+	Private
+	
+	Field _socket:Int 
+	
+	Method New( socket:Int )
+		_socket=socket
+	End
+
+End
+
+Class SocketServer
+
+	#rem monkeydoc Closes the server.
+	#end
+	Method Close()
+		If _socket<0 Return
+	
+		socket_close( _socket )
+	End
+
+	#rem monkeydoc Accepts a new connection.
+	#end
+	Method Accept:Socket()
+		If _socket<0 Return Null
+	
+		Local socket:=socket_accept( _socket )	
+		If socket<0 Return Null
+		
+		Return New Socket( socket )
+	End
+
+	#rem monkeydoc Creates a server and starts it listening.
+	
+	Returns a new server if successful, else null.
+	
+	`service` can be an integer port number.
+	
+	#end
+	Function Listen:SocketServer( service:String,queue:Int=128 )
+	
+		Local socket:=socket_listen( service,queue )
+		If socket<0 Return Null
+		
+		Return New SocketServer( socket )
+	End
+	
+	Private
+	
+	Field _socket:Int
+	
+	Method New( socket:Int )
+		_socket=socket
+	End
+
+End

+ 2 - 0
modules/std/std.monkey2

@@ -56,6 +56,8 @@ Namespace std
 #Import "misc/jsonify"
 #Import "misc/jsonify"
 #Import "misc/zipfile"
 #Import "misc/zipfile"
 
 
+#Import "socket/socket"
+
 Private
 Private
 
 
 Function Main()
 Function Main()