Code
Explorer
You are a developer and looking for Shopware projects?
Apply Now!
isFirst example
$context
->
replaceRequest
(
$method
,
$url
,
$options
)
;
if
(
$isTimeout
)
{
yield
$chunk
;
}
else
{
$context
->
pause
(
$state
->reconnectionTime
)
;
}
}
return
;
}
if
(
$chunk
->
isFirst
(
)
)
{
if
(
preg_match
(
'/^text\/event-stream(;|$)/i',
$context
->
getHeaders
(
)
[
'content-type'
]
[
0
]
?? ''
)
)
{
$state
->buffer = '';
}
elseif
(
null !==
$lastError
||
(
null !==
$state
->buffer && 200 ===
$context
->
getStatusCode
(
)
)
)
{
throw
new
EventSourceException
(
sprintf
(
'Response content-type is "%s" while "text/event-stream" was expected for "%s".',
$context
->
getHeaders
(
)
[
'content-type'
]
[
0
]
?? '',
$context
->
getInfo
(
'url'
)
)
)
;
}
else
{
$context
->
passthru
(
)
;
}
if
(
null ===
$lastError
)
{
yield
$chunk
;
}
catch
(
ClientExceptionInterface
)
{
}
$this
->
assertSame
(
404,
$response
->
getStatusCode
(
)
)
;
$this
->
assertSame
(
[
'application/json'
]
,
$response
->
getHeaders
(
false
)
[
'content-type'
]
)
;
$this
->
assertNotEmpty
(
$response
->
getContent
(
false
)
)
;
$response
=
$client
->
request
(
'GET', 'http://localhost:8057/404'
)
;
try
{
foreach
(
$client
->
stream
(
$response
)
as
$chunk
)
{
$this
->
assertTrue
(
$chunk
->
isFirst
(
)
)
;
}
$this
->
fail
(
ClientExceptionInterface::
class
.' expected'
)
;
}
catch
(
ClientExceptionInterface
)
{
}
}
public
function
testIgnoreErrors
(
)
{
$client
=
$this
->
getHttpClient
(
__FUNCTION__
)
;
$response
=
$client
->
request
(
'GET', 'http://localhost:8057/404'
)
;
abstract
protected
function
close
(
)
: void;
private
static
function
initialize
(
self
$response
)
: void
{
if
(
null !==
$response
->
getInfo
(
'error'
)
)
{
throw
new
TransportException
(
$response
->
getInfo
(
'error'
)
)
;
}
try
{
if
(
(
$response
->initializer
)
(
$response
, -0.0
)
)
{
foreach
(
self::
stream
(
[
$response
]
, -0.0
)
as
$chunk
)
{
if
(
$chunk
->
isFirst
(
)
)
{
break
;
}
}
}
}
catch
(
\Throwable
$e
)
{
// Persist timeouts thrown during initialization
$response
->info
[
'error'
]
=
$e
->
getMessage
(
)
;
$response
->
close
(
)
;
throw
$e
;
}
return
false;
}
public
function
stream_read
(
int
$count
)
: string|false
{
if
(
\
is_resource
(
$this
->content
)
)
{
// Empty the internal activity list
foreach
(
$this
->client->
stream
(
[
$this
->response
]
, 0
)
as
$chunk
)
{
try
{
if
(
!
$chunk
->
isTimeout
(
)
&&
$chunk
->
isFirst
(
)
)
{
$this
->response->
getStatusCode
(
)
; // ignore 3/4/5xx
}
}
catch
(
ExceptionInterface
$e
)
{
trigger_error
(
$e
->
getMessage
(
)
, \E_USER_WARNING
)
;
return
false;
}
}
if
(
0 !==
fseek
(
$this
->content,
$this
->offset ?? 0
)
)
{
return
false;
}
if
(
HttpClient::
create
(
)
instanceof NativeHttpClient
)
{
$this
->
markTestSkipped
(
'NativeHttpClient doesn\'t support opening concurrent requests.'
)
;
}
HttpClientTestCase::
testTimeoutOnDestruct
(
)
;
}
public
function
testRetry404
(
)
{
$client
=
$this
->
getHttpClient
(
__FUNCTION__,
function
DChunkInterface
$chunk
, AsyncContext
$context
)
{
$this
->
assertTrue
(
$chunk
->
isFirst
(
)
)
;
$this
->
assertSame
(
404,
$context
->
getStatusCode
(
)
)
;
$context
->
getResponse
(
)
->
cancel
(
)
;
$context
->
replaceRequest
(
'GET', 'http://localhost:8057/'
)
;
$context
->
passthru
(
)
;
}
)
;
$response
=
$client
->
request
(
'GET', 'http://localhost:8057/404'
)
;
foreach
(
$client
->
stream
(
$response
)
as
$chunk
)
{
}
$this
->
assertTrue
(
$chunk
->
isLast
(
)
)
;
$this
->response =
$client
->
request
(
$method
,
$url
,
[
'buffer' => false
]
+
$options
)
;
$this
->passthru =
$passthru
;
$this
->initializer =
static
function
Dself
$response
, float
$timeout
= null
)
{
if
(
null ===
$response
->shouldBuffer
)
{
return
false;
}
while
(
true
)
{
foreach
(
self::
stream
(
[
$response
]
,
$timeout
)
as
$chunk
)
{
if
(
$chunk
->
isTimeout
(
)
&&
$response
->passthru
)
{
foreach
(
self::
passthru
(
$response
->client,
$response
,
new
ErrorChunk
(
$response
->offset,
new
TransportException
(
$chunk
->
getError
(
)
)
)
)
as
$chunk
)
{
if
(
$chunk
->
isFirst
(
)
)
{
return
false;
}
}
continue
2;
}
if
(
$chunk
->
isFirst
(
)
)
{
return
false;
}
}
$client
=
new
RetryableHttpClient
(
new
MockHttpClient
(
[
new
MockResponse
(
'',
[
'http_code' => 500
]
)
,
]
)
,
new
GenericRetryStrategy
(
[
500
]
, 0
)
,
0
)
;
$response
=
$client
->
request
(
'GET', 'http://example.com/foo-bar'
)
;
foreach
(
$client
->
stream
(
$response
)
as
$chunk
)
{
if
(
$chunk
->
isFirst
(
)
)
{
self::
assertSame
(
500,
$response
->
getStatusCode
(
)
)
;
}
}
}
public
function
testRetryWithDnsIssue
(
)
{
$client
=
new
RetryableHttpClient
(
new
NativeHttpClient
(
)
,
new
class
(
GenericRetryStrategy::DEFAULT_RETRY_STATUS_CODES, 0
)
extends
GenericRetryStrategy
{
public
function
shouldRetry
(
AsyncContext
$context
, ?string
$responseContent
, ?TransportExceptionInterface
$exception
)
: ?bool
{
$startTime
=
hrtime
(
true
)
/ 1E9;
$idleTimeout
=
max
(
0.0,
min
(
$maxDuration
/ 5,
$idleTimeout
??
$maxDuration
)
)
;
}
do
{
foreach
(
$this
->client->
stream
(
$this
->promisePool,
$idleTimeout
)
as
$response
=>
$chunk
)
{
try
{
if
(
null !==
$maxDuration
&&
$chunk
->
isTimeout
(
)
)
{
goto
check_duration;
}
if
(
$chunk
->
isFirst
(
)
)
{
// Deactivate throwing on 3/4/5xx
$response
->
getStatusCode
(
)
;
}
if
(
!
$chunk
->
isLast
(
)
)
{
goto
check_duration;
}
if
(
[
,
$promise
]
=
$this
->promisePool
[
$response
]
?? null
)
{
unset
(
$this
->promisePool
[
$response
]
)
;
$promise
->
resolve
(
$this
->
createPsr7Response
(
$response
, true
)
)
;
}
$shouldRetry
=
$this
->strategy->
shouldRetry
(
$context
, null,
$exception
)
;
if
(
null ===
$shouldRetry
)
{
throw
new
\
LogicException
(
sprintf
(
'The "%s::shouldRetry()" method must not return null when called with an exception.',
$this
->strategy::
class
)
)
;
}
if
(
false ===
$shouldRetry
)
{
yield from
$this
->
passthru
(
$context
,
$firstChunk
,
$content
,
$chunk
)
;
return
;
}
}
}
elseif
(
$chunk
->
isFirst
(
)
)
{
if
(
false ===
$shouldRetry
=
$this
->strategy->
shouldRetry
(
$context
, null, null
)
)
{
yield from
$this
->
passthru
(
$context
,
$firstChunk
,
$content
,
$chunk
)
;
return
;
}
// Body is needed to decide
if
(
null ===
$shouldRetry
)
{
$firstChunk
=
$chunk
;
$content
= '';
]
,
]
,
]
)
;
}
return
$mediaIds
;
}
private
function
processChunk
(
array &
$pool
, ResponseInterface
$response
, ChunkInterface
$chunk
, array &
$mediaIds
, array &
$subtitlesVideoIds
, array &
$subtitlesMediaIds
)
: void
{
if
(
$chunk
->
isFirst
(
)
)
{
$response
->
getStatusCode
(
)
; // skip non-2xx status codes
}
if
(
!
$chunk
->
isLast
(
)
)
{
return
;
}
if
(
400 <=
$response
->
getStatusCode
(
)
)
{
$error
=
$response
->
toArray
(
false
)
;
throw
new
TransportException
(
$error
[
'errors'
]
[
0
]
[
'message'
]
??
(
$error
[
'request'
]
.': '.
$error
[
'error'
]
)
,
$response
,
$error
[
'errors'
]
[
0
]
[
'code'
]
?? 0
)
;
}
$this
->
wait
(
true
)
;
}
private
function
wait
(
bool
$blocking
)
: void
{
foreach
(
$this
->client->
stream
(
$this
->responses,
$blocking
? null : 0.0
)
as
$response
=>
$chunk
)
{
try
{
if
(
$chunk
->
isTimeout
(
)
&& !
$blocking
)
{
continue
;
}
if
(
!
$chunk
->
isFirst
(
)
&& !
$chunk
->
isLast
(
)
)
{
continue
;
}
if
(
$chunk
->
isLast
(
)
)
{
$this
->responses->
detach
(
$response
)
;
}
}
catch
(
ExceptionInterface
$e
)
{
$this
->responses->
detach
(
$response
)
;
error_log
(
sprintf
(
"Could not push logs to Elasticsearch:\n%s",
(string)
$e
)
)
;
}
}
}
}