c# - Turning async socket Parallel and not only Concurrent in very intensive application using TPL -


i'm writing application uses socket , intensive need use every core have in our big server. see question ( how using threadpool run socket thread parallel? ) here in stackoverflow there 1 answer point msdn sample.

but think point how make concurrent , not parallel, here asking how cpu intensive opening socket , looks intensive, here tell dont tpl taskfactory.fromasync vs tasks blocking methods , teach how here whith taskfactory.fromasync (is there pattern wrapping existing beginxxx/endxxx async methods async tasks?).

how can keep socket operations parallel , performant , if deal whith socket problems disconnections, half connected sockets , message boundaries headache in normal async way. how deal if put tpl , task.

see that:

using system; using system.collections.generic; using system.linq; using system.net; using system.net.sockets; using system.text; using system.threading; using system.threading.tasks;  namespace skttool {     public class stateobject     {         public socket worksocket = null;         public const int buffersize = 1024;         public byte[] buffer = new byte[buffersize];         public int bytesread = 0;         public stringbuilder sb = new stringbuilder();     }      public class tool     {         //-------------------------------------------------         private manualresetevent evtconnectiondone = new manualresetevent(false);         private socket skttool = null;         private bool running = false;         private stateobject state = null;         //-------------------------------------------------         toolconfig _cfg;         public tool(toolconfig cfg)         {             _cfg = cfg;         }         //-------------------------------------------------         public void socketlisteningset()         {             ipendpoint localendpoint;             socket skttool;             byte[] bytes = new byte[1024];             localendpoint = new ipendpoint(ipaddress.any, _cfg.addressport);             skttool = new socket(addressfamily.internetwork, sockettype.stream, protocoltype.tcp);             skttool.bind(localendpoint);             skttool.listen(_cfg.maxqtdsockets);         }         //-------------------------------------------------         public void start()         {             running = true;             task t1 = task.factory.startnew(socketlisteningset);             t1.continuewith(prev =>             {                 while (running)                 {                     evtconnectiondone.reset();                     task<socket> accepetchunk = task<socket>.factory.fromasync(                                                                        skttool.beginaccept,                                                                        skttool.endaccept,                                                                        accept,                                                                        skttool,                                                                        taskcreationoptions.attachedtoparent);                     accepetchunk.continuewith(accept, taskcontinuationoptions.notonfaulted | taskcreationoptions.attachedtoparent);                     evtconnectiondone.waitone();                 }             });         }         //-------------------------------------------------         void accept(task<socket> accepetchunk)         {             state = new stateobject();             evtconnectiondone.set();             state.worksocket = accepetchunk.result;             task<int> readchunk = task<int>.factory.fromasync(                                                        state.worksocket.beginreceive,                                                        state.worksocket.endreceive,                                                        state.buffer,                                                        state.bytesread,                                                        state.buffer.length - state.bytesread,                                                        null,                                                        taskcreationoptions.attachedtoparent);             readchunk.continuewith(read, taskcontinuationoptions.notonfaulted | taskcreationoptions.attachedtoparent);         }         //-------------------------------------------------         void read(task<int> readchunk)         {             state.bytesread += readchunk.result;             if (readchunk.result > 0 && state.bytesread < state.buffer.length)             {                 read();                 return;             }             _data = dotask(_data);             task<int> sendchunk = task<int>.factory.fromasync(                                                        state.worksocket.beginsend,                                                        state.worksocket.endsend,                                                        state.buffer,                                                        state.bytesread,                                                        state.buffer.length - state.bytesread,                                                        null,                                                        taskcreationoptions.attachedtoparent);             sendchunk.continuewith(send, taskcontinuationoptions.notonfaulted | taskcreationoptions.attachedtoparent);         }         //-------------------------------------------------         void send(task<int> readchunk)         {             state.worksocket.shutdown(socketshutdown.both);             state.worksocket.close();         }         //-------------------------------------------------         byte[] dotask(byte[] data)         {             return array.reverse(data);         }         //-------------------------------------------------     } } 

Comments