113 int max_retries = 30, attempts = 0;
114 double wait_seconds = 0.2, backoff_exp = 1.2;
116 int idx = addr.find(
":");
117 int port = stoi(addr.substr(idx + 1, addr.size() - idx - 1));
119 while (attempts < max_retries) {
120 std::string command =
"lsof -i :" + std::to_string(port);
121 FILE *pipe = popen(command.c_str(),
"r");
123 std::cerr <<
"Error executing command: " << std::strerror(errno)
129 std::string result =
"";
130 while (!feof(pipe)) {
131 if (fgets(buffer, 256, pipe) !=
nullptr)
136 if (result.empty()) {
140 std::cout <<
"Port is unavailable retrying! attempt: " << attempts
142 std::this_thread::sleep_for(std::chrono::duration<double>(wait_seconds));
143 wait_seconds *= backoff_exp;
146 std::cout <<
"Port is unavailable now!" << std::endl;
156 "evaluateUntyped not implemented for gRPCModelRunner; "
157 "Override gRPC method instead");
158 assert(
request !=
nullptr &&
"Request cannot be null");
160 int max_retries = 30, attempts = 0;
161 double retries_wait_secs = 0.2;
162 int deadline_time = 10000;
163 int deadline_max_retries = 30, deadline_attpts = 0;
164 double retry_wait_backoff_exponent = 1.5;
167 auto deadline = std::chrono::system_clock::now() +
168 std::chrono::milliseconds(deadline_time);
170 while (attempts < max_retries && deadline_attpts < deadline_max_retries) {
171 grpc::ClientContext grpcCtx;
174 grpcCtx.set_deadline(deadline);
178 if (status.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED) {
180 int ext_deadline = 2 * deadline_time;
181 deadline_time = ext_deadline;
182 std::cout <<
"Deadline Exceeded for Request! sending the message again "
183 "with extended deadline : "
184 << deadline_time <<
"\n";
185 deadline = std::chrono::system_clock::now() +
186 std::chrono::milliseconds(deadline_time);
187 }
else if (status.error_code() == grpc::StatusCode::UNAVAILABLE) {
189 std::cout <<
"Server is unavailable retrying! attempt: " << attempts
191 std::this_thread::sleep_for(
192 std::chrono::duration<double>(retries_wait_secs));
193 retries_wait_secs *= retry_wait_backoff_exponent;
198 Ctx->emitError(
"gRPC failed: " + status.error_message());
200 std::cerr <<
"gRPC failed: " << status.error_message() << std::endl;
208 std::cout <<
"Server is unavailable now!!!\n";
222 grpc::ServerBuilder builder;
224 builder.AddListeningPort(
server_address, grpc::InsecureServerCredentials());
225 builder.RegisterService(s);
226 std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
227 std::cout <<
"Server Listening on " <<
server_address << std::endl;
228 auto serveFn = [&]() { server->Wait(); };
229 std::thread serving_thread(serveFn);
234 serving_thread.join();
235 std::cout <<
"Server Shutdowns Successfully" << std::endl;