diff --git a/docs/pycti/pycti.api.opencti_api_client.rst b/docs/pycti/pycti.api.opencti_api_client.rst index 654641f65..5c6f0ef1f 100644 --- a/docs/pycti/pycti.api.opencti_api_client.rst +++ b/docs/pycti/pycti.api.opencti_api_client.rst @@ -3,6 +3,7 @@ ================================ .. automodule:: pycti.api.opencti_api_client + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.api.opencti_api_connector.rst b/docs/pycti/pycti.api.opencti_api_connector.rst index 0cf27329b..166ef3826 100644 --- a/docs/pycti/pycti.api.opencti_api_connector.rst +++ b/docs/pycti/pycti.api.opencti_api_connector.rst @@ -3,6 +3,7 @@ =================================== .. automodule:: pycti.api.opencti_api_connector + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.api.opencti_api_job.rst b/docs/pycti/pycti.api.opencti_api_job.rst index 9f0703a6b..dc1d99cbb 100644 --- a/docs/pycti/pycti.api.opencti_api_job.rst +++ b/docs/pycti/pycti.api.opencti_api_job.rst @@ -3,6 +3,7 @@ ============================= .. automodule:: pycti.api.opencti_api_job + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.api.rst b/docs/pycti/pycti.api.rst index 5c71211bc..2f21aad56 100644 --- a/docs/pycti/pycti.api.rst +++ b/docs/pycti/pycti.api.rst @@ -3,6 +3,7 @@ ============= .. automodule:: pycti.api + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.connector.opencti_connector.rst b/docs/pycti/pycti.connector.opencti_connector.rst index 39ef7df12..367ce2ba6 100644 --- a/docs/pycti/pycti.connector.opencti_connector.rst +++ b/docs/pycti/pycti.connector.opencti_connector.rst @@ -3,6 +3,7 @@ ===================================== .. automodule:: pycti.connector.opencti_connector + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.connector.opencti_connector_helper.rst b/docs/pycti/pycti.connector.opencti_connector_helper.rst index 4522c42db..69d779cfe 100644 --- a/docs/pycti/pycti.connector.opencti_connector_helper.rst +++ b/docs/pycti/pycti.connector.opencti_connector_helper.rst @@ -3,6 +3,7 @@ ============================================ .. automodule:: pycti.connector.opencti_connector_helper + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.connector.rst b/docs/pycti/pycti.connector.rst index c7abab79f..1d191c299 100644 --- a/docs/pycti/pycti.connector.rst +++ b/docs/pycti/pycti.connector.rst @@ -3,6 +3,7 @@ =================== .. automodule:: pycti.connector + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_attack_pattern.rst b/docs/pycti/pycti.entities.opencti_attack_pattern.rst index 58f1ed0af..ea70f85fc 100644 --- a/docs/pycti/pycti.entities.opencti_attack_pattern.rst +++ b/docs/pycti/pycti.entities.opencti_attack_pattern.rst @@ -3,6 +3,7 @@ ========================================= .. automodule:: pycti.entities.opencti_attack_pattern + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_campaign.rst b/docs/pycti/pycti.entities.opencti_campaign.rst index bfd832e6b..406f55294 100644 --- a/docs/pycti/pycti.entities.opencti_campaign.rst +++ b/docs/pycti/pycti.entities.opencti_campaign.rst @@ -3,6 +3,7 @@ =================================== .. automodule:: pycti.entities.opencti_campaign + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_capability.rst b/docs/pycti/pycti.entities.opencti_capability.rst new file mode 100644 index 000000000..5abde4003 --- /dev/null +++ b/docs/pycti/pycti.entities.opencti_capability.rst @@ -0,0 +1,11 @@ +==================================== +``pycti.entities.opencti_capability`` +==================================== + +.. automodule:: pycti.entities.opencti_capability + :members: + + .. contents:: + :local: + +.. currentmodule:: pycti.entities.opencti_capability diff --git a/docs/pycti/pycti.entities.opencti_course_of_action.rst b/docs/pycti/pycti.entities.opencti_course_of_action.rst index a969db351..826867702 100644 --- a/docs/pycti/pycti.entities.opencti_course_of_action.rst +++ b/docs/pycti/pycti.entities.opencti_course_of_action.rst @@ -3,6 +3,7 @@ =========================================== .. automodule:: pycti.entities.opencti_course_of_action + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_external_reference.rst b/docs/pycti/pycti.entities.opencti_external_reference.rst index 5a49d4904..06e1e3716 100644 --- a/docs/pycti/pycti.entities.opencti_external_reference.rst +++ b/docs/pycti/pycti.entities.opencti_external_reference.rst @@ -3,6 +3,7 @@ ============================================= .. automodule:: pycti.entities.opencti_external_reference + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_group.rst b/docs/pycti/pycti.entities.opencti_group.rst new file mode 100644 index 000000000..58ec5dd2a --- /dev/null +++ b/docs/pycti/pycti.entities.opencti_group.rst @@ -0,0 +1,11 @@ +==================================== +``pycti.entities.opencti_group`` +==================================== + +.. automodule:: pycti.entities.opencti_group + :members: + + .. contents:: + :local: + +.. currentmodule:: pycti.entities.opencti_group diff --git a/docs/pycti/pycti.entities.opencti_identity.rst b/docs/pycti/pycti.entities.opencti_identity.rst index d75199094..0a9b281b7 100644 --- a/docs/pycti/pycti.entities.opencti_identity.rst +++ b/docs/pycti/pycti.entities.opencti_identity.rst @@ -3,6 +3,7 @@ =================================== .. automodule:: pycti.entities.opencti_identity + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_incident.rst b/docs/pycti/pycti.entities.opencti_incident.rst index 28398afe4..f4e34d67c 100644 --- a/docs/pycti/pycti.entities.opencti_incident.rst +++ b/docs/pycti/pycti.entities.opencti_incident.rst @@ -3,6 +3,7 @@ =================================== .. automodule:: pycti.entities.opencti_incident + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_indicator.rst b/docs/pycti/pycti.entities.opencti_indicator.rst index 491bed684..ac8feb75a 100644 --- a/docs/pycti/pycti.entities.opencti_indicator.rst +++ b/docs/pycti/pycti.entities.opencti_indicator.rst @@ -3,6 +3,7 @@ ==================================== .. automodule:: pycti.entities.opencti_indicator + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_intrusion_set.rst b/docs/pycti/pycti.entities.opencti_intrusion_set.rst index 30f952862..2a731f7e9 100644 --- a/docs/pycti/pycti.entities.opencti_intrusion_set.rst +++ b/docs/pycti/pycti.entities.opencti_intrusion_set.rst @@ -3,6 +3,7 @@ ======================================== .. automodule:: pycti.entities.opencti_intrusion_set + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_kill_chain_phase.rst b/docs/pycti/pycti.entities.opencti_kill_chain_phase.rst index 3702804d7..97b264cdd 100644 --- a/docs/pycti/pycti.entities.opencti_kill_chain_phase.rst +++ b/docs/pycti/pycti.entities.opencti_kill_chain_phase.rst @@ -3,6 +3,7 @@ =========================================== .. automodule:: pycti.entities.opencti_kill_chain_phase + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_malware.rst b/docs/pycti/pycti.entities.opencti_malware.rst index a3c237845..fe9f9abca 100644 --- a/docs/pycti/pycti.entities.opencti_malware.rst +++ b/docs/pycti/pycti.entities.opencti_malware.rst @@ -3,6 +3,7 @@ ================================== .. automodule:: pycti.entities.opencti_malware + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_marking_definition.rst b/docs/pycti/pycti.entities.opencti_marking_definition.rst index 84407f6fa..1a49845c7 100644 --- a/docs/pycti/pycti.entities.opencti_marking_definition.rst +++ b/docs/pycti/pycti.entities.opencti_marking_definition.rst @@ -3,6 +3,7 @@ ============================================= .. automodule:: pycti.entities.opencti_marking_definition + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_note.rst b/docs/pycti/pycti.entities.opencti_note.rst index 407e257ad..4729defdf 100644 --- a/docs/pycti/pycti.entities.opencti_note.rst +++ b/docs/pycti/pycti.entities.opencti_note.rst @@ -3,6 +3,7 @@ =============================== .. automodule:: pycti.entities.opencti_note + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_opinion.rst b/docs/pycti/pycti.entities.opencti_opinion.rst index 9d682098e..7ad46df43 100644 --- a/docs/pycti/pycti.entities.opencti_opinion.rst +++ b/docs/pycti/pycti.entities.opencti_opinion.rst @@ -3,6 +3,7 @@ ================================== .. automodule:: pycti.entities.opencti_opinion + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_report.rst b/docs/pycti/pycti.entities.opencti_report.rst index 85041ed80..63ea51c1e 100644 --- a/docs/pycti/pycti.entities.opencti_report.rst +++ b/docs/pycti/pycti.entities.opencti_report.rst @@ -3,6 +3,7 @@ ================================= .. automodule:: pycti.entities.opencti_report + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_role.rst b/docs/pycti/pycti.entities.opencti_role.rst new file mode 100644 index 000000000..6e34c766c --- /dev/null +++ b/docs/pycti/pycti.entities.opencti_role.rst @@ -0,0 +1,11 @@ +==================================== +``pycti.entities.opencti_role`` +==================================== + +.. automodule:: pycti.entities.opencti_role + :members: + + .. contents:: + :local: + +.. currentmodule:: pycti.entities.opencti_role diff --git a/docs/pycti/pycti.entities.opencti_settings.rst b/docs/pycti/pycti.entities.opencti_settings.rst new file mode 100644 index 000000000..7eda44768 --- /dev/null +++ b/docs/pycti/pycti.entities.opencti_settings.rst @@ -0,0 +1,11 @@ +==================================== +``pycti.entities.opencti_settings`` +==================================== + +.. automodule:: pycti.entities.opencti_settings + :members: + + .. contents:: + :local: + +.. currentmodule:: pycti.entities.opencti_settings diff --git a/docs/pycti/pycti.entities.opencti_stix_domain_entity.rst b/docs/pycti/pycti.entities.opencti_stix_domain_entity.rst index a2f9848f6..2b970281a 100644 --- a/docs/pycti/pycti.entities.opencti_stix_domain_entity.rst +++ b/docs/pycti/pycti.entities.opencti_stix_domain_entity.rst @@ -3,6 +3,7 @@ ============================================= .. automodule:: pycti.entities.opencti_stix_domain_object + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_stix_entity.rst b/docs/pycti/pycti.entities.opencti_stix_entity.rst index e901dcc2a..38be5bf92 100644 --- a/docs/pycti/pycti.entities.opencti_stix_entity.rst +++ b/docs/pycti/pycti.entities.opencti_stix_entity.rst @@ -3,6 +3,7 @@ ====================================== .. automodule:: pycti.entities.opencti_stix_entity + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_stix_observable.rst b/docs/pycti/pycti.entities.opencti_stix_observable.rst index 37cb95451..4b486732d 100644 --- a/docs/pycti/pycti.entities.opencti_stix_observable.rst +++ b/docs/pycti/pycti.entities.opencti_stix_observable.rst @@ -3,6 +3,7 @@ ========================================== .. automodule:: pycti.entities.opencti_stix_observable + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_stix_observable_relation.rst b/docs/pycti/pycti.entities.opencti_stix_observable_relation.rst index eac81d4ed..c32e4faf8 100644 --- a/docs/pycti/pycti.entities.opencti_stix_observable_relation.rst +++ b/docs/pycti/pycti.entities.opencti_stix_observable_relation.rst @@ -3,6 +3,7 @@ =================================================== .. automodule:: pycti.entities.opencti_stix_observable_relationship + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_stix_relation.rst b/docs/pycti/pycti.entities.opencti_stix_relation.rst index 2b148b37f..925160789 100644 --- a/docs/pycti/pycti.entities.opencti_stix_relation.rst +++ b/docs/pycti/pycti.entities.opencti_stix_relation.rst @@ -3,6 +3,7 @@ ======================================== .. automodule:: pycti.entities.opencti_stix_relation + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_stix_sighting.rst b/docs/pycti/pycti.entities.opencti_stix_sighting.rst index 42a077f5d..fc018cee3 100644 --- a/docs/pycti/pycti.entities.opencti_stix_sighting.rst +++ b/docs/pycti/pycti.entities.opencti_stix_sighting.rst @@ -3,6 +3,7 @@ ======================================== .. automodule:: pycti.entities.opencti_stix_sighting + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_tag.rst b/docs/pycti/pycti.entities.opencti_tag.rst index d9dd33116..de5f3c51a 100644 --- a/docs/pycti/pycti.entities.opencti_tag.rst +++ b/docs/pycti/pycti.entities.opencti_tag.rst @@ -3,6 +3,7 @@ ============================== .. automodule:: pycti.entities.opencti_tag + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_threat_actor.rst b/docs/pycti/pycti.entities.opencti_threat_actor.rst index d2bb80186..fdc9c92af 100644 --- a/docs/pycti/pycti.entities.opencti_threat_actor.rst +++ b/docs/pycti/pycti.entities.opencti_threat_actor.rst @@ -3,6 +3,7 @@ ======================================= .. automodule:: pycti.entities.opencti_threat_actor + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_tool.rst b/docs/pycti/pycti.entities.opencti_tool.rst index e546b5393..01d0c4ee8 100644 --- a/docs/pycti/pycti.entities.opencti_tool.rst +++ b/docs/pycti/pycti.entities.opencti_tool.rst @@ -3,6 +3,7 @@ =============================== .. automodule:: pycti.entities.opencti_tool + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.opencti_user.rst b/docs/pycti/pycti.entities.opencti_user.rst new file mode 100644 index 000000000..0aa82efb6 --- /dev/null +++ b/docs/pycti/pycti.entities.opencti_user.rst @@ -0,0 +1,11 @@ +==================================== +``pycti.entities.opencti_user`` +==================================== + +.. automodule:: pycti.entities.opencti_user + :members: + + .. contents:: + :local: + +.. currentmodule:: pycti.entities.opencti_user diff --git a/docs/pycti/pycti.entities.opencti_vulnerability.rst b/docs/pycti/pycti.entities.opencti_vulnerability.rst index cd1bc402e..02391de05 100644 --- a/docs/pycti/pycti.entities.opencti_vulnerability.rst +++ b/docs/pycti/pycti.entities.opencti_vulnerability.rst @@ -3,6 +3,7 @@ ======================================== .. automodule:: pycti.entities.opencti_vulnerability + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.entities.rst b/docs/pycti/pycti.entities.rst index d229fdab5..c76ede085 100644 --- a/docs/pycti/pycti.entities.rst +++ b/docs/pycti/pycti.entities.rst @@ -3,6 +3,7 @@ ================== .. automodule:: pycti.entities + :members: .. contents:: :local: @@ -37,5 +38,10 @@ Submodules pycti.entities.opencti_threat_actor pycti.entities.opencti_tool pycti.entities.opencti_vulnerability + pycti.entities.opencti_capability + pycti.entities.opencti_role + pycti.entities.opencti_group + pycti.entities.opencti_user + pycti.entities.opencti_settings .. currentmodule:: pycti.entities diff --git a/docs/pycti/pycti.rst b/docs/pycti/pycti.rst index b5829609e..0a7a9ea49 100644 --- a/docs/pycti/pycti.rst +++ b/docs/pycti/pycti.rst @@ -38,11 +38,14 @@ Classes - :py:class:`CaseRft`: Undocumented. +- :py:class:`Channel`: + Undocumented. + - :py:class:`Task`: Undocumented. - :py:class:`ConnectorType`: - Create a collection of name/value pairs. + An enumeration. - :py:class:`CourseOfAction`: Undocumented. @@ -141,22 +144,22 @@ Classes Undocumented. - :py:class:`StixCyberObservable`: - Undocumented. + deprecated [>=6.2 & <6.5]` - :py:class:`StixNestedRefRelationship`: Undocumented. - :py:class:`StixCyberObservableTypes`: - Create a collection of name/value pairs. + An enumeration. - :py:class:`StixDomainObject`: Undocumented. - :py:class:`StixMetaTypes`: - Create a collection of name/value pairs. + An enumeration. - :py:class:`MultipleRefRelationship`: - Create a collection of name/value pairs. + An enumeration. - :py:class:`StixObjectOrStixRelationship`: Undocumented. @@ -185,18 +188,54 @@ Classes - :py:class:`CustomObjectTask`: Task object. +- :py:class:`CustomObjectChannel`: + Channel object. + +- :py:class:`CustomObservableCredential`: + Credential observable. + - :py:class:`CustomObservableHostname`: Hostname observable. - :py:class:`CustomObservableUserAgent`: User-Agent observable. +- :py:class:`CustomObservableBankAccount`: + Bank Account observable. + - :py:class:`CustomObservableCryptocurrencyWallet`: Cryptocurrency wallet observable. +- :py:class:`CustomObservablePaymentCard`: + Payment card observable. + +- :py:class:`CustomObservablePhoneNumber`: + Phone number observable. + +- :py:class:`CustomObservableTrackingNumber`: + Tracking number observable. + - :py:class:`CustomObservableText`: Text observable. +- :py:class:`CustomObservableMediaContent`: + Media-Content observable. + +- :py:class:`Capability`: + Represents a role capability on the OpenCTI platform + +- :py:class:`Role`: + Representation of a role in OpenCTI + +- :py:class:`Group`: + Representation of a Group in OpenCTI + +- :py:class:`User`: + Representation of a user on the OpenCTI platform + +- :py:class:`Settings`: + Represents the Settings object in OpenCTI + .. autoclass:: AttackPattern :members: @@ -233,6 +272,13 @@ Classes .. inheritance-diagram:: CaseRft :parts: 1 +.. autoclass:: Channel + :members: + + .. rubric:: Inheritance + .. inheritance-diagram:: Channel + :parts: 1 + .. autoclass:: Task :members: @@ -576,6 +622,20 @@ Classes .. inheritance-diagram:: CustomObjectTask :parts: 1 +.. autoclass:: CustomObjectChannel + :members: + + .. rubric:: Inheritance + .. inheritance-diagram:: CustomObjectChannel + :parts: 1 + +.. autoclass:: CustomObservableCredential + :members: + + .. rubric:: Inheritance + .. inheritance-diagram:: CustomObservableCredential + :parts: 1 + .. autoclass:: CustomObservableHostname :members: @@ -590,6 +650,13 @@ Classes .. inheritance-diagram:: CustomObservableUserAgent :parts: 1 +.. autoclass:: CustomObservableBankAccount + :members: + + .. rubric:: Inheritance + .. inheritance-diagram:: CustomObservableBankAccount + :parts: 1 + .. autoclass:: CustomObservableCryptocurrencyWallet :members: @@ -597,6 +664,27 @@ Classes .. inheritance-diagram:: CustomObservableCryptocurrencyWallet :parts: 1 +.. autoclass:: CustomObservablePaymentCard + :members: + + .. rubric:: Inheritance + .. inheritance-diagram:: CustomObservablePaymentCard + :parts: 1 + +.. autoclass:: CustomObservablePhoneNumber + :members: + + .. rubric:: Inheritance + .. inheritance-diagram:: CustomObservablePhoneNumber + :parts: 1 + +.. autoclass:: CustomObservableTrackingNumber + :members: + + .. rubric:: Inheritance + .. inheritance-diagram:: CustomObservableTrackingNumber + :parts: 1 + .. autoclass:: CustomObservableText :members: @@ -604,6 +692,48 @@ Classes .. inheritance-diagram:: CustomObservableText :parts: 1 +.. autoclass:: CustomObservableMediaContent + :members: + + .. rubric:: Inheritance + .. inheritance-diagram:: CustomObservableMediaContent + :parts: 1 + +.. autoclass:: Capability + :members: + + .. rubric:: Inheritance + .. inheritance-diagram:: Capability + :parts: 1 + +.. autoclass:: Role + :members: + + .. rubric:: Inheritance + .. inheritance-diagram:: Role + :parts: 1 + +.. autoclass:: Group + :members: + + .. rubric:: Inheritance + .. inheritance-diagram:: Group + :parts: 1 + +.. autoclass:: User + :members: + + .. rubric:: Inheritance + .. inheritance-diagram:: User + :parts: 1 + +.. autoclass:: Settings + :members: + + .. rubric:: Inheritance + .. inheritance-diagram:: Settings + :parts: 1 + Variables ========= diff --git a/docs/pycti/pycti.utils.constants.rst b/docs/pycti/pycti.utils.constants.rst index 130003518..015922ad8 100644 --- a/docs/pycti/pycti.utils.constants.rst +++ b/docs/pycti/pycti.utils.constants.rst @@ -3,6 +3,7 @@ ========================= .. automodule:: pycti.utils.constants + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.utils.opencti_stix2.rst b/docs/pycti/pycti.utils.opencti_stix2.rst index 172580950..5830291fa 100644 --- a/docs/pycti/pycti.utils.opencti_stix2.rst +++ b/docs/pycti/pycti.utils.opencti_stix2.rst @@ -3,6 +3,7 @@ ============================= .. automodule:: pycti.utils.opencti_stix2 + :members: .. contents:: :local: diff --git a/docs/pycti/pycti.utils.rst b/docs/pycti/pycti.utils.rst index bfbea7224..d0e4a2209 100644 --- a/docs/pycti/pycti.utils.rst +++ b/docs/pycti/pycti.utils.rst @@ -3,6 +3,7 @@ =============== .. automodule:: pycti.utils + :members: .. contents:: :local: diff --git a/pycti/__init__.py b/pycti/__init__.py index 284fcc737..7a7c9b374 100644 --- a/pycti/__init__.py +++ b/pycti/__init__.py @@ -12,6 +12,9 @@ from .connector.opencti_metric_handler import OpenCTIMetricHandler from .entities.opencti_attack_pattern import AttackPattern from .entities.opencti_campaign import Campaign + +# Administrative entities +from .entities.opencti_capability import Capability from .entities.opencti_case_incident import CaseIncident from .entities.opencti_case_rfi import CaseRfi from .entities.opencti_case_rft import CaseRft @@ -21,6 +24,7 @@ from .entities.opencti_data_source import DataSource from .entities.opencti_external_reference import ExternalReference from .entities.opencti_feedback import Feedback +from .entities.opencti_group import Group from .entities.opencti_grouping import Grouping from .entities.opencti_identity import Identity from .entities.opencti_incident import Incident @@ -37,6 +41,8 @@ from .entities.opencti_observed_data import ObservedData from .entities.opencti_opinion import Opinion from .entities.opencti_report import Report +from .entities.opencti_role import Role +from .entities.opencti_settings import Settings from .entities.opencti_stix_core_relationship import StixCoreRelationship from .entities.opencti_stix_cyber_observable import StixCyberObservable from .entities.opencti_stix_domain_object import StixDomainObject @@ -50,6 +56,7 @@ from .entities.opencti_threat_actor_group import ThreatActorGroup from .entities.opencti_threat_actor_individual import ThreatActorIndividual from .entities.opencti_tool import Tool +from .entities.opencti_user import User from .entities.opencti_vulnerability import Vulnerability from .utils.constants import ( CustomObjectCaseIncident, @@ -151,4 +158,9 @@ "STIX_EXT_MITRE", "STIX_EXT_OCTI_SCO", "STIX_EXT_OCTI", + "Capability", + "Role", + "Group", + "User", + "Settings", ] diff --git a/pycti/api/opencti_api_client.py b/pycti/api/opencti_api_client.py index 022649a01..ae57b2dfe 100644 --- a/pycti/api/opencti_api_client.py +++ b/pycti/api/opencti_api_client.py @@ -14,6 +14,7 @@ from pycti.api.opencti_api_work import OpenCTIApiWork from pycti.entities.opencti_attack_pattern import AttackPattern from pycti.entities.opencti_campaign import Campaign +from pycti.entities.opencti_capability import Capability from pycti.entities.opencti_case_incident import CaseIncident from pycti.entities.opencti_case_rfi import CaseRfi from pycti.entities.opencti_case_rft import CaseRft @@ -24,6 +25,7 @@ from pycti.entities.opencti_event import Event from pycti.entities.opencti_external_reference import ExternalReference from pycti.entities.opencti_feedback import Feedback +from pycti.entities.opencti_group import Group from pycti.entities.opencti_grouping import Grouping from pycti.entities.opencti_identity import Identity from pycti.entities.opencti_incident import Incident @@ -42,6 +44,8 @@ from pycti.entities.opencti_observed_data import ObservedData from pycti.entities.opencti_opinion import Opinion from pycti.entities.opencti_report import Report +from pycti.entities.opencti_role import Role +from pycti.entities.opencti_settings import Settings from pycti.entities.opencti_stix import Stix from pycti.entities.opencti_stix_core_object import StixCoreObject from pycti.entities.opencti_stix_core_relationship import StixCoreRelationship @@ -59,6 +63,7 @@ from pycti.entities.opencti_threat_actor_group import ThreatActorGroup from pycti.entities.opencti_threat_actor_individual import ThreatActorIndividual from pycti.entities.opencti_tool import Tool +from pycti.entities.opencti_user import User from pycti.entities.opencti_vocabulary import Vocabulary from pycti.entities.opencti_vulnerability import Vulnerability from pycti.utils.opencti_logger import logger @@ -128,6 +133,7 @@ def __init__( # Configure logger self.logger_class = logger(log_level.upper(), json_logging) self.app_logger = self.logger_class("api") + self.admin_logger = self.logger_class("admin") # Define API self.api_token = token @@ -198,6 +204,13 @@ def __init__( self.grouping = Grouping(self) self.indicator = Indicator(self) + # Admin functionality + self.capability = Capability(self) + self.role = Role(self) + self.group = Group(self) + self.user = User(self) + self.settings = Settings(self) + # Check if openCTI is available if perform_health_check and not self.health_check(): raise ValueError( @@ -626,6 +639,43 @@ def process_multiple_fields(self, data): if "importFiles" in data: data["importFiles"] = self.process_multiple(data["importFiles"]) data["importFilesIds"] = self.process_multiple_ids(data["importFiles"]) + + # Administrative data + if "groups" in data: + data["groups"] = self.process_multiple(data["groups"]) + data["groupsIds"] = self.process_multiple_ids(data["groups"]) + if "objectOrganization" in data: + data["objectOrganization"] = self.process_multiple( + data["objectOrganization"] + ) + data["objectOrganizationIds"] = self.process_multiple_ids( + data["objectOrganization"] + ) + if "roles" in data: + data["roles"] = self.process_multiple(data["roles"]) + data["rolesIds"] = self.process_multiple_ids(data["roles"]) + if "capabilities" in data: + data["capabilities"] = self.process_multiple(data["capabilities"]) + data["capabilitiesIds"] = self.process_multiple_ids(data["capabilities"]) + if "members" in data: + data["members"] = self.process_multiple(data["members"]) + data["membersIds"] = self.process_multiple_ids(data["members"]) + if "platform_messages" in data: + data["platform_messages"] = self.process_multiple(data["platform_messages"]) + data["platform_messages_ids"] = self.process_multiple_ids( + data["platform_messages"] + ) + if "messages_administration" in data: + data["messages_administration"] = self.process_multiple( + data["messages_administration"] + ) + data["messages_administration_ids"] = self.process_multiple_ids( + data["messages_administration"] + ) + if "recipients" in data: + data["recipients"] = self.process_multiple(data["recipients"]) + data["recipientsIds"] = self.process_multiple_ids(data["recipients"]) + # See aliases of GraphQL query in stix_core_object method if "name_alt" in data: data["name"] = data["name_alt"] diff --git a/pycti/entities/opencti_capability.py b/pycti/entities/opencti_capability.py new file mode 100644 index 000000000..8d41ee266 --- /dev/null +++ b/pycti/entities/opencti_capability.py @@ -0,0 +1,52 @@ +from typing import Dict, List + + +class Capability: + """Represents a role capability on the OpenCTI platform + + See the properties attribute to understand which properties are fetched by + default from the graphql queries. + """ + + def __init__(self, opencti): + self.opencti = opencti + self.properties = """ + id + entity_type + parent_types + name + description + attribute_order + + created_at + updated_at + """ + + def list(self, **kwargs) -> List[Dict]: + """Lists all capabilities available on the platform + + :param customAttributes: Custom attributes to retrieve from the GraphQL + query. + :type customAttributes: str, optional + :return: List of capabilities + :rtype: List[Dict] + """ + custom_attributes = kwargs.get("customAttributes") + self.opencti.admin_logger.info("Listing capabilities") + query = ( + """ + query CapabilityList { + capabilities { + edges { + node { + """ + + (self.properties if custom_attributes is None else custom_attributes) + + """ + } + } + } + } + """ + ) + result = self.opencti.query(query) + return self.opencti.process_multiple(result["data"]["capabilities"]) diff --git a/pycti/entities/opencti_group.py b/pycti/entities/opencti_group.py new file mode 100644 index 000000000..a4b9ee596 --- /dev/null +++ b/pycti/entities/opencti_group.py @@ -0,0 +1,718 @@ +from typing import Dict, List, Optional + + +class Group: + """Representation of a Group in OpenCTI + + Groups have members and also have assigned roles. Roles attached to a group + determine what members of the group have permissions to do according to + the capabilities the role has. + + Additionally, groups have a confidence level which informs the effective + confidence of members of the group. + + Groups also have permissions on Marking Definitions. Assigned marking + definitions allow users to apply their capabilities on objects with those + definitions. Additionally, there are default markings added to all objects + created by members of a group, and max shareable definitions which + determine which objects users can export from the platform to share. + + See the properties attribute to understand what properties are fetched by + default from GraphQL queries. + """ + + def __init__(self, opencti): + self.opencti = opencti + self.properties = """ + id + standard_id + name + description + + entity_type + parent_types + created_at + updated_at + + default_assignation + no_creators + restrict_delete + default_hidden_types + + auto_new_marking + + allowed_marking { + id, standard_id, definition_type, definition + } + + default_marking { + entity_type + values { + id, standard_id, definition_type, definition + } + } + + not_shareable_marking_types + max_shareable_marking { + id, standard_id, definition_type, definition + } + + group_confidence_level { + max_confidence + overrides { + entity_type + max_confidence + } + } + + roles { + edges { + node { + id, name + capabilities { + id, name + } + } + } + } + + members { + edges { + node { + id, individual_id, user_email, name + } + } + } + """ + + def list(self, **kwargs) -> List[Dict]: + """Lists groups based on a number of filters. + + :param first: Retrieve this number of results. If 0 + then fetches all results, defaults to 0. + :type first: int, optional + :param after: ID of the group to fetch results + after in the list of all results, defaults to None. + :type after: str, optional + :param orderBy: Field by which to order results. + Must be one of name, default_assignation, no_creators, + restrict_delete, auto_new_marking, created_at, updated_at, + group_confidence_level, and _score, defaults to "name". + :type orderBy: str, optional + :param orderMode: Direction of ordering. Must be + one of "asc" or "desc", defaults to "asc". + :type orderMode: str, optional + :param search: String to search groups for, defaults to None. + :type search: str, optional + :param filters: OpenCTI API FilterGroup object. + This is an advanced parameter. To learn more please search for + the FilterGroup object in the OpenCTI GraphQL Playground, defaults + to {}. + :type filters: dict, optional + :param customAttributes: Custom attributes to fetch from the GraphQL + query + :type customAttributes: str, optional + :param getAll: Defaults to False. Whether or not to get all results + from the search. If True then param first is ignored. + :type getAll: bool, optional + :param withPagination: Defaults to False. Whether to return pagination + info with results. + :type withPagination: bool, optional + :return: List of groups in dictionary representation. + :rtype: list[dict] + """ + first = kwargs.get("first", 500) + after = kwargs.get("after", None) + orderBy = kwargs.get("orderBy", None) + orderMode = kwargs.get("orderMode", None) + search = kwargs.get("search", None) + filters = kwargs.get("filters", None) + custom_attributes = kwargs.get("customAttributes", None) + getAll = kwargs.get("getAll", False) + withPagination = kwargs.get("withPagination", False) + + if getAll: + first = 100 + + self.opencti.admin_logger.info( + "Fetching groups with filters", {"filters": filters} + ) + query = ( + """ + query Groups($first: Int, $after: ID, $orderBy: GroupsOrdering, $orderMode: OrderingMode, $search: String, $filters: FilterGroup) { + groups(first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode, search: $search, filters: $filters) { + edges { + node { + """ + + (self.properties if custom_attributes is None else custom_attributes) + + """ + } + } + pageInfo { + startCursor + endCursor + hasNextPage + hasPreviousPage + globalCount + } + } + } + """ + ) + result = self.opencti.query( + query, + { + "first": first, + "after": after, + "orderBy": orderBy, + "orderMode": orderMode, + "search": search, + "filters": filters, + }, + ) + + if getAll: + final_data = [] + data = self.opencti.process_multiple(result["data"]["groups"]) + final_data = final_data + data + while result["data"]["groups"]["pageInfo"]["hasNextPage"]: + after = result["data"]["groups"]["pageInfo"]["endCursor"] + result = self.opencti.query( + query, + { + "first": first, + "after": after, + "orderBy": orderBy, + "orderMode": orderMode, + "search": search, + "filters": filters, + }, + ) + data = self.opencti.process_multiple(result["data"]["groups"]) + final_data = final_data + data + return final_data + else: + return self.opencti.process_multiple( + result["data"]["groups"], withPagination + ) + + def read(self, **kwargs) -> Optional[Dict]: + """Fetch a given group from OpenCTI + + One of id or filters is required. + + :param id: ID of the group to fetch + :type id: str, optional + :param filters: Filters to apply to find single group + :type filters: dict, optional + :param customAttributes: Custom attributes to fetch for the group + :type customAttributes: str + :return: Representation of a group. + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + filters = kwargs.get("filters", None) + search = kwargs.get("search", None) + custom_attributes = kwargs.get("customAttributes", None) + if id is not None: + self.opencti.admin_logger.info("Fetching group with ID", {"id": id}) + query = ( + """ + query Group($id: String!) { + group(id: $id) { + """ + + (self.properties if custom_attributes is None else custom_attributes) + + """ + } + } + """ + ) + result = self.opencti.query(query, {"id": id}) + return self.opencti.process_multiple_fields(result["data"]["group"]) + elif filters is not None or search is not None: + results = self.list( + filters=filters, search=search, customAttributes=custom_attributes + ) + return results[0] if results else None + else: + self.opencti.admin_logger.error( + "[opencti_group] Missing parameters: id or filters" + ) + return None + + def create(self, **kwargs) -> Optional[Dict]: + """Create a group with required details + + Groups can be configured after creation using other functions. + + :param name: Name of the group to create. + :type name: str + :param id_confidence_level: Confidence-level dictionary, with a + max_confidence member between 0 and 100 (incl) and an overrides + list with max_confidence and the entity_type it applies to. + :type id_confidence_level: dict + :param description: Description of the group + :type description: str, optional + :param default_assignation: Defaults to False. Whether or not to assign + this group by default to all new users. + :type default_assignation: bool, optional + :param no_creators: Defaults to False. Whether or not to create + authors for members of this group. + :type no_creators: bool, optional + :param restrict_delete: Defaults to False. Whether or not to restrict + members deleting entities that are not their own. + :type restrict_delete: bool, optional + :param auto_new_marking: Defaults to False. Whether or not to allow + members access to new markings automatically. + :type auto_new_marking: bool, optional + :param customAttributes: Attributes to retrieve from the new group + :type customAttributes: str, optional + :return: Representation of the group. + :rtype: Optional[Dict] + """ + name = kwargs.get("name", None) + group_confidence_level = kwargs.get("group_confidence_level", None) + description = kwargs.get("description", None) + default_assignation = kwargs.get("default_assignation", False) + no_creators = kwargs.get("no_creators", False) + restrict_delete = kwargs.get("restrict_delete", False) + auto_new_marking = kwargs.get("auto_new_marking", False) + custom_attributes = kwargs.get("customAttributes", None) + + if name is None or group_confidence_level is None: + self.opencti.admin_logger.error( + "[opencti_group] Missing parameters: name and group_confidence_level" + ) + return None + + self.opencti.admin_logger.info( + "Creating new group with parameters", + { + "name": name, + "group_confidence_level": group_confidence_level, + "description": description, + "default_assignation": default_assignation, + "no_creators": no_creators, + "restrict_delete": restrict_delete, + "auto_new_marking": auto_new_marking, + }, + ) + query = ( + """ + mutation GroupAdd($input: GroupAddInput!) { + groupAdd(input: $input) { + """ + + (self.properties if custom_attributes is None else custom_attributes) + + """ + } + } + """ + ) + result = self.opencti.query( + query, + { + "input": { + "name": name, + "description": description, + "default_assignation": default_assignation, + "no_creators": no_creators, + "restrict_delete": restrict_delete, + "auto_new_marking": auto_new_marking, + "group_confidence_level": group_confidence_level, + } + }, + ) + return self.opencti.process_multiple_fields(result["data"]["groupAdd"]) + + def delete(self, id: str): + """Delete a given group from OpenCTI + + :param id: ID of the group to delete. + :type id: str + """ + self.opencti.admin_logger.info("Deleting group", {"id": id}) + query = """ + mutation GroupDelete($id: ID!) { + groupEdit(id: $id) { + delete + } + } + """ + self.opencti.query(query, {"id": id}) + + def update_field(self, **kwargs) -> Optional[Dict]: + """Update a group using fieldPatch + + :param id: ID of the group to update + :type id: str + :param input: FieldPatchInput object to edit group + :type input: List[Dict] + :param customAttributes: Custom attributes to retrieve from group + :type customAttribues: str, optional + :return: Representation of a group + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + input = kwargs.get("input", None) + custom_attributes = kwargs.get("customAttributes", None) + + if id is None or input is None: + self.opencti.admin_logger.error( + "[opencti_group] Missing parameters: id and input" + ) + return None + + self.opencti.admin_logger.info("Editing group with input", {"input": input}) + query = ( + """ + mutation GroupEdit($id: ID!, $input:[EditInput]!) { + groupEdit(id: $id) { + fieldPatch(input: $input) { + """ + + (self.properties if custom_attributes is None else custom_attributes) + + """ + } + } + } + """ + ) + result = self.opencti.query(query, {"id": id, "input": input}) + return self.opencti.process_multiple_fields( + result["data"]["groupEdit"]["fieldPatch"] + ) + + def add_member(self, **kwargs) -> Optional[Dict]: + """Add a member to a given group. + + :param id: ID of the group to add a member to + :type id: str + :param user_id: ID to add to the group + :type user_id: str + :return: Representation of the relationship + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + user_id = kwargs.get("user_id", None) + + if id is None or user_id is None: + self.opencti.admin_logger.error( + "[opencti_group] Missing parameters: id and user_id" + ) + return None + + self.opencti.admin_logger.info( + "Adding member to group", {"groupId": id, "userId": user_id} + ) + query = """ + mutation GroupEditMemberAdd($groupId: ID!, $userId: ID!) { + groupEdit(id: $groupId) { + relationAdd(input: { + fromId: $userId, + relationship_type: "member-of" + }) { + id, standard_id, entity_type, created_at, updated_at + from { + id, entity_type + } + + to { + id, entity_type + } + } + } + } + """ + result = self.opencti.query(query, {"groupId": id, "userId": user_id}) + return self.opencti.process_multiple_fields( + result["data"]["groupEdit"]["relationAdd"] + ) + + def delete_member(self, **kwargs) -> Optional[Dict]: + """Remove a given user from a group + + :param id: ID to remove a user from + :type id: str + :param user: ID to remove from the group + :type user: str + :return: Representation of the group after the member has been removed + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + user_id = kwargs.get("user_id", None) + + if id is None or user_id is None: + self.opencti.admin_logger.error( + "[opencti_group] Missing parameters: id and user_id" + ) + return None + + self.opencti.admin_logger.info( + "Removing member from group", {"groupId": id, "userId": user_id} + ) + query = ( + """ + mutation GroupEditMemberDelete ($groupId: ID!, $userId: StixRef!) { + groupEdit(id: $groupId) { + relationDelete(fromId: $userId, relationship_type: "member-of") { + """ + + self.properties + + """ + } + } + } + """ + ) + result = self.opencti.query(query, {"groupId": id, "userId": user_id}) + return self.opencti.process_multiple_fields( + result["data"]["groupEdit"]["relationDelete"] + ) + + def add_role(self, **kwargs) -> Optional[Dict]: + """Add a role to a given group + + :param id: ID to add a role to + :type id: str + :param role_id: Role ID to add to the group + :type role: str + :return: Representation of the group after a role has been added + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + role_id = kwargs.get("role_id", None) + + if id is None or role_id is None: + self.opencti.admin_logger.error( + "[opencti_group] Missing parameters: id and role_id" + ) + return None + + self.opencti.admin_logger.info( + "Adding role to group", {"groupId": id, "roleId": role_id} + ) + query = """ + mutation GroupEditRoleAdd($groupId: ID!, $roleId: ID!) { + groupEdit(id: $groupId) { + relationAdd(input: { + toId: $roleId, relationship_type: "has-role" + }) { + id, standard_id, entity_type, created_at, updated_at + from { + id, entity_type + } + + to { + id, entity_type + } + } + } + } + """ + result = self.opencti.query(query, {"groupId": id, "roleId": role_id}) + return self.opencti.process_multiple_fields( + result["data"]["groupEdit"]["relationAdd"] + ) + + def delete_role(self, **kwargs) -> Optional[Dict]: + """Removes a role from a given group + + :param id: ID to remove role from + :type id: str + :param role_id: Role ID to remove from the group + :type role_id: str + :return: Representation of the group after role is removed + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + role_id = kwargs.get("role_id", None) + + if id is None or role_id is None: + self.opencti.admin_logger.error( + "[opencti_group] Missing parameters: id and role_id" + ) + return None + + self.opencti.admin_logger.info( + "Removing role from group", {"groupId": id, "roleId": role_id} + ) + query = ( + """ + mutation GroupEditRoleDelete($groupId: ID!, $roleId: StixRef!) { + groupEdit(id: $groupId) { + relationDelete(toId: $roleId, relationship_type: "has-role") { + """ + + self.properties + + """ + } + } + } + """ + ) + result = self.opencti.query(query, {"groupId": id, "roleId": role_id}) + return self.opencti.process_multiple_fields( + result["data"]["groupEdit"]["relationDelete"] + ) + + def edit_default_marking(self, **kwargs) -> Optional[Dict]: + """Adds a default marking to the group. + + :param id: ID of the group. + :type id: str + :param marking_ids: IDs of the markings to add, or an empty list to + remove all default markings + :type marking_ids: List[str] + :param entity: STIX entity type to add default + marking for. If set to "GLOBAL" applies to all entity types, + defaults to "GLOBAL". + :type entity: str, optional + :return: Group after adding the default marking. + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + marking_ids = kwargs.get("marking_ids", None) + entity_type = kwargs.get("entity_type", "GLOBAL") + + if id is None or marking_ids is None: + self.opencti.admin_logger.error( + "[opencti_group] Missing parameters: id and marking_ids" + ) + return None + + self.opencti.admin_logger.info( + "Setting default markings for entity on group", + {"markings": marking_ids, "entity_type": entity_type, "groupId": id}, + ) + query = ( + """ + mutation GroupEditEditDefaultMarking($id: ID!, $entity_type: String!, $values: [String!]) { + groupEdit(id: $id) { + editDefaultMarking(input: { + entity_type: $entity_type, + values: $values + }) { + """ + + self.properties + + """ + } + } + } + """ + ) + result = self.opencti.query( + query, {"id": id, "entity_type": entity_type, "values": marking_ids} + ) + return self.opencti.process_multiple_fields( + result["data"]["groupEdit"]["editDefaultMarking"] + ) + + def add_allowed_marking(self, **kwargs) -> Optional[Dict]: + """Allow a group to access a marking + + :param id: ID of group to authorise + :type id: str + :param marking_id: ID of marking to authorise + :type marking_id: str + :return: Relationship from the group to the marking definition + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + marking_id = kwargs.get("marking_id", None) + + if id is None or marking_id is None: + self.opencti.admin_logger.error( + "[opencti_group] Missing parameters: id and marking_id" + ) + return None + + self.opencti.admin_logger.info( + "Granting group access to marking definition", + {"groupId": id, "markingId": marking_id}, + ) + query = """ + mutation GroupEditionMarkingsMarkingDefinitionsRelationAddMutation( + $id: ID! + $input: InternalRelationshipAddInput! + ) { + groupEdit(id: $id) { + relationAdd(input: $input) { + from { + __typename + ...GroupEditionMarkings_group + id + } + id + } + } + } + + fragment GroupEditionMarkings_group on Group { + id + default_assignation + allowed_marking { + id + } + not_shareable_marking_types + max_shareable_marking { + id + definition + definition_type + x_opencti_order + } + default_marking { + entity_type + values { + id + } + } + } + """ + result = self.opencti.query( + query, + { + "id": id, + "input": {"relationship_type": "accesses-to", "toId": marking_id}, + }, + ) + return self.opencti.process_multiple_fields( + result["data"]["groupEdit"]["relationAdd"] + ) + + def delete_allowed_marking(self, **kwargs) -> Optional[Dict]: + """Removes access to a marking for a group + + :param id: ID of group to forbid + :type id: str + :param marking_id: ID of marking to deny + :type marking_id: str + :return: Group after denying access to marking definition + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + marking_id = kwargs.get("marking_id", None) + + if id is None or marking_id is None: + self.opencti.admin_logger.error( + "[opencti_group] Missing parameters: id and marking_id" + ) + return None + + self.opencti.admin_logger.info( + "Forbidding group access to marking definition", + {"groupId": id, "markingId": marking_id}, + ) + query = ( + """ + mutation GroupEditMarkingRemove($groupId: ID!, $markingId: StixRef!) { + groupEdit(id: $groupId) { + relationDelete(toId: $markingId, relationship_type: "accesses-to") { + """ + + self.properties + + """ + } + } + } + """ + ) + result = self.opencti.query(query, {"groupId": id, "markingId": marking_id}) + return self.opencti.process_multiple_fields( + result["data"]["groupEdit"]["relationDelete"] + ) diff --git a/pycti/entities/opencti_role.py b/pycti/entities/opencti_role.py new file mode 100644 index 000000000..d2fec4a9d --- /dev/null +++ b/pycti/entities/opencti_role.py @@ -0,0 +1,405 @@ +from typing import Dict, List, Optional + + +class Role: + """Representation of a role in OpenCTI + + Roles can have capabilities. Groups have roles, and the combined + capabilities of those roles determine what a group of users can do on the + platform. + + Check the properties attribute of the class to understand what default + properties are fetched. + """ + + def __init__(self, opencti): + self.opencti = opencti + self.properties = """ + id + standard_id + entity_type + parent_types + + name + description + + created_at + updated_at + + capabilities { + id + name + description + } + can_manage_sensitive_config + """ + + def list(self, **kwargs) -> List[Dict]: + """Search or list the roles on the server. + + :param search: + Defaults to None. + :type search: str, optional + :param first: Defaults to 500 Return the first x results from ID or + beginning if $after is not specified. + :type first: int, optional + :param after: Return all results after the given ID, useful for + pagination. Ignored if returning all results, defaults to None. + :type after: str, optional + :param orderBy: Field to order by. Must be one of "name", + "created_at", "updated_at", or "_score". Defaults + to "name", defaults to "name". + :type orderBy: str, optional + :param orderMode: Direction to order in, either "asc" or "desc", + defaults to "asc". + :type orderMode: str, optional + :param customAttributes: Defaults to None. Custom attributes to return + from query. If None, defaults are used. + :type customAttributes: str, optional + :param getAll: Defaults to False. Retrieve all results. If true then + the "first" param is ignored. + :type getAll: bool, optional + :param withPagination: Defaults to False Whether to include pagination + pageInfo properties in result. + :type withPagination: bool, optional + + :return: List of Python dictionaries with the properties of the role. + :rtype: List[Dict] + """ + search = kwargs.get("search", None) + first = kwargs.get("first", 500) + after = kwargs.get("after", None) + orderBy = kwargs.get("orderBy", None) + orderMode = kwargs.get("orderMode", None) + custom_attributes = kwargs.get("customAttributes", None) + getAll = kwargs.get("getAll", False) + with_pagination = kwargs.get("withPagination", False) + + self.opencti.admin_logger.info( + "Searching roles matching search term", {"search": search} + ) + + if getAll: + first = 100 + + query = ( + """ + query RoleList($first: Int, $after: ID, $orderBy: RolesOrdering, $orderMode: OrderingMode, $search: String) { + roles(first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode, search: $search) { + edges { + node { + """ + + (self.properties if custom_attributes is None else custom_attributes) + + """ + } + } + pageInfo { + startCursor + endCursor + hasNextPage + hasPreviousPage + globalCount + } + } + } + """ + ) + result = self.opencti.query( + query, + { + "first": first, + "after": after, + "orderBy": orderBy, + "orderMode": orderMode, + "search": search, + }, + ) + if getAll: + final_data = [] + data = self.opencti.process_multiple(result["data"]["roles"]) + final_data = final_data + data + while result["data"]["roles"]["pageInfo"]["hasNextPage"]: + after = result["data"]["roles"]["pageInfo"]["endCursor"] + result = self.opencti.query( + query, + { + "first": first, + "after": after, + "orderBy": orderBy, + "orderMode": orderMode, + "search": search, + }, + ) + data = self.opencti.process_multiple(result["data"]["roles"]) + final_data = final_data + data + return final_data + else: + return self.opencti.process_multiple( + result["data"]["roles"], with_pagination + ) + + def read(self, **kwargs) -> Optional[Dict]: + """Get a role given its ID or a search term + + One of id or search must be provided. + + :param id: ID of the role on the platform + :type id: str, optional + :param search: Search term for a role, e.g. its name + :type search: str, optional + :param customAttributes: Custom attributes on the role to return + :type customAttributes: str, optional + + :return: Representation of the role + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + search = kwargs.get("search", None) + custom_attributes = kwargs.get("customAttributes", None) + + if id is not None: + self.opencti.admin_logger.info("Reading role", {"id": id}) + query = ( + """ + query RoleRead($id: String!) { + role(id: $id) { + """ + + (self.properties if custom_attributes is None else custom_attributes) + + """ + } + } + """ + ) + result = self.opencti.query(query, {"id": id}) + return self.opencti.process_multiple_fields(result["data"]["role"]) + elif search is not None: + result = self.list(search=search) + return result[0] if len(result) > 0 else None + else: + self.opencti.admin_logger.error( + "[opencti_role] Missing parameters: id or search" + ) + return None + + def delete(self, **kwargs): + """Delete a role given its ID + + :param id: ID for the role on the platform. + :type id: str + """ + id = kwargs.get("id", None) + + if id is None: + self.opencti.admin_logger.error("[opencti_role] Missing parameter: id") + return None + + self.opencti.admin_logger.info("Deleting role", {"id": id}) + query = """ + mutation RoleDelete($id: ID!) { + roleEdit(id: $id) { + delete + } + } + """ + self.opencti.query(query, {"id": id}) + + def create(self, **kwargs) -> Optional[Dict]: + """Add a new role to OpenCTI. + + :param name: Name to assign to the role. + :type name: str + :param description: Optional. Description of the role, defaults to + None. + :type description: str, optional + :param customAttributes: Custom attributes to return on role + :type customAttributes: str, optional + :return: Representation of the role. + :rtype: Optional[Dict] + """ + name = kwargs.get("name", None) + description = kwargs.get("description", None) + custom_attributes = kwargs.get("customAttributes", None) + + if name is None: + self.opencti.admin_logger.error("[opencti_role] Missing parameter: name") + return None + + self.opencti.admin_logger.info( + "Creating new role", {"name": name, "description": description} + ) + query = ( + """ + mutation RoleCreate($input: RoleAddInput!) { + roleAdd(input: $input) { + """ + + (self.properties if custom_attributes is None else custom_attributes) + + """ + } + } + """ + ) + result = self.opencti.query( + query, {"input": {"name": name, "description": description}} + ) + return self.opencti.process_multiple_fields(result["data"]["roleAdd"]) + + def update_field(self, **kwargs) -> Optional[Dict]: + """Updates a given role with the given inputs + + Example of input:: + + [ + { + "key": "name", + "value": "NewCustomRole" + }, + { + "key": "can_manage_sensitive_config", + "value": False + } + ] + + :param id: ID for the role on the platform + :type id: str + :param input: List of EditInput objects + :type input: List[Dict] + :param customAttributes: Custom attributes to return on the role + :type customAttributes: str, optional + + :return: Representation of the role + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + input = kwargs.get("input", None) + custom_attributes = kwargs.get("customAttributes", None) + + if id is None or input is None: + self.opencti.admin_logger.error( + "[opencti_role] Missing parameters: id and input" + ) + return None + + self.opencti.admin_logger.info( + "Editing role with input", {"id": id, "input": input} + ) + query = ( + """ + mutation RoleUpdate($id: ID!, $input: [EditInput]!) { + roleEdit(id: $id) { + fieldPatch(input: $input) { + """ + + (self.properties if custom_attributes is None else custom_attributes) + + """ + } + } + } + """ + ) + result = self.opencti.query(query, {"id": id, "input": input}) + return self.opencti.process_multiple_fields( + result["data"]["roleEdit"]["fieldPatch"] + ) + + def add_capability(self, **kwargs) -> Optional[Dict]: + """Adds a capability to a role + + :param id: ID of the role. + :type id: str + :param capability_id: ID of the capability to add. + :type capability_id: str + :return: Representation of the relationship, including the role and + capability + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + capability_id = kwargs.get("capability_id", None) + + if id is None or capability_id is None: + self.opencti.admin_logger( + "[opencti_role] Missing parameters: id and capability_id" + ) + return None + + self.opencti.admin_logger.info( + "Adding capability to role", {"roleId": id, "capabilityId": capability_id} + ) + query = ( + """ + mutation RoleEditAddCapability($id: ID!, $input: InternalRelationshipAddInput!) { + roleEdit(id: $id) { + relationAdd(input: $input) { + id + entity_type + parent_types + created_at + updated_at + + from { + ... on Role { + """ + + self.properties + + """ + } + } + + to { + ... on Capability { + id, name, description + } + } + } + } + } + """ + ) + result = self.opencti.query( + query, + { + "id": id, + "input": {"relationship_type": "has-capability", "toId": capability_id}, + }, + ) + return self.opencti.process_multiple_fields( + result["data"]["roleEdit"]["relationAdd"] + ) + + def delete_capability(self, **kwargs) -> Optional[Dict]: + """Removes a capability from a role + + :param id: ID of the role + :type id: str + :param capability_id: ID of the capability to remove + :type capability_id: str + :return: Representation of the role after removing the capability + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + capability_id = kwargs.get("capability_id", None) + + if id is None or capability_id is None: + self.opencti.admin_logger.error( + "[opencti_role] Missing parameters: id and capability_id" + ) + return None + + self.opencti.admin_logger.info( + "Removing capability from role", + {"roleId": id, "capabilityId": capability_id}, + ) + query = ( + """ + mutation RoleEditDeleteCapability($id: ID!, $toId: StixRef!) { + roleEdit(id: $id) { + relationDelete(toId: $toId, relationship_type: "has-capability") { + """ + + self.properties + + """ + } + } + } + """ + ) + result = self.opencti.query(query, {"id": id, "toId": capability_id}) + return self.opencti.process_multiple_fields( + result["data"]["roleEdit"]["relationDelete"] + ) diff --git a/pycti/entities/opencti_settings.py b/pycti/entities/opencti_settings.py new file mode 100644 index 000000000..6fb42ade7 --- /dev/null +++ b/pycti/entities/opencti_settings.py @@ -0,0 +1,378 @@ +from typing import Dict, Optional + + +class Settings: + """Represents the Settings object in OpenCTI + + These are the properties which are viewable in the customization and + security policies views on OpenCTI platform. This also includes all + messages on the platform. + + See the properties attribute to understand which properties are fetched by + default on graphql queries. + """ + + def __init__(self, opencti): + self.opencti = opencti + self.properties = """ + id + standard_id + entity_type + parent_types + + platform_organization { + id, name, description + } + + platform_title + platform_favicon + platform_email + platform_url + platform_language + + platform_cluster { + instances_number + } + + platform_modules { + id, enable, warning + } + + platform_providers { + name, type, strategy, provider + } + + platform_user_statuses { + status, message + } + + platform_theme + platform_theme_dark_background + platform_theme_dark_paper + platform_theme_dark_nav + platform_theme_dark_primary + platform_theme_dark_secondary + platform_theme_dark_accent + platform_theme_dark_logo + platform_theme_dark_logo_collapsed + platform_theme_dark_logo_login + platform_theme_light_background + platform_theme_light_paper + platform_theme_light_nav + platform_theme_light_primary + platform_theme_light_secondary + platform_theme_light_accent + platform_theme_light_logo + platform_theme_light_logo_collapsed + platform_theme_light_logo_login + platform_map_tile_server_dark + platform_map_tile_server_light + + platform_openbas_url + platform_openbas_disable_display + + platform_openerm_url + platform_openmtd_url + + platform_ai_enabled + platform_ai_type + platform_ai_model + platform_ai_has_token + + platform_login_message + platform_consent_message + platform_consent_confirm_text + + platform_banner_text + platform_banner_level + + platform_session_idle_timeout + platform_session_timeout + + platform_whitemark + platform_demo + platform_reference_attachment + + platform_feature_flags { + id, enable, warning + } + + platform_critical_alerts { + message, type + details { + groups { + id, name, description + } + } + } + + platform_trash_enabled + + platform_protected_sensitive_config { + enabled + markings { + enabled, protected_ids + } + groups { + enabled, protected_ids + } + roles { + enabled, protected_ids + } + rules { + enabled, protected_ids + } + ce_ee_toggle { + enabled, protected_ids + } + file_indexing { + enabled, protected_ids + } + platform_organization { + enabled, protected_ids + } + } + + created_at + updated_at + enterprise_edition + analytics_google_analytics_v4 + + activity_listeners { + id, name, entity_type + } + """ + self.messages_properties = """ + platform_messages { + id, message, activated, dismissible, updated_at, color + recipients { + id, name, entity_type + } + } + messages_administration { + id, message, activated, dismissible, updated_at, color + recipients { + id, name, entity_type + } + } + """ + self.password_policy_properties = """ + otp_mandatory + password_policy_min_length + password_policy_max_length + password_policy_min_symbols + password_policy_min_numbers + password_policy_min_words + password_policy_min_lowercase + password_policy_min_uppercase + """ + + self.editable_properties = ( + """ + id + + platform_organization { + id + } + + platform_title + platform_favicon + platform_email + platform_language + + platform_theme + platform_theme_dark_background + platform_theme_dark_paper + platform_theme_dark_nav + platform_theme_dark_primary + platform_theme_dark_secondary + platform_theme_dark_accent + platform_theme_dark_logo + platform_theme_dark_logo_collapsed + platform_theme_dark_logo_login + platform_theme_light_background + platform_theme_light_paper + platform_theme_light_nav + platform_theme_light_primary + platform_theme_light_secondary + platform_theme_light_accent + platform_theme_light_logo + platform_theme_light_logo_collapsed + platform_theme_light_logo_login + + platform_login_message + platform_consent_message + platform_consent_confirm_text + platform_banner_text + platform_banner_level + + platform_whitemark + analytics_google_analytics_v4 + enterprise_edition + """ + + self.password_policy_properties + ) + + def read(self, **kwargs) -> Dict: + """Reads settings from the platform + + :param customAttributes: Custom attribues to return from query + :type customAttributes: str, optional + :param include_password_policy: Defaults to False. Whether to include + password policy properties in response. + :type include_password_policy: bool, optional + :param include_messages: Defaults to False. Whether to include messages + in query response. + :type include_messages: bool, optional + :return: Representation of the platform settings + :rtype: Dict + """ + custom_attributes = kwargs.get("customAttributes", None) + include_password_policy = kwargs.get("include_password_policy", False) + include_messages = kwargs.get("include_messages", False) + + self.opencti.admin_logger.info("Reading platform settings") + query = ( + """ + query PlatformSettings { + settings { + """ + + (self.properties if custom_attributes is None else custom_attributes) + + (self.password_policy_properties if include_password_policy else "") + + (self.messages_properties if include_messages else "") + + """ + } + } + """ + ) + result = self.opencti.query(query) + return self.opencti.process_multiple_fields(result["data"]["settings"]) + + def update_field(self, **kwargs) -> Optional[Dict]: + """Update settings using input to fieldPatch + + :param id: ID of the settings object to update + :type id: str + :param input: List of EditInput objects + :type input: List[Dict] + :param customAttributes: Custom attribues to return from query + :type customAttributes: str, optional + :param include_password_policy: Defaults to False. Whether to include + password policy properties in response. + :type include_password_policy: bool, optional + :param include_messages: Defaults to False. Whether to include messages + in query response. + :type include_messages: bool, optional + :return: Representation of the platform settings + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + input = kwargs.get("input", None) + custom_attributes = kwargs.get("customAttributes", None) + include_password_policy = kwargs.get("include_password_policy", False) + include_messages = kwargs.get("include_messages", False) + + if id is None or input is None: + self.opencti.admin_logger.error( + "[opencti_settings] Missing parameters: id and input" + ) + return None + + self.opencti.admin_logger.info( + "Updating settings with input", {"id": id, "input": input} + ) + query = ( + """ + mutation SettingsUpdateField($id: ID!, $input: [EditInput]!) { + settingsEdit(id: $id) { + fieldPatch(input: $input) { + """ + + (self.properties if custom_attributes is None else custom_attributes) + + (self.password_policy_properties if include_password_policy else "") + + (self.messages_properties if include_messages else "") + + """ + } + } + } + """ + ) + result = self.opencti.query(query, {"id": id, "input": input}) + return self.opencti.process_multiple_fields( + result["data"]["settingsEdit"]["fieldPatch"] + ) + + def edit_message(self, **kwargs) -> Optional[Dict]: + """Edit or add a message to the platform + + To add a message, don't include an ID in the input object. To edit a + message an ID must be provided. + + :param id: ID of the settings object on the platform + :type id: str + :param input: SettingsMessageInput object + :type input: Dict + :return: Settings ID and message objects + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + input = kwargs.get("input", None) + if id is None or input is None: + self.opencti.admin_logger.error( + "[opencti_settings] Missing parameters: id and input" + ) + return None + self.opencti.admin_logger.info("Editing message", {"id": id, "input": input}) + + query = ( + """ + mutation SettingsEditMessage($id: ID!, $input: SettingsMessageInput!) { + settingsEdit(id: $id) { + editMessage(input: $input) { + id + """ + + self.messages_properties + + """ + } + } + } + """ + ) + result = self.opencti.query(query, {"id": id, "input": input}) + return self.opencti.process_multiple_fields( + result["data"]["settingsEdit"]["editMessage"] + ) + + def delete_message(self, **kwargs) -> Optional[Dict]: + """Delete a message from the platform + + :param id: ID of the settings object on the platform + :type id: str + :param input: ID of the message to delete + :type input: str + :return: Settings ID and message objects + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + input = kwargs.get("input", None) + if id is None: + self.opencti.admin_logger.info("[opencti_settings] Missing parameters: id") + return None + + query = ( + """ + mutation SettingsEditDeleteMessage($id: ID!, $input: String!) { + settingsEdit(id: $id) { + deleteMessage(input: $input) { + id + """ + + self.messages_properties + + """ + } + } + } + """ + ) + result = self.opencti.query(query, {"id": id, "input": input}) + return self.opencti.process_multiple_fields( + result["data"]["settingsEdit"]["deleteMessage"] + ) diff --git a/pycti/entities/opencti_user.py b/pycti/entities/opencti_user.py new file mode 100644 index 000000000..ecec35c50 --- /dev/null +++ b/pycti/entities/opencti_user.py @@ -0,0 +1,803 @@ +import secrets +from typing import Dict, List, Optional + + +class User: + """Representation of a user on the OpenCTI platform + + Users can be member of multiple groups, from which its permissions + (capabilities) are derived. Additionally, users are part of organisations, + and sometimes administrating them (Enterprise edition). + + They have configured confidence, and an effective confidence (which might + be set by the group). + + You can view the properties, token_properties, session_properties, and + me_properties attributes of a User object to view what attributes will be + present in a User or MeUser object. + """ + + def __init__(self, opencti): + self.opencti = opencti + self.properties = """ + id + standard_id + individual_id + user_email + firstname + lastname + name + description + + language + theme + unit_system + + external + restrict_delete + + account_status + account_lock_after_date + + entity_type + parent_types + created_at + updated_at + + unit_system + submenu_show_icons + submenu_auto_collapse + monochrome_labels + + roles { + id, name, description + capabilities { + id, name + } + } + + groups { + edges { + node { + id, name, description + } + } + } + + objectOrganization { + edges { + node { + id, is_inferred, name, description + } + } + } + + administrated_organizations { + id, name, description + } + + user_confidence_level { + max_confidence + overrides { + entity_type, max_confidence + } + } + effective_confidence_level { + max_confidence + source { + type + object { + ... on Group { + id, name + } + } + } + overrides { + entity_type, max_confidence + source { + type + object { + ... on Group { + id, name + } + } + } + } + } + """ + + self.token_properties = """ + api_token + """ + + self.session_properties = """ + sessions { + id, created, ttl, originalMaxAge + } + """ + + self.me_properties = """ + id + individual_id + user_email + firstname + lastname + name + description + + theme + language + unit_system + submenu_show_icons + submenu_auto_collapse + + entity_type + parent_types + created_at + updated_at + + objectOrganization { + edges { + node { + id, name + } + } + } + + administrated_organizations { + id, name + } + + capabilities { + id, name, description + } + + groups { + edges { + node { + id, name, description + } + } + } + + effective_confidence_level { + max_confidence + source { + type + object { + ... on Group { + id, name + } + } + } + overrides { + entity_type, max_confidence + source { + type + object { + ... on Group { + id, name + } + } + } + } + } + """ + + def list(self, **kwargs) -> List[Dict]: + """Search/list users on the platform + + Searches users given some conditions. Defaults to listing all users. + + :param first: Defaults to 500. Retrieve this number of results. + :type first: int, optional + :param after: Retrieves all results after the user with this ID. + Ignored if None, empty, or if fetching all results, defaults to + None. + :type after: str, optional + :param orderBy: Orders results by this field. + Can be one of user, user_email, firstname, lastname, language, + external, created_at, updated_at, or _score, defaults to "name". + :type orderBy: str, optional + :param orderMode: Ordering direction. Must be one + of "asc" or "desc", defaults to "asc". + :type orderMode: str, optional + :param filters: OpenCTI API FilterGroup object. + This is an advanced parameter. To learn more please search for + the FilterGroup object in the OpenCTI GraphQL Playground, defaults + to {}. + :type filters: dict, optional + :param search: String to search for when listing + users, defaults to None. + :type search: str, optional + :param include_sessions: Whether or not to + include a list of sessions with results, defaults to False. + :type include_sessions: bool, optional + :param customAttributes: Custom attributes to fetch from the GraphQL + query + :type customAttributes: str, optional + :param getAll: Defaults to False. Whether or not to get all results + from the search. If True then param first is ignored. + :type getAll: bool, optional + :param withPagination: Defaults to False. Whether to return pagination + info with results. + :type withPagination: bool, optional + :return: Returns a list of users, sorted as specified. + :rtype: list[dict] + """ + first = kwargs.get("first", 500) + after = kwargs.get("after", None) + orderBy = kwargs.get("orderBy", "name") + orderMode = kwargs.get("orderMode", "asc") + filters = kwargs.get("filters", None) + search = kwargs.get("search", None) + include_sessions = kwargs.get("include_sessions", False) + custom_attributes = kwargs.get("customAttributes", None) + getAll = kwargs.get("getAll", False) + with_pagination = kwargs.get("withPagination", False) + + if getAll: + first = 100 + + self.opencti.admin_logger.info( + "Fetching users with filters", {"filters": filters} + ) + query = ( + """ + query UserList($first: Int, $after: ID, $orderBy: UsersOrdering, $orderMode: OrderingMode, $filters: FilterGroup, $search: String) { + users(first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode, filters: $filters, search: $search) { + edges { + node { + """ + + (self.properties if custom_attributes is None else custom_attributes) + + (self.session_properties if include_sessions else "") + + """ + } + } + + pageInfo { + startCursor, endCursor, hasNextPage, hasPreviousPage + globalCount + } + } + } + """ + ) + result = self.opencti.query( + query, + { + "first": first, + "after": after, + "orderBy": orderBy, + "orderMode": orderMode, + "filters": filters, + "search": search, + }, + ) + + if getAll: + final_data = [] + data = self.opencti.process_multiple(result["data"]["users"]) + final_data = final_data + data + while result["data"]["users"]["pageInfo"]["hasNextPage"]: + after = result["data"]["users"]["pageInfo"]["endCursor"] + result = self.opencti.query( + query, + { + "first": first, + "after": after, + "orderBy": orderBy, + "orderMode": orderMode, + "filters": filters, + "search": search, + }, + ) + data = self.opencti.process_multiple(result["data"]["users"]) + final_data = final_data + data + return final_data + else: + return self.opencti.process_multiple( + result["data"]["users"], with_pagination + ) + + def read(self, **kwargs) -> Optional[Dict]: + """Reads user details from the platform. + + :param id: ID of the user to fetch + :type id: str, optional + :param include_sessions: Whether or not to + include a list of sessions for the given user, defaults to False. + :type include_sessions: bool, optional + :param include_token: Whether or not to include + the user's API token, defaults to False. + :type include_token: bool, optional + :param customAttributes: Custom attributes to include instead of the + defaults + :type customAttribues: str, optional + :param filters: Filters to apply to find a single user + :type filters: dict, optional + :param search: Search term to use to find a single user + :type search: str, optional + :return: Representation of the user as a Python dictionary. + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + include_sessions = kwargs.get("include_sessions", False) + include_token = kwargs.get("include_token", False) + custom_attributes = kwargs.get("customAttributes", None) + filters = kwargs.get("filters", None) + search = kwargs.get("search", None) + if id is not None: + self.opencti.admin_logger.info("Fetching user with ID", {"id": id}) + query = ( + """ + query UserRead($id: String!) { + user(id: $id) { + """ + + (self.properties if custom_attributes is None else custom_attributes) + + (self.token_properties if include_token else "") + + (self.session_properties if include_sessions else "") + + """ + } + } + """ + ) + result = self.opencti.query(query, {"id": id}) + return self.opencti.process_multiple_fields(result["data"]["user"]) + elif filters is not None or search is not None: + results = self.list( + filters=filters, + search=search, + include_sessions=include_sessions, + customAttributes=custom_attributes, + ) + user = results[0] if results else None + if not include_token or user is None: + return user + else: + return self.read( + id=user["id"], + include_sessions=include_sessions, + include_token=include_token, + customAttributes=custom_attributes, + ) + else: + self.opencti.admin_logger.error( + "[opencti_user] Missing paramters: id, search, or filters" + ) + return None + + def create(self, **kwargs) -> Optional[Dict]: + """Creates a new user with basic details + + Note that when SSO is connected users generally do not need to be + manually created. + + Additionally note that if there is no password passed to this function + then a random password will be created and will not be returned. This + is useful for creating service accounts and connector accounts. + + :param name: Name to assign to the user. + :type name: str + :param user_email: Email address for the user. + :type user_email: str + :param password: Password that should be assigned + to the user. If one is not provided then a random one will be + generated, defaults to None. + :type password: str, optional + :param firstname: First name of the user + :type firstname: str, optional + :param lastname: Last name of the user + :type lastname: str, optional + :param description: Description for the user + :type description: str, optional + :param language: Language the user should use + :type language: str, optional + :param theme: Theme to set for the user, either light or dark + :type theme: str, optional + :param objectOrganization: List of organization IDs to add the user to + :type objectOgranization: List[str], optional + :param account_status: The status of the account: Active, Expired, + Inactive, or Locked + :type account_status: str, optional + :param account_lock_after_date: ISO 8901 of when account should be + locked + :type account_lock_after_date: str, optional + :param unit_system: Unit system for the user, metric or imperial + :type unit_system: str, optional + :param submenu_show_icons: Defaults to False. Whether or not to show + icons in submenus on the left hand menu bar in the UI + :type submenu_show_icons: bool, optional + :param submenu_auto_collaps: Defaults to False. Whether to auto- + collapse the left hand menu bar in the UI + :type submenu_auto_collapse: bool, optional + :param monochrome_labels: Defaults to False. Whether to ignore colours + and just show entity labels in monochrome. + :type monochrome_labels: bool, optional + :param groups: List of group IDs to add the user to + :type groups: List[str], optional + :param user_confidence_level: Confidence level object to assign to the + user. This may not impact effective confidence depending on group + membership. + :type user_confidence_level: Dict + :param customAttributes: Custom attributes to return for the user + :type customAttributes: str, optional + :param include_token: Defaults to False. Whether to include the API + token for the new user in the response. + :type include_token: bool, optional + :return: Representation of the user without sessions or API token. + :rtype: Optional[Dict] + """ + name = kwargs.get("name", None) + user_email = kwargs.get("user_email", None) + password = kwargs.get("password", None) + firstname = kwargs.get("firstname", None) + lastname = kwargs.get("lastname", None) + description = kwargs.get("description", None) + language = kwargs.get("language", None) + theme = kwargs.get("theme", None) + object_organization = kwargs.get("objectOrganization", None) + account_status = kwargs.get("account_status", None) + account_lock_after_date = kwargs.get("account_lock_after_date", None) + unit_system = kwargs.get("unit_system", None) + submenu_show_icons = kwargs.get("submenu_show_icons", False) + submenu_auto_collapse = kwargs.get("submenu_auto_collapse", False) + monochrome_labels = kwargs.get("monochrome_labels", False) + groups = kwargs.get("groups", None) + user_confidence_level = kwargs.get("user_confidence_level", None) + custom_attributes = kwargs.get("customAttributes", None) + include_token = kwargs.get("include_token", False) + + if name is None or user_email is None: + self.opencti.admin_logger.error( + "[opencti_user] Missing parameters: name and user_email" + ) + return None + + self.opencti.admin_logger.info( + "Creating a new user", {"name": name, "email": user_email} + ) + if password is None: + self.opencti.admin_logger.info( + "Generating random password for user", + {"name": name, "user_email": user_email}, + ) + password = secrets.token_urlsafe(64) + query = ( + """ + mutation UserAdd($input: UserAddInput!) { + userAdd(input: $input) { + """ + + (self.properties if custom_attributes is None else custom_attributes) + + (self.token_properties if include_token else "") + + """ + } + } + """ + ) + result = self.opencti.query( + query, + { + "input": { + "user_email": user_email, + "name": name, + "password": password, + "firstname": firstname, + "lastname": lastname, + "description": description, + "language": language, + "theme": theme, + "objectOrganization": object_organization, + "account_status": account_status, + "account_lock_after_date": account_lock_after_date, + "unit_system": unit_system, + "submenu_show_icons": submenu_show_icons, + "submenu_auto_collapse": submenu_auto_collapse, + "monochrome_labels": monochrome_labels, + "groups": groups, + "user_confidence_level": user_confidence_level, + } + }, + ) + return self.opencti.process_multiple_fields(result["data"]["userAdd"]) + + def delete(self, **kwargs): + """Deletes the given user from the platform. + + :param id: ID of the user to delete. + :type id: str + """ + id = kwargs.get("id", None) + if id is None: + self.opencti.admin_logger.error("[opencti_user] Missing parameter: id") + return None + + self.opencti.admin_logger.info("Deleting user", {"id": id}) + query = """ + mutation DeleteUser($id: ID!) { + userEdit(id: $id) { + delete + } + } + """ + self.opencti.query(query, {"id": id}) + + def me(self, **kwargs) -> Dict: + """Reads the currently authenticated user. + + :param include_token: Whether to inclued the API + token of the currently authenticated user, defaults to False. + :type include_token: bool, optional + :param customAttributes: Custom attributes to return on the User + :type customAttributes: str, optional + :return: Representation of the user. + :rtype: dict + """ + include_token = kwargs.get("include_token", False) + custom_attributes = kwargs.get("customAttributes", None) + + self.opencti.admin_logger.info("Reading MeUser") + query = ( + """ + query Me { + me { + """ + + (self.me_properties if custom_attributes is None else custom_attributes) + + (self.token_properties if include_token else "") + + """ + } + } + """ + ) + result = self.opencti.query(query) + return self.opencti.process_multiple_fields(result["data"]["me"]) + + def update_field(self, **kwargs) -> Optional[Dict]: + """Update a given user using fieldPatch + + :param id: ID of the user to update. + :type id: str + :param input: FieldPatchInput objects to edit user + :type input: List[Dict] + :param customAttributes: Custom attributes to return from the mutation + :type customAttributes: str, optional + :return: Representation of the user without sessions or API token. + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + input = kwargs.get("input", None) + custom_attributes = kwargs.get("customAttributes", None) + if id is None or input is None: + self.opencti.admin_logger.error( + "[opencti_user] Missing parameters: id and input" + ) + return None + + self.opencti.admin_logger.info( + "Editing user with input (not shown to hide password and API token" + " changes)", + {"id": id}, + ) + query = ( + """ + mutation UserEdit($id: ID!, $input: [EditInput]!) { + userEdit(id: $id) { + fieldPatch(input: $input) { + """ + + (self.properties if custom_attributes is None else custom_attributes) + + """ + } + } + } + """ + ) + result = self.opencti.query(query, {"id": id, "input": input}) + return self.opencti.process_multiple_fields( + result["data"]["userEdit"]["fieldPatch"] + ) + + def add_membership(self, **kwargs) -> Optional[Dict]: + """Adds the user to a given group. + + :param id: User ID to add to the group. + :type id: str + :param group_id: Group ID to add the user to. + :type group_id: str + :return: Representation of the InternalRelationship + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + group_id = kwargs.get("group_id", None) + if id is None or group_id is None: + self.opencti.admin_logger.error( + "[opencti_user] Missing parameters: id and group_id" + ) + return None + + self.opencti.admin_logger.info( + "Adding user to group", {"id": id, "group_id": group_id} + ) + query = """ + mutation UserEditAddMembership($id: ID!, $group_id: ID!) { + userEdit(id: $id) { + relationAdd(input: { + relationship_type: "member-of", + toId: $group_id + }) { + id + from { + ... on User { + id, name, user_email + } + } + to { + ... on Group { + id, name, description + } + } + } + } + } + """ + result = self.opencti.query(query, {"id": id, "group_id": group_id}) + return self.opencti.process_multiple_fields( + result["data"]["userEdit"]["relationAdd"] + ) + + def delete_membership(self, **kwargs) -> Optional[Dict]: + """Removes the user from the given group. + + :param id: User ID to remove from the group. + :type id: str + :param group_id: Group ID to remove the user from. + :type group_id: str + :return: Representation of the user without sessions or API token + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + group_id = kwargs.get("group_id", None) + if id is None or group_id is None: + self.opencti.admin_logger.error( + "[opencti_user] Missing parameters: id and group_id" + ) + return None + + self.opencti.admin_logger.info( + "Removing used from group", {"id": id, "group_id": group_id} + ) + query = ( + """ + mutation UserEditDeleteMembership($id: ID!, $group_id: StixRef!) { + userEdit(id: $id) { + relationDelete(toId: $group_id, relationship_type: "member-of") { + """ + + self.properties + + """ + } + } + } + """ + ) + result = self.opencti.query(query, {"id": id, "group_id": group_id}) + return self.opencti.process_multiple_fields( + result["data"]["userEdit"]["relationDelete"] + ) + + def add_organization(self, **kwargs) -> Optional[Dict]: + """Adds a user to an organization + + :param id: User ID to add to organization + :type id: str + :param organization_id: ID of organization to add to + :type organization_id: str + :return: Representation of user without sessions or API key + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + organization_id = kwargs.get("organization_id", None) + if id is None or organization_id is None: + self.opencti.admin_logger.error( + "[opencti_user] Missing parameters: id and organization_id" + ) + + self.opencti.admin_logger.info( + "Adding user to organization", + {"id": id, "organization_id": organization_id}, + ) + query = ( + """ + mutation UserEditAddOrganization($id: ID!, $organization_id: ID!) { + userEdit(id: $id) { + organizationAdd(organizationId: $organization_id) { + """ + + self.properties + + """ + } + } + } + """ + ) + result = self.opencti.query( + query, {"id": id, "organization_id": organization_id} + ) + return self.opencti.process_multiple_fields( + result["data"]["userEdit"]["organizationAdd"] + ) + + def delete_organization(self, **kwargs) -> Optional[Dict]: + """Delete a user from an organization + + :param id: User ID to remove from organization + :type id: str + :param organization_id: ID of organization to remove from + :type organization_id: str + :return: Representation of user without sessions or API key + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + organization_id = kwargs.get("organization_id", None) + if id is None or organization_id is None: + self.opencti.admin_logger.error( + "[opencti_user] Missing parameters: id and organization_id" + ) + + self.opencti.admin_logger.info( + "Removing user from organization", + {"id": id, "organization_id": organization_id}, + ) + query = ( + """ + mutation UserEditDeleteOrganization($id: ID!, $organization_id: ID!) { + userEdit(id: $id) { + organizationDelete(organizationId: $organization_id) { + """ + + self.properties + + """ + } + } + } + """ + ) + result = self.opencti.query( + query, {"id": id, "organization_id": organization_id} + ) + return self.opencti.process_multiple_fields( + result["data"]["userEdit"]["organizationDelete"] + ) + + def token_renew(self, **kwargs) -> Optional[Dict]: + """Rotates the API token for the given user + + :param user: User ID to rotate API token for. + :type user: str + :param include_token: Whether to include new API + token in response from server, defaults to False. + :type include_token: bool, optional + :return: Representation of user + :rtype: Optional[Dict] + """ + id = kwargs.get("id", None) + include_token = kwargs.get("include_token", False) + if id is None: + self.opencti.admin_logger.error("[opencti_user] Missing parameter: id") + return None + + self.opencti.admin_logger.info("Rotating API key for user", {"id": id}) + query = ( + """ + mutation UserEditRotateToken($id: ID!) { + userEdit(id: $id) { + tokenRenew { + """ + + self.properties + + (self.token_properties if include_token else "") + + """ + } + } + } + """ + ) + result = self.opencti.query(query, {"id": id}) + return self.opencti.process_multiple_fields( + result["data"]["userEdit"]["tokenRenew"] + ) diff --git a/tests/02-integration/entities/test_entity_crud.py b/tests/02-integration/entities/test_entity_crud.py index 716942b38..245488355 100644 --- a/tests/02-integration/entities/test_entity_crud.py +++ b/tests/02-integration/entities/test_entity_crud.py @@ -15,67 +15,83 @@ def entity_class(entity): def test_entity_create(entity_class): class_data = entity_class.data() test_indicator = entity_class.own_class().create(**class_data) - assert test_indicator is not None, "Response is NoneType" - assert "id" in test_indicator, "No ID on object" - - entity_class.base_class().delete(id=test_indicator["id"]) + try: + assert test_indicator is not None, "Response is NoneType" + assert "id" in test_indicator, "No ID on object" + finally: + if test_indicator and "id" in test_indicator: + entity_class.base_class().delete(id=test_indicator["id"]) def test_read(entity_class): class_data = entity_class.data() test_indicator = entity_class.own_class().create(**class_data) - assert test_indicator is not None, "Response is NoneType" - assert "id" in test_indicator, "No ID on object" - assert "standard_id" in test_indicator, "No standard_id (STIX ID) on object" - test_indicator = entity_class.own_class().read(id=test_indicator["id"]) - compare_values( - class_data, - test_indicator, - entity_class.get_compare_exception_keys(), - ) - - entity_class.base_class().delete(id=test_indicator["id"]) + try: + assert test_indicator is not None, "Response is NoneType" + assert "id" in test_indicator, "No ID on object" + assert "standard_id" in test_indicator, "No standard_id (STIX ID) on object" + test_indicator = entity_class.own_class().read(id=test_indicator["id"]) + compare_values( + class_data, + test_indicator, + entity_class.get_compare_exception_keys(), + ) + + finally: + if test_indicator and "id" in test_indicator: + entity_class.base_class().delete(id=test_indicator["id"]) def test_update(entity_class): class_data = entity_class.data() test_indicator = entity_class.own_class().create(**class_data) - assert test_indicator is not None, "Response is NoneType" - assert "id" in test_indicator, "No ID on object" - - if len(entity_class.update_data()) > 0: - function_present = getattr(entity_class.own_class(), "update_field", None) - if function_present: - for update_field, update_value in entity_class.update_data().items(): - class_data[update_field] = update_value - input = [{"key": update_field, "value": str(update_value)}] - result = entity_class.own_class().update_field( - id=test_indicator["id"], input=input - ) + try: + assert test_indicator is not None, "Response is NoneType" + assert "id" in test_indicator, "No ID on object" + + if len(entity_class.update_data()) > 0: + function_present = getattr(entity_class.own_class(), "update_field", None) + if function_present: + for update_field, update_value in entity_class.update_data().items(): + class_data[update_field] = update_value + input = [{"key": update_field, "value": update_value}] + result = entity_class.own_class().update_field( + id=test_indicator["id"], input=input + ) + else: + for update_field, update_value in entity_class.update_data().items(): + class_data[update_field] = update_value + class_data["update"] = True + result = entity_class.own_class().create(**class_data) + + result = entity_class.own_class().read(id=result["id"]) + assert ( + result["id"] == test_indicator["id"] + ), "Updated SDO does not match old ID" + compare_values( + class_data, result, entity_class.get_compare_exception_keys() + ) else: - for update_field, update_value in entity_class.update_data().items(): - class_data[update_field] = update_value - class_data["update"] = True - result = entity_class.own_class().create(**class_data) - - result = entity_class.own_class().read(id=result["id"]) - assert result["id"] == test_indicator["id"], "Updated SDO does not match old ID" - compare_values(class_data, result, entity_class.get_compare_exception_keys()) - else: - result = test_indicator + result = test_indicator - entity_class.base_class().delete(id=result["id"]) + finally: + if test_indicator and "id" in test_indicator: + entity_class.base_class().delete(id=result["id"]) def test_delete(entity_class): class_data = entity_class.data() test_indicator = entity_class.own_class().create(**class_data) - assert test_indicator is not None, "Response is NoneType" - assert "id" in test_indicator, "No ID on object" - result = entity_class.base_class().delete(id=test_indicator["id"]) - assert result is None, f"Delete returned value '{result}'" - result = entity_class.own_class().read(id=test_indicator["id"]) - assert result is None, f"Read returned value '{result}' after delete" + try: + assert test_indicator is not None, "Response is NoneType" + assert "id" in test_indicator, "No ID on object" + result = entity_class.base_class().delete(id=test_indicator["id"]) + assert result is None, f"Delete returned value '{result}'" + result = entity_class.own_class().read(id=test_indicator["id"]) + assert result is None, f"Read returned value '{result}' after delete" + except AssertionError: + if test_indicator and "id" in test_indicator: + entity_class.base_class().delete(id=test_indicator["id"]) def test_filter(entity_class): @@ -84,16 +100,40 @@ def test_filter(entity_class): class_data = entity_class.data() test_indicator = entity_class.own_class().create(**class_data) - assert test_indicator is not None, "Response is NoneType" - assert "id" in test_indicator, "No ID on object" - test_indicator = entity_class.own_class().read(filters=entity_class.get_filter()) - compare_values( - class_data, - test_indicator, - entity_class.get_compare_exception_keys(), - ) + try: + assert test_indicator is not None, "Response is NoneType" + assert "id" in test_indicator, "No ID on object" + test_indicator = entity_class.own_class().read( + filters=entity_class.get_filter() + ) + compare_values( + class_data, + test_indicator, + entity_class.get_compare_exception_keys(), + ) + finally: + if test_indicator and "id" in test_indicator: + entity_class.base_class().delete(id=test_indicator["id"]) + + +def test_search(entity_class): + if not entity_class.get_search(): + return - entity_class.base_class().delete(id=test_indicator["id"]) + class_data = entity_class.data() + test_indicator = entity_class.own_class().create(**class_data) + try: + assert test_indicator is not None, "Response is NoneType" + assert "id" in test_indicator, "No ID on object" + test_indicator = entity_class.own_class().read(search=entity_class.get_search()) + compare_values( + class_data, + test_indicator, + entity_class.get_compare_exception_keys(), + ) + finally: + if test_indicator and "id" in test_indicator: + entity_class.base_class().delete(id=test_indicator["id"]) def test_relation(entity_class): @@ -103,19 +143,23 @@ def test_relation(entity_class): class_data2 = entity_class.relation_test() test_indicator = entity_class.own_class().create(**class_data) test_indicator2 = entity_class.own_class().create(**class_data2) - assert test_indicator is not None, "Response is NoneType" - assert "id" in test_indicator, "No ID on object" - entity_class.own_class().add_stix_object_or_stix_relationship( - id=test_indicator["id"], - stixObjectOrStixRelationshipId=test_indicator2["id"], - ) - result = entity_class.own_class().read(id=test_indicator["id"]) - assert result["objectsIds"][0] == test_indicator2["id"] - entity_class.own_class().remove_stix_object_or_stix_relationship( - id=test_indicator["id"], - stixObjectOrStixRelationshipId=test_indicator2["id"], - ) - result = entity_class.own_class().read(id=test_indicator["id"]) - assert len(result["objectsIds"]) == 0 - entity_class.base_class().delete(id=test_indicator["id"]) - entity_class.base_class().delete(id=test_indicator2["id"]) + try: + assert test_indicator is not None, "Response is NoneType" + assert "id" in test_indicator, "No ID on object" + entity_class.own_class().add_stix_object_or_stix_relationship( + id=test_indicator["id"], + stixObjectOrStixRelationshipId=test_indicator2["id"], + ) + result = entity_class.own_class().read(id=test_indicator["id"]) + assert result["objectsIds"][0] == test_indicator2["id"] + entity_class.own_class().remove_stix_object_or_stix_relationship( + id=test_indicator["id"], + stixObjectOrStixRelationshipId=test_indicator2["id"], + ) + result = entity_class.own_class().read(id=test_indicator["id"]) + assert len(result["objectsIds"]) == 0 + finally: + if test_indicator and "id" in test_indicator: + entity_class.base_class().delete(id=test_indicator["id"]) + if test_indicator2 and "id" in test_indicator2: + entity_class.base_class().delete(id=test_indicator2["id"]) diff --git a/tests/02-integration/entities/test_group.py b/tests/02-integration/entities/test_group.py new file mode 100644 index 000000000..8018d3853 --- /dev/null +++ b/tests/02-integration/entities/test_group.py @@ -0,0 +1,94 @@ +from tests.cases.entities import GroupTest, MarkingDefinitionTest, RoleTest, UserTest + + +def test_group_roles(api_client): + group_data = GroupTest(api_client).data() + role_data = RoleTest(api_client).data() + test_group = api_client.group.create(**group_data) + test_role = api_client.role.create(**role_data) + assert test_group is not None, "Create group response is NoneType" + assert test_role is not None, "Create role response is NoneType" + + api_client.group.add_role(id=test_group["id"], role_id=test_role["id"]) + result = api_client.group.read(id=test_group["id"]) + assert result["rolesIds"][0] == test_role["id"] + + api_client.group.delete_role(id=test_group["id"], role_id=test_role["id"]) + result = api_client.group.read(id=test_group["id"]) + assert len(result["rolesIds"]) == 0 + + api_client.group.delete(id=test_group["id"]) + api_client.role.delete(id=test_role["id"]) + + +def test_group_default_markings(api_client): + group_data = GroupTest(api_client).data() + marking_test = MarkingDefinitionTest(api_client) + marking_data = marking_test.data() + test_group = api_client.group.create(**group_data) + test_marking = marking_test.own_class().create(**marking_data) + assert test_group is not None, "Create group response is NoneType" + assert test_marking is not None, "Create marking response is NoneType" + + api_client.group.edit_default_marking( + id=test_group["id"], marking_ids=[test_marking["id"]] + ) + result = api_client.group.read(id=test_group["id"]) + assert result["default_marking"][0]["values"][0]["id"] == test_marking["id"] + + api_client.group.edit_default_marking(id=test_group["id"], marking_ids=[]) + result = api_client.group.read(id=test_group["id"]) + assert len(result["default_marking"][0]["values"]) == 0 + + api_client.group.delete(id=test_group["id"]) + marking_test.base_class().delete(id=test_marking["id"]) + + +def test_group_membership(api_client): + group_test = GroupTest(api_client) + user_test = UserTest(api_client) + + test_group = group_test.own_class().create(**group_test.data()) + test_user = user_test.own_class().create(**user_test.data()) + + try: + assert test_group is not None, "Create group response is NoneType" + assert test_user is not None, "Create user response is NoneType" + + group_test.own_class().add_member(id=test_group["id"], user_id=test_user["id"]) + result = group_test.own_class().read(id=test_group["id"]) + assert result["membersIds"][0] == test_user["id"] + + group_test.own_class().delete_member( + id=test_group["id"], user_id=test_user["id"] + ) + result = group_test.own_class().read(id=test_group["id"]) + assert len(result["membersIds"]) == 0 + finally: + group_test.base_class().delete(id=test_group["id"]) + user_test.base_class().delete(id=test_user["id"]) + + +def test_group_allowed_markings(api_client): + group_data = GroupTest(api_client).data() + marking_test = MarkingDefinitionTest(api_client) + marking_data = marking_test.data() + test_group = api_client.group.create(**group_data) + test_marking = marking_test.own_class().create(**marking_data) + assert test_group is not None, "Create group response is NoneType" + assert test_marking is not None, "Create marking response is NoneType" + + api_client.group.add_allowed_marking( + id=test_group["id"], marking_id=test_marking["id"] + ) + result = api_client.group.read(id=test_group["id"]) + assert result["allowed_marking"][0]["id"] == test_marking["id"] + + api_client.group.delete_allowed_marking( + id=test_group["id"], marking_id=test_marking["id"] + ) + result = api_client.group.read(id=test_group["id"]) + assert len(result["allowed_marking"]) == 0 + + api_client.group.delete(id=test_group["id"]) + marking_test.base_class().delete(id=test_marking["id"]) diff --git a/tests/02-integration/entities/test_role.py b/tests/02-integration/entities/test_role.py new file mode 100644 index 000000000..e8f873aaf --- /dev/null +++ b/tests/02-integration/entities/test_role.py @@ -0,0 +1,24 @@ +from tests.cases.entities import RoleTest + + +def test_role_capabilities(api_client): + role_test = RoleTest(api_client) + role_data = role_test.data() + test_role = role_test.own_class().create(**role_data) + assert test_role is not None, "Create role response is NoneType" + + capability_id = api_client.capability.list()[0]["id"] + + role_test.own_class().add_capability( + id=test_role["id"], capability_id=capability_id + ) + result = role_test.own_class().read(id=test_role["id"]) + assert capability_id in result["capabilitiesIds"] + + role_test.own_class().delete_capability( + id=test_role["id"], capability_id=capability_id + ) + result = role_test.own_class().read(id=test_role["id"]) + assert capability_id not in result["capabilitiesIds"] + + role_test.base_class().delete(id=test_role["id"]) diff --git a/tests/02-integration/entities/test_settings.py b/tests/02-integration/entities/test_settings.py new file mode 100644 index 000000000..96f4a5341 --- /dev/null +++ b/tests/02-integration/entities/test_settings.py @@ -0,0 +1,47 @@ +from tests.cases.entities import SettingsTest + + +def test_settings_messages(api_client): + settings_test = SettingsTest(api_client) + settings_test.setup() + try: + result = settings_test.own_class().read(include_messages=True) + id = result["id"] + num_messages = len(result["platform_messages"]) + + # Add message + test_message_data = { + "message": "This is a test message", + "activated": True, + "dismissible": True, + } + result = settings_test.own_class().edit_message(id=id, input=test_message_data) + assert len(result["platform_messages"]) == num_messages + 1 + + test_message = result["platform_messages"][-1] + assert test_message["message"] == test_message_data["message"] + + # Update message + result = settings_test.own_class().edit_message( + id=id, + input={ + "id": test_message["id"], + "message": "This is an updated test message", + "activated": True, + "dismissible": False, + }, + ) + assert len(result["platform_messages"]) == num_messages + 1 + + updated_message = result["platform_messages"][-1] + assert updated_message["id"] == test_message["id"] + assert updated_message["message"] == "This is an updated test message" + + # Delete message + result = settings_test.own_class().delete_message( + id=id, input=test_message["id"] + ) + assert len(result["platform_messages"]) == num_messages + assert test_message["id"] not in result["platform_messages_ids"] + finally: + settings_test.teardown() diff --git a/tests/02-integration/entities/test_user.py b/tests/02-integration/entities/test_user.py new file mode 100644 index 000000000..6148f18e7 --- /dev/null +++ b/tests/02-integration/entities/test_user.py @@ -0,0 +1,70 @@ +from tests.cases.entities import GroupTest, IdentityOrganizationTest, UserTest + + +def test_user_membership(api_client): + user_test = UserTest(api_client) + group_test = GroupTest(api_client) + + test_user = user_test.own_class().create(**user_test.data()) + test_group = group_test.own_class().create(**group_test.data()) + + try: + assert test_user is not None, "User create response returned NoneType" + assert test_group is not None, "Group create response returned NoneType" + + user_test.own_class().add_membership( + id=test_user["id"], group_id=test_group["id"] + ) + result = user_test.own_class().read(id=test_user["id"]) + assert test_group["id"] in result["groupsIds"] + + user_test.own_class().delete_membership( + id=test_user["id"], group_id=test_group["id"] + ) + result = user_test.own_class().read(id=test_user["id"]) + assert test_group["id"] not in result["groupsIds"] + finally: + user_test.base_class().delete(id=test_user["id"]) + group_test.base_class().delete(id=test_group["id"]) + + +def test_user_organization(api_client): + user_test = UserTest(api_client) + org_test = IdentityOrganizationTest(api_client) + + test_user = user_test.own_class().create(**user_test.data()) + test_org = org_test.own_class().create(**org_test.data()) + + try: + assert test_user is not None, "User create response returned NoneType" + assert test_org is not None, "Organization create response returned NoneType" + + user_test.own_class().add_organization( + id=test_user["id"], organization_id=test_org["id"] + ) + result = user_test.own_class().read(id=test_user["id"]) + assert test_org["id"] in result["objectOrganizationIds"] + + user_test.own_class().delete_organization( + id=test_user["id"], organization_id=test_org["id"] + ) + result = user_test.own_class().read(id=test_user["id"]) + assert test_org["id"] not in result["objectOrganizationIds"] + finally: + user_test.base_class().delete(id=test_user["id"]) + org_test.base_class().delete(id=test_org["id"]) + + +def test_user_token_renew(api_client): + user_test = UserTest(api_client) + test_user = user_test.own_class().create(**user_test.data(), include_token=True) + try: + assert test_user is not None, "User create response returned NoneType" + + old_token = test_user["api_token"] + result = user_test.own_class().token_renew( + id=test_user["id"], include_token=True + ) + assert old_token != result["api_token"] + finally: + user_test.own_class().delete(id=test_user["id"]) diff --git a/tests/cases/entities.py b/tests/cases/entities.py index a45e9c7b6..7d6451c9f 100644 --- a/tests/cases/entities.py +++ b/tests/cases/entities.py @@ -2,6 +2,7 @@ from stix2 import TLP_GREEN, TLP_WHITE, AttackPattern +from pycti.entities.opencti_settings import Settings from pycti.utils.constants import ContainerTypes, IdentityTypes, LocationTypes from tests.utils import get_incident_end_date, get_incident_start_date @@ -171,6 +172,22 @@ def case_case_rft(api_client): def task(api_client): return TaskTest(api_client) + @staticmethod + def case_role(api_client): + return RoleTest(api_client) + + @staticmethod + def case_group(api_client): + return GroupTest(api_client) + + @staticmethod + def case_user(api_client): + return UserTest(api_client) + + @staticmethod + def case_settings(api_client): + return SettingsTest(api_client) + class EntityTest: def __init__(self, api_client): @@ -212,6 +229,9 @@ def get_filter(self) -> Dict[str, str]: "filterGroups": [], } + def get_search(self) -> str: + return None + def stix_class(self): pass @@ -1151,3 +1171,154 @@ def data(self) -> Dict: def own_class(self): return self.api_client.threat_actor_individual + + +class RoleTest(EntityTest): + def data(self) -> Dict: + return { + "name": "TestRole", + "description": "This is a role for testing", + } + + def own_class(self): + return self.api_client.role + + def base_class(self): + return self.own_class() + + def update_data(self) -> Dict[str, bool]: + return {"can_manage_sensitive_config": True} + + def get_filter(self) -> Dict[str, str]: + return {} + + def get_search(self) -> str: + return "TestRole" + + +class GroupTest(EntityTest): + def data(self) -> Dict: + return { + "name": "TestGroup", + "group_confidence_level": {"max_confidence": 80, "overrides": []}, + } + + def own_class(self): + return self.api_client.group + + def base_class(self): + return self.own_class() + + def update_data(self) -> Dict: + return { + "description": "This is a test group", + "no_creators": True, + "group_confidence_level": { + "max_confidence": 90, + "overrides": [{"entity_type": "Indicator", "max_confidence": 80}], + }, + } + + def get_search(self) -> str: + return "TestGroup" + + +class UserTest(EntityTest): + def data(self) -> Dict: + return { + "name": "Test User", + "user_email": "test@localhost.local", + } + + def own_class(self): + return self.api_client.user + + def base_class(self): + return self.own_class() + + def update_data(self) -> Dict: + return { + "description": "This is a test user", + "firstname": "Test", + "lastname": "User", + "user_confidence_level": { + "max_confidence": 70, + "overrides": [{"entity_type": "Indicator", "max_confidence": 80}], + }, + "account_status": "Locked", + "submenu_show_icons": True, + } + + def get_search(self): + return '"Test User"' + + +class SettingsTest(EntityTest): + + class SettingsWrapper(Settings): + def __init__(self, opencti): + self._deleted = False + super().__init__(opencti) + + def create(self, **kwargs) -> Dict: + """Stub function for tests + + :return: Settings as defined by self.read() + :rtype: Dict + """ + self.opencti.admin_logger.info( + "Settings.create called with arguments", kwargs + ) + self._deleted = False + return self.read() + + def delete(self, **kwargs): + """Stub function for tests""" + self.opencti.admin_logger.info( + "Settings.delete called with arguments", kwargs + ) + self._deleted = True + + def read(self, **kwargs): + """Stub function for tests""" + if self._deleted: + return None + return super().read(**kwargs) + + def setup(self): + self._ownclass = self.SettingsWrapper(self.api_client) + # Save current platform information + custom_attributes = self.own_class().editable_properties + self.own_class().create() + self._saved_settings = self.own_class().read(customAttributes=custom_attributes) + if self._saved_settings["platform_organization"] is not None: + self._saved_settings["platform_organization"] = self._saved_settings[ + "platform_organization" + ]["id"] + return + + def teardown(self): + # Restore platform information + id = self._saved_settings.pop("id") + input = [ + {"key": key, "value": value} + for key, value in self._saved_settings.items() + if value is not None + ] + self.own_class().update_field(id=id, input=input) + return + + def data(self) -> Dict: + return {} + + def own_class(self): + return self._ownclass + + def base_class(self): + return self.own_class() + + def update_data(self): + return {"platform_title": "This is a test platform", "platform_theme": "light"} + + def get_filter(self): + return None