This is an example of how to use the endpoint class.
#include "acceptor.h"
#include "connector.h"
#include "socket/channel.h"
#include "worker_epoll.h"
#include <algorithm>
#include <endian.h>
#include <iomanip>
#include <iostream>
#include <memory>
#include "ucxpp/context.h"
#include "ucxpp/endpoint.h"
#include "ucxpp/memory.h"
#include <ucxpp/ucxpp.h>
constexpr ucp_tag_t kTestTag = 0xFD709394UL;
constexpr ucp_tag_t kBellTag = 0xbe11be11UL;
receive_mr(std::shared_ptr<ucxpp::endpoint> ep) {
using std::cout;
using std::endl;
uint64_t remote_addr;
co_await ep->stream_recv(&remote_addr, sizeof(remote_addr));
remote_addr = ::be64toh(remote_addr);
size_t rkey_length;
co_await ep->stream_recv(&rkey_length, sizeof(rkey_length));
rkey_length = ::be64toh(rkey_length);
std::vector<char> rkey_buffer(rkey_length);
size_t rkey_recved = 0;
while (rkey_recved < rkey_length) {
auto n = co_await ep->stream_recv(&rkey_buffer[rkey_recved],
rkey_length - rkey_recved);
rkey_recved += n;
}
co_return std::make_pair(remote_addr,
}
auto ep = co_await connector.connect();
ep->print();
char buffer[6];
auto [n, sender_tag] =
co_await ep->worker_ptr()->tag_recv(buffer, sizeof(buffer), kTestTag);
std::cout << "Received " << n << " bytes from " << std::hex << sender_tag
<< std::dec << ": " << buffer << std::endl;
std::copy_n("world", 6, buffer);
co_await ep->tag_send(buffer, sizeof(buffer), kTestTag);
n = co_await ep->stream_recv(buffer, sizeof(buffer));
std::cout << "Received " << n << " bytes: " << buffer << std::endl;
std::copy_n("world", 6, buffer);
co_await ep->stream_send(buffer, sizeof(buffer));
auto local_mr = ucxpp::local_memory_handle::register_mem(
ep->worker_ptr()->context_ptr(), buffer, sizeof(buffer));
auto [remote_addr, remote_mr] = co_await receive_mr(ep);
std::cout << "Remote addr: 0x" << std::hex << remote_addr << std::dec
<< std::endl;
co_await remote_mr.get(buffer, sizeof(buffer), remote_addr);
std::cout << "Read from server: " << buffer << std::endl;
std::copy_n("world", 6, buffer);
co_await remote_mr.put(buffer, sizeof(buffer), remote_addr);
std::cout << "Wrote to server: " << buffer << std::endl;
size_t bell;
co_await ep->tag_send(&bell, sizeof(bell), kBellTag);
uint64_t local_value = 1;
uint64_t reply_value = 0;
auto [atomic_raddr, atomic_mr] = co_await receive_mr(ep);
co_await atomic_mr.atomic_fetch_add(atomic_raddr, local_value, reply_value);
std::cout << "Fetched and added on server: " << reply_value << std::endl;
co_await ep->tag_send(&bell, sizeof(bell), kBellTag);
co_await ep->worker_ptr()->tag_recv(&bell, sizeof(bell), kBellTag);
local_value = reply_value + local_value;
reply_value = 456;
co_await atomic_mr.atomic_compare_swap(atomic_raddr, local_value,
reply_value);
std::cout << "Compared and swapped on server: " << reply_value << std::endl;
co_await ep->tag_send(&bell, sizeof(bell), kBellTag);
co_await ep->worker_ptr()->tag_recv(&bell, sizeof(bell), kBellTag);
local_value = 123;
co_await atomic_mr.atomic_swap(atomic_raddr, local_value, reply_value);
std::cout << "Swapped on server: " << reply_value << std::endl;
co_await ep->tag_send(&bell, sizeof(bell), kBellTag);
co_await ep->worker_ptr()->tag_recv(&bell, sizeof(bell), kBellTag);
local_value = 0xF;
co_await atomic_mr.atomic_fetch_and(atomic_raddr, local_value, reply_value);
std::cout << "Fetched and anded on server: " << reply_value << std::endl;
co_await ep->tag_send(&bell, sizeof(bell), kBellTag);
co_await ep->worker_ptr()->tag_recv(&bell, sizeof(bell), kBellTag);
local_value = 0xF;
co_await atomic_mr.atomic_fetch_or(atomic_raddr, local_value, reply_value);
std::cout << "Fetched and ored on server: " << reply_value << std::endl;
co_await ep->tag_send(&bell, sizeof(bell), kBellTag);
co_await ep->worker_ptr()->tag_recv(&bell, sizeof(bell), kBellTag);
local_value = 0xF;
co_await atomic_mr.atomic_fetch_xor(atomic_raddr, local_value, reply_value);
std::cout << "Fetched and xored on server: " << reply_value << std::endl;
co_await ep->tag_send(&bell, sizeof(bell), kBellTag);
co_await ep->worker_ptr()->tag_recv(&bell, sizeof(bell), kBellTag);
co_await ep->flush();
co_await ep->close();
co_return;
}
auto rkey_length = ::htobe64(packed_rkey.get_length());
auto remote_addr = ::htobe64(reinterpret_cast<uint64_t>(address));
co_await ep->stream_send(&remote_addr, sizeof(remote_addr));
co_await ep->stream_send(&rkey_length, sizeof(rkey_length));
co_await ep->stream_send(packed_rkey.get_buffer(), packed_rkey.get_length());
co_return;
}
ep->print();
char buffer[6] = "Hello";
co_await ep->tag_send(buffer, sizeof(buffer), kTestTag);
auto [n, sender_tag] =
co_await ep->worker_ptr()->tag_recv(buffer, sizeof(buffer), kTestTag);
std::cout << "Received " << n << " bytes from " << std::hex << sender_tag
<< std::dec << ": " << buffer << std::endl;
std::copy_n("Hello", 6, buffer);
co_await ep->stream_send(buffer, sizeof(buffer));
n = co_await ep->stream_recv(buffer, sizeof(buffer));
std::cout << "Received " << n << " bytes: " << buffer << std::endl;
std::copy_n("Hello", 6, buffer);
auto local_mr = ucxpp::local_memory_handle::register_mem(
ep->worker_ptr()->context_ptr(), buffer, sizeof(buffer));
co_await send_mr(ep, buffer, local_mr);
size_t bell;
co_await ep->worker_ptr()->tag_recv(&bell, sizeof(bell), kBellTag);
std::cout << "Written by client: " << buffer << std::endl;
uint64_t value = 42;
auto atomic_mr = ucxpp::local_memory_handle::register_mem(
ep->worker_ptr()->context_ptr(), &value, sizeof(value));
co_await send_mr(ep, &value, atomic_mr);
co_await ep->worker_ptr()->tag_recv(&bell, sizeof(bell), kBellTag);
std::cout << "Fetched and added by client: " << value << std::endl;
co_await ep->tag_send(&bell, sizeof(bell), kBellTag);
co_await ep->worker_ptr()->tag_recv(&bell, sizeof(bell), kBellTag);
std::cout << "Compared and Swapped by client: " << value << std::endl;
co_await ep->tag_send(&bell, sizeof(bell), kBellTag);
co_await ep->worker_ptr()->tag_recv(&bell, sizeof(bell), kBellTag);
std::cout << "Swapped by client: " << value << std::endl;
co_await ep->tag_send(&bell, sizeof(bell), kBellTag);
co_await ep->worker_ptr()->tag_recv(&bell, sizeof(bell), kBellTag);
std::cout << "Fetched and Anded by client: " << value << std::endl;
co_await ep->tag_send(&bell, sizeof(bell), kBellTag);
co_await ep->worker_ptr()->tag_recv(&bell, sizeof(bell), kBellTag);
std::cout << "Fetched and Ored by client: " << value << std::endl;
co_await ep->tag_send(&bell, sizeof(bell), kBellTag);
co_await ep->worker_ptr()->tag_recv(&bell, sizeof(bell), kBellTag);
std::cout << "Fetched and Xored by client: " << value << std::endl;
co_await ep->tag_send(&bell, sizeof(bell), kBellTag);
co_await ep->flush();
co_await ep->close();
co_return;
}
while (true) {
auto ep = co_await acceptor.accept();
handle_endpoint(ep).
detach();
}
co_return;
}
int main(int argc, char *argv[]) {
auto loop = ucxpp::socket::event_loop::new_loop();
auto worker = std::make_shared<ucxpp::worker>(ctx);
ucxpp::register_loop(worker, loop);
bool close_triggered;
if (argc == 2) {
auto listener = std::make_shared<ucxpp::socket::tcp_listener>(
loop, "0.0.0.0", std::stoi(argv[1]));
auto acceptor = ucxpp::acceptor(worker, listener);
server(std::move(acceptor)).detach();
} else if (argc == 3) {
auto connector =
ucxpp::connector(worker, loop, argv[1], std::stoi(argv[2]));
client(std::move(connector)).detach();
} else {
std::cout << "Usage: " << argv[0] << " <host> <port>" << std::endl;
}
while (worker.use_count() > 1) {
loop->poll(close_triggered);
}
loop->close();
loop->poll(close_triggered);
return 0;
}
Context builder.
Definition: context.h:34
builder & enable_amo64()
Enable atomic memory operations with 64-bit operands.
Definition: context.cc:56
std::shared_ptr< context > build()
Build and return a context object.
Definition: context.cc:17
builder & enable_wakeup()
Enable the wakeup feature.
Definition: context.cc:26
builder & enable_rma()
Enable remote memory access feature.
Definition: context.cc:46
builder & enable_stream()
Enable stream-related operations.
Definition: context.cc:36
builder & enable_tag()
Enable tag-related operations.
Definition: context.cc:31
Represents a registered local memory region.
Definition: memory.h:37
packed_memory_rkey pack_rkey() const
Pack the information needed for remote access to the memory region. It is intended to be sent to the ...
Definition: memory.cc:55
Represents a remote memory region. Note that this does not contain the remote address....
Definition: memory.h:98