c# - Turning async socket Parallel and not only Concurrent in very intensive application using TPL -
i'm writing application uses socket , intensive need use every core have in our big server. see question ( how using threadpool run socket thread parallel? ) here in stackoverflow there 1 answer point msdn sample.
but think point how make concurrent , not parallel, here asking how cpu intensive opening socket , looks intensive, here tell dont tpl taskfactory.fromasync vs tasks blocking methods , teach how here whith taskfactory.fromasync (is there pattern wrapping existing beginxxx/endxxx async methods async tasks?).
how can keep socket operations parallel , performant , if deal whith socket problems disconnections, half connected sockets , message boundaries headache in normal async way. how deal if put tpl , task.
see that:
using system; using system.collections.generic; using system.linq; using system.net; using system.net.sockets; using system.text; using system.threading; using system.threading.tasks; namespace skttool { public class stateobject { public socket worksocket = null; public const int buffersize = 1024; public byte[] buffer = new byte[buffersize]; public int bytesread = 0; public stringbuilder sb = new stringbuilder(); } public class tool { //------------------------------------------------- private manualresetevent evtconnectiondone = new manualresetevent(false); private socket skttool = null; private bool running = false; private stateobject state = null; //------------------------------------------------- toolconfig _cfg; public tool(toolconfig cfg) { _cfg = cfg; } //------------------------------------------------- public void socketlisteningset() { ipendpoint localendpoint; socket skttool; byte[] bytes = new byte[1024]; localendpoint = new ipendpoint(ipaddress.any, _cfg.addressport); skttool = new socket(addressfamily.internetwork, sockettype.stream, protocoltype.tcp); skttool.bind(localendpoint); skttool.listen(_cfg.maxqtdsockets); } //------------------------------------------------- public void start() { running = true; task t1 = task.factory.startnew(socketlisteningset); t1.continuewith(prev => { while (running) { evtconnectiondone.reset(); task<socket> accepetchunk = task<socket>.factory.fromasync( skttool.beginaccept, skttool.endaccept, accept, skttool, taskcreationoptions.attachedtoparent); accepetchunk.continuewith(accept, taskcontinuationoptions.notonfaulted | taskcreationoptions.attachedtoparent); evtconnectiondone.waitone(); } }); } //------------------------------------------------- void accept(task<socket> accepetchunk) { state = new stateobject(); evtconnectiondone.set(); state.worksocket = accepetchunk.result; task<int> readchunk = task<int>.factory.fromasync( state.worksocket.beginreceive, state.worksocket.endreceive, state.buffer, state.bytesread, state.buffer.length - state.bytesread, null, taskcreationoptions.attachedtoparent); readchunk.continuewith(read, taskcontinuationoptions.notonfaulted | taskcreationoptions.attachedtoparent); } //------------------------------------------------- void read(task<int> readchunk) { state.bytesread += readchunk.result; if (readchunk.result > 0 && state.bytesread < state.buffer.length) { read(); return; } _data = dotask(_data); task<int> sendchunk = task<int>.factory.fromasync( state.worksocket.beginsend, state.worksocket.endsend, state.buffer, state.bytesread, state.buffer.length - state.bytesread, null, taskcreationoptions.attachedtoparent); sendchunk.continuewith(send, taskcontinuationoptions.notonfaulted | taskcreationoptions.attachedtoparent); } //------------------------------------------------- void send(task<int> readchunk) { state.worksocket.shutdown(socketshutdown.both); state.worksocket.close(); } //------------------------------------------------- byte[] dotask(byte[] data) { return array.reverse(data); } //------------------------------------------------- } }
Comments
Post a Comment