Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions plugins/slice/Config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Config::~Config()
if (nullptr != m_regex) {
delete m_regex;
}
prefetchCleanup();
}

int64_t
Expand Down Expand Up @@ -387,3 +388,36 @@ Config::sizeCacheRemove(std::string_view url)
m_oscache->remove(url);
}
}

std::pair<bool, BgBlockFetch *>
Config::prefetchAcquire(const std::string &key)
{
std::lock_guard<std::mutex> const guard(m_prefetch_mutex);
auto [it, inserted] = m_prefetch_active.insert(key);

if (!inserted) {
return {false, nullptr};
}

BgBlockFetch *bg = nullptr;

if (!m_prefetch_freelist.empty()) {
bg = m_prefetch_freelist.back();
m_prefetch_freelist.pop_back();
}

return {true, bg};
}

#if defined(UNITTEST)
// Stubs for unit tests that don't link prefetch.cc
void
Config::prefetchRelease(BgBlockFetch *)
{
}

void
Config::prefetchCleanup()
{
}
#endif
13 changes: 13 additions & 0 deletions plugins/slice/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@

#include <string>
#include <mutex>
#include <unordered_set>
#include <vector>

struct BgBlockFetch;

// Data Structures and Classes
struct Config {
Expand Down Expand Up @@ -79,6 +83,10 @@ struct Config {
// Did we cache this internally as a small object?
bool isKnownLargeObj(std::string_view url);

// Prefetch dedup and freelist
std::pair<bool, BgBlockFetch *> prefetchAcquire(const std::string &key);
void prefetchRelease(BgBlockFetch *bg);

// Metadata cache stats
std::string stat_prefix{};
int stat_TP{0}, stat_TN{0}, stat_FP{0}, stat_FN{0}, stat_no_cl{0}, stat_bad_cl{0}, stat_no_url{0};
Expand All @@ -89,4 +97,9 @@ struct Config {
std::mutex m_mutex;
std::optional<ObjectSizeCache> m_oscache;
void setCacheSize(size_t entries);

std::mutex m_prefetch_mutex;
std::unordered_set<std::string> m_prefetch_active;
std::vector<BgBlockFetch *> m_prefetch_freelist;
void prefetchCleanup();
};
59 changes: 52 additions & 7 deletions plugins/slice/prefetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,61 @@
#include "prefetch.h"

bool
BgBlockFetch::schedule(Data *const data, int blocknum)
BgBlockFetch::schedule(Data *const data, int blocknum, std::string_view url)
{
bool ret = false;
BgBlockFetch *bg = new BgBlockFetch(blocknum);
std::string key = std::string(url) + ':' + std::to_string(blocknum);
auto [acquired, bg] = data->m_config->prefetchAcquire(key);

if (!acquired) {
DEBUG_LOG("Prefetch already in flight for block %d, skipping", blocknum);
return false;
}

// Nothing on the freelist, so make a new object
if (!bg) {
bg = new BgBlockFetch();
}

bg->m_blocknum = blocknum;
bg->m_config = data->m_config;
bg->m_key = std::move(key);

if (bg->fetch(data)) {
ret = true;
return true;
} else {
bg->m_config->prefetchRelease(bg);
return false;
}
}

void
BgBlockFetch::clear()
{
m_blocknum = 0;
m_cont = nullptr;
m_config = nullptr;
m_key.clear();
}

void
Config::prefetchRelease(BgBlockFetch *bg)
{
std::lock_guard<std::mutex> const guard(m_prefetch_mutex);

m_prefetch_active.erase(bg->m_key);
bg->clear();
m_prefetch_freelist.push_back(bg);
}

void
Config::prefetchCleanup()
{
std::lock_guard<std::mutex> const guard(m_prefetch_mutex);

for (auto *bg : m_prefetch_freelist) {
delete bg;
}
return ret;
m_prefetch_freelist.clear();
}

/**
Expand Down Expand Up @@ -132,15 +177,15 @@ BgBlockFetch::handler(TSCont contp, TSEvent event, void * /* edata ATS_UNUSED */
case TS_EVENT_ERROR:
bg->m_stream.abort();
TSContDataSet(contp, nullptr);
delete bg;
TSContDestroy(contp);
bg->m_config->prefetchRelease(bg);
break;
case TS_EVENT_VCONN_READ_COMPLETE:
case TS_EVENT_VCONN_EOS:
bg->m_stream.close();
TSContDataSet(contp, nullptr);
delete bg;
TSContDestroy(contp);
bg->m_config->prefetchRelease(bg);
break;
default:
DEBUG_LOG("Unhandled bg fetch event:%s (%d)", TSHttpEventNameLookup(event), event);
Expand Down
16 changes: 10 additions & 6 deletions plugins/slice/prefetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

#pragma once

#include <map>
#include <string>
#include <string_view>

#include "ts/ts.h"
#include "Data.h"
Expand All @@ -33,15 +34,18 @@
* @brief Represents a single background fetch.
*/
struct BgBlockFetch {
static bool schedule(Data *const data, int blocknum);
static bool schedule(Data *const data, int blocknum, std::string_view url);

explicit BgBlockFetch(int blocknum) : m_blocknum(blocknum) {}
BgBlockFetch() = default;

bool fetch(Data *const data);
static int handler(TSCont contp, TSEvent event, void * /* edata ATS_UNUSED */);
void clear();

/* This is for the actual background fetch / NetVC */
Stage m_stream;
int m_blocknum;
TSCont m_cont = nullptr;
Stage m_stream;
int m_blocknum{0};
TSCont m_cont{nullptr};
Config *m_config{nullptr};
std::string m_key;
};
2 changes: 2 additions & 0 deletions plugins/slice/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ handle_server_resp(TSCont contp, TSEvent event, Data *const data)
data->m_blockskip = data->m_req_range.skipBytesForBlock(data->m_config->m_blockbytes, data->m_blocknum);
} break;
}

schedule_prefetch(data);
}

transfer_content_bytes(data);
Expand Down
20 changes: 20 additions & 0 deletions plugins/slice/unit-tests/test_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,26 @@ TEST_CASE("config bytesfrom invalid parsing", "[AWS][slice][utility]")
}
}

TEST_CASE("prefetchAcquire deduplication", "[AWS][slice][utility]")
{
Config config;

// Acquiring a new key should succeed with no freelist item.
auto [acquired1, bg1] = config.prefetchAcquire("http://example.com/file:0");
CHECK(acquired1 == true);
CHECK(bg1 == nullptr);

// Acquiring the same key again should fail (dedup).
auto [acquired2, bg2] = config.prefetchAcquire("http://example.com/file:0");
CHECK(acquired2 == false);
CHECK(bg2 == nullptr);

// A distinct key should succeed independently.
auto [acquired3, bg3] = config.prefetchAcquire("http://example.com/file:1");
CHECK(acquired3 == true);
CHECK(bg3 == nullptr);
}

TEST_CASE("config fromargs validate sizes", "[AWS][slice][utility]")
{
char const *const appname = "slice.so";
Expand Down
54 changes: 39 additions & 15 deletions plugins/slice/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,41 @@ abort(TSCont const contp, Data *const data)
TSContDestroy(contp);
}

void
schedule_prefetch(Data *const data)
{
if (!data->m_prefetchable || data->m_config->m_prefetchcount <= 0) {
return;
}

int urllen = 0;
char *const urlstr = TSUrlStringGet(data->m_urlbuf, data->m_urlloc, &urllen);

if (urlstr == nullptr || urllen <= 0) {
TSfree(urlstr);
return;
}

std::string_view const url(urlstr, urllen);
int nextblocknum = data->m_blocknum + 1;

if (data->m_blocknum > data->m_req_range.firstBlockFor(data->m_config->m_blockbytes) + 1) {
nextblocknum = data->m_blocknum + data->m_config->m_prefetchcount;
}

for (int i = nextblocknum; i <= data->m_blocknum + data->m_config->m_prefetchcount; i++) {
if (data->m_req_range.blockIsInside(data->m_config->m_blockbytes, i)) {
if (BgBlockFetch::schedule(data, i, url)) {
DEBUG_LOG("Background fetch requested");
} else {
DEBUG_LOG("Background fetch not requested");
}
}
}

TSfree(urlstr);
}

// create and issue a block request
bool
request_block(TSCont contp, Data *const data)
Expand Down Expand Up @@ -151,22 +186,11 @@ request_block(TSCont contp, Data *const data)
DEBUG_LOG("Headers\n%s", headerstr.c_str());
}

// if prefetch config set, schedule next block requests in background
if (data->m_prefetchable && data->m_config->m_prefetchcount > 0) {
int nextblocknum = data->m_blocknum + 1;
if (data->m_blocknum > data->m_req_range.firstBlockFor(data->m_config->m_blockbytes) + 1) {
nextblocknum = data->m_blocknum + data->m_config->m_prefetchcount;
}
for (int i = nextblocknum; i <= data->m_blocknum + data->m_config->m_prefetchcount; i++) {
if (data->m_req_range.blockIsInside(data->m_config->m_blockbytes, i)) {
if (BgBlockFetch::schedule(data, i)) {
DEBUG_LOG("Background fetch requested");
} else {
DEBUG_LOG("Background fetch not requested");
}
}
}
// Extend prefetch sliding window past the initial burst
if (data->m_blocknum > data->m_req_range.firstBlockFor(data->m_config->m_blockbytes) + 1) {
schedule_prefetch(data);
}

// get ready for data back from the server
data->m_upstream.setupVioRead(contp, INT64_MAX);

Expand Down
2 changes: 2 additions & 0 deletions plugins/slice/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ void abort(TSCont const contp, Data *const data);

bool request_block(TSCont contp, Data *const data);

void schedule_prefetch(Data *const data);

bool reader_avail_more_than(TSIOBufferReader const reader, int64_t bytes);