Code
Explorer
You are a developer and looking for Shopware projects?
Apply Now!
isLast example
if
(
!
$client
|| !
$wrappedResponses
)
{
return
;
}
foreach
(
$client
->
stream
(
$wrappedResponses
,
$timeout
)
as
$response
=>
$chunk
)
{
$r
=
$asyncMap
[
$response
]
;
if
(
null ===
$chunk
->
getError
(
)
)
{
if
(
$chunk
->
isFirst
(
)
)
{
// Ensure no exception is thrown on destruct for the wrapped response
$r
->response->
getStatusCode
(
)
;
}
elseif
(
0 ===
$r
->offset && null ===
$r
->content &&
$chunk
->
isLast
(
)
)
{
$r
->content =
fopen
(
'php://memory', 'w+'
)
;
}
}
if
(
!
$r
->passthru
)
{
if
(
null !==
$chunk
->
getError
(
)
||
$chunk
->
isLast
(
)
)
{
unset
(
$asyncMap
[
$response
]
)
;
}
elseif
(
null !==
$r
->content && ''
!==
(
$content
=
$chunk
->
getContent
(
)
)
&& \
strlen
(
$content
)
!==
fwrite
(
$r
->content,
$content
)
)
{
$chunk
=
new
ErrorChunk
(
$r
->offset,
new
TransportException
(
sprintf
(
'Failed writing %d bytes to the response buffer.', \
strlen
(
$content
)
)
)
)
;
$r
->info
[
'error'
]
=
$chunk
->
getError
(
)
;
$r
->response->
cancel
(
)
;
}
return
$mediaIds
;
}
private
function
processChunk
(
array &
$pool
, ResponseInterface
$response
, ChunkInterface
$chunk
, array &
$mediaIds
, array &
$subtitlesVideoIds
, array &
$subtitlesMediaIds
)
: void
{
if
(
$chunk
->
isFirst
(
)
)
{
$response
->
getStatusCode
(
)
; // skip non-2xx status codes
}
if
(
!
$chunk
->
isLast
(
)
)
{
return
;
}
if
(
400 <=
$response
->
getStatusCode
(
)
)
{
$error
=
$response
->
toArray
(
false
)
;
throw
new
TransportException
(
$error
[
'errors'
]
[
0
]
[
'message'
]
??
(
$error
[
'request'
]
.': '.
$error
[
'error'
]
)
,
$response
,
$error
[
'errors'
]
[
0
]
[
'code'
]
?? 0
)
;
}
[
$i
,
$mediaId
,
$seq
,
$h
,
$alt
,
$subtitles
]
=
$response
->
getInfo
(
'user_data'
)
;
unset
(
$pool
[
$i
]
)
;
$this
->
expectExceptionMessage
(
'Response has been canceled'
)
;
while
(
$res
)
{
if
(
$i
> 0
)
{
$res
->
cancel
(
)
;
}
foreach
(
$es
->
stream
(
$res
)
as
$chunk
)
{
if
(
$chunk
->
isTimeout
(
)
)
{
continue
;
}
if
(
$chunk
->
isLast
(
)
)
{
continue
;
}
$this
->
assertEquals
(
$expected
[
$i
++
]
,
$chunk
)
;
}
}
}
public
function
testPostServerSentEvents
(
)
{
$chunk
=
new
DataChunk
(
0, ''
)
;
if
(
null ===
$lastError
)
{
yield
$chunk
;
}
return
;
}
$rx
= '/((?:\r\n|[\r\n]){2,})/';
$content
=
$state
->buffer.
$chunk
->
getContent
(
)
;
if
(
$chunk
->
isLast
(
)
)
{
$rx
=
substr_replace
(
$rx
, '|$', -2, 0
)
;
}
$events
=
preg_split
(
$rx
,
$content
, -1, \PREG_SPLIT_DELIM_CAPTURE
)
;
$state
->buffer =
array_pop
(
$events
)
;
for
(
$i
= 0;
isset
(
$events
[
$i
]
)
;
$i
+= 2
)
{
$event
=
new
ServerSentEvent
(
$events
[
$i
]
.
$events
[
1 +
$i
]
)
;
if
(
'' !==
$event
->
getId
(
)
)
{
$context
->
setInfo
(
'last_event_id',
$state
->lastEventId =
$event
->
getId
(
)
)
;
}
return
;
}
// Body is needed to decide
if
(
null ===
$shouldRetry
)
{
$firstChunk
=
$chunk
;
$content
= '';
return
;
}
}
else
{
if
(
!
$chunk
->
isLast
(
)
)
{
$content
.=
$chunk
->
getContent
(
)
;
return
;
}
if
(
null ===
$shouldRetry
=
$this
->strategy->
shouldRetry
(
$context
,
$content
, null
)
)
{
throw
new
\
LogicException
(
sprintf
(
'The "%s::shouldRetry()" method must not return null when called with a body.',
$this
->strategy::
class
)
)
;
}
if
(
false ===
$shouldRetry
)
{
yield from
$this
->
passthru
(
$context
,
$firstChunk
,
$content
,
$chunk
)
;
$this
->
wait
(
true
)
;
}
private
function
wait
(
bool
$blocking
)
: void
{
foreach
(
$this
->client->
stream
(
$this
->responses,
$blocking
? null : 0.0
)
as
$response
=>
$chunk
)
{
try
{
if
(
$chunk
->
isTimeout
(
)
&& !
$blocking
)
{
continue
;
}
if
(
!
$chunk
->
isFirst
(
)
&& !
$chunk
->
isLast
(
)
)
{
continue
;
}
if
(
$chunk
->
isLast
(
)
)
{
$this
->responses->
detach
(
$response
)
;
}
}
catch
(
ExceptionInterface
$e
)
{
$this
->responses->
detach
(
$response
)
;
error_log
(
sprintf
(
"Could not push logs to Elasticsearch:\n%s",
(string)
$e
)
)
;
}
}
}
}
self::
initialize
(
$this
)
;
}
if
(
$throw
)
{
$this
->
checkStatusCode
(
)
;
}
if
(
null ===
$this
->content
)
{
$content
= null;
foreach
(
self::
stream
(
[
$this
]
)
as
$chunk
)
{
if
(
!
$chunk
->
isLast
(
)
)
{
$content
.=
$chunk
->
getContent
(
)
;
}
}
if
(
null !==
$content
)
{
return
$content
;
}
if
(
null ===
$this
->content
)
{
throw
new
TransportException
(
'Cannot get the content of the response twice: buffering is disabled.'
)
;
}
}
foreach
(
$this
->client->
stream
(
[
$this
->response
]
,
$this
->blocking ?
$this
->timeout : 0
)
as
$chunk
)
{
try
{
$this
->eof = true;
$this
->eof = !
$chunk
->
isTimeout
(
)
;
if
(
!
$this
->eof && !
$this
->blocking
)
{
return
'';
}
$this
->eof =
$chunk
->
isLast
(
)
;
if
(
$chunk
->
isFirst
(
)
)
{
$this
->response->
getStatusCode
(
)
; // ignore 3/4/5xx
}
if
(
'' !==
$data
=
$chunk
->
getContent
(
)
)
{
if
(
\
strlen
(
$data
)
>
$count
)
{
$this
->content ??=
substr
(
$data
,
$count
)
;
$data
=
substr
(
$data
, 0,
$count
)
;
}
$this
->offset += \
strlen
(
$data
)
;
$traceableMap
[
$r
->response
]
=
$r
;
$wrappedResponses
[
]
=
$r
->response;
if
(
$r
->event && !
$r
->event->
isStarted
(
)
)
{
$r
->event->
start
(
)
;
}
}
foreach
(
$client
->
stream
(
$wrappedResponses
,
$timeout
)
as
$r
=>
$chunk
)
{
if
(
$traceableMap
[
$r
]
->event &&
$traceableMap
[
$r
]
->event->
isStarted
(
)
)
{
try
{
if
(
$chunk
->
isTimeout
(
)
|| !
$chunk
->
isLast
(
)
)
{
$traceableMap
[
$r
]
->event->
lap
(
)
;
}
else
{
$traceableMap
[
$r
]
->event->
stop
(
)
;
}
}
catch
(
TransportExceptionInterface
$e
)
{
$traceableMap
[
$r
]
->event->
stop
(
)
;
if
(
$chunk
instanceof ErrorChunk
)
{
$chunk
->
didThrow
(
false
)
;
}
else
{
$chunk
=
new
ErrorChunk
(
$chunk
->
getOffset
(
)
,
$e
)
;
}
}
public
function
testStream
(
)
{
$client
=
$this
->
getHttpClient
(
__FUNCTION__
)
;
$response
=
$client
->
request
(
'GET', 'http://localhost:8057'
)
;
$chunks
=
$client
->
stream
(
$response
)
;
$result
=
[
]
;
foreach
(
$chunks
as
$r
=>
$chunk
)
{
if
(
$chunk
->
isTimeout
(
)
)
{
$result
[
]
= 't';
}
elseif
(
$chunk
->
isLast
(
)
)
{
$result
[
]
= 'l';
}
elseif
(
$chunk
->
isFirst
(
)
)
{
$result
[
]
= 'f';
}
}
$this
->
assertSame
(
$response
,
$r
)
;
$this
->
assertSame
(
[
'f', 'l'
]
,
$result
)
;
$chunk
= null;
$i
= 0;
$this
->
assertTrue
(
$chunk
->
isFirst
(
)
)
;
$this
->
assertSame
(
404,
$context
->
getStatusCode
(
)
)
;
$context
->
getResponse
(
)
->
cancel
(
)
;
$context
->
replaceRequest
(
'GET', 'http://localhost:8057/'
)
;
$context
->
passthru
(
)
;
}
)
;
$response
=
$client
->
request
(
'GET', 'http://localhost:8057/404'
)
;
foreach
(
$client
->
stream
(
$response
)
as
$chunk
)
{
}
$this
->
assertTrue
(
$chunk
->
isLast
(
)
)
;
$this
->
assertSame
(
200,
$response
->
getStatusCode
(
)
)
;
}
public
function
testRetry404WithThrow
(
)
{
$client
=
$this
->
getHttpClient
(
__FUNCTION__,
function
DChunkInterface
$chunk
, AsyncContext
$context
)
{
$this
->
assertTrue
(
$chunk
->
isFirst
(
)
)
;
$this
->
assertSame
(
404,
$context
->
getStatusCode
(
)
)
;
$context
->
getResponse
(
)
->
cancel
(
)
;
$context
->
replaceRequest
(
'GET', 'http://localhost:8057/404'
)
;
$context
->
passthru
(
)
;
}
)
;
foreach
(
$this
->client->
stream
(
$this
->promisePool,
$idleTimeout
)
as
$response
=>
$chunk
)
{
try
{
if
(
null !==
$maxDuration
&&
$chunk
->
isTimeout
(
)
)
{
goto
check_duration;
}
if
(
$chunk
->
isFirst
(
)
)
{
// Deactivate throwing on 3/4/5xx
$response
->
getStatusCode
(
)
;
}
if
(
!
$chunk
->
isLast
(
)
)
{
goto
check_duration;
}
if
(
[
,
$promise
]
=
$this
->promisePool
[
$response
]
?? null
)
{
unset
(
$this
->promisePool
[
$response
]
)
;
$promise
->
resolve
(
$this
->
createPsr7Response
(
$response
, true
)
)
;
}
}
catch
(
\Exception
$e
)
{
if
(
[
$request
,
$promise
]
=
$this
->promisePool
[
$response
]
?? null
)
{
unset
(
$this
->promisePool
[
$response
]
)
;