|
@@ -88,7 +88,7 @@ void SctpTransport::Cleanup() {
|
|
|
SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
|
|
|
message_callback recvCallback, amount_callback bufferedAmountCallback,
|
|
|
state_callback stateChangeCallback)
|
|
|
- : Transport(lower, std::move(stateChangeCallback)), mPort(port),
|
|
|
+ : Transport(lower, std::move(stateChangeCallback)), mPort(port), mPendingRecvCount(0),
|
|
|
mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
|
|
|
onRecv(recvCallback);
|
|
|
|
|
@@ -327,6 +327,7 @@ void SctpTransport::incoming(message_ptr message) {
|
|
|
|
|
|
void SctpTransport::doRecv() {
|
|
|
std::lock_guard lock(mRecvMutex);
|
|
|
+ --mPendingRecvCount;
|
|
|
try {
|
|
|
while (true) {
|
|
|
const size_t bufferSize = 65536;
|
|
@@ -532,15 +533,17 @@ bool SctpTransport::safeFlush() {
|
|
|
}
|
|
|
|
|
|
void SctpTransport::handleUpcall() {
|
|
|
- if(!mSock)
|
|
|
+ if (!mSock)
|
|
|
return;
|
|
|
|
|
|
PLOG_VERBOSE << "Handle upcall";
|
|
|
|
|
|
int events = usrsctp_get_events(mSock);
|
|
|
|
|
|
- if (events & SCTP_EVENT_READ)
|
|
|
+ if (events & SCTP_EVENT_READ && mPendingRecvCount == 0) {
|
|
|
+ ++mPendingRecvCount;
|
|
|
mProcessor.enqueue(&SctpTransport::doRecv, this);
|
|
|
+ }
|
|
|
|
|
|
if (events & SCTP_EVENT_WRITE)
|
|
|
mProcessor.enqueue(&SctpTransport::safeFlush, this);
|