193 lines
6.1 KiB
C
193 lines
6.1 KiB
C
|
// Copyright 2018 The MediaPipe Authors.
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
#ifndef MEDIAPIPE_FRAMEWORK_PROFILER_CIRCULAR_BUFFER_H_
|
||
|
#define MEDIAPIPE_FRAMEWORK_PROFILER_CIRCULAR_BUFFER_H_
|
||
|
|
||
|
#include <atomic>
|
||
|
#include <cstddef>
|
||
|
#include <iterator>
|
||
|
#include <vector>
|
||
|
|
||
|
#include "mediapipe/framework/port/integral_types.h"
|
||
|
|
||
|
namespace mediapipe {
|
||
|
|
||
|
// A circular buffer for lock-free event logging.
|
||
|
// This class is thread-safe and writing using "push_back" is lock-free.
|
||
|
// Multiple writers and readers are supported. All writes and reads
|
||
|
// will succeed as long as the buffer does not grow by more than
|
||
|
// "buffer_margin_" during a read.
|
||
|
template <typename T>
|
||
|
class CircularBuffer {
|
||
|
public:
|
||
|
class iterator;
|
||
|
|
||
|
// Create a circular buffer to hold up to |capacity| events.
|
||
|
// Buffer writers are separated from readers by |buffer_margin|.
|
||
|
CircularBuffer(size_t capacity, double buffer_margin = 0.25);
|
||
|
|
||
|
// Appends one event to the buffer.
|
||
|
// Returns true if the buffer is free and writing succeeds.
|
||
|
inline bool push_back(const T& event);
|
||
|
|
||
|
// Returns the i-th event in the buffer from the current beginning location.
|
||
|
// Reading blocks until buffer is free.
|
||
|
inline T Get(size_t i) const;
|
||
|
|
||
|
// Returns the i-th event in the absolute buffer coordinates. Wrapping from
|
||
|
// the beginning must be implemented separately.
|
||
|
// Reading blocks until buffer is free.
|
||
|
inline T GetAbsolute(size_t i) const;
|
||
|
|
||
|
// Returns the first available index in the buffer.
|
||
|
inline iterator begin() const {
|
||
|
return iterator(this, current_ < capacity_ ? 0 : current_ - capacity_);
|
||
|
}
|
||
|
|
||
|
// Returns one past the last available index in the buffer.
|
||
|
inline iterator end() const { return iterator(this, current_); }
|
||
|
|
||
|
private:
|
||
|
// Marks an atom busy and returns its previous value.
|
||
|
static inline char AcquireForWrite(std::atomic_char& atom);
|
||
|
|
||
|
// After an atom reaches |lap|, marks it busy and returns its previous value.
|
||
|
static inline char AcquireForRead(std::atomic_char& atom, char lap);
|
||
|
|
||
|
// Marks an atom as not busy at |lap|.
|
||
|
static inline void Release(std::atomic_char& atom, char lap);
|
||
|
|
||
|
// Returns the modulo lap for a buffer index.
|
||
|
static inline char GetLap(size_t i, size_t buffer_size);
|
||
|
|
||
|
// Returns the greater of two modulo laps.
|
||
|
static inline char MaxLap(char u, char v);
|
||
|
|
||
|
private:
|
||
|
double buffer_margin_;
|
||
|
size_t capacity_;
|
||
|
size_t buffer_size_;
|
||
|
std::vector<T> buffer_;
|
||
|
mutable std::vector<std::atomic_char> lap_;
|
||
|
std::atomic<size_t> current_;
|
||
|
static constexpr char kBusy = 0xFF;
|
||
|
static constexpr char kMask = 0x7F;
|
||
|
};
|
||
|
|
||
|
template <typename T>
|
||
|
CircularBuffer<T>::CircularBuffer(size_t capacity, double buffer_margin)
|
||
|
: capacity_(capacity),
|
||
|
buffer_size_((size_t)capacity * (1 + buffer_margin)),
|
||
|
buffer_(buffer_size_),
|
||
|
lap_(buffer_size_),
|
||
|
current_(0) {}
|
||
|
|
||
|
template <typename T>
|
||
|
bool CircularBuffer<T>::push_back(const T& event) {
|
||
|
size_t i = current_++;
|
||
|
char lap = GetLap(i, buffer_size_);
|
||
|
size_t index = i % buffer_size_;
|
||
|
char prev = AcquireForWrite(lap_[index]);
|
||
|
buffer_[index] = event;
|
||
|
Release(lap_[index], MaxLap(prev, lap));
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
template <typename T>
|
||
|
T CircularBuffer<T>::GetAbsolute(size_t i) const {
|
||
|
char lap = GetLap(i, buffer_size_);
|
||
|
size_t index = i % buffer_size_;
|
||
|
char prev = AcquireForRead(lap_[index], lap);
|
||
|
T result = buffer_[index];
|
||
|
Release(lap_[index], prev);
|
||
|
return result;
|
||
|
}
|
||
|
|
||
|
template <typename T>
|
||
|
T CircularBuffer<T>::Get(size_t i) const {
|
||
|
if (current_ > capacity_) {
|
||
|
return GetAbsolute(i + current_ - capacity_);
|
||
|
} else {
|
||
|
return GetAbsolute(i);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
template <typename T>
|
||
|
char CircularBuffer<T>::AcquireForWrite(std::atomic_char& atom) {
|
||
|
char prev;
|
||
|
for (bool done = false; !done;) {
|
||
|
prev = atom;
|
||
|
if (prev != kBusy) {
|
||
|
done =
|
||
|
atom.compare_exchange_strong(prev, kBusy, std::memory_order_acquire);
|
||
|
}
|
||
|
}
|
||
|
return prev;
|
||
|
}
|
||
|
|
||
|
template <typename T>
|
||
|
char CircularBuffer<T>::AcquireForRead(std::atomic_char& atom, char lap) {
|
||
|
char prev;
|
||
|
for (bool done = false; !done;) {
|
||
|
prev = atom;
|
||
|
if (prev != kBusy && prev == MaxLap(prev, lap)) {
|
||
|
done =
|
||
|
atom.compare_exchange_strong(prev, kBusy, std::memory_order_acquire);
|
||
|
}
|
||
|
}
|
||
|
return prev;
|
||
|
}
|
||
|
|
||
|
template <typename T>
|
||
|
void CircularBuffer<T>::Release(std::atomic_char& atom, char lap) {
|
||
|
atom.store(lap, std::memory_order_release);
|
||
|
}
|
||
|
|
||
|
template <typename T>
|
||
|
char CircularBuffer<T>::GetLap(size_t index, size_t buffer_size) {
|
||
|
return (index / buffer_size + 1) & kMask;
|
||
|
}
|
||
|
|
||
|
template <typename T>
|
||
|
char CircularBuffer<T>::MaxLap(char u, char v) {
|
||
|
return ((u - v) & kMask) <= (kMask / 2) ? u : v;
|
||
|
}
|
||
|
|
||
|
template <typename T>
|
||
|
class CircularBuffer<T>::iterator
|
||
|
: public std::iterator<std::random_access_iterator_tag, T, int64> {
|
||
|
public:
|
||
|
explicit iterator(const CircularBuffer* buffer, size_t index)
|
||
|
: buffer_(buffer), index_(index) {}
|
||
|
bool operator==(iterator other) const { return index_ == other.index_; }
|
||
|
bool operator!=(iterator other) const { return !(*this == other); }
|
||
|
bool operator<(iterator other) const { return index_ < other.index_; }
|
||
|
T operator*() const { return buffer_->GetAbsolute(index_); }
|
||
|
T* operator->() const { &buffer_->GetAbsolute(index_); }
|
||
|
iterator& operator++() { return (*this) += 1; }
|
||
|
iterator& operator+=(const int64& num) { return index_ += num, *this; }
|
||
|
int64 operator-(const iterator& it) const { return index_ - it.index_; }
|
||
|
iterator& operator+(const int64& num) { return iterator(*this) += num; }
|
||
|
iterator& operator-(const int64& num) { return iterator(*this) += -num; }
|
||
|
|
||
|
private:
|
||
|
const CircularBuffer* buffer_;
|
||
|
size_t index_;
|
||
|
};
|
||
|
|
||
|
} // namespace mediapipe
|
||
|
|
||
|
#endif // MEDIAPIPE_FRAMEWORK_PROFILER_CIRCULAR_BUFFER_H_
|