RDMA++
Loading...
Searching...
No Matches
helloworld.cc

This is an example of how to create a Queue Pair connected with a remote peer and perform send/recv/read/write/atomic operations on the QP.

#include "acceptor.h"
#include "connector.h"
#include <algorithm>
#include <cassert>
#include <chrono>
#include <cstdint>
#include <exception>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <rdmapp/rdmapp.h>
using namespace std::literals::chrono_literals;
rdmapp::task<void> handle_qp(std::shared_ptr<rdmapp::qp> qp) {
/* Send/Recv */
char buffer[6] = "hello";
co_await qp->send(buffer, sizeof(buffer));
std::cout << "Sent to client: " << buffer << std::endl;
co_await qp->recv(buffer, sizeof(buffer));
std::cout << "Received from client: " << buffer << std::endl;
/* Read/Write */
std::copy_n("hello", sizeof(buffer), buffer);
auto local_mr = std::make_shared<rdmapp::local_mr>(
qp->pd_ptr()->reg_mr(&buffer[0], sizeof(buffer)));
auto local_mr_serialized = local_mr->serialize();
co_await qp->send(local_mr_serialized.data(), local_mr_serialized.size());
std::cout << "Sent mr addr=" << local_mr->addr()
<< " length=" << local_mr->length() << " rkey=" << local_mr->rkey()
<< " to client" << std::endl;
auto [_, imm] = co_await qp->recv(local_mr);
assert(imm.has_value());
std::cout << "Written by client (imm=" << imm.value() << "): " << buffer
<< std::endl;
/* Atomic */
uint64_t counter = 42;
auto counter_mr = std::make_shared<rdmapp::local_mr>(
qp->pd_ptr()->reg_mr(&counter, sizeof(counter)));
auto counter_mr_serialized = counter_mr->serialize();
co_await qp->send(counter_mr_serialized.data(), counter_mr_serialized.size());
std::cout << "Sent mr addr=" << counter_mr->addr()
<< " length=" << counter_mr->length()
<< " rkey=" << counter_mr->rkey() << " to client" << std::endl;
imm = (co_await qp->recv(local_mr)).second;
assert(imm.has_value());
std::cout << "Fetched and added by client: " << counter << std::endl;
imm = (co_await qp->recv(local_mr)).second;
assert(imm.has_value());
std::cout << "Compared and swapped by client: " << counter << std::endl;
co_return;
}
rdmapp::task<void> server(rdmapp::acceptor &acceptor) {
while (true) {
auto qp = co_await acceptor.accept();
handle_qp(qp).detach();
}
co_return;
}
rdmapp::task<void> client(rdmapp::connector &connector) {
auto qp = co_await connector.connect();
char buffer[6];
/* Send/Recv */
auto [n, _] = co_await qp->recv(buffer, sizeof(buffer));
std::cout << "Received " << n << " bytes from server: " << buffer
<< std::endl;
std::copy_n("world", sizeof(buffer), buffer);
co_await qp->send(buffer, sizeof(buffer));
std::cout << "Sent to server: " << buffer << std::endl;
/* Read/Write */
char remote_mr_serialized[rdmapp::remote_mr::kSerializedSize];
co_await qp->recv(remote_mr_serialized, sizeof(remote_mr_serialized));
auto remote_mr = rdmapp::remote_mr::deserialize(remote_mr_serialized);
std::cout << "Received mr addr=" << remote_mr.addr()
<< " length=" << remote_mr.length() << " rkey=" << remote_mr.rkey()
<< " from server" << std::endl;
n = co_await qp->read(remote_mr, buffer, sizeof(buffer));
std::cout << "Read " << n << " bytes from server: " << buffer << std::endl;
std::copy_n("world", sizeof(buffer), buffer);
co_await qp->write_with_imm(remote_mr, buffer, sizeof(buffer), 1);
/* Atomic Fetch-and-Add (FA)/Compare-and-Swap (CS) */
char counter_mr_serialized[rdmapp::remote_mr::kSerializedSize];
co_await qp->recv(counter_mr_serialized, sizeof(counter_mr_serialized));
auto counter_mr = rdmapp::remote_mr::deserialize(counter_mr_serialized);
std::cout << "Received mr addr=" << counter_mr.addr()
<< " length=" << counter_mr.length()
<< " rkey=" << counter_mr.rkey() << " from server" << std::endl;
uint64_t counter = 0;
co_await qp->fetch_and_add(counter_mr, &counter, sizeof(counter), 1);
std::cout << "Fetched and added from server: " << counter << std::endl;
co_await qp->write_with_imm(remote_mr, buffer, sizeof(buffer), 1);
co_await qp->compare_and_swap(counter_mr, &counter, sizeof(counter), 43,
4422);
std::cout << "Compared and swapped from server: " << counter << std::endl;
co_await qp->write_with_imm(remote_mr, buffer, sizeof(buffer), 1);
co_return;
}
int main(int argc, char *argv[]) {
auto device = std::make_shared<rdmapp::device>(0, 1);
auto pd = std::make_shared<rdmapp::pd>(device);
auto cq = std::make_shared<rdmapp::cq>(device);
auto cq_poller = std::make_shared<rdmapp::cq_poller>(cq);
auto loop = rdmapp::socket::event_loop::new_loop();
auto looper = std::thread([loop]() { loop->loop(); });
if (argc == 2) {
rdmapp::acceptor acceptor(loop, std::stoi(argv[1]), pd, cq);
server(acceptor);
} else if (argc == 3) {
rdmapp::connector connector(loop, argv[1], std::stoi(argv[2]), pd, cq);
client(connector);
} else {
std::cout << "Usage: " << argv[0] << " [port] for server and " << argv[0]
<< " [server_ip] [port] for client" << std::endl;
}
loop->close();
looper.join();
return 0;
}
void * addr() const
Get the address of the memory region.
Definition mr.cc:50
std::vector< uint8_t > serialize() const
Serialize the memory region handle to be sent to a remote peer.
Definition mr.cc:41
uint32_t rkey() const
Get the remote key of the memory region.
Definition mr.cc:54
size_t length() const
Get the length of the memory region.
Definition mr.cc:52
uint32_t rkey()
Get the remote key of the memory region.
Definition mr.cc:65
void * addr()
Get the address of the remote memory region.
Definition mr.cc:61
static constexpr size_t kSerializedSize
The size of a serialized remote memory region.
Definition mr.h:118
uint32_t length()
Get the length of the remote memory region.
Definition mr.cc:63
static mr< tags::mr::remote > deserialize(It it)
Deserialize a remote memory region handle.
Definition mr.h:167
Definition task.h:50