....................................../////.===Shadow-Here===./////................................................ > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < ------------------------------------------------------------------------------------------------------------------- /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// RIFF¤ WEBPVP8 ˜ ðÑ *ôô>‘HŸK¥¤"§£±¨àð enü¹%½_F‘åè¿2ºQú³íªú`N¿­3ÿƒügµJžaÿ¯ÿ°~¼ÎùnúîÞÖô•òíôÁÉß®Sm¥Ü/ ‡ó˜f£Ùà<˜„xëJ¢Ù€SO3x<ªÔ©4¿+ç¶A`q@Ì“Úñè™ÍÿJÌ´ª-˜ÆtÊÛL]Ïq*‘Ý”ì#ŸÌÏãY]@ê`¿ /ªfkØB4·®£ó z—Üw¥Pxù–ÞLШKÇN¾AkÙTf½è'‰g gÆv›Øuh~ a˜Z— ïj*á¥t d£“uÒ ¨`K˜¹ßþ]b>˜]_ÏÔ6W—è2r4x•íÖ…"ƒÖNîä!¦å Ú}ýxGøÌ —@ ;ÆÚŠ=ɾ1ý8lªË¥ô ^yf®Œ¢u&2©nÙÇ›ñÂñŒ³ aPo['½»øFùà­+4ê“$!lövlüÞ=;N®3ð‚õ›DÉKòÞ>ÄÍ ¥ˆuߤ#ˆ$6ù™¥îЇy’ÍB¼ çxÛ;X"WL£R÷͝*ó-¶Zu}º.s¸sšXqù–DþÿvªhüïwyŸ ¯é³lÀ:KCûÄ£Ëá\…­ ~—ýóî ¼ûûÜTÓüÇy…ŽÆvc»¾×U ñ¸žþоP÷¦ó:Ò¨¨5;Ð#&#ÖúñläÿÁœ GxÉ­/ñ‡áQðìYÉtÒw޼GÔ´zàÒò ð*ëzƒ•4~H]Ø‹f ñÓÈñ`NåWçs'ÆÏW^ø¹!XžµmQ5ÃËoLœÎ: ÞËÍ¥J ù…î èo£ßPÎñ¶ž8.Œ]ʵ~5›ÙË-ù*8ÙÖß±~ ©¹rÓê‚j¶d¸{^Q'˜±Crß ÚH—#¥¥QlÀ×ëã‡DÜ«èî þ&Çæžî;ŽÏºò6ÒLÃXy&ZŒ'j‚¢Ù€IßÚù+–MGi‰*jE€‘JcÜ ÓÌ EÏÚj]o˜ Þr <¾U ûŪæÍ/šÝH¥˜b”¼ ÁñßX GP›ï2›4WŠÏà×£…íÓk†¦H·ÅíMh–*nó÷à]ÁjCº€b7<ب‹¨5車bp2:Á[UªM„QŒçiNMa#<5›áËó¸HýÊ"…×Éw¹¦ì2º–x<›»a±¸3Weü®FÝ⑱ö–î–³|LPÈ~çð~Çå‡|º kD¢µÏàÆAI %1À% ¹Ò – ”ϝS¦‰4&¶£°à Öý”û_Ò Áw°A«Å€?mÇÛgHÉ/8)á¾ÛìáöŽP í¨PŸNÙµº¦‡§Ùš"ÿ«>+ªÕ`Ê÷‡‚ß Õû˜þãÇ-PÍ.¾XV‘€ dÜ"þ4¹ ±Oú‘©t¥¦FªÄÃÄ•b‚znýu½—#cDs˜ÃiÑOˆñ×QO=*IAÊ,¶ŽZƒ;‡wøXè%EÐk:F±Ú” .Ѽ+Áu&Ç`."pÈÉw o&¿dE6‘’EqTuK@Ì¥ã™À(Êk(h‰,H}RÀIXÛš3µ1©_OqÚÒJAñ$ÊÙÜ;D3çŒ[þùœh¬Ã³™ö6ç†NY".Ú‰ï[ªŸŒ '²Ð öø_¨ÂÉ9ué¶³ÒŠõTàîMØ#û¯gN‡bÙ놚X„ö …ÉeüÌ^J ‹€.œ$Æ)βÄeæW#óüßĺŸ€ ÀzwV 9oä»f4V*uB «Ë†¹ì¯žR霓æHXa=&“I4K;¯ç‹h×·"UŠ~<•╪Vêª&ÍSÃÆÅ?ÔqÎ*mTM ˜›µwêd#[C¡©§‘D<©àb†–ÁœøvH/,í:¯( ²£|4-„Æövv„Yͼ™^Á$ˆ„¢Û[6yB.åH*V¨æ?$=˜Ñ€•ñ·­(VlŸ‘ nÀt8W÷´Bûba?q9ú¶Xƒl«ÿ\ù¶’þòUÐj/õ¢Ìµ³g$ƒÎR!¸»|Oߍë’BhîÚÑ¢ñåŒJ„®„£2Ð3•ô02Nt…!£Í]Ïc½Qÿ?ˆ<&ÃA¾Ú,JˆijÌ#5yz„‰Î|ÊŽ5QÏ:‹ÐaóVÔxW—CpeÏzÐïíçôÿÅ_[hãsÐ_/ŽTÝ?BîˆííV$<¿i>²F¬_Eß¿ †bÊŒº­ÿ®Z H“C}”¬,Mp ý/Bá£w>˜YV°aƒúh+cŠ- r/[%|üUMHäQ°X»|û/@|°¥Ð !BÔ Ç¢Ä©š+Õì D«7ìN¶ŽðÔ " ƶ’ÖçtA‰Û×}{tþz­¾GÍ›k¹OEJR$ Â׃ «ëÁ"oÉôž$oUK(Ä)Ãz³Ê-‹êN[Ò3Œñbï8P 4ƒ×q¢bo|?<ÛX¬òÄͰL–±›(™ûG?ýË©ÚÄ–ÂDØÐ_Ç¡ô ¾–ÄÏø ×e8Ë©$ÄF¹Å‹ì[©óìl:F¾f´‹‹Xì²ï®\¬ôùƒ ÿat¥óèÒùHß0äe‚;ü×h:ÆWðHž=Ã8骣"kœ'Y?³}Tûè€>?0l›e1Lòñ„aæKÆw…hÖŠùW…ÈÆÄ0ši·›[pcwËþñiêíY/~-Á5˜!¿†A›™Mÿþ(±“t@â“ö2­´TG5yé]çå僳 .·ÍïçÝ7UÚ±Ð/Nè»,_Ï ùdj7\ï Wì4›„»c¸àešg#ÒÊ⥭áØo5‘?ÌdÝô¯ ¹kzsƒ=´#ëÉK›Ø´±-¥eW?‡çßtòTã…$Ý+qÿ±ƒ÷_3Ô¥í÷:æ–ž<·Ö‡‰Å¢ š‡%Ô—utÌÈìðžgÖÀz²À—ï÷Óîäõ{K'´È÷³yaÏÁjƒô}ž§®æÊydÕÈë5¯èˆõvÕ©ã*çD„ “z„Ó‡^^xÂ3M§A´JG‚öï 3W'ˆ.OvXè¡ÊÕª?5º7†˜(˜Ç¶#çê’¶!ÌdZK§æ 0fãaN]òY³RV ™î$®K2R¨`W!1Ôó\;Ý ýB%qæK•&ÓÈe9È0êI±žeŸß -ú@žQr¦ ö4»M¼Áè¹µmw 9 EÆE_°2ó„ŸXKWÁ×Hóì^´²GѝF©óäR†¦‰ç"V»eØ<3ùd3ÿÚ¤Žú“Gi" —‘_ÙËÎ~Üö¯¥½Î»üŸEÚŽåmÞþí ;ÞólËΦMzA"Âf(´òá;Éï(/7½ûñÌ­cïÕçлþÝz¾-ÍvÑ“pH­–ðÓj$¸Äû¤‚‘ãUBË-n“2åPkS5&‹Â|+g^œ®Ì͆d!OïäîU«c;{Û!ÅŽ«ëZ9Ókóˆ]¯ƒ›né `ÇÒ+tÆš (ØKá¾—=3œ®•vuMñg²\ï Ec€ 05±d™‡×iÇ×›UúvÌ¢£Èþ¡ÕØô¶ßÎA"ß±#Ö²ˆÊŸ¦*Ä~ij|àø.-¼'»Ú¥£h ofº¦‡VsR=N½„Î v˜Z*SÌ{=jÑB‹tê…;’HžH¯8–îDù8ñ¢|Q•bÛçš–‹m³“ê¨ åÏ^m¬Žãþ©ïêO‡½6] µÆ„Ooòü ²x}N¦Ë3ïé¿»€›HA˜m%çÞ/¿í7Fø“‹léUk)É°Œµ8Q8›:ÀŠeT*šõ~ôڝG6 ¢}`ùH­–”¡k ‰P1>š†®9z11!X wKfmÁ¦xÑ,N1Q”–æB¶M…ÒÃv6SMˆhU¬ÊPŽï‘öj=·CŒ¯u¹ƒVIЃsx4’ömÛýcå¡¶7ßŠß 57^\wÒÐÆ k§h,Œý î«q^R½3]J¸ÇðN ‚çU¬ôº^Áì} ³f©Õœ§ˆã:FÄÈ‚é(€™?àýÓüè1Gô£¼éj‚OÅñ  #>×—ßtà 0G¥Åa뀐kßhc™À_ÉñÞ#±)GD" YîäË-ÿÙ̪ ¹™a¯´¢E\ÝÒö‚;™„ë]_ p8‰o¡ñ+^÷ 3‘'dT4œŽ ðVë½° :¬víÑ«£tßÚS-3¶“þ2 †üüʨòrš¹M{É_¤`Û¨0ìjœøJ‡:÷ÃáZ˜†@GP&œÑDGÏs¡þ¦þDGú‘1Yá9Ôþ¼ ûø…§÷8&–ÜÑnÄ_m®^üÆ`;ÉVÁJ£?â€-ßê}suÍ2sõA NÌúA磸‘îÿÚ»ƒìö·á¿±tÑÐ"Tÿü˜[@/äj¬€uüªìù¥Ý˜á8Ý´sõj 8@rˆð äþZÇD®ÿUÏ2ùôõrBzÆÏÞž>Ì™xœ“ wiÎ×7_… ¸ \#€MɁV¶¥üÕÿPÔ9Z‡ø§É8#H:ƒ5ÀÝå9ÍIŒ5åKÙŠ÷qÄ>1AÈøžj"µÂд/ªnÀ qªã}"iŸBå˜ÓÛŽ¦…&ݧ;G@—³b¯“•"´4í¨ôM¨åñC‹ïùÉó¯ÓsSH2Ý@ßáM‡ˆKÀªÛUeø/4\gnm¥‹ŸŒ qÄ b9ÞwÒNÏ_4Ég³ú=܆‚´ •â¥õeíþkjz>éÚyU«Íӝ݃6"8/ø{=Ô¢»G¥ äUw°W«,ô—¿ãㆅү¢³xŠUû™yŒ (øSópÐ 9\åTâ»—*oG$/×ÍT†Y¿1¤Þ¢_‡ ¼ „±ÍçèSaÓ 3ÛMÁBkxs‰’R/¡¤ˆÙçª(*õ„üXÌ´ƒ E§´¬EF"Ù”R/ÐNyÆÂ^°?™6¡œïJ·±$§?º>ÖüœcNÌù¯G ‹ñ2ЁBB„^·úìaz¨k:#¨Æ¨8LÎõލ£^§S&cŒÐU€ü(‡F±Š¼&P>8ÙÁ ‰ p5?0ÊÆƒZl¸aô š¼¡}gÿ¶zÆC²¹¬ÎÖG*HB¡O<º2#ñŒAƒ–¡B˜´É$¥›É:FÀÔx¾u?XÜÏÓvN©RS{2ʈãk9rmP¼Qq̳ è¼ÐFׄ^¡Öì fE“F4A…!ì/…¦Lƒ… … $%´¾yã@CI¬ á—3PþBÏNÿ<ý°4Ü ËÃ#ØÍ~âW«rEñw‹eùMMHß²`¬Öó½íf³:‹k˜¯÷}Z!ã¿<¥,\#öµÀ¯aÒNÆIé,Ћ–lŽ#Àæ9ÀÒS·I’½-Ïp Äz¤Š Â* ­íÄ9­< h>׍3ZkËU¹§˜ŒŠ±f­’¤º³Q ÏB?‹#µíÃ¥®@(Gs«†vI¥Mµ‹Á©e~2ú³ÁP4ìÕi‚²Ê^ö@-DþÓàlÜOÍ]n"µã:žpsŽ¢:! Aõ.ç~ÓBûH÷JCÌ]õVƒd «ú´QÙEA–¯¯Œ!.ˆˆëQ±ù œ·Ì!Õâ )ùL„ÅÀlÚè5@B…o´Æ¸XÓ&Û…O«˜”_#‡ƒ„ûÈt!¤ÁÏ›ÎÝŠ?c9 â\>lÓÁVÄÑ™£eØY]:fÝ–—ù+p{™ðè û³”g±OƒÚSù£áÁÊ„ä,ï7š²G ÕÌBk)~ÑiCµ|h#u¤¶îK¨² #²vݯGãeÖ϶ú…¾múÀ¶þÔñ‚Š9'^($¤§ò “š½{éúp÷J›ušS¹áªCÂubÃH9™D™/ZöØÁ‡¦ÝÙŸ·kð*_”.C‹{áXó€‡c¡c€§/šò/&éš÷,àéJþ‰X›fµ“C¨œ®r¬"kL‰Â_q…Z–.ÉL~O µ›zn‚¹À¦Öª7\àHµšÖ %»ÇníV[¥*Õ;ƒ#½¾HK-ÖIÊdÏEÚ#=o÷Óò³´Š: Ç?{¾+9›–‘OEáU·S€˜j"ÄaÜ ŒÛWt› á–c#a»pÔZÞdŽtWê=9éöÊ¢µ~ ë ;Öe‡Œ®:bî3±ýê¢wà¼îpêñ¹¾4 zc¾ðÖÿzdêŒÑÒŝÀ‰s6¤í³ÎÙB¿OZ”+F¤á‡3@Ñëäg©·Ž ˆèª<ù@É{&S„œÕúÀA)‰h:YÀ5^ÂÓŒ°õäU\ ùËÍû#²?Xe¬tu‰^zÒÔãë¼ÛWtEtû …‚g¶Úüâî*moGè¨7%u!]PhÏd™Ý%Îx: VÒ¦ôÊD3ÀŽKÛËãvÆî…N¯ä>Eró–ð`5 Œ%u5XkñÌ*NU%¶áœÊ:Qÿú»“úzyÏ6å-၇¾ ´ ÒÊ]y žO‘w2Äøæ…H’²f±ÎÇ.ª|¥'gîV•Ü .̘¯€šòü¤U~Ù†*¢!?ò wý,}´°ÔÞnïoKq5µb!áÓ3"vAßH¡³¡·G(ÐÎ0Îò¼MG!/ài®@—¬04*`…«é8ªøøló“ˆÊ”èù¤…ßÊoÿé'ËuÌÖ5×È¡§ˆˆfŽë9}hìâ_!!¯  B&Ëö¶‰ÀAÙNVŸ Wh›¸®XÑJì¨ú“¿÷3uj²˜¨ÍÎìë±aúŠÝå¯ð*Ó¨ôJ“yºØ)m°WýOè68†ŸÏ2—‰Ïüꪫٚ¥‹l1 ø ÏÄFjêµvÌbü¦èÝx:X±¢H=MÐß—,ˆÉÇ´(9ú¾^ÅÚ4¿m‡$âX‘å%(AlZo@½¨UOÌÕ”1ø¸jÎÀÃÃ_ µ‘Ü.œº¦Ut: Æï’!=¯uwû#,“pþÇúŒø(é@?³ü¥‘Mo §—s@Œ#)§ŒùkL}NOÆêA›¸~r½¼ÙA—HJ«eˆÖ´*¡ÓpÌŸö.m<-"³ûÈ$¬_6­åf£ïÚâj1y§ÕJ½@dÞÁr&Í\Z%D£Íñ·AZ Û³øüd/ªAi†/Й~  ‡âĮҮÏh§°b—›Û«mJžòG'[ÈYýŒ¦9psl ýÁ ®±f¦x,‰½tN ‚Xª9 ÙÖH.«Lo0×?͹m¡å†Ѽ+›2ƒF ±Ê8 7Hցϓ²Æ–m9…òŸï]Â1äN†VLâCˆU .ÿ‰Ts +ÅÎx(%¦u]6AF Š ØF鈄‘ |¢¶c±soŒ/t[a¾–û:s·`i햍ê›ËchÈ…8ßÀUÜewŒðNOƒõD%q#éû\9¤x¹&UE×G¥ Í—™$ð E6-‡¼!ýpãÔM˜ Âsìe¯ñµK¢Ç¡ùôléœ4Ö£”À Š®Ðc ^¨À}ÙËŸ§›ºê{ÊuÉC ×Sr€¤’fÉ*j!úÓ’Gsùìoîßîn%ò· àc Wp÷$¨˜)û»H ×8ŽÒ€Zj¤3ÀÙºY'Ql¦py{-6íÔCeiØp‘‡XÊîÆUߢ܂ž£Xé¼Y8þ©ëgñß}é.ÎógÒ„ÃØËø¯»™§Xýy M%@NŠ À(~áÐvu7&•,Ù˜ó€uP‡^^®=_E„jt’ 403WebShell
403Webshell
Server IP : 198.54.126.4  /  Your IP : 216.73.216.178
Web Server : Apache
System : Linux host55.registrar-servers.com 4.18.0-513.18.1.lve.2.el8.x86_64 #1 SMP Sat Mar 30 15:36:11 UTC 2024 x86_64
User : aeaw ( 7508)
PHP Version : 8.1.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/pydantic/_internal/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/pydantic/_internal//_core_utils.py
from __future__ import annotations

import os
from collections import defaultdict
from typing import (
    Any,
    Callable,
    Hashable,
    TypeVar,
    Union,
    _GenericAlias,  # type: ignore
    cast,
)

from pydantic_core import CoreSchema, core_schema
from pydantic_core import validate_core_schema as _validate_core_schema
from typing_extensions import TypeAliasType, TypeGuard, get_args

from . import _repr

AnyFunctionSchema = Union[
    core_schema.AfterValidatorFunctionSchema,
    core_schema.BeforeValidatorFunctionSchema,
    core_schema.WrapValidatorFunctionSchema,
    core_schema.PlainValidatorFunctionSchema,
]


FunctionSchemaWithInnerSchema = Union[
    core_schema.AfterValidatorFunctionSchema,
    core_schema.BeforeValidatorFunctionSchema,
    core_schema.WrapValidatorFunctionSchema,
]

CoreSchemaField = Union[
    core_schema.ModelField, core_schema.DataclassField, core_schema.TypedDictField, core_schema.ComputedField
]
CoreSchemaOrField = Union[core_schema.CoreSchema, CoreSchemaField]

_CORE_SCHEMA_FIELD_TYPES = {'typed-dict-field', 'dataclass-field', 'model-field', 'computed-field'}
_FUNCTION_WITH_INNER_SCHEMA_TYPES = {'function-before', 'function-after', 'function-wrap'}
_LIST_LIKE_SCHEMA_WITH_ITEMS_TYPES = {'list', 'tuple-variable', 'set', 'frozenset'}

_DEFINITIONS_CACHE_METADATA_KEY = 'pydantic.definitions_cache'

NEEDS_APPLY_DISCRIMINATED_UNION_METADATA_KEY = 'pydantic.internal.needs_apply_discriminated_union'
"""Used to mark a schema that has a discriminated union that needs to be checked for validity at the end of
schema building because one of it's members refers to a definition that was not yet defined when the union
was first encountered.
"""
HAS_INVALID_SCHEMAS_METADATA_KEY = 'pydantic.internal.invalid'
"""Used to mark a schema that is invalid because it refers to a definition that was not yet defined when the
schema was first encountered.
"""


def is_core_schema(
    schema: CoreSchemaOrField,
) -> TypeGuard[CoreSchema]:
    return schema['type'] not in _CORE_SCHEMA_FIELD_TYPES


def is_core_schema_field(
    schema: CoreSchemaOrField,
) -> TypeGuard[CoreSchemaField]:
    return schema['type'] in _CORE_SCHEMA_FIELD_TYPES


def is_function_with_inner_schema(
    schema: CoreSchemaOrField,
) -> TypeGuard[FunctionSchemaWithInnerSchema]:
    return schema['type'] in _FUNCTION_WITH_INNER_SCHEMA_TYPES


def is_list_like_schema_with_items_schema(
    schema: CoreSchema,
) -> TypeGuard[
    core_schema.ListSchema | core_schema.TupleVariableSchema | core_schema.SetSchema | core_schema.FrozenSetSchema
]:
    return schema['type'] in _LIST_LIKE_SCHEMA_WITH_ITEMS_TYPES


def get_type_ref(type_: type[Any], args_override: tuple[type[Any], ...] | None = None) -> str:
    """Produces the ref to be used for this type by pydantic_core's core schemas.

    This `args_override` argument was added for the purpose of creating valid recursive references
    when creating generic models without needing to create a concrete class.
    """
    origin = type_
    args = get_args(type_) if isinstance(type_, _GenericAlias) else (args_override or ())
    generic_metadata = getattr(type_, '__pydantic_generic_metadata__', None)
    if generic_metadata:
        origin = generic_metadata['origin'] or origin
        args = generic_metadata['args'] or args

    module_name = getattr(origin, '__module__', '<No __module__>')
    if isinstance(origin, TypeAliasType):
        type_ref = f'{module_name}.{origin.__name__}'
    else:
        try:
            qualname = getattr(origin, '__qualname__', f'<No __qualname__: {origin}>')
        except Exception:
            qualname = getattr(origin, '__qualname__', '<No __qualname__>')
        type_ref = f'{module_name}.{qualname}:{id(origin)}'

    arg_refs: list[str] = []
    for arg in args:
        if isinstance(arg, str):
            # Handle string literals as a special case; we may be able to remove this special handling if we
            # wrap them in a ForwardRef at some point.
            arg_ref = f'{arg}:str-{id(arg)}'
        else:
            arg_ref = f'{_repr.display_as_type(arg)}:{id(arg)}'
        arg_refs.append(arg_ref)
    if arg_refs:
        type_ref = f'{type_ref}[{",".join(arg_refs)}]'
    return type_ref


def get_ref(s: core_schema.CoreSchema) -> None | str:
    """Get the ref from the schema if it has one.
    This exists just for type checking to work correctly.
    """
    return s.get('ref', None)


def collect_definitions(schema: core_schema.CoreSchema) -> dict[str, core_schema.CoreSchema]:
    defs: dict[str, CoreSchema] = {}

    def _record_valid_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema:
        ref = get_ref(s)
        if ref:
            defs[ref] = s
        return recurse(s, _record_valid_refs)

    walk_core_schema(schema, _record_valid_refs)

    return defs


def define_expected_missing_refs(
    schema: core_schema.CoreSchema, allowed_missing_refs: set[str]
) -> core_schema.CoreSchema | None:
    if not allowed_missing_refs:
        # in this case, there are no missing refs to potentially substitute, so there's no need to walk the schema
        # this is a common case (will be hit for all non-generic models), so it's worth optimizing for
        return None

    refs = collect_definitions(schema).keys()

    expected_missing_refs = allowed_missing_refs.difference(refs)
    if expected_missing_refs:
        definitions: list[core_schema.CoreSchema] = [
            # TODO: Replace this with a (new) CoreSchema that, if present at any level, makes validation fail
            #   Issue: https://github.com/pydantic/pydantic-core/issues/619
            core_schema.none_schema(ref=ref, metadata={HAS_INVALID_SCHEMAS_METADATA_KEY: True})
            for ref in expected_missing_refs
        ]
        return core_schema.definitions_schema(schema, definitions)
    return None


def collect_invalid_schemas(schema: core_schema.CoreSchema) -> bool:
    invalid = False

    def _is_schema_valid(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema:
        nonlocal invalid
        if 'metadata' in s:
            metadata = s['metadata']
            if HAS_INVALID_SCHEMAS_METADATA_KEY in metadata:
                invalid = metadata[HAS_INVALID_SCHEMAS_METADATA_KEY]
                return s
        return recurse(s, _is_schema_valid)

    walk_core_schema(schema, _is_schema_valid)
    return invalid


T = TypeVar('T')


Recurse = Callable[[core_schema.CoreSchema, 'Walk'], core_schema.CoreSchema]
Walk = Callable[[core_schema.CoreSchema, Recurse], core_schema.CoreSchema]

# TODO: Should we move _WalkCoreSchema into pydantic_core proper?
#   Issue: https://github.com/pydantic/pydantic-core/issues/615


class _WalkCoreSchema:
    def __init__(self):
        self._schema_type_to_method = self._build_schema_type_to_method()

    def _build_schema_type_to_method(self) -> dict[core_schema.CoreSchemaType, Recurse]:
        mapping: dict[core_schema.CoreSchemaType, Recurse] = {}
        key: core_schema.CoreSchemaType
        for key in get_args(core_schema.CoreSchemaType):
            method_name = f"handle_{key.replace('-', '_')}_schema"
            mapping[key] = getattr(self, method_name, self._handle_other_schemas)
        return mapping

    def walk(self, schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema:
        return f(schema, self._walk)

    def _walk(self, schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema:
        schema = self._schema_type_to_method[schema['type']](schema.copy(), f)
        ser_schema: core_schema.SerSchema | None = schema.get('serialization')  # type: ignore
        if ser_schema:
            schema['serialization'] = self._handle_ser_schemas(ser_schema, f)
        return schema

    def _handle_other_schemas(self, schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema:
        sub_schema = schema.get('schema', None)
        if sub_schema is not None:
            schema['schema'] = self.walk(sub_schema, f)  # type: ignore
        return schema

    def _handle_ser_schemas(self, ser_schema: core_schema.SerSchema, f: Walk) -> core_schema.SerSchema:
        schema: core_schema.CoreSchema | None = ser_schema.get('schema', None)
        if schema is not None:
            ser_schema['schema'] = self.walk(schema, f)  # type: ignore
        return_schema: core_schema.CoreSchema | None = ser_schema.get('return_schema', None)
        if return_schema is not None:
            ser_schema['return_schema'] = self.walk(return_schema, f)  # type: ignore
        return ser_schema

    def handle_definitions_schema(self, schema: core_schema.DefinitionsSchema, f: Walk) -> core_schema.CoreSchema:
        new_definitions: list[core_schema.CoreSchema] = []
        for definition in schema['definitions']:
            updated_definition = self.walk(definition, f)
            if 'ref' in updated_definition:
                # If the updated definition schema doesn't have a 'ref', it shouldn't go in the definitions
                # This is most likely to happen due to replacing something with a definition reference, in
                # which case it should certainly not go in the definitions list
                new_definitions.append(updated_definition)
        new_inner_schema = self.walk(schema['schema'], f)

        if not new_definitions and len(schema) == 3:
            # This means we'd be returning a "trivial" definitions schema that just wrapped the inner schema
            return new_inner_schema

        new_schema = schema.copy()
        new_schema['schema'] = new_inner_schema
        new_schema['definitions'] = new_definitions
        return new_schema

    def handle_list_schema(self, schema: core_schema.ListSchema, f: Walk) -> core_schema.CoreSchema:
        items_schema = schema.get('items_schema')
        if items_schema is not None:
            schema['items_schema'] = self.walk(items_schema, f)
        return schema

    def handle_set_schema(self, schema: core_schema.SetSchema, f: Walk) -> core_schema.CoreSchema:
        items_schema = schema.get('items_schema')
        if items_schema is not None:
            schema['items_schema'] = self.walk(items_schema, f)
        return schema

    def handle_frozenset_schema(self, schema: core_schema.FrozenSetSchema, f: Walk) -> core_schema.CoreSchema:
        items_schema = schema.get('items_schema')
        if items_schema is not None:
            schema['items_schema'] = self.walk(items_schema, f)
        return schema

    def handle_generator_schema(self, schema: core_schema.GeneratorSchema, f: Walk) -> core_schema.CoreSchema:
        items_schema = schema.get('items_schema')
        if items_schema is not None:
            schema['items_schema'] = self.walk(items_schema, f)
        return schema

    def handle_tuple_variable_schema(
        self, schema: core_schema.TupleVariableSchema | core_schema.TuplePositionalSchema, f: Walk
    ) -> core_schema.CoreSchema:
        schema = cast(core_schema.TupleVariableSchema, schema)
        items_schema = schema.get('items_schema')
        if items_schema is not None:
            schema['items_schema'] = self.walk(items_schema, f)
        return schema

    def handle_tuple_positional_schema(
        self, schema: core_schema.TupleVariableSchema | core_schema.TuplePositionalSchema, f: Walk
    ) -> core_schema.CoreSchema:
        schema = cast(core_schema.TuplePositionalSchema, schema)
        schema['items_schema'] = [self.walk(v, f) for v in schema['items_schema']]
        extras_schema = schema.get('extras_schema')
        if extras_schema is not None:
            schema['extras_schema'] = self.walk(extras_schema, f)
        return schema

    def handle_dict_schema(self, schema: core_schema.DictSchema, f: Walk) -> core_schema.CoreSchema:
        keys_schema = schema.get('keys_schema')
        if keys_schema is not None:
            schema['keys_schema'] = self.walk(keys_schema, f)
        values_schema = schema.get('values_schema')
        if values_schema:
            schema['values_schema'] = self.walk(values_schema, f)
        return schema

    def handle_function_schema(self, schema: AnyFunctionSchema, f: Walk) -> core_schema.CoreSchema:
        if not is_function_with_inner_schema(schema):
            return schema
        schema['schema'] = self.walk(schema['schema'], f)
        return schema

    def handle_union_schema(self, schema: core_schema.UnionSchema, f: Walk) -> core_schema.CoreSchema:
        new_choices: list[CoreSchema | tuple[CoreSchema, str]] = []
        for v in schema['choices']:
            if isinstance(v, tuple):
                new_choices.append((self.walk(v[0], f), v[1]))
            else:
                new_choices.append(self.walk(v, f))
        schema['choices'] = new_choices
        return schema

    def handle_tagged_union_schema(self, schema: core_schema.TaggedUnionSchema, f: Walk) -> core_schema.CoreSchema:
        new_choices: dict[Hashable, core_schema.CoreSchema] = {}
        for k, v in schema['choices'].items():
            new_choices[k] = v if isinstance(v, (str, int)) else self.walk(v, f)
        schema['choices'] = new_choices
        return schema

    def handle_chain_schema(self, schema: core_schema.ChainSchema, f: Walk) -> core_schema.CoreSchema:
        schema['steps'] = [self.walk(v, f) for v in schema['steps']]
        return schema

    def handle_lax_or_strict_schema(self, schema: core_schema.LaxOrStrictSchema, f: Walk) -> core_schema.CoreSchema:
        schema['lax_schema'] = self.walk(schema['lax_schema'], f)
        schema['strict_schema'] = self.walk(schema['strict_schema'], f)
        return schema

    def handle_json_or_python_schema(self, schema: core_schema.JsonOrPythonSchema, f: Walk) -> core_schema.CoreSchema:
        schema['json_schema'] = self.walk(schema['json_schema'], f)
        schema['python_schema'] = self.walk(schema['python_schema'], f)
        return schema

    def handle_model_fields_schema(self, schema: core_schema.ModelFieldsSchema, f: Walk) -> core_schema.CoreSchema:
        extras_schema = schema.get('extras_schema')
        if extras_schema is not None:
            schema['extras_schema'] = self.walk(extras_schema, f)
        replaced_fields: dict[str, core_schema.ModelField] = {}
        replaced_computed_fields: list[core_schema.ComputedField] = []
        for computed_field in schema.get('computed_fields', ()):
            replaced_field = computed_field.copy()
            replaced_field['return_schema'] = self.walk(computed_field['return_schema'], f)
            replaced_computed_fields.append(replaced_field)
        if replaced_computed_fields:
            schema['computed_fields'] = replaced_computed_fields
        for k, v in schema['fields'].items():
            replaced_field = v.copy()
            replaced_field['schema'] = self.walk(v['schema'], f)
            replaced_fields[k] = replaced_field
        schema['fields'] = replaced_fields
        return schema

    def handle_typed_dict_schema(self, schema: core_schema.TypedDictSchema, f: Walk) -> core_schema.CoreSchema:
        extras_schema = schema.get('extras_schema')
        if extras_schema is not None:
            schema['extras_schema'] = self.walk(extras_schema, f)
        replaced_computed_fields: list[core_schema.ComputedField] = []
        for computed_field in schema.get('computed_fields', ()):
            replaced_field = computed_field.copy()
            replaced_field['return_schema'] = self.walk(computed_field['return_schema'], f)
            replaced_computed_fields.append(replaced_field)
        if replaced_computed_fields:
            schema['computed_fields'] = replaced_computed_fields
        replaced_fields: dict[str, core_schema.TypedDictField] = {}
        for k, v in schema['fields'].items():
            replaced_field = v.copy()
            replaced_field['schema'] = self.walk(v['schema'], f)
            replaced_fields[k] = replaced_field
        schema['fields'] = replaced_fields
        return schema

    def handle_dataclass_args_schema(self, schema: core_schema.DataclassArgsSchema, f: Walk) -> core_schema.CoreSchema:
        replaced_fields: list[core_schema.DataclassField] = []
        replaced_computed_fields: list[core_schema.ComputedField] = []
        for computed_field in schema.get('computed_fields', ()):
            replaced_field = computed_field.copy()
            replaced_field['return_schema'] = self.walk(computed_field['return_schema'], f)
            replaced_computed_fields.append(replaced_field)
        if replaced_computed_fields:
            schema['computed_fields'] = replaced_computed_fields
        for field in schema['fields']:
            replaced_field = field.copy()
            replaced_field['schema'] = self.walk(field['schema'], f)
            replaced_fields.append(replaced_field)
        schema['fields'] = replaced_fields
        return schema

    def handle_arguments_schema(self, schema: core_schema.ArgumentsSchema, f: Walk) -> core_schema.CoreSchema:
        replaced_arguments_schema: list[core_schema.ArgumentsParameter] = []
        for param in schema['arguments_schema']:
            replaced_param = param.copy()
            replaced_param['schema'] = self.walk(param['schema'], f)
            replaced_arguments_schema.append(replaced_param)
        schema['arguments_schema'] = replaced_arguments_schema
        if 'var_args_schema' in schema:
            schema['var_args_schema'] = self.walk(schema['var_args_schema'], f)
        if 'var_kwargs_schema' in schema:
            schema['var_kwargs_schema'] = self.walk(schema['var_kwargs_schema'], f)
        return schema

    def handle_call_schema(self, schema: core_schema.CallSchema, f: Walk) -> core_schema.CoreSchema:
        schema['arguments_schema'] = self.walk(schema['arguments_schema'], f)
        if 'return_schema' in schema:
            schema['return_schema'] = self.walk(schema['return_schema'], f)
        return schema


_dispatch = _WalkCoreSchema().walk


def walk_core_schema(schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema:
    """Recursively traverse a CoreSchema.

    Args:
        schema (core_schema.CoreSchema): The CoreSchema to process, it will not be modified.
        f (Walk): A function to apply. This function takes two arguments:
          1. The current CoreSchema that is being processed
             (not the same one you passed into this function, one level down).
          2. The "next" `f` to call. This lets you for example use `f=functools.partial(some_method, some_context)`
             to pass data down the recursive calls without using globals or other mutable state.

    Returns:
        core_schema.CoreSchema: A processed CoreSchema.
    """
    return f(schema.copy(), _dispatch)


def simplify_schema_references(schema: core_schema.CoreSchema) -> core_schema.CoreSchema:  # noqa: C901
    definitions: dict[str, core_schema.CoreSchema] = {}
    ref_counts: dict[str, int] = defaultdict(int)
    involved_in_recursion: dict[str, bool] = {}
    current_recursion_ref_count: dict[str, int] = defaultdict(int)

    def collect_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema:
        if s['type'] == 'definitions':
            for definition in s['definitions']:
                ref = get_ref(definition)
                assert ref is not None
                if ref not in definitions:
                    definitions[ref] = definition
                recurse(definition, collect_refs)
            return recurse(s['schema'], collect_refs)
        else:
            ref = get_ref(s)
            if ref is not None:
                new = recurse(s, collect_refs)
                new_ref = get_ref(new)
                if new_ref:
                    definitions[new_ref] = new
                return core_schema.definition_reference_schema(schema_ref=ref)
            else:
                return recurse(s, collect_refs)

    schema = walk_core_schema(schema, collect_refs)

    def count_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema:
        if s['type'] != 'definition-ref':
            return recurse(s, count_refs)
        ref = s['schema_ref']
        ref_counts[ref] += 1

        if ref_counts[ref] >= 2:
            # If this model is involved in a recursion this should be detected
            # on its second encounter, we can safely stop the walk here.
            if current_recursion_ref_count[ref] != 0:
                involved_in_recursion[ref] = True
            return s

        current_recursion_ref_count[ref] += 1
        recurse(definitions[ref], count_refs)
        current_recursion_ref_count[ref] -= 1
        return s

    schema = walk_core_schema(schema, count_refs)

    assert all(c == 0 for c in current_recursion_ref_count.values()), 'this is a bug! please report it'

    def can_be_inlined(s: core_schema.DefinitionReferenceSchema, ref: str) -> bool:
        if ref_counts[ref] > 1:
            return False
        if involved_in_recursion.get(ref, False):
            return False
        if 'serialization' in s:
            return False
        if 'metadata' in s:
            metadata = s['metadata']
            for k in (
                'pydantic_js_functions',
                'pydantic_js_annotation_functions',
                'pydantic.internal.union_discriminator',
            ):
                if k in metadata:
                    # we need to keep this as a ref
                    return False
        return True

    def inline_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema:
        if s['type'] == 'definition-ref':
            ref = s['schema_ref']
            # Check if the reference is only used once, not involved in recursion and does not have
            # any extra keys (like 'serialization')
            if can_be_inlined(s, ref):
                # Inline the reference by replacing the reference with the actual schema
                new = definitions.pop(ref)
                ref_counts[ref] -= 1  # because we just replaced it!
                # put all other keys that were on the def-ref schema into the inlined version
                # in particular this is needed for `serialization`
                if 'serialization' in s:
                    new['serialization'] = s['serialization']
                s = recurse(new, inline_refs)
                return s
            else:
                return recurse(s, inline_refs)
        else:
            return recurse(s, inline_refs)

    schema = walk_core_schema(schema, inline_refs)

    def_values = [v for v in definitions.values() if ref_counts[v['ref']] > 0]  # type: ignore

    if def_values:
        schema = core_schema.definitions_schema(schema=schema, definitions=def_values)
    return schema


def _strip_metadata(schema: CoreSchema) -> CoreSchema:
    def strip_metadata(s: CoreSchema, recurse: Recurse) -> CoreSchema:
        s = s.copy()
        s.pop('metadata', None)
        if s['type'] == 'model-fields':
            s = s.copy()
            s['fields'] = {k: v.copy() for k, v in s['fields'].items()}
            for field_name, field_schema in s['fields'].items():
                field_schema.pop('metadata', None)
                s['fields'][field_name] = field_schema
            computed_fields = s.get('computed_fields', None)
            if computed_fields:
                s['computed_fields'] = [cf.copy() for cf in computed_fields]
                for cf in computed_fields:
                    cf.pop('metadata', None)
            else:
                s.pop('computed_fields', None)
        elif s['type'] == 'model':
            # remove some defaults
            if s.get('custom_init', True) is False:
                s.pop('custom_init')
            if s.get('root_model', True) is False:
                s.pop('root_model')
            if {'title'}.issuperset(s.get('config', {}).keys()):
                s.pop('config', None)

        return recurse(s, strip_metadata)

    return walk_core_schema(schema, strip_metadata)


def pretty_print_core_schema(
    schema: CoreSchema,
    include_metadata: bool = False,
) -> None:
    """Pretty print a CoreSchema using rich.
    This is intended for debugging purposes.

    Args:
        schema: The CoreSchema to print.
        include_metadata: Whether to include metadata in the output. Defaults to `False`.
    """
    from rich import print  # type: ignore  # install it manually in your dev env

    if not include_metadata:
        schema = _strip_metadata(schema)

    return print(schema)


def validate_core_schema(schema: CoreSchema) -> CoreSchema:
    if 'PYDANTIC_SKIP_VALIDATING_CORE_SCHEMAS' in os.environ:
        return schema
    return _validate_core_schema(schema)

Youez - 2016 - github.com/yon3zu
LinuXploit