From 6014f92e6355b712b82069f04b821b2329b9f874 Mon Sep 17 00:00:00 2001 From: Doron Adler Date: Mon, 30 Nov 2015 13:31:52 +0200 Subject: [PATCH 1/3] * SampleBroadcaster project now depends on the products of Pods.xcodeproj * Added Pods (Pods), URIParser-cpp (Pods) and VideoCore (Pods) into SampleBroadcaster's "Target Dependencies" under Build Phases * SampleBroadcaster will wait for and link against the build products of Pods directly * Removed the Hard-Coded DevelopmentTeam ID from the Target's attributes * The minimum iOS deployment target has been set to 8.0 (Minimum target with VideoToolBox API support) * Removed the Hard-Coded Provisioning Profile UID * Commented out m_socketJob.thisThreadInQueue related assertions because this member is not available --- .../project.pbxproj | 137 ++++++++++++++++-- stream/AsyncStreamSession.cpp | 20 +-- 2 files changed, 134 insertions(+), 23 deletions(-) diff --git a/sample/SampleBroadcaster/SampleBroadcaster.xcodeproj/project.pbxproj b/sample/SampleBroadcaster/SampleBroadcaster.xcodeproj/project.pbxproj index 95e0a8c6..96e29116 100644 --- a/sample/SampleBroadcaster/SampleBroadcaster.xcodeproj/project.pbxproj +++ b/sample/SampleBroadcaster/SampleBroadcaster.xcodeproj/project.pbxproj @@ -8,7 +8,7 @@ /* Begin PBXBuildFile section */ 1A7B35651942542400AB5FD6 /* GLKit.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 1A7B35641942542400AB5FD6 /* GLKit.framework */; }; - 1A7B35671942564600AB5FD6 /* libPods.a in Frameworks */ = {isa = PBXBuildFile; fileRef = 1A7B35661942564600AB5FD6 /* libPods.a */; }; + 6F614C4C0DC4B51E132F6628 /* libPods.a in Frameworks */ = {isa = PBXBuildFile; fileRef = D811EF3F24704EBFB916513B /* libPods.a */; }; 7265D5E0191994B80042DC3B /* Foundation.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 7265D5DF191994B80042DC3B /* Foundation.framework */; }; 7265D5E2191994B80042DC3B /* CoreGraphics.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 7265D5E1191994B80042DC3B /* CoreGraphics.framework */; }; 7265D5E4191994B80042DC3B /* UIKit.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 7265D5E3191994B80042DC3B /* UIKit.framework */; }; @@ -24,11 +24,58 @@ 7265D66B1919978E0042DC3B /* CoreMedia.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 7265D66A1919978E0042DC3B /* CoreMedia.framework */; }; 7265D66D191997930042DC3B /* CoreVideo.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 7265D66C191997930042DC3B /* CoreVideo.framework */; }; 7265D66F191997970042DC3B /* OpenGLES.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 7265D66E191997970042DC3B /* OpenGLES.framework */; }; + E804065F1C0B6AC0007A8C35 /* libPods.a in Frameworks */ = {isa = PBXBuildFile; fileRef = E80406541C0B6A9A007A8C35 /* libPods.a */; }; + E80406601C0B6AC0007A8C35 /* libUriParser-cpp.a in Frameworks */ = {isa = PBXBuildFile; fileRef = E80406561C0B6A9A007A8C35 /* libUriParser-cpp.a */; }; + E80406611C0B6AC0007A8C35 /* libVideoCore.a in Frameworks */ = {isa = PBXBuildFile; fileRef = E80406581C0B6A9A007A8C35 /* libVideoCore.a */; }; /* End PBXBuildFile section */ +/* Begin PBXContainerItemProxy section */ + E80406531C0B6A9A007A8C35 /* PBXContainerItemProxy */ = { + isa = PBXContainerItemProxy; + containerPortal = E804064D1C0B6A9A007A8C35 /* Pods.xcodeproj */; + proxyType = 2; + remoteGlobalIDString = 2D0B02FEFD13E275DCDA0A3FA4A91128; + remoteInfo = Pods; + }; + E80406551C0B6A9A007A8C35 /* PBXContainerItemProxy */ = { + isa = PBXContainerItemProxy; + containerPortal = E804064D1C0B6A9A007A8C35 /* Pods.xcodeproj */; + proxyType = 2; + remoteGlobalIDString = 3ABC171208B669CAAC83360EC099312B; + remoteInfo = "UriParser-cpp"; + }; + E80406571C0B6A9A007A8C35 /* PBXContainerItemProxy */ = { + isa = PBXContainerItemProxy; + containerPortal = E804064D1C0B6A9A007A8C35 /* Pods.xcodeproj */; + proxyType = 2; + remoteGlobalIDString = C776C7A3542DE0824B02E9B374516A98; + remoteInfo = VideoCore; + }; + E80406591C0B6AA7007A8C35 /* PBXContainerItemProxy */ = { + isa = PBXContainerItemProxy; + containerPortal = E804064D1C0B6A9A007A8C35 /* Pods.xcodeproj */; + proxyType = 1; + remoteGlobalIDString = 3F9F12C8E3E76B4D733D743B811D4356; + remoteInfo = Pods; + }; + E804065B1C0B6AA7007A8C35 /* PBXContainerItemProxy */ = { + isa = PBXContainerItemProxy; + containerPortal = E804064D1C0B6A9A007A8C35 /* Pods.xcodeproj */; + proxyType = 1; + remoteGlobalIDString = 065E820B76BD075F05D7A93F1708BBFB; + remoteInfo = "UriParser-cpp"; + }; + E804065D1C0B6AA7007A8C35 /* PBXContainerItemProxy */ = { + isa = PBXContainerItemProxy; + containerPortal = E804064D1C0B6A9A007A8C35 /* Pods.xcodeproj */; + proxyType = 1; + remoteGlobalIDString = F112B488AE08212B62435D436F6675E1; + remoteInfo = VideoCore; + }; +/* End PBXContainerItemProxy section */ + /* Begin PBXFileReference section */ 1A7B35641942542400AB5FD6 /* GLKit.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; name = GLKit.framework; path = System/Library/Frameworks/GLKit.framework; sourceTree = SDKROOT; }; - 1A7B35661942564600AB5FD6 /* libPods.a */ = {isa = PBXFileReference; lastKnownFileType = archive.ar; name = libPods.a; path = "Pods/build/Debug-iphoneos/libPods.a"; sourceTree = ""; }; 7265D5DC191994B80042DC3B /* SampleBroadcaster.app */ = {isa = PBXFileReference; explicitFileType = wrapper.application; includeInIndex = 0; path = SampleBroadcaster.app; sourceTree = BUILT_PRODUCTS_DIR; }; 7265D5DF191994B80042DC3B /* Foundation.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; name = Foundation.framework; path = System/Library/Frameworks/Foundation.framework; sourceTree = SDKROOT; }; 7265D5E1191994B80042DC3B /* CoreGraphics.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; name = CoreGraphics.framework; path = System/Library/Frameworks/CoreGraphics.framework; sourceTree = SDKROOT; }; @@ -53,6 +100,9 @@ AC034311A4E2CCE0CCDA6C72 /* Pods.release.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = Pods.release.xcconfig; path = "Pods/Target Support Files/Pods/Pods.release.xcconfig"; sourceTree = ""; }; D811EF3F24704EBFB916513B /* libPods.a */ = {isa = PBXFileReference; explicitFileType = archive.ar; includeInIndex = 0; path = libPods.a; sourceTree = BUILT_PRODUCTS_DIR; }; DA5713A4139B758B1B834C67 /* Pods.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = Pods.debug.xcconfig; path = "Pods/Target Support Files/Pods/Pods.debug.xcconfig"; sourceTree = ""; }; + E80406491C0B69FD007A8C35 /* libUriParser-cpp.a */ = {isa = PBXFileReference; lastKnownFileType = archive.ar; name = "libUriParser-cpp.a"; path = "Pods/../build/Debug-iphoneos/libUriParser-cpp.a"; sourceTree = ""; }; + E804064A1C0B69FD007A8C35 /* libVideoCore.a */ = {isa = PBXFileReference; lastKnownFileType = archive.ar; name = libVideoCore.a; path = "Pods/../build/Debug-iphoneos/libVideoCore.a"; sourceTree = ""; }; + E804064D1C0B6A9A007A8C35 /* Pods.xcodeproj */ = {isa = PBXFileReference; lastKnownFileType = "wrapper.pb-project"; name = Pods.xcodeproj; path = Pods/Pods.xcodeproj; sourceTree = ""; }; /* End PBXFileReference section */ /* Begin PBXFrameworksBuildPhase section */ @@ -61,7 +111,6 @@ buildActionMask = 2147483647; files = ( 1A7B35651942542400AB5FD6 /* GLKit.framework in Frameworks */, - 1A7B35671942564600AB5FD6 /* libPods.a in Frameworks */, 7265D66F191997970042DC3B /* OpenGLES.framework in Frameworks */, 7265D66D191997930042DC3B /* CoreVideo.framework in Frameworks */, 7265D66B1919978E0042DC3B /* CoreMedia.framework in Frameworks */, @@ -71,6 +120,10 @@ 7265D5E2191994B80042DC3B /* CoreGraphics.framework in Frameworks */, 7265D5E4191994B80042DC3B /* UIKit.framework in Frameworks */, 7265D5E0191994B80042DC3B /* Foundation.framework in Frameworks */, + E804065F1C0B6AC0007A8C35 /* libPods.a in Frameworks */, + E80406601C0B6AC0007A8C35 /* libUriParser-cpp.a in Frameworks */, + E80406611C0B6AC0007A8C35 /* libVideoCore.a in Frameworks */, + 6F614C4C0DC4B51E132F6628 /* libPods.a in Frameworks */, ); runOnlyForDeploymentPostprocessing = 0; }; @@ -80,6 +133,7 @@ 7265D5D3191994B80042DC3B = { isa = PBXGroup; children = ( + E804064D1C0B6A9A007A8C35 /* Pods.xcodeproj */, 7265D5E5191994B80042DC3B /* SampleBroadcaster */, 7265D5DE191994B80042DC3B /* Frameworks */, 7265D5DD191994B80042DC3B /* Products */, @@ -98,7 +152,8 @@ 7265D5DE191994B80042DC3B /* Frameworks */ = { isa = PBXGroup; children = ( - 1A7B35661942564600AB5FD6 /* libPods.a */, + E80406491C0B69FD007A8C35 /* libUriParser-cpp.a */, + E804064A1C0B69FD007A8C35 /* libVideoCore.a */, 1A7B35641942542400AB5FD6 /* GLKit.framework */, 7265D66E191997970042DC3B /* OpenGLES.framework */, 7265D66C191997930042DC3B /* CoreVideo.framework */, @@ -149,6 +204,16 @@ name = Pods; sourceTree = ""; }; + E804064E1C0B6A9A007A8C35 /* Products */ = { + isa = PBXGroup; + children = ( + E80406541C0B6A9A007A8C35 /* libPods.a */, + E80406561C0B6A9A007A8C35 /* libUriParser-cpp.a */, + E80406581C0B6A9A007A8C35 /* libVideoCore.a */, + ); + name = Products; + sourceTree = ""; + }; /* End PBXGroup section */ /* Begin PBXNativeTarget section */ @@ -165,6 +230,9 @@ buildRules = ( ); dependencies = ( + E804065A1C0B6AA7007A8C35 /* PBXTargetDependency */, + E804065C1C0B6AA7007A8C35 /* PBXTargetDependency */, + E804065E1C0B6AA7007A8C35 /* PBXTargetDependency */, ); name = SampleBroadcaster; productName = SampleBroadcaster; @@ -179,11 +247,6 @@ attributes = { LastUpgradeCheck = 0700; ORGANIZATIONNAME = videocore; - TargetAttributes = { - 7265D5DB191994B80042DC3B = { - DevelopmentTeam = 263A46GSYZ; - }; - }; }; buildConfigurationList = 7265D5D7191994B80042DC3B /* Build configuration list for PBXProject "SampleBroadcaster" */; compatibilityVersion = "Xcode 3.2"; @@ -196,6 +259,12 @@ mainGroup = 7265D5D3191994B80042DC3B; productRefGroup = 7265D5DD191994B80042DC3B /* Products */; projectDirPath = ""; + projectReferences = ( + { + ProductGroup = E804064E1C0B6A9A007A8C35 /* Products */; + ProjectRef = E804064D1C0B6A9A007A8C35 /* Pods.xcodeproj */; + }, + ); projectRoot = ""; targets = ( 7265D5DB191994B80042DC3B /* SampleBroadcaster */, @@ -203,6 +272,30 @@ }; /* End PBXProject section */ +/* Begin PBXReferenceProxy section */ + E80406541C0B6A9A007A8C35 /* libPods.a */ = { + isa = PBXReferenceProxy; + fileType = archive.ar; + path = libPods.a; + remoteRef = E80406531C0B6A9A007A8C35 /* PBXContainerItemProxy */; + sourceTree = BUILT_PRODUCTS_DIR; + }; + E80406561C0B6A9A007A8C35 /* libUriParser-cpp.a */ = { + isa = PBXReferenceProxy; + fileType = archive.ar; + path = "libUriParser-cpp.a"; + remoteRef = E80406551C0B6A9A007A8C35 /* PBXContainerItemProxy */; + sourceTree = BUILT_PRODUCTS_DIR; + }; + E80406581C0B6A9A007A8C35 /* libVideoCore.a */ = { + isa = PBXReferenceProxy; + fileType = archive.ar; + path = libVideoCore.a; + remoteRef = E80406571C0B6A9A007A8C35 /* PBXContainerItemProxy */; + sourceTree = BUILT_PRODUCTS_DIR; + }; +/* End PBXReferenceProxy section */ + /* Begin PBXResourcesBuildPhase section */ 7265D5DA191994B80042DC3B /* Resources */ = { isa = PBXResourcesBuildPhase; @@ -262,6 +355,24 @@ }; /* End PBXSourcesBuildPhase section */ +/* Begin PBXTargetDependency section */ + E804065A1C0B6AA7007A8C35 /* PBXTargetDependency */ = { + isa = PBXTargetDependency; + name = Pods; + targetProxy = E80406591C0B6AA7007A8C35 /* PBXContainerItemProxy */; + }; + E804065C1C0B6AA7007A8C35 /* PBXTargetDependency */ = { + isa = PBXTargetDependency; + name = "UriParser-cpp"; + targetProxy = E804065B1C0B6AA7007A8C35 /* PBXContainerItemProxy */; + }; + E804065E1C0B6AA7007A8C35 /* PBXTargetDependency */ = { + isa = PBXTargetDependency; + name = VideoCore; + targetProxy = E804065D1C0B6AA7007A8C35 /* PBXContainerItemProxy */; + }; +/* End PBXTargetDependency section */ + /* Begin PBXVariantGroup section */ 7265D5E8191994B80042DC3B /* InfoPlist.strings */ = { isa = PBXVariantGroup; @@ -321,7 +432,7 @@ "$(inherited)", /Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/include, ); - IPHONEOS_DEPLOYMENT_TARGET = 7.1; + IPHONEOS_DEPLOYMENT_TARGET = 8.0; ONLY_ACTIVE_ARCH = YES; SDKROOT = iphoneos; TARGETED_DEVICE_FAMILY = "1,2"; @@ -360,7 +471,7 @@ "$(inherited)", /Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/include, ); - IPHONEOS_DEPLOYMENT_TARGET = 7.1; + IPHONEOS_DEPLOYMENT_TARGET = 8.0; SDKROOT = iphoneos; TARGETED_DEVICE_FAMILY = "1,2"; VALIDATE_PRODUCT = YES; @@ -415,7 +526,7 @@ LIBRARY_SEARCH_PATHS = "$(inherited)"; PRODUCT_BUNDLE_IDENTIFIER = "videocore.${PRODUCT_NAME:rfc1034identifier}"; PRODUCT_NAME = "$(TARGET_NAME)"; - PROVISIONING_PROFILE = "802ea4ec-e810-451f-9144-bc91bd34dd7b"; + PROVISIONING_PROFILE = ""; TARGETED_DEVICE_FAMILY = 1; WRAPPER_EXTENSION = app; }; @@ -469,7 +580,7 @@ LIBRARY_SEARCH_PATHS = "$(inherited)"; PRODUCT_BUNDLE_IDENTIFIER = "videocore.${PRODUCT_NAME:rfc1034identifier}"; PRODUCT_NAME = "$(TARGET_NAME)"; - PROVISIONING_PROFILE = "802ea4ec-e810-451f-9144-bc91bd34dd7b"; + PROVISIONING_PROFILE = ""; TARGETED_DEVICE_FAMILY = 1; WRAPPER_EXTENSION = app; }; diff --git a/stream/AsyncStreamSession.cpp b/stream/AsyncStreamSession.cpp index 128cd375..03dc9627 100644 --- a/stream/AsyncStreamSession.cpp +++ b/stream/AsyncStreamSession.cpp @@ -232,7 +232,7 @@ namespace videocore { } void AsyncStreamSession::readLength(size_t length, AsyncStreamBufferSP orgbuf, size_t offset, SSAnsyncReadCallBack_T readcb){ - assert(!m_socketJob.thisThreadInQueue()); + //assert(!m_socketJob.thisThreadInQueue()); m_socketJob.enqueue([=]{ DLogVerbose("Queue read request length:%zd\n", length); @@ -246,7 +246,7 @@ namespace videocore { #pragma mark - #pragma mark - Private void AsyncStreamSession::setState(AsyncStreamState_T state) { - assert(m_socketJob.thisThreadInQueue()); + // assert(m_socketJob.thisThreadInQueue()); m_state = state; m_eventTriggerJob.enqueue([=]{ @@ -257,7 +257,7 @@ namespace videocore { #pragma mark - #pragma mark - Reading std::shared_ptr AsyncStreamSession::getCurrentReader() { - assert(m_socketJob.thisThreadInQueue()); + // assert(m_socketJob.thisThreadInQueue()); if (m_readerQueue.size() > 0) { return m_readerQueue.front(); @@ -266,7 +266,7 @@ namespace videocore { } void AsyncStreamSession::doReadData(){ - assert(m_socketJob.thisThreadInQueue()); + // assert(m_socketJob.thisThreadInQueue()); // Just in case logical error if (m_doReadingData) { @@ -283,7 +283,7 @@ namespace videocore { } bool AsyncStreamSession::innnerReadData(){ - assert(m_socketJob.thisThreadInQueue()); + // assert(m_socketJob.thisThreadInQueue()); auto currentReader = getCurrentReader(); if (currentReader) { @@ -340,7 +340,7 @@ namespace videocore { } void AsyncStreamSession::finishCurrentReader(){ - assert(m_socketJob.thisThreadInQueue()); + // assert(m_socketJob.thisThreadInQueue()); auto currentReader = getCurrentReader(); assert(currentReader); @@ -358,14 +358,14 @@ namespace videocore { #pragma mark - #pragma mark - Writing std::shared_ptr AsyncStreamSession::getCurrentWriter() { - assert(m_socketJob.thisThreadInQueue()); + // assert(m_socketJob.thisThreadInQueue()); if (m_writerQueue.size() > 0) { return m_writerQueue.front(); } return nullptr; } void AsyncStreamSession::doWriteData(){ - assert(m_socketJob.thisThreadInQueue()); + // assert(m_socketJob.thisThreadInQueue()); DLogVerbose("Do write data\n"); while (m_stream->status() & kStreamStatusWriteBufferHasSpace && m_writerQueue.size() > 0) { @@ -373,7 +373,7 @@ namespace videocore { } } void AsyncStreamSession::innerWriteData(){ - assert(m_socketJob.thisThreadInQueue()); + // assert(m_socketJob.thisThreadInQueue()); if (m_stream->status() & kStreamStatusWriteBufferHasSpace) { auto writer = getCurrentWriter(); @@ -397,7 +397,7 @@ namespace videocore { } void AsyncStreamSession::finishCurrentWriter() { - assert(m_socketJob.thisThreadInQueue()); + // assert(m_socketJob.thisThreadInQueue()); auto currentWriter = getCurrentWriter(); assert(currentWriter); From 6e26be86bef4b788fd3efc4c7c0553ed837c926f Mon Sep 17 00:00:00 2001 From: Doron Adler Date: Mon, 30 Nov 2015 16:05:24 +0200 Subject: [PATCH 2/3] Only noticed this now: // If >= iOS 8.0 use the VideoToolbox encoder that does not write to disk. --- .../SampleBroadcaster.xcodeproj/project.pbxproj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sample/SampleBroadcaster/SampleBroadcaster.xcodeproj/project.pbxproj b/sample/SampleBroadcaster/SampleBroadcaster.xcodeproj/project.pbxproj index 96e29116..d7d2421d 100644 --- a/sample/SampleBroadcaster/SampleBroadcaster.xcodeproj/project.pbxproj +++ b/sample/SampleBroadcaster/SampleBroadcaster.xcodeproj/project.pbxproj @@ -432,7 +432,7 @@ "$(inherited)", /Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/include, ); - IPHONEOS_DEPLOYMENT_TARGET = 8.0; + IPHONEOS_DEPLOYMENT_TARGET = 7.1; ONLY_ACTIVE_ARCH = YES; SDKROOT = iphoneos; TARGETED_DEVICE_FAMILY = "1,2"; @@ -471,7 +471,7 @@ "$(inherited)", /Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/include, ); - IPHONEOS_DEPLOYMENT_TARGET = 8.0; + IPHONEOS_DEPLOYMENT_TARGET = 7.1; SDKROOT = iphoneos; TARGETED_DEVICE_FAMILY = "1,2"; VALIDATE_PRODUCT = YES; From d6b9106ca3feb6d0d59919ed150dd8a98307276d Mon Sep 17 00:00:00 2001 From: Doron Adler Date: Wed, 30 Dec 2015 16:28:47 +0200 Subject: [PATCH 3/3] Removed unused AsyncStreamSession.cpp --- stream/AsyncStreamSession.cpp | 413 ---------------------------------- 1 file changed, 413 deletions(-) delete mode 100644 stream/AsyncStreamSession.cpp diff --git a/stream/AsyncStreamSession.cpp b/stream/AsyncStreamSession.cpp deleted file mode 100644 index 03dc9627..00000000 --- a/stream/AsyncStreamSession.cpp +++ /dev/null @@ -1,413 +0,0 @@ -// -// AnsyncStreamSession.cpp -// Pods -// -// Created by jinchu darwin on 15/9/29. -// -// - -#include - -#ifndef DLOG_LEVEL_DEF -#define DLOG_LEVEL_DEF DLOG_LEVEL_VERBOSE -#endif -#include - -#include -#include - -namespace videocore { -#pragma mark - -#pragma mark AsyncStreamReader - /** - * Reader object - * Reader object represent a read request for specail length or other (TODO) - * Caller can provide buffer, read object maybe adjust the buffer size for read - * If not provite, reader object will create the buffer, and manager it. - */ - class AsyncStreamReader { - public: - /** - * Ctor read specail length - * Read is asynchronous, when the the data read enough, it is trigger the event callback - */ - AsyncStreamReader(size_t length, AsyncStreamBufferSP buf, size_t offset, SSAnsyncReadCallBack_T eventcb){ - m_bytesDone = 0; - - if (buf) { - m_buffer = buf; - m_startOffset = offset; - m_bufferOwner = true; - } - else { - m_buffer = std::make_shared(length); - m_startOffset = 0; - m_bufferOwner = false; - } - - m_readLength = length; - m_eventCallback = eventcb; - } - - AsyncStreamReader() { - DLogVerbose("AsyncReadConsumer::~AsyncReadConsumer\n"); - } - - /** - * Read data from pre-allocated buffer - * - * @param abuff When the buffer was read, it will change it's status. - */ - void readFromBuffer(PreallocBuffer &abuff) { - size_t availableBytes = abuff.availableBytes(); - size_t bytesToCopy = readLengthIfAvailable(availableBytes); - resizeForRead(bytesToCopy); - memcpy(&(*m_buffer)[m_startOffset+m_bytesDone], abuff.readBuffer(), bytesToCopy); - abuff.didRead(bytesToCopy); - - m_bytesDone += bytesToCopy; - } - - /** - * Read from stream - * - * @return Result of stream read method. - */ - ssize_t readFromStream(IStreamSession *stream) { - size_t bytesToRead = readLengthRemains(); - resizeForRead(bytesToRead); - ssize_t result = stream->read(&(*m_buffer)[m_startOffset+m_bytesDone], bytesToRead); - if (result > 0) { - m_bytesDone += result; - } - return result; - } - - bool done() { - return m_readLength == m_bytesDone; - } - - void triggerEvent() { - if (m_eventCallback) { - // TODO: check the diference owned buffer or not? - m_eventCallback(*m_buffer); - } - } - - private: - size_t readLengthIfAvailable(size_t bytesAvailable) { - return std::min(bytesAvailable, readLengthRemains()); - } - - size_t readLengthRemains() { - return m_readLength - m_bytesDone; - } - - void resizeForRead(size_t bytesToRead) { - size_t buffSize = m_buffer->size(); - size_t buffUsed = m_startOffset + m_bytesDone; - - size_t buffSpace = buffSize - buffUsed; - - if (bytesToRead > buffSpace) - { - size_t buffInc = bytesToRead - buffSpace; - m_buffer->resize(buffInc); - } - } - - private: - AsyncStreamBufferSP m_buffer; - size_t m_startOffset; - size_t m_bytesDone; - size_t m_readLength; - bool m_bufferOwner; - SSAnsyncReadCallBack_T m_eventCallback; - }; -#pragma mark - -#pragma mark AsyncStreamWriter - class AsyncStreamWriter { - public: - AsyncStreamWriter(AsyncStreamBufferSP buf, SSAnsyncWriteCallBack_T writecb = nullptr) { - m_bytesDone = 0; - m_buffer = buf; - m_eventCallback = writecb; - } - AsyncStreamWriter() { - DLog("AnsyncStreamWriter::~AnsyncStreamWriter\n"); - } - - ssize_t writeToStream(IStreamSession *stream) { - size_t bytesToWrite = m_buffer->size() - m_bytesDone; - assert(bytesToWrite > 0); - ssize_t result = stream->write(&(*m_buffer)[m_bytesDone], bytesToWrite); - if (result > 0) { - m_bytesDone += result; - } - return result; - } - bool done() { - return m_bytesDone == m_buffer->size(); - } - void triggerEvent() { - if (m_eventCallback) { - m_eventCallback(); - } - } - - private: - AsyncStreamBufferSP m_buffer; - size_t m_bytesDone; - SSAnsyncWriteCallBack_T m_eventCallback; - }; - -#pragma mark - -#pragma mark AnsyncStreamSession - AsyncStreamSession::AsyncStreamSession(IStreamSession *stream) - : m_state(kAsyncStreamStateNone) - , m_inputBuffer(4*1024) - , m_socketJob("AnsyncStreamSession Socket") - , m_eventTriggerJob("AnsyncStreamSession Event Trigger") - , m_doReadingData(false) - { - m_stream.reset(stream); - } - AsyncStreamSession::~AsyncStreamSession(){ - m_socketJob.mark_exiting(); - m_socketJob.enqueue_sync([=]{}); - DLogVerbose("AsyncStreamSession::~AsyncStreamSession\n"); - } - - void AsyncStreamSession::connect(const std::string& host, int port, SSConnectionStatus_T statuscb) { - m_connectionStatusCB = statuscb; - m_state = kAsyncStreamStateConnecting; - m_stream->connect(host, port, [=](IStreamSession& session, StreamStatus_T status){ - m_socketJob.enqueue([=]{ - // make sure set connected status only one time - if (status & kStreamStatusConnected && m_state < kAsyncStreamStateConnected) { - DLogDebug("AnsyncStreamSession connected\n"); - setState(kAsyncStreamStateConnected); - doWriteData(); - doReadData(); - } - if (status & kStreamStatusWriteBufferHasSpace) { - DLogDebug("AnsyncStreamSession Write ready\n"); - doWriteData(); - } - if (status & kStreamStatusReadBufferHasBytes) { - DLogDebug("AnsyncStreamSession Read ready\n"); - doReadData(); - } - if(status & kStreamStatusErrorEncountered) { - DLogDebug("AnsyncStreamSession Error!\n"); - setState(kAsyncStreamStateError); - } - if (status & kStreamStatusEndStream) { - DLogDebug("AnsyncStreamSession end.\n"); - setState(kAsyncStreamStateDisconnected); - } - }); - }); - } - - void AsyncStreamSession::disconnect(){ - DLogDebug("Disconnecting\n"); - setState(kAsyncStreamStateDisconnecting); - m_stream->disconnect(); - } - - void AsyncStreamSession::write(uint8_t *buffer, size_t length, SSAnsyncWriteCallBack_T writecb){ - // make a reference for the lamdba function - auto bufref = std::make_shared(buffer, buffer+length); - m_socketJob.enqueue([=]{ - auto writer = std::make_shared(bufref, writecb); - m_writerQueue.push(writer); - DLogVerbose("Queue write request length:%zd\n", length); - doWriteData(); - }); - } - - void AsyncStreamSession::readLength(size_t length, SSAnsyncReadCallBack_T readcb){ - this->readLength(length, nullptr, 0, readcb); - } - - void AsyncStreamSession::readLength(size_t length, AsyncStreamBufferSP orgbuf, size_t offset, SSAnsyncReadCallBack_T readcb){ - //assert(!m_socketJob.thisThreadInQueue()); - - m_socketJob.enqueue([=]{ - DLogVerbose("Queue read request length:%zd\n", length); - auto reader = std::make_shared(length, orgbuf, offset, readcb); - m_readerQueue.push(reader); - doReadData(); - }); - - } - -#pragma mark - -#pragma mark - Private - void AsyncStreamSession::setState(AsyncStreamState_T state) { - // assert(m_socketJob.thisThreadInQueue()); - - m_state = state; - m_eventTriggerJob.enqueue([=]{ - m_connectionStatusCB(m_state); - }); - } - -#pragma mark - -#pragma mark - Reading - std::shared_ptr AsyncStreamSession::getCurrentReader() { - // assert(m_socketJob.thisThreadInQueue()); - - if (m_readerQueue.size() > 0) { - return m_readerQueue.front(); - } - return nullptr; - } - - void AsyncStreamSession::doReadData(){ - // assert(m_socketJob.thisThreadInQueue()); - - // Just in case logical error - if (m_doReadingData) { - DLogError("Already reading\n"); - return ; - } - m_doReadingData = true; - - DLogVerbose("Do read data\n"); - while (innnerReadData()) { - ; - } - m_doReadingData = false; - } - - bool AsyncStreamSession::innnerReadData(){ - // assert(m_socketJob.thisThreadInQueue()); - - auto currentReader = getCurrentReader(); - if (currentReader) { - // 从已读buffer中读取 - if (m_inputBuffer.availableBytes() > 0) { - DLogVerbose("Read from prebuffer\n"); - currentReader->readFromBuffer(m_inputBuffer); - }else { - DLogVerbose("No data in prebuffer\n"); - } - if (currentReader->done()) { - finishCurrentReader(); - return true; - } - if (m_stream->status() & kStreamStatusReadBufferHasBytes) { - // 从SOCKET读取 - DLogVerbose("Reading from stream\n"); - ssize_t result = currentReader->readFromStream(m_stream.get()); - if (result > 0) { - DLogVerbose("Read %ld bytes from stream\n", result); - } - else { - DLogVerbose("ERROR! Read from stream error:%ld\n", result); - } - } - else { - DLogVerbose("No data in current stream\n"); - } - if (currentReader->done()) { - finishCurrentReader(); - return true; - } - } - else { - DLogVerbose("No reader currently\n"); - if (m_stream->status() & kStreamStatusReadBufferHasBytes) { - // read the SOCKET into pre buffer ? - size_t availibleSize = m_inputBuffer.availableSpace(); - DLogVerbose("Try reading from stream\n"); - ssize_t result = m_stream->read(m_inputBuffer.writeBuffer(), availibleSize); - if (result > 0) { - m_inputBuffer.didWrite(result); - DLogVerbose("Read %ld bytes into prebuffer\n", result); - } - else { - DLogVerbose("ERROR! read from stream error:%ld\n", result); - } - } - else { - DLogVerbose("No data in current stream\n"); - } - } - return false; - } - - void AsyncStreamSession::finishCurrentReader(){ - // assert(m_socketJob.thisThreadInQueue()); - - auto currentReader = getCurrentReader(); - assert(currentReader); - - if (currentReader) { - DLogVerbose("Reader done\n"); - m_readerQueue.pop(); - m_eventTriggerJob.enqueue([=]{ - currentReader->triggerEvent(); - }); - } - } - - -#pragma mark - -#pragma mark - Writing - std::shared_ptr AsyncStreamSession::getCurrentWriter() { - // assert(m_socketJob.thisThreadInQueue()); - if (m_writerQueue.size() > 0) { - return m_writerQueue.front(); - } - return nullptr; - } - void AsyncStreamSession::doWriteData(){ - // assert(m_socketJob.thisThreadInQueue()); - - DLogVerbose("Do write data\n"); - while (m_stream->status() & kStreamStatusWriteBufferHasSpace && m_writerQueue.size() > 0) { - innerWriteData(); - } - } - void AsyncStreamSession::innerWriteData(){ - // assert(m_socketJob.thisThreadInQueue()); - - if (m_stream->status() & kStreamStatusWriteBufferHasSpace) { - auto writer = getCurrentWriter(); - if (writer) { - DLogVerbose("Writing data to stream\n"); - ssize_t result = writer->writeToStream(m_stream.get()); - if (result <= 0) { - DLogError("ERROR! Write to stream error:%ld\n", result); - } - else { - DLogVerbose("Wrote %ld bytes to stream\n", result); - } - if (writer->done()) { - finishCurrentWriter(); - } - } - } - else { - DLogVerbose("Stream can't write now\n"); - } - } - - void AsyncStreamSession::finishCurrentWriter() { - // assert(m_socketJob.thisThreadInQueue()); - - auto currentWriter = getCurrentWriter(); - assert(currentWriter); - - if (currentWriter) { - m_writerQueue.pop(); - DLogVerbose("Writer done\n"); - m_eventTriggerJob.enqueue([=]{ - currentWriter->triggerEvent(); - }); - } - } -} \ No newline at end of file