1
0
mirror of https://github.com/MGislv/NekoX.git synced 2024-07-02 22:43:36 +00:00

Using pipe instead of signal to interrupt epoll_wait

This commit is contained in:
DrKLO 2015-09-30 01:50:53 +03:00
parent 16af1b0fa7
commit e0c9c4e6f4
4 changed files with 53 additions and 18 deletions

View File

@ -13,7 +13,6 @@
#include <algorithm> #include <algorithm>
#include <fcntl.h> #include <fcntl.h>
#include <memory.h> #include <memory.h>
#include <signal.h>
#include <openssl/rand.h> #include <openssl/rand.h>
#include <zlib.h> #include <zlib.h>
#include "ConnectionsManager.h" #include "ConnectionsManager.h"
@ -38,12 +37,8 @@ jmethodID jclass_ByteBuffer_allocateDirect = 0;
static bool done = false; static bool done = false;
void signal_handler(int param) {
}
ConnectionsManager::ConnectionsManager() { ConnectionsManager::ConnectionsManager() {
if ((epolFd = epoll_create(64)) == -1) { if ((epolFd = epoll_create(128)) == -1) {
DEBUG_E("unable to create epoll instance"); DEBUG_E("unable to create epoll instance");
exit(1); exit(1);
} }
@ -57,15 +52,46 @@ ConnectionsManager::ConnectionsManager() {
} }
} }
if ((epollEvents = new epoll_event[64]) == nullptr) { if ((epollEvents = new epoll_event[128]) == nullptr) {
DEBUG_E("unable to allocate epoll events"); DEBUG_E("unable to allocate epoll events");
exit(1); exit(1);
} }
struct sigaction action; pipeFd = new int[2];
memset(&action, 0, sizeof(action)); if (pipe(pipeFd) != 0) {
action.sa_handler = signal_handler; DEBUG_E("unable to create pipe");
sigaction(SIGRTMIN, &action, NULL); exit(1);
}
flags = fcntl(pipeFd[0], F_GETFL);
if (flags == -1) {
DEBUG_E("fcntl get pipefds[0] failed");
exit(1);
}
if (fcntl(pipeFd[0], F_SETFL, flags | O_NONBLOCK) == -1) {
DEBUG_E("fcntl set pipefds[0] failed");
exit(1);
}
flags = fcntl(pipeFd[1], F_GETFL);
if (flags == -1) {
DEBUG_E("fcntl get pipefds[1] failed");
exit(1);
}
if (fcntl(pipeFd[1], F_SETFL, flags | O_NONBLOCK) == -1) {
DEBUG_E("fcntl set pipefds[1] failed");
exit(1);
}
EventObject *eventObject = new EventObject(pipeFd, EventObjectPipe);
epoll_event eventMask = {};
eventMask.events = EPOLLIN;
eventMask.data.ptr = eventObject;
if (epoll_ctl(epolFd, EPOLL_CTL_ADD, pipeFd[0], &eventMask) != 0) {
DEBUG_E("can't add pipe to epoll");
exit(1);
}
networkBuffer = new NativeByteBuffer((uint32_t) READ_BUFFER_SIZE); networkBuffer = new NativeByteBuffer((uint32_t) READ_BUFFER_SIZE);
if (networkBuffer == nullptr) { if (networkBuffer == nullptr) {
@ -130,7 +156,7 @@ void ConnectionsManager::checkPendingTasks() {
void ConnectionsManager::select() { void ConnectionsManager::select() {
checkPendingTasks(); checkPendingTasks();
int eventsCount = epoll_wait(epolFd, epollEvents, 64, callEvents(getCurrentTimeMillis())); int eventsCount = epoll_wait(epolFd, epollEvents, 128, callEvents(getCurrentTimeMillis()));
checkPendingTasks(); checkPendingTasks();
int64_t now = getCurrentTimeMillis(); int64_t now = getCurrentTimeMillis();
callEvents(now); callEvents(now);
@ -249,9 +275,8 @@ void ConnectionsManager::removeEvent(EventObject *eventObject) {
} }
void ConnectionsManager::wakeup() { void ConnectionsManager::wakeup() {
if (threadStarted) { char ch = 'x';
pthread_kill(networkThread, SIGRTMIN); write(pipeFd[1], &ch, 1);
}
} }
void *ConnectionsManager::ThreadProc(void *data) { void *ConnectionsManager::ThreadProc(void *data) {
@ -267,7 +292,6 @@ void *ConnectionsManager::ThreadProc(void *data) {
networkManager->sendPing(datacenter, true); networkManager->sendPing(datacenter, true);
} }
} }
networkManager->threadStarted = true;
do { do {
networkManager->select(); networkManager->select();
} while (!done); } while (!done);

View File

@ -153,7 +153,7 @@ private:
bool ipv6Enabled = false; bool ipv6Enabled = false;
std::vector<ConnectionSocket *> activeConnections; std::vector<ConnectionSocket *> activeConnections;
int epolFd; int epolFd;
volatile bool threadStarted = false; int *pipeFd;
NativeByteBuffer *networkBuffer; NativeByteBuffer *networkBuffer;
requestsList requestsQueue; requestsList requestsQueue;

View File

@ -56,7 +56,8 @@ enum ConnectionState {
enum EventObjectType { enum EventObjectType {
EventObjectTypeConnection, EventObjectTypeConnection,
EventObjectTypeTimer EventObjectTypeTimer,
EventObjectPipe
}; };
typedef struct ConnectiosManagerDelegate { typedef struct ConnectiosManagerDelegate {

View File

@ -6,6 +6,7 @@
* Copyright Nikolai Kudashov, 2015. * Copyright Nikolai Kudashov, 2015.
*/ */
#include <unistd.h>
#include "EventObject.h" #include "EventObject.h"
#include "Connection.h" #include "Connection.h"
#include "Timer.h" #include "Timer.h"
@ -27,6 +28,15 @@ void EventObject::onEvent(uint32_t events) {
timer->onEvent(); timer->onEvent();
break; break;
} }
case EventObjectPipe: {
int *pipe = (int *) eventObject;
char ch;
ssize_t size = 1;
while (size > 0) {
size = read(pipe[0], &ch, 1);
}
break;
}
default: default:
break; break;
} }