diff --git a/tensorflow/core/common_runtime/direct_session.cc b/tensorflow/core/common_runtime/direct_session.cc index 094f84cd781..2b365769adf 100644 --- a/tensorflow/core/common_runtime/direct_session.cc +++ b/tensorflow/core/common_runtime/direct_session.cc @@ -489,6 +489,10 @@ class DirectSessionFactory : public SessionFactory { ResourceMgr* gpu_shared_rmgr = nullptr; #if GOOGLE_CUDA + bool use_per_session_host_allocator = false; + TF_CHECK_OK(tensorflow::ReadBoolFromEnvVar("PER_SESSION_HOSTALLOC", + /*default_val=*/false, + &use_per_session_host_allocator)); if (use_multi_stream) { // Create shared resource for gpu devices gpu_shared_rmgr = new ResourceMgr("localhost"); @@ -496,7 +500,7 @@ class DirectSessionFactory : public SessionFactory { for (int i = 0; i < session_num; ++i) { dev_rmgr_map.device_rmgr_map[gpu_dev_prefix+std::to_string(base_index+i)] = gpu_shared_rmgr; - if (i > 0) { + if (use_per_session_host_allocator && i > 0) { dev_rmgr_map.device_rmgr_map[dev_prefix+"/device:CPU:"+std::to_string(i)] = shared_rmgr; dev_rmgr_map.device_rmgr_map[dev_prefix+"/device:cpu:"+std::to_string(i)] = shared_rmgr; dev_rmgr_map.device_rmgr_map["/device:CPU:"+std::to_string(i)] = shared_rmgr; @@ -571,8 +575,13 @@ class DirectSessionFactory : public SessionFactory { follower_options.config.add_per_session_devices( "/job:localhost/replica:0/task:0/device:GPU:" + std::to_string(base_index+i)); - follower_options.config.add_per_session_devices( - "/job:localhost/replica:0/task:0/device:CPU:"+std::to_string(i)); + if (use_per_session_host_allocator) { + follower_options.config.add_per_session_devices( + "/job:localhost/replica:0/task:0/device:CPU:"+std::to_string(i)); + } else { + follower_options.config.add_per_session_devices( + "/job:localhost/replica:0/task:0/device:CPU:0"); + } } #endif // GOOGLE_CUDA diff --git a/tensorflow/core/common_runtime/gpu/gpu_device_factory.cc b/tensorflow/core/common_runtime/gpu/gpu_device_factory.cc index bb4c510253f..11421ad0999 100644 --- a/tensorflow/core/common_runtime/gpu/gpu_device_factory.cc +++ b/tensorflow/core/common_runtime/gpu/gpu_device_factory.cc @@ -199,8 +199,12 @@ class GPUCompatibleCPUDeviceFactory : public DeviceFactory { int num_numa_nodes = options.config.experimental().use_numa_affinity() ? port::NUMANumNodes() : 1; + bool use_per_session_host_allocator = false; + TF_CHECK_OK(tensorflow::ReadBoolFromEnvVar("PER_SESSION_HOSTALLOC", + /*default_val=*/false, + &use_per_session_host_allocator)); int sess_num = 1; - if (dev_rmgr_map) { + if (use_per_session_host_allocator && dev_rmgr_map) { for (auto& item : dev_rmgr_map->device_rmgr_map) { int sess_idx = std::stoi(item.first.substr(item.first.rfind(":")+1)); if (sess_idx >= sess_num) { diff --git a/tensorflow/core/common_runtime/graph_execution_state.cc b/tensorflow/core/common_runtime/graph_execution_state.cc index f8faf15f7d6..222c7408652 100644 --- a/tensorflow/core/common_runtime/graph_execution_state.cc +++ b/tensorflow/core/common_runtime/graph_execution_state.cc @@ -737,11 +737,13 @@ Status GraphExecutionState::InitBaseGraph(std::unique_ptr&& new_graph) { break; } } - const auto& dname1 = session_options_->config.per_session_devices(1); - for (auto& d : device_set_->devices()) { - if (d->name() == dname1) { - devices.AddDevice(d); - break; + if (session_options_->config.per_session_devices_size() > 1) { + const auto& dname1 = session_options_->config.per_session_devices(1); + for (auto& d : device_set_->devices()) { + if (d->name() == dname1) { + devices.AddDevice(d); + break; + } } } } diff --git a/tensorflow/core/common_runtime/optimization_registry.h b/tensorflow/core/common_runtime/optimization_registry.h index 0e31f389aa7..8a0fdf80963 100644 --- a/tensorflow/core/common_runtime/optimization_registry.h +++ b/tensorflow/core/common_runtime/optimization_registry.h @@ -26,6 +26,7 @@ limitations under the License. #include "tensorflow/core/framework/function.h" #include "tensorflow/core/graph/costmodel.h" #include "tensorflow/core/graph/graph.h" +#include "tensorflow/core/util/env_var.h" namespace tensorflow { struct SessionOptions; @@ -123,10 +124,38 @@ class OptimizationPassRegistration { int phase, std::unique_ptr pass, string optimization_pass_name) { - pass->set_name(optimization_pass_name); - OptimizationPassRegistry::Global()->Register(grouping, phase, - std::move(pass)); + bool is_disable_current_pass = false; + + // Disable XLA pass or not, default is false. + if (xla_pass_list.find(optimization_pass_name) != xla_pass_list.end()) { + TF_CHECK_OK(ReadBoolFromEnvVar("TF_DISABLE_XLA", false, &is_disable_current_pass)); + } + + if (!is_disable_current_pass) { + pass->set_name(optimization_pass_name); + OptimizationPassRegistry::Global()->Register(grouping, phase, + std::move(pass)); + } } + + private: + std::unordered_set xla_pass_list = { + "AsyncIoConversionPass", + "BuildCgmodeOpsPass", + "BuildXlaOpsPass", + "CloneConstantsForBetterClusteringPass", + "ClusterScopingPass", + "EncapsulateCGModeSubgraphsPass", + "EncapsulateSubgraphsPass", + "EncapsulateXlaComputationsPass", + "FunctionalizeControlFlowPass", + "IncreaseDynamismForAutoJitPass", + "IntroduceFloatingPointJitterPass", + "MarkForCompilationPass", + "MarkForCudaGraphModePass", + "PartiallyDeclusterPass", + "ReportClusteringInfoPass" + }; }; } // namespace optimization_registration diff --git a/tensorflow/core/framework/embedding/dram_leveldb_storage.h b/tensorflow/core/framework/embedding/dram_leveldb_storage.h index 728f5d75b6f..7ebb0ff2e7e 100644 --- a/tensorflow/core/framework/embedding/dram_leveldb_storage.h +++ b/tensorflow/core/framework/embedding/dram_leveldb_storage.h @@ -35,16 +35,43 @@ class DramLevelDBStore : public MultiTierStorage { MultiTierStorage(sc, name) { dram_kv_ = new LocklessHashMap(); leveldb_ = new LevelDBKV(sc.path); + if (sc.embedding_config.steps_to_live != 0) { + dram_policy_ = new GlobalStepShrinkPolicy(dram_kv_, alloc_, + sc.embedding_config.slot_num + 1); + leveldb_policy_ = new GlobalStepShrinkPolicy(leveldb_, alloc_, + sc.embedding_config.slot_num + 1); + } else if (sc.embedding_config.l2_weight_threshold != -1.0) { + dram_policy_ = + new L2WeightShrinkPolicy( + sc.embedding_config.l2_weight_threshold, + sc.embedding_config.primary_emb_index, + Storage::GetOffset(sc.embedding_config.primary_emb_index), + dram_kv_, alloc_, + sc.embedding_config.slot_num + 1); + leveldb_policy_ = + new L2WeightShrinkPolicy( + sc.embedding_config.l2_weight_threshold, + sc.embedding_config.primary_emb_index, + Storage::GetOffset(sc.embedding_config.primary_emb_index), + leveldb_, alloc_, + sc.embedding_config.slot_num + 1); + } else { + dram_policy_ = nullptr; + leveldb_policy_ = nullptr; + } MultiTierStorage::kvs_.emplace_back( - KVInterfaceDescriptor(dram_kv_, alloc_, dram_mu_)); + KVInterfaceDescriptor(dram_kv_, alloc_, dram_mu_, dram_policy_)); MultiTierStorage::kvs_.emplace_back( - KVInterfaceDescriptor(leveldb_, alloc_, leveldb_mu_)); + KVInterfaceDescriptor(leveldb_, alloc_, + leveldb_mu_, leveldb_policy_)); } ~DramLevelDBStore() override { MultiTierStorage::ReleaseValues( {std::make_pair(dram_kv_, alloc_)}); + if (dram_policy_ != nullptr) delete dram_policy_; + if (leveldb_policy_ != nullptr) delete leveldb_policy_; } TF_DISALLOW_COPY_AND_ASSIGN(DramLevelDBStore); @@ -169,6 +196,8 @@ class DramLevelDBStore : public MultiTierStorage { KVInterface* dram_kv_; KVInterface* leveldb_; Allocator* alloc_; + ShrinkPolicy* dram_policy_; + ShrinkPolicy* leveldb_policy_; LayoutCreator* layout_creator_; mutex dram_mu_; //must be locked before leveldb_mu_ is locked mutex leveldb_mu_; diff --git a/tensorflow/core/framework/embedding/dram_pmem_storage.h b/tensorflow/core/framework/embedding/dram_pmem_storage.h index 25053e56b03..8acf12c4507 100644 --- a/tensorflow/core/framework/embedding/dram_pmem_storage.h +++ b/tensorflow/core/framework/embedding/dram_pmem_storage.h @@ -36,17 +36,43 @@ class DramPmemStorage : public MultiTierStorage { layout_creator_(lc), MultiTierStorage(sc, name) { dram_kv_ = new LocklessHashMap(); pmem_kv_ = new LocklessHashMap(); + if (sc.embedding_config.steps_to_live != 0) { + dram_policy_ = new GlobalStepShrinkPolicy(dram_kv_, dram_alloc_, + sc.embedding_config.slot_num + 1); + pmem_policy_ = new GlobalStepShrinkPolicy(pmem_kv_, pmem_alloc_, + sc.embedding_config.slot_num + 1); + } else if (sc.embedding_config.l2_weight_threshold != -1.0) { + dram_policy_ = + new L2WeightShrinkPolicy( + sc.embedding_config.l2_weight_threshold, + sc.embedding_config.primary_emb_index, + Storage::GetOffset(sc.embedding_config.primary_emb_index), + dram_kv_, dram_alloc_, + sc.embedding_config.slot_num + 1); + pmem_policy_ = + new L2WeightShrinkPolicy( + sc.embedding_config.l2_weight_threshold, + sc.embedding_config.primary_emb_index, + Storage::GetOffset(sc.embedding_config.primary_emb_index), + pmem_kv_, pmem_alloc_, + sc.embedding_config.slot_num + 1); + } else { + dram_policy_ = nullptr; + pmem_policy_ = nullptr; + } MultiTierStorage::kvs_.emplace_back( - KVInterfaceDescriptor(dram_kv_, dram_alloc_, dram_mu_)); + KVInterfaceDescriptor(dram_kv_, dram_alloc_, dram_mu_, dram_policy_)); MultiTierStorage::kvs_.emplace_back( - KVInterfaceDescriptor(pmem_kv_, pmem_alloc_, pmem_mu_)); + KVInterfaceDescriptor(pmem_kv_, pmem_alloc_, pmem_mu_, pmem_policy_)); } ~DramPmemStorage() override { MultiTierStorage::ReleaseValues( {std::make_pair(dram_kv_, dram_alloc_), std::make_pair(pmem_kv_, pmem_alloc_)}); + if (dram_policy_ != nullptr) delete dram_policy_; + if (pmem_policy_ != nullptr) delete pmem_policy_; } TF_DISALLOW_COPY_AND_ASSIGN(DramPmemStorage); @@ -174,6 +200,8 @@ class DramPmemStorage : public MultiTierStorage { KVInterface* pmem_kv_; Allocator* dram_alloc_; Allocator* pmem_alloc_; + ShrinkPolicy* dram_policy_; + ShrinkPolicy* pmem_policy_; LayoutCreator* layout_creator_; mutex dram_mu_; // must be locked before pmem_mu_ is locked mutex pmem_mu_; diff --git a/tensorflow/core/framework/embedding/dram_ssd_storage.h b/tensorflow/core/framework/embedding/dram_ssd_storage.h index df2dfcee733..e4e1b8da5e6 100644 --- a/tensorflow/core/framework/embedding/dram_ssd_storage.h +++ b/tensorflow/core/framework/embedding/dram_ssd_storage.h @@ -35,16 +35,42 @@ class DramSsdHashStorage : public MultiTierStorage { MultiTierStorage(sc, name) { dram_kv_ = new LocklessHashMap(); ssd_kv_ = new SSDHashKV(sc.path, alloc_); + if (sc.embedding_config.steps_to_live != 0) { + dram_policy_ = new GlobalStepShrinkPolicy(dram_kv_, alloc_, + sc.embedding_config.slot_num + 1); + ssd_policy_ = new GlobalStepShrinkPolicy(ssd_kv_, alloc_, + sc.embedding_config.slot_num + 1); + } else if (sc.embedding_config.l2_weight_threshold != -1.0) { + dram_policy_ = + new L2WeightShrinkPolicy( + sc.embedding_config.l2_weight_threshold, + sc.embedding_config.primary_emb_index, + Storage::GetOffset(sc.embedding_config.primary_emb_index), + dram_kv_, alloc_, + sc.embedding_config.slot_num + 1); + ssd_policy_ = + new L2WeightShrinkPolicy( + sc.embedding_config.l2_weight_threshold, + sc.embedding_config.primary_emb_index, + Storage::GetOffset(sc.embedding_config.primary_emb_index), + ssd_kv_, alloc_, + sc.embedding_config.slot_num + 1); + } else { + dram_policy_ = nullptr; + ssd_policy_ = nullptr; + } MultiTierStorage::kvs_.emplace_back( - KVInterfaceDescriptor(dram_kv_, alloc_, dram_mu_)); + KVInterfaceDescriptor(dram_kv_, alloc_, dram_mu_, dram_policy_)); MultiTierStorage::kvs_.emplace_back( - KVInterfaceDescriptor(ssd_kv_, alloc_, ssd_mu_)); + KVInterfaceDescriptor(ssd_kv_, alloc_, ssd_mu_, ssd_policy_)); } ~DramSsdHashStorage() override { MultiTierStorage::ReleaseValues( {std::make_pair(dram_kv_, alloc_)}); + if (dram_policy_ != nullptr) delete dram_policy_; + if (ssd_policy_ != nullptr) delete ssd_policy_; } TF_DISALLOW_COPY_AND_ASSIGN(DramSsdHashStorage); @@ -211,6 +237,8 @@ class DramSsdHashStorage : public MultiTierStorage { KVInterface* dram_kv_; SSDHashKV* ssd_kv_; Allocator* alloc_; + ShrinkPolicy* dram_policy_; + ShrinkPolicy* ssd_policy_; LayoutCreator* layout_creator_; mutex dram_mu_; // must be locked before ssd_mu_ is locked mutex ssd_mu_; diff --git a/tensorflow/core/framework/embedding/embedding_var.h b/tensorflow/core/framework/embedding/embedding_var.h index a39285b960f..e1c1b58b372 100644 --- a/tensorflow/core/framework/embedding/embedding_var.h +++ b/tensorflow/core/framework/embedding/embedding_var.h @@ -568,6 +568,11 @@ class EmbeddingVar : public ResourceBase { freq_list, emb_config_, filter_, it); } + void GetSnapshot(K* keys, V* values, const Eigen::GpuDevice& device) { + return storage_manager_->GetSnapshot( + keys, values, device); + } + int64 GetSnapshotWithoutFetchPersistentEmb( std::vector* key_list, std::vector* value_list, @@ -589,7 +594,7 @@ class EmbeddingVar : public ResourceBase { } Status Shrink() { - return storage_manager_->Shrink(emb_config_, value_len_); + return storage_manager_->Shrink(value_len_); } Status Shrink(int64 gs) { diff --git a/tensorflow/core/framework/embedding/globalstep_shrink_policy.h b/tensorflow/core/framework/embedding/globalstep_shrink_policy.h index c0bc9728a35..cd0c6b028ec 100644 --- a/tensorflow/core/framework/embedding/globalstep_shrink_policy.h +++ b/tensorflow/core/framework/embedding/globalstep_shrink_policy.h @@ -26,15 +26,18 @@ namespace embedding { template class GlobalStepShrinkPolicy : public ShrinkPolicy { public: - GlobalStepShrinkPolicy(KVInterface* kv, Allocator* alloc) - : ShrinkPolicy(kv, alloc) {} + GlobalStepShrinkPolicy( + KVInterface* kv, + Allocator* alloc, + int slot_num) + : ShrinkPolicy(kv, alloc, slot_num) {} TF_DISALLOW_COPY_AND_ASSIGN(GlobalStepShrinkPolicy); void Shrink(int64 global_step, int64 steps_to_live) { + ShrinkPolicy::ReleaseDeleteValues(); ShrinkPolicy::GetSnapshot(); FilterToDelete(global_step, steps_to_live); - ShrinkPolicy::ReleaseDeleteValues(); } private: @@ -45,12 +48,14 @@ class GlobalStepShrinkPolicy : public ShrinkPolicy { ShrinkPolicy::value_list_[i]->SetStep(global_step); } else { if (global_step - version > steps_to_live) { + ShrinkPolicy::kv_->Remove(ShrinkPolicy::key_list_[i]); ShrinkPolicy::to_delete_.emplace_back( - ShrinkPolicy::key_list_[i], ShrinkPolicy::value_list_[i]); } } } + ShrinkPolicy::key_list_.clear(); + ShrinkPolicy::value_list_.clear(); } }; } // embedding diff --git a/tensorflow/core/framework/embedding/gpu_hash_map_kv.h b/tensorflow/core/framework/embedding/gpu_hash_map_kv.h index fbaee03adc1..2d0be38c382 100644 --- a/tensorflow/core/framework/embedding/gpu_hash_map_kv.h +++ b/tensorflow/core/framework/embedding/gpu_hash_map_kv.h @@ -199,7 +199,7 @@ class GPUHashMapKV : public KVInterface { } int64 Size() const override { - return 0; + return hash_table_->Size(); } void SetTotalDims(int total_dims) override { diff --git a/tensorflow/core/framework/embedding/hbm_dram_ssd_storage.h b/tensorflow/core/framework/embedding/hbm_dram_ssd_storage.h index a3b6052522c..3f915ae4d60 100644 --- a/tensorflow/core/framework/embedding/hbm_dram_ssd_storage.h +++ b/tensorflow/core/framework/embedding/hbm_dram_ssd_storage.h @@ -21,19 +21,56 @@ class HbmDramSsdStorage : public MultiTierStorage { hbm_kv_ = new LocklessHashMap(); dram_kv_ = new LocklessHashMapCPU(gpu_alloc); ssd_kv_ = new SSDHashKV(sc.path, cpu_alloc); + if (sc.embedding_config.steps_to_live != 0) { + hbm_policy_ = new GlobalStepShrinkPolicy(hbm_kv_, gpu_alloc_, + sc.embedding_config.slot_num + 1); + dram_policy_ = new GlobalStepShrinkPolicy(dram_kv_, cpu_alloc_, + sc.embedding_config.slot_num + 1); + ssd_policy_ = new GlobalStepShrinkPolicy(ssd_kv_, cpu_alloc_, + sc.embedding_config.slot_num + 1); + } else if (sc.embedding_config.l2_weight_threshold != -1.0) { + hbm_policy_ = + new L2WeightShrinkPolicy( + sc.embedding_config.l2_weight_threshold, + sc.embedding_config.primary_emb_index, + Storage::GetOffset(sc.embedding_config.primary_emb_index), + hbm_kv_, gpu_alloc_, + sc.embedding_config.slot_num + 1); + dram_policy_ = + new L2WeightShrinkPolicy( + sc.embedding_config.l2_weight_threshold, + sc.embedding_config.primary_emb_index, + Storage::GetOffset(sc.embedding_config.primary_emb_index), + dram_kv_, cpu_alloc_, + sc.embedding_config.slot_num + 1); + ssd_policy_ = + new L2WeightShrinkPolicy( + sc.embedding_config.l2_weight_threshold, + sc.embedding_config.primary_emb_index, + Storage::GetOffset(sc.embedding_config.primary_emb_index), + ssd_kv_, cpu_alloc_, + sc.embedding_config.slot_num + 1); + } else { + hbm_policy_ = nullptr; + dram_policy_ = nullptr; + ssd_policy_ = nullptr; + } MultiTierStorage::kvs_.emplace_back( - KVInterfaceDescriptor(hbm_kv_, gpu_alloc_, hbm_mu_)); + KVInterfaceDescriptor(hbm_kv_, gpu_alloc_, hbm_mu_, hbm_policy_)); MultiTierStorage::kvs_.emplace_back( - KVInterfaceDescriptor(dram_kv_, cpu_alloc_, dram_mu_)); + KVInterfaceDescriptor(dram_kv_, cpu_alloc_, dram_mu_, dram_policy_)); MultiTierStorage::kvs_.emplace_back( - KVInterfaceDescriptor(ssd_kv_, cpu_alloc_, ssd_mu_)); + KVInterfaceDescriptor(ssd_kv_, cpu_alloc_, ssd_mu_, ssd_policy_)); } ~HbmDramSsdStorage() override { ReleaseValues({std::make_pair(hbm_kv_, gpu_alloc_), std::make_pair(dram_kv_, cpu_alloc_)}); delete dram_cache_; + if (hbm_policy_ != nullptr) delete hbm_policy_; + if (dram_policy_ != nullptr) delete dram_policy_; + if (ssd_policy_ != nullptr) delete ssd_policy_; } TF_DISALLOW_COPY_AND_ASSIGN(HbmDramSsdStorage); @@ -380,6 +417,9 @@ class HbmDramSsdStorage : public MultiTierStorage { EmbeddingMemoryPool* embedding_mem_pool_; Allocator* gpu_alloc_; Allocator* cpu_alloc_; + ShrinkPolicy* hbm_policy_; + ShrinkPolicy* dram_policy_; + ShrinkPolicy* ssd_policy_; LayoutCreator* layout_creator_; BatchCache* dram_cache_; int64 dram_capacity_; diff --git a/tensorflow/core/framework/embedding/hbm_dram_storage.h b/tensorflow/core/framework/embedding/hbm_dram_storage.h index 9468ac723ce..0b84ae004c3 100644 --- a/tensorflow/core/framework/embedding/hbm_dram_storage.h +++ b/tensorflow/core/framework/embedding/hbm_dram_storage.h @@ -33,17 +33,43 @@ class HbmDramStorage : public MultiTierStorage { layout_creator_(lc), MultiTierStorage(sc, name) { hbm_kv_ = new LocklessHashMap(); dram_kv_ = new LocklessHashMapCPU(gpu_alloc); + if (sc.embedding_config.steps_to_live != 0) { + hbm_policy_ = new GlobalStepShrinkPolicy(hbm_kv_, gpu_alloc_, + sc.embedding_config.slot_num + 1); + dram_policy_ = new GlobalStepShrinkPolicy(dram_kv_, cpu_alloc_, + sc.embedding_config.slot_num + 1); + } else if (sc.embedding_config.l2_weight_threshold != -1.0) { + hbm_policy_ = + new L2WeightShrinkPolicy( + sc.embedding_config.l2_weight_threshold, + sc.embedding_config.primary_emb_index, + Storage::GetOffset(sc.embedding_config.primary_emb_index), + hbm_kv_, gpu_alloc_, + sc.embedding_config.slot_num + 1); + dram_policy_ = + new L2WeightShrinkPolicy( + sc.embedding_config.l2_weight_threshold, + sc.embedding_config.primary_emb_index, + Storage::GetOffset(sc.embedding_config.primary_emb_index), + dram_kv_, cpu_alloc_, + sc.embedding_config.slot_num + 1); + } else { + hbm_policy_ = nullptr; + dram_policy_ = nullptr; + } MultiTierStorage::kvs_.emplace_back( - KVInterfaceDescriptor(hbm_kv_, gpu_alloc_, hbm_mu_)); + KVInterfaceDescriptor(hbm_kv_, gpu_alloc_, hbm_mu_, hbm_policy_)); MultiTierStorage::kvs_.emplace_back( - KVInterfaceDescriptor(dram_kv_, cpu_alloc, dram_mu_)); + KVInterfaceDescriptor(dram_kv_, cpu_alloc, dram_mu_, dram_policy_)); } ~HbmDramStorage() override { ReleaseValues({std::make_pair(hbm_kv_, gpu_alloc_), std::make_pair(dram_kv_, cpu_alloc_)}); + if (hbm_policy_ != nullptr) delete hbm_policy_; + if (dram_policy_ != nullptr) delete dram_policy_; } TF_DISALLOW_COPY_AND_ASSIGN(HbmDramStorage); @@ -294,6 +320,8 @@ class HbmDramStorage : public MultiTierStorage { EmbeddingMemoryPool* embedding_mem_pool_; Allocator* gpu_alloc_; Allocator* cpu_alloc_; + ShrinkPolicy* hbm_policy_; + ShrinkPolicy* dram_policy_; LayoutCreator* layout_creator_; mutex hbm_mu_; //must be locked before dram_mu_ is locked; mutex dram_mu_; diff --git a/tensorflow/core/framework/embedding/l2weight_shrink_policy.h b/tensorflow/core/framework/embedding/l2weight_shrink_policy.h index 6370147b771..540f180df09 100644 --- a/tensorflow/core/framework/embedding/l2weight_shrink_policy.h +++ b/tensorflow/core/framework/embedding/l2weight_shrink_policy.h @@ -26,21 +26,24 @@ namespace embedding { template class L2WeightShrinkPolicy : public ShrinkPolicy { public: - L2WeightShrinkPolicy(int64 primary_index, int64 primary_offset, - KVInterface* kv, Allocator* alloc) - : primary_index_(primary_index), primary_offset_(primary_offset), - ShrinkPolicy(kv, alloc) {} + L2WeightShrinkPolicy(float l2_weight_threshold, + int64 primary_index, int64 primary_offset, + KVInterface* kv, Allocator* alloc, + int slot_num) + : l2_weight_threshold_(l2_weight_threshold), + primary_index_(primary_index), primary_offset_(primary_offset), + ShrinkPolicy(kv, alloc, slot_num) {} TF_DISALLOW_COPY_AND_ASSIGN(L2WeightShrinkPolicy); - void Shrink(int64 value_len, V l2_weight_threshold) { - ShrinkPolicy::GetSnapshot(); - FilterToDelete(value_len, l2_weight_threshold); + void Shrink(int64 value_len) { ShrinkPolicy::ReleaseDeleteValues(); + ShrinkPolicy::GetSnapshot(); + FilterToDelete(value_len); } private: - void FilterToDelete(int64 value_len, V l2_weight_threshold) { + void FilterToDelete(int64 value_len) { for (int64 i = 0; i < ShrinkPolicy::key_list_.size(); ++i) { V* val = ShrinkPolicy::value_list_[i]->GetValue( primary_index_, primary_offset_); @@ -50,17 +53,21 @@ class L2WeightShrinkPolicy : public ShrinkPolicy { l2_weight += val[j] * val[j]; } l2_weight *= (V)0.5; - if (l2_weight < l2_weight_threshold) { + if (l2_weight < (V)l2_weight_threshold_) { + ShrinkPolicy::kv_->Remove(ShrinkPolicy::key_list_[i]); ShrinkPolicy::to_delete_.emplace_back( - ShrinkPolicy::key_list_[i], ShrinkPolicy::value_list_[i]); + ShrinkPolicy::value_list_[i]); } } } + ShrinkPolicy::key_list_.clear(); + ShrinkPolicy::value_list_.clear(); } private: int64 primary_index_; // Shrink only handle primary slot int64 primary_offset_; + float l2_weight_threshold_; }; } // embedding } // tensorflow diff --git a/tensorflow/core/framework/embedding/multi_tier_storage.h b/tensorflow/core/framework/embedding/multi_tier_storage.h index cad5163c1d1..577955bcc8b 100644 --- a/tensorflow/core/framework/embedding/multi_tier_storage.h +++ b/tensorflow/core/framework/embedding/multi_tier_storage.h @@ -187,14 +187,12 @@ class MultiTierStorage : public Storage { <<" RestoreSsdHashmap yet"; } - Status Shrink(const EmbeddingConfig& emb_config, - int64 value_len) override { + Status Shrink(int64 value_len) override { for (auto kv : kvs_) { mutex_lock l(kv.mu_); - L2WeightShrinkPolicy policy(emb_config.primary_emb_index, - Storage::GetOffset(emb_config.primary_emb_index), - kv.kv_, kv.allocator_); - policy.Shrink(value_len, (V)emb_config.l2_weight_threshold); + L2WeightShrinkPolicy* l2_weight_shrink = + reinterpret_cast*>(kv.shrink_policy_); + l2_weight_shrink->Shrink(value_len); } return Status::OK(); } @@ -202,8 +200,9 @@ class MultiTierStorage : public Storage { Status Shrink(int64 global_step, int64 steps_to_live) override { for (auto kv : kvs_) { mutex_lock l(kv.mu_); - GlobalStepShrinkPolicy policy(kv.kv_, kv.allocator_); - policy.Shrink(global_step, steps_to_live); + GlobalStepShrinkPolicy* global_step_shrink = + reinterpret_cast*>(kv.shrink_policy_); + global_step_shrink->Shrink(global_step, steps_to_live); } return Status::OK(); } diff --git a/tensorflow/core/framework/embedding/shrink_policy.h b/tensorflow/core/framework/embedding/shrink_policy.h index d76b36a0a00..6b61a48fbd1 100644 --- a/tensorflow/core/framework/embedding/shrink_policy.h +++ b/tensorflow/core/framework/embedding/shrink_policy.h @@ -27,37 +27,36 @@ namespace embedding { template class ShrinkPolicy { public: - ShrinkPolicy(KVInterface* kv, Allocator* alloc) - : kv_(kv), alloc_(alloc) {} + ShrinkPolicy(KVInterface* kv, Allocator* alloc, int slot_num) + : kv_(kv), alloc_(alloc), + slot_num_(slot_num), shrink_count_(0) {} TF_DISALLOW_COPY_AND_ASSIGN(ShrinkPolicy); inline Status GetSnapshot() { + shrink_count_ = (shrink_count_ + 1) % slot_num_; return kv_->GetSnapshot(&key_list_, &value_list_); } void ReleaseDeleteValues() { - for (auto it : to_delete_) { - (it.value_ptr)->Destroy(alloc_); - delete it.value_ptr; - kv_->Remove(it.key); + if (shrink_count_ == 0) { + for (auto it : to_delete_) { + it->Destroy(alloc_); + delete it; + } + to_delete_.clear(); } } - struct KeyValuePair { - KeyValuePair(const K& k, ValuePtr* v) : key(k), value_ptr(v) {} - - K key; - ValuePtr* value_ptr; - }; - protected: std::vector key_list_; std::vector*> value_list_; - std::vector to_delete_; + std::vector*> to_delete_; KVInterface* kv_; Allocator* alloc_; + int slot_num_; + int shrink_count_; }; } // embedding } // tensorflow diff --git a/tensorflow/core/framework/embedding/single_tier_storage.h b/tensorflow/core/framework/embedding/single_tier_storage.h index 9747c407b68..d3083fbe788 100644 --- a/tensorflow/core/framework/embedding/single_tier_storage.h +++ b/tensorflow/core/framework/embedding/single_tier_storage.h @@ -49,6 +49,19 @@ class SingleTierStorage : public Storage { KVInterface* kv, LayoutCreator* lc) : kv_(kv), alloc_(alloc), layout_creator_(lc), Storage(sc) { + if (sc.embedding_config.steps_to_live != 0) { + shrink_policy_ = new GlobalStepShrinkPolicy( + kv_, alloc_, sc.embedding_config.slot_num + 1); + } else if (sc.embedding_config.l2_weight_threshold != -1.0) { + shrink_policy_ = + new L2WeightShrinkPolicy( + sc.embedding_config.l2_weight_threshold, + sc.embedding_config.primary_emb_index, + Storage::GetOffset(sc.embedding_config.primary_emb_index), + kv_, alloc_, sc.embedding_config.slot_num + 1); + } else { + shrink_policy_ = nullptr; + } } ~SingleTierStorage() override { @@ -61,6 +74,7 @@ class SingleTierStorage : public Storage { delete value_ptr; } delete kv_; + if (shrink_policy_ != nullptr) delete shrink_policy_; } TF_DISALLOW_COPY_AND_ASSIGN(SingleTierStorage); @@ -259,20 +273,19 @@ class SingleTierStorage : public Storage { LOG(FATAL)<<"The Storage dosen't have ssd storage."; } - Status Shrink(const EmbeddingConfig& emb_config, - int64 value_len) override { + Status Shrink(int64 value_len) override { mutex_lock l(Storage::mu_); - L2WeightShrinkPolicy policy(emb_config.primary_emb_index, - Storage::GetOffset(emb_config.primary_emb_index), - kv_, alloc_); - policy.Shrink(value_len, (V)emb_config.l2_weight_threshold); + L2WeightShrinkPolicy* l2_weight_shrink = + reinterpret_cast*>(shrink_policy_); + l2_weight_shrink->Shrink(value_len); return Status::OK(); } Status Shrink(int64 global_step, int64 steps_to_live) override { mutex_lock l(Storage::mu_); - GlobalStepShrinkPolicy policy(kv_, alloc_); - policy.Shrink(global_step, steps_to_live); + GlobalStepShrinkPolicy* global_step_shrink = + reinterpret_cast*>(shrink_policy_); + global_step_shrink->Shrink(global_step, steps_to_live); return Status::OK(); } @@ -323,6 +336,7 @@ class SingleTierStorage : public Storage { protected: KVInterface* kv_; + ShrinkPolicy* shrink_policy_; Allocator* alloc_; LayoutCreator* layout_creator_; }; diff --git a/tensorflow/core/framework/embedding/storage.h b/tensorflow/core/framework/embedding/storage.h index b8264f392a0..1361e32b27e 100644 --- a/tensorflow/core/framework/embedding/storage.h +++ b/tensorflow/core/framework/embedding/storage.h @@ -18,6 +18,7 @@ limitations under the License. #include "tensorflow/core/framework/embedding/cache.h" #include "tensorflow/core/framework/embedding/config.pb.h" #include "tensorflow/core/framework/embedding/kv_interface.h" +#include "tensorflow/core/framework/embedding/shrink_policy.h" #include "tensorflow/core/framework/embedding/storage_config.h" #include "tensorflow/core/lib/core/status.h" #include "tensorflow/core/framework/embedding/embedding_memory_pool.h" @@ -44,13 +45,17 @@ template struct KVInterfaceDescriptor { KVInterfaceDescriptor(KVInterface* kv, Allocator* allocator, - mutex mu) : kv_(kv), - allocator_(allocator), - mu_(mu) {} + mutex mu, + ShrinkPolicy* shrink_policy) + : kv_(kv), + allocator_(allocator), + mu_(mu), + shrink_policy_(shrink_policy) {} ~KVInterfaceDescriptor() {} KVInterface* kv_; Allocator* allocator_; + ShrinkPolicy* shrink_policy_; mutex mu_; }; @@ -85,6 +90,7 @@ class Storage { const EmbeddingConfig& emb_config, FilterPolicy>* filter, embedding::Iterator** it) = 0; + virtual void GetSnapshot(K* keys, V* values, const Eigen::GpuDevice& device) {}; virtual int64 GetSnapshotWithoutFetchPersistentEmb( std::vector* key_list, std::vector* value_list, @@ -98,8 +104,7 @@ class Storage { int64* file_list, int64* invalid_record_count_list, int64* record_count_list, int64 num_of_files, const std::string& ssd_emb_file_name) = 0; - virtual Status Shrink(const EmbeddingConfig& emb_config, - int64 value_len) = 0; + virtual Status Shrink(int64 value_len) = 0; virtual Status Shrink(int64 gs, int64 steps_to_live) = 0; virtual Status BatchCommit(const std::vector& keys, diff --git a/tensorflow/core/framework/embedding/storage_manager.h b/tensorflow/core/framework/embedding/storage_manager.h index 1653f3b56cb..d2eaeb3e4a6 100644 --- a/tensorflow/core/framework/embedding/storage_manager.h +++ b/tensorflow/core/framework/embedding/storage_manager.h @@ -206,6 +206,11 @@ class StorageManager { freq_list, emb_config, filter, it); } + void GetSnapshot(K* keys, V* values, const Eigen::GpuDevice& device) { + return storage_->GetSnapshot( + keys, values, device); + } + int64 GetSnapshotWithoutFetchPersistentEmb( std::vector* key_list, std::vector* value_list, @@ -233,8 +238,8 @@ class StorageManager { ssd_emb_file_name); } - Status Shrink(const EmbeddingConfig& emb_config, int64 value_len) { - return storage_->Shrink(emb_config, value_len); + Status Shrink(int64 value_len) { + return storage_->Shrink(value_len); } Status Shrink(int64 gs, int64 steps_to_live) { diff --git a/tensorflow/core/kernels/kv_variable_ops.cc b/tensorflow/core/kernels/kv_variable_ops.cc index 0e4c26c353d..545b2a691dc 100644 --- a/tensorflow/core/kernels/kv_variable_ops.cc +++ b/tensorflow/core/kernels/kv_variable_ops.cc @@ -1605,15 +1605,65 @@ class KvResourceExportOp : public OpKernel { #define REGISTER_KERNELS_CPU(type) REGISTER_KERNELS_ALL(CPU, type) TF_CALL_REAL_NUMBER_TYPES(REGISTER_KERNELS_CPU) #undef REGISTER_KERNELS_CPU +#undef REGISTER_KERNELS_ALL +#undef REGISTER_KERNELS #if GOOGLE_CUDA +template +class KvResourceExportGPUOp : public OpKernel { + public: + explicit KvResourceExportGPUOp(OpKernelConstruction *ctx) : OpKernel(ctx) {} + + void Compute(OpKernelContext *ctx) override { + EmbeddingVar *ev = nullptr; + OP_REQUIRES_OK(ctx, LookupResource(ctx, HandleFromInput(ctx, 0), &ev)); + core::ScopedUnref unref_me(ev); + + int64 total_size = ev->Size(); + + // Create an output tensor + Tensor *keys_output_tensor = NULL; + Tensor *values_output_tensor = NULL; + Tensor *versions_output_tensor = NULL; + Tensor *freq_output_tensor = NULL; + + OP_REQUIRES_OK(ctx, ctx->allocate_output( + 0, TensorShape({total_size}), &keys_output_tensor)); + OP_REQUIRES_OK(ctx, ctx->allocate_output( + 1, TensorShape({total_size, ev->ValueLen()}), + &values_output_tensor)); + OP_REQUIRES_OK(ctx, ctx->allocate_output( + 2, TensorShape({0}), + &versions_output_tensor)); + OP_REQUIRES_OK(ctx, ctx->allocate_output( + 3, TensorShape({0}), + &freq_output_tensor)); + + auto keys_flat = keys_output_tensor->flat(); + TKey* key_base = &keys_flat(0); + auto values_flat = values_output_tensor->flat(); + TValue* value_base = &values_flat(0); + + auto device = ctx->eigen_device(); + ev->GetSnapshot(key_base, value_base, device); + } +}; + +#define REGISTER_KERNELS(dev, ktype, vtype) \ + REGISTER_KERNEL_BUILDER(Name("KvResourceExport") \ + .Device(DEVICE_##dev) \ + .TypeConstraint("Tkeys") \ + .TypeConstraint("Tvalues"), \ + KvResourceExportGPUOp); +#define REGISTER_KERNELS_ALL(dev, type) \ + REGISTER_KERNELS(dev, int32, type) \ + REGISTER_KERNELS(dev, int64, type) #define REGISTER_KERNELS_GPU(type) REGISTER_KERNELS_ALL(GPU, type) TF_CALL_GPU_NUMBER_TYPES(REGISTER_KERNELS_GPU) #undef REGISTER_KERNELS_GPU -#endif // GOOGLE_CUDA - #undef REGISTER_KERNELS_ALL #undef REGISTER_KERNELS +#endif // GOOGLE_CUDA template class KvResourceGeneratePartitionedTensorOp : public OpKernel { diff --git a/tensorflow/core/kernels/kv_variable_ops.h b/tensorflow/core/kernels/kv_variable_ops.h index 5831edd0e24..03f19b48b59 100644 --- a/tensorflow/core/kernels/kv_variable_ops.h +++ b/tensorflow/core/kernels/kv_variable_ops.h @@ -1280,7 +1280,6 @@ int64 ReadRecord( reader->LookupSegment( record_key, sizeof(K) * shape.dim_size(0), (char*)*buffer, bytes_read); - delete[] buffer; return shape.dim_size(0); } diff --git a/tensorflow/core/kernels/save_restore_v2_ops.cc b/tensorflow/core/kernels/save_restore_v2_ops.cc index b2c651b03b8..2dbb05bad13 100644 --- a/tensorflow/core/kernels/save_restore_v2_ops.cc +++ b/tensorflow/core/kernels/save_restore_v2_ops.cc @@ -171,7 +171,8 @@ class SaveV2 : public OpKernel { const string& tensor_name = tensor_names_flat(i); if (tensor_types_[i] == DT_RESOURCE) { auto& handle = HandleFromInput(context, i + kFixedInputs); - if (IsHandle>(handle)) { + if (IsHandle>(handle) || + IsHandle>(handle)) { if (ev_key_types_[start_ev_key_index] == DT_INT32) { DumpEvWithGlobalStep(context, i + kFixedInputs, tensor_name, writer, tensor_types_[0]); diff --git a/tensorflow/python/keras/BUILD b/tensorflow/python/keras/BUILD index 388e9d0be19..f1b06e2496a 100755 --- a/tensorflow/python/keras/BUILD +++ b/tensorflow/python/keras/BUILD @@ -943,7 +943,7 @@ tf_py_test( tf_py_test( name = "normalization_test", - size = "large", + size = "medium", srcs = ["layers/normalization_test.py"], additional_deps = [ ":keras", @@ -951,7 +951,7 @@ tf_py_test( "//third_party/py/numpy", "//tensorflow/python:client_testlib", ], - shard_count = 8, + shard_count = 4, tags = [ "no_rocm", "notsan", diff --git a/tensorflow/python/keras/layers/normalization.py b/tensorflow/python/keras/layers/normalization.py index 24fa9308302..ebaef47fcbe 100644 --- a/tensorflow/python/keras/layers/normalization.py +++ b/tensorflow/python/keras/layers/normalization.py @@ -19,7 +19,6 @@ from __future__ import print_function from tensorflow.python.distribute import distribution_strategy_context -from tensorflow.python.framework import constant_op from tensorflow.python.framework import dtypes from tensorflow.python.framework import ops from tensorflow.python.framework import tensor_shape @@ -187,8 +186,10 @@ def __init__(self, if self._USE_V2_BEHAVIOR: if fused: self._raise_if_fused_cannot_be_used() - elif fused is None: - fused = self._fused_can_be_used() + # We leave fused as None if self._fused_can_be_used()==True, since we + # still may set it to False in self.build() if the input rank is not 4. + elif fused is None and not self._fused_can_be_used(): + fused = False elif fused is None: fused = True self.supports_masking = True @@ -209,16 +210,22 @@ def __init__(self, def _raise_if_fused_cannot_be_used(self): """Raises a ValueError if fused implementation cannot be used. + + In addition to the checks done in this function, the input tensors rank must + be 4. The input rank check can only be done once the input shape is known. """ # Currently fused batch norm doesn't support renorm. It also only supports a - # single axis, when no virtual batch size or adjustment is used. + # channel dimension on axis 1 or 3, when no virtual batch size or adjustment + # is used. if self.renorm: raise ValueError('Passing both fused=True and renorm=True is ' 'unsupported') axis = [self.axis] if isinstance(self.axis, int) else self.axis - if len(axis) > 1: - raise ValueError('Passing fused=True is only supported when operating ' - 'over a single axis.') + # Axis -3 is equivalent to 1, and axis -1 is equivalent to 3, because the + # input rank is required to be 4 (which is checked later). + if len(axis) > 1 or axis[0] not in (-3, -1, 1, 3): + raise ValueError('Passing fused=True is only supported when axis is 1 ' + 'or 3') if self.virtual_batch_size is not None: raise ValueError('Passing fused=True is unsupported when ' 'virtual_batch_size is specified.') @@ -262,62 +269,6 @@ def _support_zero_size_input(self): distribution_strategy_context.get_strategy().extended, 'experimental_enable_get_next_as_optional', False) - def _get_shape_and_axis_for_fused(self, nd_shape, nd_axis): - """Compute an equivalent shape and axis that are compatible with the fused - implementation. - - The input/output of the layer can be reshaped to/from the shape returned by - this function without affecting the correctness of the computation. - - Arguments: - nd_shape: Tensor. The original shape of the operation. - nd_axis: Integer. The original axis of the operation. - - Returns: - shape: Tensor. A 4D shape. - axis: Integer. An axis (always 1 or 3). - """ - assert(isinstance(nd_axis, int)) - ndims = nd_shape.shape[0] - shape = nd_shape[:] - axis = nd_shape + nd_axis if nd_axis < 0 else nd_axis - # First check if the axis needs to be moved. - if axis not in (1, ndims - 1): - # Move axis to dim 1. - if axis == 0: - # Transform [C, ...] to [1, C, ...]. - shape = array_ops.concat([constant_op.constant([1]), shape], axis=0) - ndims += 1 - else: - # Merge excess pre-axis dims into first dim. - # Transform [N, ..., C, ...] to [product(N, ...), C, ...]. - product = math_ops.reduce_prod(shape[:axis], keepdims=True) - shape = array_ops.concat([product, shape[axis:]], axis=0) - ndims -= (axis - 1) - axis = 1 - # Now change shape to 4D. - is_channels_last = axis == ndims - 1 - if ndims < 4: - # Insert new dims after existing spatial dim or before channel dim. - new_dims = constant_op.constant([1] * (4 - ndims)) - if is_channels_last: - # Transform [..., C] to [..., 1..., C] (ndims=4). - shape = array_ops.concat([shape[:-1], new_dims, shape[-1:]], axis=0) - else: - # Transform [N, C, ...] to [N, C, ..., 1...] (ndims=4). - shape = array_ops.concat([shape, new_dims], axis=0) - elif ndims > 4: - # Merge excess spatial dims into the second spatial dim. - # Transform [N, C, H, W, ...] to [N, C, H, product(W, ...)]. - # Or [N, H, W, ..., C] to [N, H, product(W, ...), C]. - merge_dim = 2 if is_channels_last else 3 - product = math_ops.reduce_prod( - shape[merge_dim:merge_dim + 1 + (ndims - 4)], keepdims=True) - shape = array_ops.concat([shape[:merge_dim], product, - shape[merge_dim + 1 + (ndims - 4):]], axis=0) - axis = 3 if is_channels_last else 1 - return shape, axis - def build(self, input_shape): input_shape = tensor_shape.TensorShape(input_shape) if not input_shape.ndims: @@ -352,8 +303,33 @@ def build(self, input_shape): raise ValueError('When using virtual_batch_size, adjustment cannot ' 'be specified') - if self.fused and not self._USE_V2_BEHAVIOR: - self.fused = self._fused_can_be_used() + if self.fused in (None, True): + # TODO(yaozhang): if input is not 4D, reshape it to 4D and reshape the + # output back to its original shape accordingly. + if self._USE_V2_BEHAVIOR: + if self.fused is None: + self.fused = (ndims == 4) + elif self.fused and ndims != 4: + raise ValueError('Batch normalization layers with fused=True only ' + 'support 4D input tensors.') + else: + assert self.fused is not None + self.fused = (ndims == 4 and self._fused_can_be_used()) + # TODO(chrisying): fused batch norm is currently not supported for + # multi-axis batch norm and by extension virtual batches. In some cases, + # it might be possible to use fused batch norm but would require reshaping + # the Tensor to 4D with the axis in 1 or 3 (preferred 1) which is + # particularly tricky. A compromise might be to just support the most + # common use case (turning 5D w/ virtual batch to NCHW) + + if self.fused: + if self.axis == [1]: + self._data_format = 'NCHW' + elif self.axis == [3]: + self._data_format = 'NHWC' + else: + raise ValueError('Unsupported axis, fused batch norm only supports ' + 'axis == [1] or axis == [3]') axis_to_dim = {x: input_shape.dims[x].value for x in self.axis} for x in axis_to_dim: @@ -548,7 +524,7 @@ def _fused_batch_norm_training(): gamma, beta, epsilon=self.epsilon, - data_format=data_format) + data_format=self._data_format) def _fused_batch_norm_inference(): return nn.fused_batch_norm( @@ -559,7 +535,7 @@ def _fused_batch_norm_inference(): variance=self.moving_variance, epsilon=self.epsilon, is_training=False, - data_format=data_format) + data_format=self._data_format) output, mean, variance = tf_utils.smart_cond( training, _fused_batch_norm_training, _fused_batch_norm_inference) @@ -572,9 +548,6 @@ def _fused_batch_norm_inference(): factor = (sample_size - math_ops.cast(1.0, variance.dtype)) / sample_size variance *= factor - if original_shape is not None: - output = array_ops.reshape(output, original_shape) - training_value = tf_utils.constant_value(training) if training_value is None: momentum = tf_utils.smart_cond(training, diff --git a/tensorflow/python/keras/layers/normalization_test.py b/tensorflow/python/keras/layers/normalization_test.py index 511ee29baac..3ade7d1cbf8 100644 --- a/tensorflow/python/keras/layers/normalization_test.py +++ b/tensorflow/python/keras/layers/normalization_test.py @@ -147,134 +147,6 @@ def test_batchnorm_convnet_channel_last(self): np.testing.assert_allclose(np.mean(out, axis=(0, 1, 2)), 0.0, atol=1e-1) np.testing.assert_allclose(np.std(out, axis=(0, 1, 2)), 1.0, atol=1e-1) - @keras_parameterized.run_all_keras_modes - def test_batchnorm_convnet_channel_last_3d_fused(self): - model = keras.models.Sequential() - norm = keras.layers.BatchNormalization( - axis=-1, input_shape=(4, 4, 4, 3), momentum=0.8, fused=True) - model.add(norm) - model.compile( - loss='mse', - optimizer=gradient_descent.GradientDescentOptimizer(0.01), - run_eagerly=testing_utils.should_run_eagerly()) - - # centered on 5.0, variance 10.0 - x = np.random.normal(loc=5.0, scale=10.0, size=(1000, 4, 4, 4, 3)) - model.fit(x, x, epochs=4, verbose=0) - out = model.predict(x) - out -= np.reshape(keras.backend.eval(norm.beta), (1, 1, 1, 1, 3)) - out /= np.reshape(keras.backend.eval(norm.gamma), (1, 1, 1, 1, 3)) - - np.testing.assert_allclose(np.mean(out, axis=(0, 1, 2, 3)), 0.0, atol=1e-1) - np.testing.assert_allclose(np.std(out, axis=(0, 1, 2, 3)), 1.0, atol=1e-1) - - @keras_parameterized.run_all_keras_modes - def test_batchnorm_convnet_channel_first_3d_fused(self): - model = keras.models.Sequential() - norm = keras.layers.BatchNormalization( - axis=1, input_shape=(3, 4, 4, 4), momentum=0.8, fused=True) - model.add(norm) - model.compile( - loss='mse', - optimizer=gradient_descent.GradientDescentOptimizer(0.01), - run_eagerly=testing_utils.should_run_eagerly()) - - # centered on 5.0, variance 10.0 - x = np.random.normal(loc=5.0, scale=10.0, size=(1000, 3, 4, 4, 4)) - model.fit(x, x, epochs=4, verbose=0) - out = model.predict(x) - out -= np.reshape(keras.backend.eval(norm.beta), (1, 3, 1, 1, 1)) - out /= np.reshape(keras.backend.eval(norm.gamma), (1, 3, 1, 1, 1)) - - np.testing.assert_allclose(np.mean(out, axis=(0, 2, 3, 4)), 0.0, atol=1e-1) - np.testing.assert_allclose(np.std(out, axis=(0, 2, 3, 4)), 1.0, atol=1e-1) - - @keras_parameterized.run_all_keras_modes - def test_batchnorm_convnet_channel_last_3d_fused_correctness(self): - model = keras.models.Sequential() - norm = keras.layers.BatchNormalization(axis=-1, momentum=0.8, fused=True) - model.add(norm) - model.compile( - loss='mse', - optimizer=gradient_descent.GradientDescentOptimizer(0.01)) - - # Sequential values ensure the result is axis-dependent. - x = np.arange(5 * 5 * 5 * 5 * 3).reshape([5, 5, 5, 5, 3]) - x = x.astype(np.float32) - model.fit(x, x, epochs=1000, verbose=0) - moving_mean = keras.backend.eval(norm.moving_mean) - moving_variance = keras.backend.eval(norm.moving_variance) - x_mean = x.mean(axis=(0, 1, 2, 3)) - moving_mean_target = x_mean - moving_variance_target = x.var(axis=(0, 1, 2, 3)) - np.testing.assert_allclose( - moving_mean, moving_mean_target, rtol=1e-5) - np.testing.assert_allclose( - moving_variance, moving_variance_target, rtol=1e-2) - - beta = np.reshape(keras.backend.eval(norm.beta), (1, 1, 1, 1, 3)) - gamma = np.reshape(keras.backend.eval(norm.gamma), (1, 1, 1, 1, 3)) - - out = (model.predict(x) - beta) / gamma - np.testing.assert_allclose( - np.mean(out, axis=(0, 1, 2, 3)), 0.0, atol=1e-2) - np.testing.assert_allclose(np.std(out, axis=(0, 1, 2, 3)), 1.0, atol=1e-2) - - # Test with changed input shape. - y = np.arange(7 * 7 * 7 * 7 * 3).reshape([7, 7, 7, 7, 3]) - y = y.astype(np.float32) - out = (model.predict(y) - beta) / gamma - x_std = x.std(axis=(0, 1, 2, 3)) - out_mean_target = (y.mean(axis=(0, 1, 2, 3)) - x_mean) / x_std - out_std_target = y.std(axis=(0, 1, 2, 3)) / x_std - np.testing.assert_allclose( - np.mean(out, axis=(0, 1, 2, 3)), out_mean_target, atol=1e-2) - np.testing.assert_allclose( - np.std(out, axis=(0, 1, 2, 3)), out_std_target, atol=1e-2) - - @keras_parameterized.run_all_keras_modes - def test_batchnorm_convnet_channel_first_3d_fused_correctness(self): - model = keras.models.Sequential() - norm = keras.layers.BatchNormalization(axis=1, momentum=0.8, fused=True) - model.add(norm) - model.compile( - loss='mse', - optimizer=gradient_descent.GradientDescentOptimizer(0.01)) - - # Sequential values ensure the result is axis-dependent. - x = np.arange(5 * 3 * 5 * 5 * 5).reshape([5, 3, 5, 5, 5]) - x = x.astype(np.float32) - model.fit(x, x, epochs=1000, verbose=0) - moving_mean = keras.backend.eval(norm.moving_mean) - moving_variance = keras.backend.eval(norm.moving_variance) - x_mean = x.mean(axis=(0, 2, 3, 4)) - moving_mean_target = x_mean - moving_variance_target = x.var(axis=(0, 2, 3, 4)) - np.testing.assert_allclose( - moving_mean, moving_mean_target, rtol=1e-5) - np.testing.assert_allclose( - moving_variance, moving_variance_target, rtol=1e-2) - - beta = np.reshape(keras.backend.eval(norm.beta), (1, 3, 1, 1, 1)) - gamma = np.reshape(keras.backend.eval(norm.gamma), (1, 3, 1, 1, 1)) - - out = (model.predict(x) - beta) / gamma - np.testing.assert_allclose( - np.mean(out, axis=(0, 2, 3, 4)), 0.0, atol=1e-2) - np.testing.assert_allclose(np.std(out, axis=(0, 2, 3, 4)), 1.0, atol=1e-2) - - # Test with changed input shape. - y = np.arange(7 * 3 * 7 * 7 * 7).reshape([7, 3, 7, 7, 7]) - y = y.astype(np.float32) - out = (model.predict(y) - beta) / gamma - x_std = x.std(axis=(0, 2, 3, 4)) - out_mean_target = (y.mean(axis=(0, 2, 3, 4)) - x_mean) / x_std - out_std_target = y.std(axis=(0, 2, 3, 4)) / x_std - np.testing.assert_allclose( - np.mean(out, axis=(0, 2, 3, 4)), out_mean_target, atol=1e-2) - np.testing.assert_allclose( - np.std(out, axis=(0, 2, 3, 4)), out_std_target, atol=1e-2) - @keras_parameterized.run_all_keras_modes def test_batchnorm_correctness(self): _run_batchnorm_correctness_test( @@ -352,11 +224,10 @@ def test_step(x, y): loss = loss_fn(y, y_pred) return loss - train_step(np.random.random((100, 3)).astype(np.float32), - np.random.random((100, 3)).astype(np.float32)) + train_step(np.random.random((100, 3)), np.random.random((100, 3))) - test_data = np.random.random((10, 3)).astype(np.float32) - test_targets = np.random.random((10, 3)).astype(np.float32) + test_data = np.random.random((10, 3)) + test_targets = np.random.random((10, 3)) test_loss = test_step(test_data, test_targets) bn.trainable = False @@ -380,7 +251,7 @@ def call(self, x, training): model = MyModel() for _ in range(10): - x = constant_op.constant(0.5, shape=[2, 1]) + x = constant_op.constant(0.5, shape=[1, 1]) model(x, training=True) # Make sure the moving mean and variance have been updated @@ -422,28 +293,20 @@ def test_basic_batchnorm_v2(self): normalization_v2.BatchNormalization, kwargs={'fused': None}, input_shape=(3, 3, 3)) - testing_utils.layer_test( - normalization_v2.BatchNormalization, - kwargs={'fused': True}, - input_shape=(3, 3, 3, 3, 3)) - testing_utils.layer_test( - normalization_v2.BatchNormalization, - kwargs={'fused': True}, - input_shape=(3, 3)) @tf_test_util.run_in_graph_and_eager_modes def test_v2_fused_attribute(self): norm = normalization_v2.BatchNormalization() - self.assertEqual(norm.fused, True) + self.assertEqual(norm.fused, None) inp = keras.layers.Input(shape=(4, 4, 4)) norm(inp) self.assertEqual(norm.fused, True) norm = normalization_v2.BatchNormalization() - self.assertEqual(norm.fused, True) + self.assertEqual(norm.fused, None) inp = keras.layers.Input(shape=(4, 4)) norm(inp) - self.assertEqual(norm.fused, True) + self.assertEqual(norm.fused, False) norm = normalization_v2.BatchNormalization(virtual_batch_size=2) self.assertEqual(norm.fused, False) @@ -466,7 +329,10 @@ def test_v2_fused_attribute(self): with self.assertRaisesRegexp(ValueError, 'fused.*renorm'): normalization_v2.BatchNormalization(fused=True, renorm=True) - with self.assertRaisesRegexp(ValueError, 'fused.*over a single axis'): + with self.assertRaisesRegexp(ValueError, 'fused.*when axis is 1 or 3'): + normalization_v2.BatchNormalization(fused=True, axis=2) + + with self.assertRaisesRegexp(ValueError, 'fused.*when axis is 1 or 3'): normalization_v2.BatchNormalization(fused=True, axis=[1, 3]) with self.assertRaisesRegexp(ValueError, 'fused.*virtual_batch_size'): @@ -476,6 +342,12 @@ def test_v2_fused_attribute(self): normalization_v2.BatchNormalization(fused=True, adjustment=lambda _: (1, 0)) + norm = normalization_v2.BatchNormalization(fused=True) + self.assertEqual(norm.fused, True) + inp = keras.layers.Input(shape=(4, 4)) + with self.assertRaisesRegexp(ValueError, '4D input tensors'): + norm(inp) + def test_updates_in_wrap_function(self): with context.eager_mode(): layer = keras.layers.BatchNormalization() @@ -522,23 +394,6 @@ def _run_batchnorm_correctness_test(layer, dtype='float32', fused=False): class NormalizationLayersGraphModeOnlyTest( test.TestCase, parameterized.TestCase): - def test_unknown_shape_batchnorm(self, layer): - """Test that a BN layer supports unknown input shapes in graph mode.""" - with self.cached_session(): - def run_model(input_shape): - bn = layer() - x = keras.layers.Input(shape=input_shape) - y = bn(x) - model = keras.models.Model(x, y) - model.compile(gradient_descent.GradientDescentOptimizer(0.01), 'mse') - known_shape = [2] + [5 if dim is None else dim for dim in input_shape] - val_a = np.random.random(known_shape) - _ = model.predict(val_a) - - run_model((None, 10)) - run_model((None, None, 10)) - run_model((None, None, None, 10)) - def test_shared_batchnorm(self, layer): """Test that a BN layer can be shared across different data streams.""" with self.cached_session(): diff --git a/tensorflow/python/ops/embedding_variable_ops_gpu_test.py b/tensorflow/python/ops/embedding_variable_ops_gpu_test.py index dfd5ff80293..23fa9ae7c80 100644 --- a/tensorflow/python/ops/embedding_variable_ops_gpu_test.py +++ b/tensorflow/python/ops/embedding_variable_ops_gpu_test.py @@ -203,7 +203,7 @@ def testEmbeddingVariableForGetShape(self): sess.run([init]) sess.run([emb]) # Unimplement GPUHashMapKV::Size() {return 0;} - self.assertAllEqual([0, 3], sess.run(shape)) + self.assertAllEqual([6, 3], sess.run(shape)) def testEmbeddingVariableForSparseColumnSharedEmbeddingCol(self): columns_list=[] @@ -482,5 +482,30 @@ def runTest(self, var, g): for j in range(0, 6): self.assertAllClose(emb1.tolist()[i][j], emb2.tolist()[i][j]) + def testEmbeddingVariableForSaver(self): + print("testEmbeddingVariableForSaver") + checkpoint_directory = self.get_temp_dir() + with ops.device("/gpu:0"): + var = variable_scope.get_embedding_variable("var_1", + embedding_dim = 3, + initializer=init_ops.ones_initializer(dtypes.float32), + ev_option = variables.EmbeddingVariableOption(storage_option=variables.StorageOption(storage_type=config_pb2.StorageType.HBM)), + partitioner=partitioned_variables.fixed_size_partitioner(num_shards=4)) + emb = embedding_ops.embedding_lookup(var, math_ops.cast([0,1,2,5,6,-7], dtypes.int64)) + fun = math_ops.multiply(emb, 2.0, name='multiply') + loss = math_ops.reduce_sum(fun, name='reduce_sum') + opt = ftrl.FtrlOptimizer(0.1, l1_regularization_strength=2.0, l2_regularization_strength=0.00001) + g_v = opt.compute_gradients(loss) + train_op = opt.apply_gradients(g_v) + saver = saver_module.Saver() + init = variables.global_variables_initializer() + with self.test_session(force_gpu=True) as sess: + sess.run(ops.get_collection(ops.GraphKeys.EV_INIT_VAR_OPS)) + sess.run(ops.get_collection(ops.GraphKeys.EV_INIT_SLOT_OPS)) + sess.run([init]) + print(sess.run([emb, train_op,loss])) + model_path = os.path.join(checkpoint_directory, "model.ckpt") + print(saver.save(sess, model_path)) + if __name__ == "__main__": googletest.main() diff --git a/tensorflow/tools/pip_package/setup.py b/tensorflow/tools/pip_package/setup.py index c2f9d45cfde..b50a94cf34a 100644 --- a/tensorflow/tools/pip_package/setup.py +++ b/tensorflow/tools/pip_package/setup.py @@ -47,7 +47,7 @@ # result for pip. # Also update tensorflow/tensorflow.bzl and # tensorflow/core/public/version.h -_VERSION = '1.15.5+deeprec2212' +_VERSION = '1.15.5+deeprec2212u1' REQUIRED_PACKAGES = [ 'absl-py >= 0.9.0',