#include "multiverso/table/array_table.h" #include "multiverso/io/io.h" #include "multiverso/multiverso.h" #include "multiverso/util/log.h" #include "multiverso/updater/updater.h" namespace multiverso { template ArrayWorker::ArrayWorker(size_t size) : WorkerTable(), size_(size) { num_server_ = MV_NumServers(); server_offsets_.push_back(0); CHECK(size_ > MV_NumServers()); integer_t length = static_cast(size_) / MV_NumServers(); for (auto i = 1; i < MV_NumServers(); ++i) { server_offsets_.push_back(i * length); // may not balance } server_offsets_.push_back(size_); Log::Debug("worker %d create arrayTable with %d elements.\n", MV_Rank(), size); } template ArrayWorker::ArrayWorker(const ArrayTableOption &option) : ArrayWorker(option.size) { } template void ArrayWorker::Get(T* data, size_t size) { CHECK(size == size_); data_ = data; integer_t all_key = -1; Blob whole_table(&all_key, sizeof(integer_t)); WorkerTable::Get(whole_table); Log::Debug("worker %d getting all parameters.\n", MV_Rank()); } template int ArrayWorker::GetAsync(T* data, size_t size) { CHECK(size == size_); data_ = data; integer_t all_key = -1; Blob whole_table(&all_key, sizeof(integer_t)); return WorkerTable::GetAsync(whole_table); } template void ArrayWorker::Add(T* data, size_t size, const AddOption* option) { CHECK(size == size_); integer_t all_key = -1; Blob key(&all_key, sizeof(integer_t)); Blob val(data, sizeof(T) * size); WorkerTable::Add(key, val, option); Log::Debug("worker %d adding parameters with size of %d.\n", MV_Rank(), size); } template int ArrayWorker::AddAsync(T* data, size_t size, const AddOption* option) { CHECK(size == size_); integer_t all_key = -1; Blob key(&all_key, sizeof(integer_t)); Blob val(data, sizeof(T) * size); return WorkerTable::AddAsync(key, val, option); } template int ArrayWorker::Partition(const std::vector& kv, MsgType, std::unordered_map >* out) { CHECK(kv.size() == 1 || kv.size() == 2 || kv.size() == 3); for (int i = 0; i < num_server_; ++i) (*out)[i].push_back(kv[0]); if (kv.size() >= 2) { CHECK(kv[1].size() == size_ * sizeof(T)); for (int i = 0; i < num_server_; ++i) { Blob blob(kv[1].data() + server_offsets_[i] * sizeof(T), (server_offsets_[i + 1] - server_offsets_[i]) * sizeof(T)); (*out)[i].push_back(blob); if (kv.size() == 3) {// update option blob (*out)[i].push_back(kv[2]); } } } return num_server_; } template void ArrayWorker::ProcessReplyGet(std::vector& reply_data) { CHECK(reply_data.size() == 2); int id = (reply_data[0]).As(); CHECK(reply_data[1].size() == (server_offsets_[id + 1] - server_offsets_[id])); memcpy(data_ + server_offsets_[id], reply_data[1].data(), reply_data[1].size()); } template ArrayServer::ArrayServer(size_t size) : ServerTable() { server_id_ = MV_Rank(); size_ = size / MV_NumServers(); if (server_id_ == MV_NumServers() - 1) { // last server size_ += size % MV_NumServers(); } storage_.resize(size_); updater_ = Updater::GetUpdater(size_); Log::Debug("server %d create arrayTable with %d elements of %d elements.\n", server_id_, size_, size); } template ArrayServer::ArrayServer(const ArrayTableOption &option) : ArrayServer(option.size) { } template void ArrayServer::ProcessAdd(const std::vector& data) { Blob keys = data[0], values = data[1]; AddOption* option = nullptr; if (data.size() == 3) option = new AddOption(data[2].data(), data[2].size()); // Always request whole table CHECK(keys.size() == 1 && keys.As() == -1); CHECK(values.size() == size_ * sizeof(T)); T* pvalues = reinterpret_cast(values.data()); updater_->Update(size_, storage_.data(), pvalues, option); delete option; } template void ArrayServer::ProcessGet(const std::vector& data, std::vector* result) { size_t key_size = data[0].size(); CHECK(key_size == 1 && data[0].As() == -1); // Always request the whole table Blob key(sizeof(integer_t)); key.As() = server_id_; Blob values(sizeof(T) * size_); T* pvalues = reinterpret_cast(values.data()); updater_->Access(size_, storage_.data(), pvalues); result->push_back(key); result->push_back(values); } template void ArrayServer::Store(Stream* s) { s->Write(storage_.data(), storage_.size() * sizeof(T)); } template void ArrayServer::Load(Stream* s) { s->Read(storage_.data(), storage_.size() * sizeof(T)); } MV_INSTANTIATE_CLASS_WITH_BASE_TYPE(ArrayWorker); MV_INSTANTIATE_CLASS_WITH_BASE_TYPE(ArrayServer); }