18 #define DEBUG_TYPE "orc"
29 #if LLVM_ENABLE_THREADS
31 unique_function<
void()> Work) {
33 std::lock_guard<std::mutex> Lock(DispatchMutex);
39 std::thread([
this, Work =
std::move(Work)]()
mutable {
41 std::lock_guard<std::mutex> Lock(DispatchMutex);
43 OutstandingCV.notify_all();
48 std::unique_lock<std::mutex> Lock(DispatchMutex);
50 OutstandingCV.wait(Lock, [
this]() {
return Outstanding == 0; });
66 dbgs() <<
"SimpleRemoteEPCServer::handleMessage: opc = ";
70 assert(SeqNo == 0 &&
"Non-zero SeqNo for Setup?");
75 assert(SeqNo == 0 &&
"Non-zero SeqNo for Hangup?");
83 dbgs() <<
"CallWrapper";
86 dbgs() <<
", seqno = " << SeqNo
88 <<
", arg-buffer = " <<
formatv(
"{0:x}", ArgBytes.size())
92 using UT = std::underlying_type_t<SimpleRemoteEPCOpcode>;
94 return make_error<StringError>(
"Unexpected opcode",
100 return make_error<StringError>(
"Unexpected Setup opcode",
105 if (
auto Err = handleResult(SeqNo, TagAddr,
std::move(ArgBytes)))
109 handleCallWrapper(SeqNo, TagAddr,
std::move(ArgBytes));
112 return ContinueSession;
116 std::unique_lock<std::mutex> Lock(ServerStateMutex);
117 ShutdownCV.wait(Lock, [
this]() {
return RunState == ServerShutDown; });
125 std::lock_guard<std::mutex> Lock(ServerStateMutex);
126 std::swap(TmpPending, PendingJITDispatchResults);
127 RunState = ServerShuttingDown;
131 for (
auto &KV : TmpPending)
132 KV.second->set_value(
139 while (!Services.empty()) {
145 std::lock_guard<std::mutex> Lock(ServerStateMutex);
147 RunState = ServerShutDown;
148 ShutdownCV.notify_all();
156 dbgs() <<
"SimpleRemoteEPCServer::sendMessage: opc = ";
160 assert(SeqNo == 0 &&
"Non-zero SeqNo for Setup?");
165 assert(SeqNo == 0 &&
"Non-zero SeqNo for Hangup?");
166 assert(TagAddr.
getValue() == 0 &&
"Non-zero TagAddr for Hangup?");
170 assert(TagAddr.
getValue() == 0 &&
"Non-zero TagAddr for Result?");
173 dbgs() <<
"CallWrapper";
176 dbgs() <<
", seqno = " << SeqNo
178 <<
", arg-buffer = " <<
formatv(
"{0:x}", ArgBytes.
size())
181 auto Err =
T->sendMessage(OpC, SeqNo, TagAddr, ArgBytes);
184 dbgs() <<
" \\--> SimpleRemoteEPC::sendMessage failed\n";
189 Error SimpleRemoteEPCServer::sendSetupMessage(
190 StringMap<ExecutorAddr> BootstrapSymbols) {
192 using namespace SimpleRemoteEPCDefaultBootstrapSymbolNames;
194 std::vector<char> SetupPacket;
195 SimpleRemoteEPCExecutorInfo EI;
201 EI.BootstrapSymbols =
std::move(BootstrapSymbols);
204 "Dispatch context name should not be set");
206 "Dispatch function name should not be set");
211 shared::SPSArgList<shared::SPSSimpleRemoteEPCExecutorInfo>;
212 auto SetupPacketBytes =
214 shared::SPSOutputBuffer
OB(SetupPacketBytes.data(), SetupPacketBytes.size());
215 if (!SPSSerialize::serialize(
OB, EI))
216 return make_error<StringError>(
"Could not send setup packet",
220 {SetupPacketBytes.data(), SetupPacketBytes.size()});
223 Error SimpleRemoteEPCServer::handleResult(
224 uint64_t SeqNo, ExecutorAddr TagAddr,
226 std::promise<shared::WrapperFunctionResult> *
P =
nullptr;
228 std::lock_guard<std::mutex> Lock(ServerStateMutex);
229 auto I = PendingJITDispatchResults.find(SeqNo);
230 if (
I == PendingJITDispatchResults.end())
231 return make_error<StringError>(
"No call for sequence number " +
235 PendingJITDispatchResults.erase(
I);
239 memcpy(
R.data(), ArgBytes.data(), ArgBytes.size());
244 void SimpleRemoteEPCServer::handleCallWrapper(
245 uint64_t RemoteSeqNo, ExecutorAddr TagAddr,
247 D->dispatch([
this, RemoteSeqNo, TagAddr, ArgBytes =
std::move(ArgBytes)]() {
249 shared::CWrapperFunctionResult (*)(
const char *, size_t);
250 auto *Fn = TagAddr.toPtr<WrapperFnTy>();
251 shared::WrapperFunctionResult ResultBytes(
252 Fn(ArgBytes.data(), ArgBytes.size()));
255 {ResultBytes.data(), ResultBytes.size()}))
260 shared::WrapperFunctionResult
261 SimpleRemoteEPCServer::doJITDispatch(
const void *FnTag,
const char *ArgData,
264 std::promise<shared::WrapperFunctionResult> ResultP;
265 auto ResultF = ResultP.get_future();
267 std::lock_guard<std::mutex> Lock(ServerStateMutex);
268 if (RunState != ServerRunning)
270 "jit_dispatch not available (EPC server shut down)");
272 SeqNo = getNextSeqNo();
273 assert(!PendingJITDispatchResults.count(SeqNo) &&
"SeqNo already in use");
274 PendingJITDispatchResults[SeqNo] = &ResultP;
281 return ResultF.get();
284 shared::CWrapperFunctionResult
285 SimpleRemoteEPCServer::jitDispatchEntry(
void *DispatchCtx,
const void *FnTag,
286 const char *ArgData,
size_t ArgSize) {
287 return reinterpret_cast<SimpleRemoteEPCServer *
>(DispatchCtx)
288 ->doJITDispatch(FnTag, ArgData, ArgSize)