diff --git a/src/stream.cpp b/src/stream.cpp
new file mode 100644
--- /dev/null
+++ b/src/stream.cpp
@@ -0,0 +1,248 @@
+/*
+ Copyright © 2018 Hasan Yavuz Özderya
+
+ This file is part of serialplot.
+
+ serialplot is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ serialplot is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with serialplot. If not, see .
+*/
+
+#include "stream.h"
+#include "ringbuffer.h"
+#include "indexbuffer.h"
+
+Stream::Stream(unsigned nc, bool x, unsigned ns) :
+ _infoModel(nc)
+{
+ _numSamples = ns;
+ _paused = false;
+
+ // create xdata buffer
+ _hasx = x;
+ if (x)
+ {
+ xData = new RingBuffer(ns);
+ }
+ else
+ {
+ xData = new IndexBuffer(ns);
+ }
+
+ // create channels
+ for (unsigned i = 0; i < nc; i++)
+ {
+ auto c = new StreamChannel(i, xData, new RingBuffer(ns), &_infoModel);
+ channels.append(c);
+ }
+}
+
+Stream::~Stream()
+{
+ for (auto ch : channels)
+ {
+ delete ch;
+ }
+ delete xData;
+}
+
+bool Stream::hasX() const
+{
+ return _hasx;
+}
+
+unsigned Stream::numChannels() const
+{
+ return channels.length();
+}
+
+unsigned Stream::numSamples() const
+{
+ return _numSamples;
+}
+
+const StreamChannel* Stream::channel(unsigned index) const
+{
+ Q_ASSERT(index < numChannels());
+ return channels[index];
+}
+
+StreamChannel* Stream::channel(unsigned index)
+{
+ return const_cast(static_cast(*this).channel(index));
+}
+
+const ChannelInfoModel* Stream::infoModel() const
+{
+ return &_infoModel;
+}
+
+ChannelInfoModel* Stream::infoModel()
+{
+ return const_cast(static_cast(*this).infoModel());
+}
+
+void Stream::setNumChannels(unsigned nc, bool x)
+{
+ unsigned oldNum = numChannels();
+ if (oldNum == nc && x == _hasx) return;
+
+ // adjust the number of channels
+ if (nc > oldNum)
+ {
+ for (unsigned i = oldNum; i < nc; i++)
+ {
+ auto c = new StreamChannel(i, xData, new RingBuffer(_numSamples), &_infoModel);
+ channels.append(c);
+ }
+ }
+ else if (nc < oldNum)
+ {
+ for (unsigned i = oldNum-1; i > nc-1; i--)
+ {
+ delete channels.takeLast();
+ }
+ }
+
+ // change the xdata
+ if (x != _hasx)
+ {
+ if (x)
+ {
+ xData = new RingBuffer(_numSamples);
+ }
+ else
+ {
+ xData = new IndexBuffer(_numSamples);
+ }
+
+ for (auto c : channels)
+ {
+ c->setX(xData);
+ }
+ _hasx = x;
+ }
+
+ if (nc != oldNum)
+ {
+ _infoModel.setNumOfChannels(nc);
+ // TODO: how about X change?
+ emit numChannelsChanged(nc);
+ }
+
+ Sink::setNumChannels(nc, x);
+}
+
+const SamplePack* Stream::applyGainOffset(const SamplePack& pack) const
+{
+ Q_ASSERT(infoModel()->gainOrOffsetEn());
+
+ SamplePack* mPack = new SamplePack(pack);
+ unsigned ns = pack.numSamples();
+
+ for (unsigned ci = 0; ci < numChannels(); ci++)
+ {
+ // TODO: we could use some kind of map (int32, int64 would suffice) to speed things up
+ bool gainEn = infoModel()->gainEn(ci);
+ bool offsetEn = infoModel()->offsetEn(ci);
+ if (gainEn || offsetEn)
+ {
+ double* mdata = mPack->data(ci);
+
+ double gain = infoModel()->gain(ci);
+ double offset = infoModel()->offset(ci);
+
+ if (gainEn)
+ {
+ for (unsigned i = 0; i < ns; i++)
+ {
+ mdata[i] *= gain;
+ }
+ }
+ if (offsetEn)
+ {
+ for (unsigned i = 0; i < ns; i++)
+ {
+ mdata[i] += offset;
+ }
+ }
+ }
+ }
+
+ return mPack;
+}
+
+void Stream::feedIn(const SamplePack& pack)
+{
+ Q_ASSERT(pack.numChannels() == numChannels() &&
+ pack.hasX() == hasX());
+
+ if (_paused) return;
+
+ unsigned ns = pack.numSamples();
+ if (_hasx)
+ {
+ static_cast(xData)->addSamples(pack.xData(), ns);
+ }
+
+ // modified pack that gain and offset is applied to
+ const SamplePack* mPack = nullptr;
+ if (infoModel()->gainOrOffsetEn())
+ mPack = applyGainOffset(pack);
+
+ for (unsigned ci = 0; ci < numChannels(); ci++)
+ {
+ auto buf = static_cast(channels[ci]->yData());
+ double* data = (mPack == nullptr) ? pack.data(ci) : mPack->data(ci);
+ buf->addSamples(data, ns);
+ }
+
+ Sink::feedIn((mPack == nullptr) ? pack : *mPack);
+
+ if (mPack != nullptr) delete mPack;
+ emit dataAdded();
+}
+
+void Stream::pause(bool paused)
+{
+ _paused = paused;
+}
+
+void Stream::clear()
+{
+ for (auto c : channels)
+ {
+ static_cast(c->yData())->clear();
+ }
+}
+
+void Stream::setNumSamples(unsigned value)
+{
+ if (value == _numSamples) return;
+ _numSamples = value;
+
+ xData->resize(value);
+ for (auto c : channels)
+ {
+ static_cast(c->yData())->resize(value);
+ }
+}
+
+void Stream::saveSettings(QSettings* settings) const
+{
+ _infoModel.saveSettings(settings);
+}
+
+void Stream::loadSettings(QSettings* settings)
+{
+ _infoModel.loadSettings(settings);
+}