Skip to content

Commit

Permalink
Refactor: How Streams and Streamclient are handled and how data is send
Browse files Browse the repository at this point in the history
  • Loading branch information
Barracuda09 committed Oct 30, 2023
1 parent b8852ac commit dda93e4
Show file tree
Hide file tree
Showing 101 changed files with 1,894 additions and 2,978 deletions.
13 changes: 4 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ SOURCES = Version.cpp \
main.cpp \
Satpi.cpp \
Stream.cpp \
StreamClient.cpp \
StreamManager.cpp \
StringConverter.cpp \
TransportParamVector.cpp \
Expand Down Expand Up @@ -108,14 +107,10 @@ SOURCES = Version.cpp \
mpegts/PMT.cpp \
mpegts/SDT.cpp \
mpegts/TableData.cpp \
output/StreamThreadBase.cpp \
output/StreamThreadHttp.cpp \
output/StreamThreadRtcpBase.cpp \
output/StreamThreadRtcp.cpp \
output/StreamThreadRtcpTcp.cpp \
output/StreamThreadRtp.cpp \
output/StreamThreadRtpTcp.cpp \
output/StreamThreadTSWriter.cpp \
output/StreamClient.cpp \
output/StreamClientOutputHttp.cpp \
output/StreamClientOutputRtp.cpp \
output/StreamClientOutputRtpTcp.cpp \
socket/HttpcSocket.cpp \
socket/TcpSocket.cpp \
socket/SocketAttr.cpp \
Expand Down
58 changes: 19 additions & 39 deletions SatPI.plantuml
Original file line number Diff line number Diff line change
Expand Up @@ -103,22 +103,16 @@ class HttpcServer {
processStreamingRequest(..)
}

class StreamInterface #aaaaff {
getInputDevice() = 0
getStreamClient(..) = 0
getDecryptDevice() = 0
}

Stream --|> StreamInterface
Stream *-- StreamClient : " _client[..] "
Stream *-- output::StreamClient : " _streamClientVector[..] "
Stream *-- mpegts::PacketBuffer : " _tsBuffer[..] "
Stream *-- input::Device
Stream *-- output::StreamThreadBase
decrypt::dvbapi::Client ..* Stream : " _decrypt "
class Stream #aaaaff {
XMLSupport
--
** Thread DataReader**
** Thread Monitor/RTCP**
--
enabled
StreamingType (RSTP/RTP/HTTP/..)
streamInUse
--
getFrontendDecryptInterface()
Expand Down Expand Up @@ -149,10 +143,10 @@ class StreamManager #aaaaff {
getFrontendDecryptInterface(..)
}

StreamClient *-- socket::SocketAttr : " _rtp "
StreamClient *-- socket::SocketAttr : " _rtcp "
StreamClient *.. socket::SocketClient : " _httpStream "
class StreamClient #aaaaff {
output::StreamClient *-- socket::SocketAttr : " _rtp "
output::StreamClient *-- socket::SocketAttr : " _rtcp "
output::StreamClient *.. socket::SocketClient : " _httpStream "
class output::StreamClient #ccffff {
Mutex _mutex
--
SocketClient *_httpStream
Expand All @@ -171,33 +165,19 @@ class mpegts::PMT #11ff11
class mpegts::PCR #11ff11
class mpegts::SDT #11ff11

output::StreamThreadBase *-- mpegts::PacketBuffer : " _tsBuffer[..] "
output::StreamThreadBase *.. StreamInterface
class output::StreamThreadBase #ccffff {
** Thread **
--
}

class output::StreamThreadRtcp #ccffff
class output::StreamThreadHttp #ccffff

output::StreamThreadRtcp --* output::StreamThreadRtp

output::StreamThreadRtp --|> output::StreamThreadBase
class output::StreamThreadRtp #ccffff {
cseq
}

output::StreamThreadHttp --|> output::StreamThreadBase

class input::stream::Streamer #55ccff
input::Device <|-- input::stream::Streamer
class output::StreamClientOutputHttp #ccffff
class output::StreamClientOutputRtp #ccffff
class output::StreamClientOutputRtpTcp #ccffff
output::StreamClientOutputHttp --|> output::StreamClient
output::StreamClientOutputRtp --|> output::StreamClient
output::StreamClientOutputRtpTcp --|> output::StreamClient

class input::file::TSReader #99cc11
class TSReaderData #99cc11
class input::childpipe::TSReader #99cc11
class TSReaderData #99cc11

input::Device <|-- input::file::TSReader
input::file::TSReader *-- TSReaderData
input::Device <|-- input::childpipe::TSReader
input::childpipe::TSReader *-- TSReaderData

TSReaderData --|> input::DeviceData

Expand Down
Binary file modified SatPI.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion SatPI.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions src/Defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
#ifndef DEFS_H_INCLUDE
#define DEFS_H_INCLUDE DEFS_H_INCLUDE

#include <memory>
#include <string>
#include <vector>

using StringVector = std::vector<std::string>;

using PacketPtr = std::unique_ptr<uint8_t[]>;

class TypeID {
public:
TypeID(int id) : _id(id) {}
Expand Down
4 changes: 2 additions & 2 deletions src/HeaderVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
// =============================================================================

std::string HeaderVector::getFieldParameter(const std::string_view reqHeader) const {
for (const std::string &header : _vector) {
for (const std::string& header : _vector) {
const auto b = header.find(reqHeader, 0);
const auto e = header.find_first_not_of(reqHeader, b);
if (b != std::string::npos && reqHeader.size() == e && header[e] == ':') {
Expand All @@ -43,7 +43,7 @@ std::string HeaderVector::getStringFieldParameter(std::string_view header,
return std::string();
}
StringVector params = StringConverter::split(field, ";\r\n");
for (const std::string &param : params) {
for (const std::string& param : params) {
const auto p = param.find(parameter, 0);
if (p != std::string::npos) {
StringVector r = StringConverter::split(param, "=");
Expand Down
16 changes: 8 additions & 8 deletions src/HttpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ bool HttpServer::methodPost(SocketClient &client) {
getHtmlBodyWithContent(htmlBody, HTML_NO_RESPONSE, "", CONTENT_TYPE_HTML, 0, 0);

// send 'htmlBody' to client
if (!client.sendData(htmlBody.c_str(), htmlBody.size(), 0)) {
if (!client.sendData(htmlBody.data(), htmlBody.size(), 0)) {
SI_LOG_ERROR("Send htmlBody failed");
return false;
}
Expand Down Expand Up @@ -147,7 +147,7 @@ bool HttpServer::methodGet(SocketClient &client, bool headOnly) {
} else if (file == "STOP") {
exitRequest = true;
getHtmlBodyWithContent(htmlBody, HTML_NO_RESPONSE, "", CONTENT_TYPE_HTML, 0, 0);
} else if ((docTypeSize = readFile(filePath.c_str(), docType))) {
} else if ((docTypeSize = readFile(filePath.data(), docType))) {
if (file.find(".xml") != std::string::npos) {
// check if the request is the SAT>IP description xml then fill in the server version, UUID,
// XSatipM3U, presentationURL and tuner string
Expand All @@ -160,7 +160,7 @@ bool HttpServer::methodGet(SocketClient &client, bool headOnly) {
_bindIPAddress,
std::to_string(_properties.getHttpPort()));
const std::string modelName = StringConverter::stringFormat("SatPI Server (@#1)", _bindIPAddress);
const std::string newDocType = StringConverter::stringFormat(docType.c_str(),
const std::string newDocType = StringConverter::stringFormat(docType.data(),
modelName, _properties.getUPnPVersion(), _properties.getUUID(), presentationURL,
_streamManager.getXMLDeliveryString(), _properties.getXSatipM3U());
docType = newDocType;
Expand Down Expand Up @@ -193,9 +193,9 @@ bool HttpServer::methodGet(SocketClient &client, bool headOnly) {
continue;
}
if (line.find("rtsp://") != std::string::npos) {
docType += StringConverter::stringFormat(line.c_str(), rtsp);
docType += StringConverter::stringFormat(line.data(), rtsp);
} else if (line.find("http://") != std::string::npos) {
docType += StringConverter::stringFormat(line.c_str(), http);
docType += StringConverter::stringFormat(line.data(), http);
}
}
docTypeSize = docType.size();
Expand All @@ -209,21 +209,21 @@ bool HttpServer::methodGet(SocketClient &client, bool headOnly) {
}
} else {
file = _properties.getWebPath() + "/" + "404.html";
docTypeSize = readFile(file.c_str(), docType);
docTypeSize = readFile(file.data(), docType);
getHtmlBodyWithContent(htmlBody, HTML_NOT_FOUND, file, CONTENT_TYPE_HTML, docTypeSize, 0);
}
}
}
// send something?
if (htmlBody.size() > 0) {
// send 'htmlBody' to client
if (!client.sendData(htmlBody.c_str(), htmlBody.size(), 0)) {
if (!client.sendData(htmlBody.data(), htmlBody.size(), 0)) {
SI_LOG_ERROR("Send htmlBody failed");
return false;
}
// send 'docType' to client if needed
if (!headOnly && docTypeSize > 0) {
if (!client.sendData(docType.c_str(), docTypeSize, 0)) {
if (!client.sendData(docType.data(), docTypeSize, 0)) {
SI_LOG_ERROR("Send docType failed");
return false;
}
Expand Down
41 changes: 23 additions & 18 deletions src/HttpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Log.h>
#include <Properties.h>
#include <Stream.h>
#include <output/StreamClient.h>
#include <StreamManager.h>
#include <StringConverter.h>
#include <socket/SocketClient.h>
Expand All @@ -32,6 +33,8 @@
#include <cstdlib>
#include <fcntl.h>

extern const char* const satpi_version;

const char *HttpcServer::HTML_BODY_WITH_CONTENT =
"@#1 @#2\r\n" \
"Server: SatPI WebServer v0.1\r\n" \
Expand Down Expand Up @@ -82,8 +85,6 @@ HttpcServer::HttpcServer(
_streamManager(streamManager),
_bindIPAddress(bindIPAddress) {}

HttpcServer::~HttpcServer() {}

void HttpcServer::initialize(
int port,
bool nonblock) {
Expand Down Expand Up @@ -164,18 +165,23 @@ void HttpcServer::processStreamingRequest(SocketClient &client) {
const std::string method = client.getMethod();
std::string httpcReply;
if (sessionID.empty() && method == "OPTIONS") {
methodOptions("", cseq, httpcReply);
static const char *RTSP_OPTIONS_OK =
"RTSP/1.0 200 OK\r\n" \
"Server: satpi/@#1\r\n" \
"CSeq: @#2\r\n" \
"Public: OPTIONS, DESCRIBE, SETUP, PLAY, TEARDOWN\r\n" \
"\r\n";
httpcReply = StringConverter::stringFormat(RTSP_OPTIONS_OK, satpi_version, cseq);
} else if (sessionID.empty() && method == "DESCRIBE") {
methodDescribe("", cseq, feIndex, httpcReply);
} else {
int clientID;
SpStream stream = _streamManager.findStreamAndClientIDFor(client, clientID);
const auto [stream, streamClient] = _streamManager.findStreamAndClientFor(client);
if (stream != nullptr) {
stream->processStreamingRequest(client, clientID);
stream->processStreamingRequest(client, streamClient);

// Check the Method
if (method == "GET") {
stream->update(clientID);
stream->update(streamClient);
const std::string multicast = params.getParameter("multicast");
if (multicast.empty()) {
getHtmlBodyNoContent(httpcReply, HTML_OK, "", CONTENT_TYPE_VIDEO, 0);
Expand All @@ -185,27 +191,26 @@ void HttpcServer::processStreamingRequest(SocketClient &client) {
httpcReply += content;
}
} else if (method == "SETUP") {
methodSetup(*stream, clientID, httpcReply);
httpcReply = streamClient->getSetupMethodReply(stream->getStreamID());

if (!stream->update(clientID)) {
if (!stream->update(streamClient)) {
// something wrong here... send 408 error
getHtmlBodyNoContent(httpcReply, HTML_REQUEST_TIMEOUT, "", CONTENT_TYPE_VIDEO, cseq);
stream->teardown(clientID);
stream->teardown(streamClient);
}
} else if (method == "PLAY") {
methodPlay(*stream, clientID, httpcReply);
httpcReply = streamClient->getPlayMethodReply(stream->getStreamID(), _bindIPAddress);

if (!stream->update(clientID)) {
if (!stream->update(streamClient)) {
// something wrong here... send 408 error
getHtmlBodyNoContent(httpcReply, HTML_REQUEST_TIMEOUT, "", CONTENT_TYPE_VIDEO, cseq);
stream->teardown(clientID);
stream->teardown(streamClient);
}
} else if (method == "TEARDOWN") {
methodTeardown(sessionID, cseq, httpcReply);

stream->teardown(clientID);
httpcReply = streamClient->getTeardownMethodReply();
stream->teardown(streamClient);
} else if (method == "OPTIONS") {
methodOptions(sessionID, cseq, httpcReply);
httpcReply = streamClient->getOptionsMethodReply();
} else if (method == "DESCRIBE") {
methodDescribe(sessionID, cseq, feIndex, httpcReply);
} else {
Expand All @@ -221,7 +226,7 @@ void HttpcServer::processStreamingRequest(SocketClient &client) {
}
const unsigned long time = sw.getIntervalMS();
SI_LOG_DEBUG("Send reply in @#1 ms\r\n@#2", time, httpcReply);
if (!client.sendData(httpcReply.c_str(), httpcReply.size(), MSG_NOSIGNAL)) {
if (!client.sendData(httpcReply.data(), httpcReply.size(), MSG_NOSIGNAL)) {
SI_LOG_ERROR("Send Streaming reply failed");
}
}
Expand Down
16 changes: 3 additions & 13 deletions src/HttpcServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
FW_DECL_NS0(Stream);
FW_DECL_NS0(StreamManager);

FW_DECL_SP_NS1(output, StreamClient);

/// HTTP Client Server
class HttpcServer :
public TcpSocket {
Expand Down Expand Up @@ -66,7 +68,7 @@ class HttpcServer :
StreamManager &streamManager,
const std::string &bindIPAddress);

virtual ~HttpcServer();
virtual ~HttpcServer() = default;

/// Call this to initialize, setup and start this server
virtual void initialize(
Expand Down Expand Up @@ -94,18 +96,6 @@ class HttpcServer :
return false;
}

/// RTSP Method
virtual void methodSetup(const Stream &UNUSED(stream), int UNUSED(clientID), std::string &UNUSED(htmlBody)) {}

/// RTSP Method
virtual void methodPlay(const Stream &UNUSED(stream), int UNUSED(clientID), std::string &UNUSED(htmlBody)) {}

/// RTSP Method
virtual void methodTeardown(const std::string &UNUSED(sessionID), int UNUSED(cseq), std::string &UNUSED(htmlBody)) {}

/// RTSP Method
virtual void methodOptions(const std::string &UNUSED(sessionID), int UNUSED(cseq), std::string &UNUSED(htmlBody)) {}

/// RTSP Method
virtual void methodDescribe(const std::string &UNUSED(sessionID), int UNUSED(cseq), FeIndex UNUSED(index), std::string &UNUSED(htmlBody)) {}

Expand Down
9 changes: 7 additions & 2 deletions src/Log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ static base::Mutex logMutex;

bool Log::_syslogOn = false;
bool Log::_coutLog = true;
bool Log::_logDebug = true;

Log::LogBuffer Log::_appLogBuffer;

void Log::openAppLog(const char *deamonName, const bool daemonize) {
Expand Down Expand Up @@ -64,6 +66,9 @@ void Log::log(const int priority, const std::string &msg) {
if ((priority & MPEGTS_TABLES) == MPEGTS_TABLES) {
return;
}
if (priority == LOG_DEBUG && !_logDebug) {
return;
}
// set timestamp
struct timespec timeStamp;
struct tm result;
Expand All @@ -87,7 +92,7 @@ void Log::log(const int priority, const std::string &msg) {

// log to syslog
if (_syslogOn) {
syslog(priority, "%s", line.c_str());
syslog(priority, "%s", line.data());
}

#ifdef DEBUG
Expand All @@ -112,7 +117,7 @@ std::string Log::makeJSON() {
{
base::MutexLock lock(logMutex);
if (!_appLogBuffer.empty()) {
for (const LogElem &elem : _appLogBuffer) {
for (const LogElem& elem : _appLogBuffer) {
json.startObject();
json.addValueString("timestamp", elem.timestamp);
json.addValueString("msg", elem.msg);
Expand Down
Loading

0 comments on commit dda93e4

Please sign in to comment.