Deprecated: array_key_exists(): Using array_key_exists() on objects is deprecated. Use isset() or property_exists() instead in phar:///Users/jonathanstegall/Sites/phpDocumentor.phar/vendor/twig/twig/lib/Twig/Template.php on line 527 wpdb = $wpdb; $this->version = $version; $this->login_credentials = $login_credentials; $this->slug = $slug; $this->option_prefix = isset( $option_prefix ) ? $option_prefix : 'object_sync_for_salesforce_'; $this->wordpress = $wordpress; $this->salesforce = $salesforce; $this->mappings = $mappings; $this->logging = $logging; $this->schedulable_classes = $schedulable_classes; $this->queue = $queue; $this->schedule_name = 'salesforce_pull'; // To be clear: we should only ever set this to true if Salesforce actually starts to reliably support it instead of generally ignoring it. $this->batch_soql_queries = $this->batch_soql_queries( false ); // Maximum offset size for a SOQL query to Salesforce // See: https://developer.salesforce.com/docs/atlas.en-us.soql_sosl.meta/soql_sosl/sforce_api_calls_soql_select_offset.htm // "The maximum offset is 2,000 rows. Requesting an offset greater than 2,000 results in a NUMBER_OUTSIDE_VALID_RANGE error." $this->min_soql_batch_size = 200; // batches cannot be smaller than 200 records $this->max_soql_size = 2000; $this->mergeable_record_types = array( 'Lead', 'Contact', 'Account' ); // Create action hooks for WordPress objects. We run this after plugins are loaded in case something depends on another plugin. add_action( 'plugins_loaded', array( $this, 'add_actions' ) ); $this->debug = get_option( $this->option_prefix . 'debug_mode', false ); } /** * Whether to use the batchSize parameter on SOQL queries * * @param bool $batch_soql_queries * @return bool $batch_soql_queries * */ private function batch_soql_queries( $batch_soql_queries ) { // as of version 34.0, the Salesforce REST API accepts a batchSize option on the Sforce-Call-Options header if ( version_compare( $this->login_credentials['rest_api_version'], '34.0', '<' ) ) { $batch_soql_queries = false; } // otherwise, return whatever the plugin's default value is. // this allows us to decide to support query batching if it is ever not absurdly bad. return $batch_soql_queries; } /** * Create the action hooks based on what object maps exist from the admin settings * route is http://example.com/wp-json/salesforce-rest-api/pull/ plus params we decide to accept * */ public function add_actions() { // ajax hook add_action( 'wp_ajax_salesforce_pull_webhook', array( $this, 'salesforce_pull_webhook' ) ); // action-scheduler needs two hooks: one to check for records, and one to process them add_action( $this->option_prefix . 'pull_check_records', array( $this, 'salesforce_pull' ), 10 ); add_action( $this->option_prefix . 'pull_process_records', array( $this, 'salesforce_pull_process_records' ), 10, 3 ); } /** * REST API callback for salesforce pull. Returns status of 200 for successful * attempt or 403 for a failed pull attempt (SF not authorized, threshhold * reached, etc. * * @param object $request * This is a merged object of all the arguments from the API request * @return array * code: 201 * data: * success : true * */ public function salesforce_pull_webhook( WP_REST_Request $request ) { // run a pull request and then run the schedule if anything is in there $data = $this->salesforce_pull(); // salesforce_pull currently returns true if it runs successfully if ( true === $data ) { $code = '201'; // check to see if anything is in the queue and handle it if it is // single task for action-scheduler to check for data $this->queue->add( $this->schedulable_classes[ $this->schedule_name ]['initializer'], array(), $this->schedule_name ); } else { $code = '403'; } $result = array( 'code' => $code, 'data' => array( 'success' => $data, ), ); return $result; } /** * Callback for the standard pull process used by webhooks and cron. */ public function salesforce_pull() { $sfapi = $this->salesforce['sfapi']; if ( true === $this->salesforce['is_authorized'] && true === $this->check_throttle() ) { $this->get_updated_records(); $this->get_merged_records(); $this->get_deleted_records(); // Store this request time for the throttle check. update_option( $this->option_prefix . 'pull_last_sync', time() ); return true; } else { // No pull happened. return false; } } /** * Determines if the Salesforce pull should be allowed, or throttled. * * Prevents too many pull processes from running at once. * * @return bool * Returns false if the time elapsed between recent pulls is too short. */ private function check_throttle() { $pull_throttle = get_option( $this->option_prefix . 'pull_throttle', 5 ); $last_sync = get_option( $this->option_prefix . 'pull_last_sync', 0 ); if ( time() > ( $last_sync + $pull_throttle ) ) { return true; } else { return false; } } /** * Pull updated records from Salesforce and place them in the queue. * * Executes a SOQL query based on defined mappings, loops through the results, * and places each updated SF object into the queue for later processing. * * We copy the convention from the Drupal module here, and run a separate SOQL query for each type of object in SF * * If we return something here, it's because there is an error. * */ private function get_updated_records() { $sfapi = $this->salesforce['sfapi']; foreach ( $this->mappings->get_fieldmaps() as $salesforce_mapping ) { $map_sync_triggers = $salesforce_mapping['sync_triggers']; // this sets which Salesforce triggers are allowed for the mapping $type = $salesforce_mapping['salesforce_object']; // this sets the Salesforce object type for the SOQL query $soql = $this->get_pull_query( $type, $salesforce_mapping ); // get_pull_query returns null if it has no matching fields if ( null === $soql ) { continue; } $query_options = array( 'cache' => false, ); // if we are batching soql queries, let's do it if ( true === $this->batch_soql_queries ) { // pull query batch size option name if ( '' !== get_option( $this->option_prefix . 'pull_query_batch_size', '' ) ) { $batch_size = filter_var( get_option( $this->option_prefix . 'pull_query_batch_size', $this->min_soql_batch_size ), FILTER_VALIDATE_INT ); } else { // old limit value $batch_size = filter_var( get_option( $this->option_prefix . 'pull_query_limit', $this->min_soql_batch_size ), FILTER_VALIDATE_INT ); } $batch_size = filter_var( $batch_size, FILTER_VALIDATE_INT, array( 'options' => array( 'min_range' => $this->min_soql_batch_size, 'max_range' => $this->max_soql_size, ), ) ); if ( false !== $batch_size ) { // the Sforce-Query-Options header is a comma delimited string $query_options['headers']['Sforce-Query-Options'] = 'batchSize=' . $batch_size; } } if ( 1 === (int) $this->debug ) { // create log entry for the attempted query $status = 'debug'; $title = sprintf( // translators: placeholders are: 1) the log status esc_html__( '%1$s: SOQL query to get updated records from Salesforce (it has not yet run)', 'object-sync-for-salesforce' ), ucfirst( esc_attr( $status ) ) ); if ( isset( $this->logging ) ) { $logging = $this->logging; } elseif ( class_exists( 'Object_Sync_Sf_Logging' ) ) { $logging = new Object_Sync_Sf_Logging( $this->wpdb, $this->version ); } $debug = array( 'title' => $title, 'message' => esc_html( (string) $soql ), 'trigger' => 0, 'parent' => '', 'status' => $status, ); $logging->setup( $debug ); } // Execute query // have to cast it to string to make sure it uses the magic method // we don't want to cache this because timestamps $results = $sfapi->query( (string) $soql, $query_options ); $response = $results['data']; $version_path = wp_parse_url( $sfapi->get_api_endpoint(), PHP_URL_PATH ); $sf_last_sync = get_option( $this->option_prefix . 'pull_last_sync_' . $type, null ); $last_sync = gmdate( 'Y-m-d\TH:i:s\Z', $sf_last_sync ); if ( ! isset( $response['errorCode'] ) && 0 < count( $response['records'] ) ) { // Write items to the queue. foreach ( $response['records'] as $key => $result ) { // if we've already pulled, or tried to pull, the current ID, don't do it again. if ( get_option( $this->option_prefix . 'last_pull_id', '' ) === $result['Id'] ) { if ( 1 === (int) $this->debug ) { // create log entry for failed pull $status = 'debug'; $title = sprintf( // translators: placeholders are: 1) the log status, 2) the Salesforce ID esc_html__( '%1$s: Salesforce ID %2$s has already been attempted.', 'object-sync-for-salesforce' ), ucfirst( esc_attr( $status ) ), esc_attr( $result['Id'] ) ); if ( isset( $this->logging ) ) { $logging = $this->logging; } elseif ( class_exists( 'Object_Sync_Sf_Logging' ) ) { $logging = new Object_Sync_Sf_Logging( $this->wpdb, $this->version ); } $debug = array( 'title' => $title, 'message' => esc_html__( 'This ID has already been attempted so it was not pulled again.', 'object-sync-for-salesforce' ), 'trigger' => 0, 'parent' => '', 'status' => $status, ); $logging->setup( $debug ); } continue; } // if this record is new as of the last sync, use the create trigger if ( isset( $result['CreatedDate'] ) && $result['CreatedDate'] > $last_sync ) { $sf_sync_trigger = $this->mappings->sync_sf_create; } else { $sf_sync_trigger = $this->mappings->sync_sf_update; } // Only queue when the record's trigger is configured for the mapping // these are bit operators, so we leave out the strict if ( isset( $map_sync_triggers ) && isset( $sf_sync_trigger ) && in_array( $sf_sync_trigger, $map_sync_triggers ) ) { // wp or sf crud event $data = array( 'object_type' => $type, 'object' => $result, 'mapping' => $salesforce_mapping, 'sf_sync_trigger' => $sf_sync_trigger, // use the appropriate trigger based on when this was created ); $pull_allowed = $this->is_pull_allowed( $type, $result, $sf_sync_trigger, $salesforce_mapping, $map_sync_triggers ); if ( false === $pull_allowed ) { // update the current state so we don't end up on the same record again if the loop fails update_option( $this->option_prefix . 'last_pull_id', $result['Id'] ); if ( 1 === (int) $this->debug ) { // create log entry for failed pull $status = 'debug'; $title = sprintf( // translators: placeholders are: 1) the log status, 2) the Salesforce ID esc_html__( '%1$s: Salesforce ID %2$s is not allowed.', 'object-sync-for-salesforce' ), ucfirst( esc_attr( $status ) ), esc_attr( $result['Id'] ) ); if ( isset( $this->logging ) ) { $logging = $this->logging; } elseif ( class_exists( 'Object_Sync_Sf_Logging' ) ) { $logging = new Object_Sync_Sf_Logging( $this->wpdb, $this->version ); } $debug = array( 'title' => $title, 'message' => esc_html__( 'This ID is not pullable so it was skipped.', 'object-sync-for-salesforce' ), 'trigger' => $sf_sync_trigger, 'parent' => '', 'status' => $status, ); $logging->setup( $debug ); } continue; } if ( 1 === (int) $this->debug ) { // create log entry for queue addition $status = 'debug'; $title = sprintf( // translators: placeholders are: 1) the log status, 2) the Salesforce ID esc_html__( '%1$s: Add Salesforce ID %2$s to the queue', 'object-sync-for-salesforce' ), ucfirst( esc_attr( $status ) ), esc_attr( $result['Id'] ) ); if ( isset( $this->logging ) ) { $logging = $this->logging; } elseif ( class_exists( 'Object_Sync_Sf_Logging' ) ) { $logging = new Object_Sync_Sf_Logging( $this->wpdb, $this->version ); } $message = sprintf( // translators: 1) is the name of the hook that was called, 2) is the Salesforce object type, 3) is the ID for the object map, 4) is the event trigger that is running, and 5) is the name of the schedule that is running. esc_html__( 'This record is being sent to the queue. The hook name is %1$s. The arguments for the hook are: object type %2$s, object map ID %3$s, sync trigger %4$s. The schedule name is %5$s.', 'object-sync-for-salesforce' ), esc_attr( $this->schedulable_classes[ $this->schedule_name ]['callback'] ), esc_attr( $type ), absint( $salesforce_mapping['id'] ), $sf_sync_trigger, $this->schedule_name ); $debug = array( 'title' => $title, 'message' => $message, 'trigger' => $sf_sync_trigger, 'parent' => '', 'status' => $status, ); $logging->setup( $debug ); } // add a queue action to save data from salesforce $this->queue->add( $this->schedulable_classes[ $this->schedule_name ]['callback'], array( 'object_type' => $type, 'object' => $result['Id'], 'sf_sync_trigger' => $sf_sync_trigger, ), $this->schedule_name ); // update the current state so we don't end up on the same record again if the loop fails update_option( $this->option_prefix . 'last_pull_id', $result['Id'] ); if ( 1 === (int) $this->debug ) { // create log entry for successful pull $status = 'debug'; $title = sprintf( // translators: placeholders are: 1) the log status, 2) the Salesforce ID esc_html__( '%1$s: Salesforce ID %2$s has been successfully pulled.', 'object-sync-for-salesforce' ), ucfirst( esc_attr( $status ) ), esc_attr( $result['Id'] ) ); if ( isset( $this->logging ) ) { $logging = $this->logging; } elseif ( class_exists( 'Object_Sync_Sf_Logging' ) ) { $logging = new Object_Sync_Sf_Logging( $this->wpdb, $this->version ); } $debug = array( 'title' => $title, 'message' => esc_html__( 'This ID has been successfully pulled and added to the queue for processing. It cannot be pulled again without being modified again.', 'object-sync-for-salesforce' ), 'trigger' => $sf_sync_trigger, 'parent' => '', 'status' => $status, ); $logging->setup( $debug ); } // end of debug } // end if } // end foreach // we're done with the foreach. store the LastModifiedDate of the last item processed, or the current time if it isn't there. $last_date_for_query = isset( $result['LastModifiedDate'] ) ? $result['LastModifiedDate'] : ''; $this->increment_current_type_datetime( $type, $last_date_for_query ); if ( true === $this->batch_soql_queries ) { // if applicable, process the next batch of records $this->get_next_record_batch( $last_sync, $salesforce_mapping, $map_sync_triggers, $type, $version_path, $query_options, $response ); } else { // Here, we check and see if the query has results with an additional offset. // If it does, we regenerate the query so it will have an offset next time it runs. // If it does not, we clear the query if we've just processed the last row. // this allows us to run an offset on the stored query instead of clearing it. $does_next_offset_have_results = $this->check_offset_query( $type, $salesforce_mapping, $query_options ); end( $response['records'] ); $last_record_key = key( $response['records'] ); if ( true === $does_next_offset_have_results ) { // increment SOQL query to run $soql = $this->get_pull_query( $type, $salesforce_mapping ); } elseif ( $last_record_key === $key ) { // clear the stored query. we don't need to offset and we've finished the loop. $this->clear_current_type_query( $type ); } } // end if } elseif ( ! isset( $response['errorCode'] ) && 0 === count( $response['records'] ) && false === $this->batch_soql_queries ) { // only update/clear these option values if we are currently still processing a query if ( '' !== get_option( $this->option_prefix . 'currently_pulling_query_' . $type, '' ) ) { $this->clear_current_type_query( $type ); } // try to check for specific error codes from Salesforce } elseif ( isset( $response['errorCode'] ) && 'INVALID_FIELD' === $response['errorCode'] ) { // set up log entry $status = 'error'; $log_title = sprintf( // translators: placeholders are: 1) the log status, 2) the server error code, and 3) the name of the Salesforce object esc_html__( '%1$s: %2$s when pulling %3$s data from Salesforce. Check and resave the fieldmap.', 'object-sync-for-salesforce' ), ucfirst( esc_attr( $status ) ), esc_attr( $response['errorCode'] ), esc_attr( $salesforce_mapping['salesforce_object'] ) ); $log_body = '
' . esc_html__( 'A field may have been deleted from Salesforce, or it has otherwise become invalid. You may need to check and resave your fieldmap.', 'object-sync-for-salesforce' ) . '
'; // if it's an invalid field, try to clear the cached query so it can try again next time if ( '' !== get_option( $this->option_prefix . 'currently_pulling_query_' . $type, '' ) ) { $this->clear_current_type_query( $type ); $log_title .= esc_html__( ' The stored query has been cleared.', 'object-sync-for-salesforce' ); $log_body .= '' . esc_html__( 'The currently stored query for this object type has been deleted.', 'object-sync-for-salesforce' ) . '
'; } $log_body .= sprintf( // translators: placeholders are: 1) the Salesforce API response message '' . esc_html__( 'Salesforce API Response: %1$s', 'object-sync-for-salesforce' ) . '
', $response['message'] ); if ( isset( $this->logging ) ) { $logging = $this->logging; } elseif ( class_exists( 'Object_Sync_Sf_Logging' ) ) { $logging = new Object_Sync_Sf_Logging( $this->wpdb, $this->version ); } $result = array( 'title' => $log_title, 'message' => $log_body, 'trigger' => 0, 'parent' => '', 'status' => $status, ); $logging->setup( $result ); } elseif ( isset( $response['errorCode'] ) ) { // create log entry for failed pull $status = 'error'; $title = sprintf( // translators: placeholders are: 1) the log status, 2) the server error code, and 3) the name of the Salesforce object esc_html__( '%1$s: %2$s when pulling %3$s data from Salesforce', 'object-sync-for-salesforce' ), ucfirst( esc_attr( $status ) ), esc_attr( $response['errorCode'] ), esc_attr( $salesforce_mapping['salesforce_object'] ) ); if ( isset( $this->logging ) ) { $logging = $this->logging; } elseif ( class_exists( 'Object_Sync_Sf_Logging' ) ) { $logging = new Object_Sync_Sf_Logging( $this->wpdb, $this->version ); } $result = array( 'title' => $title, 'message' => $response['message'], 'trigger' => 0, 'parent' => '', 'status' => $status, ); $logging->setup( $result ); return $result; } // End if(). } // End foreach(). } /** * Pull the next batch of records from the Salesforce API, if applicable * * Executes a nextRecordsUrl SOQL query based on the previous result, * and places each updated SF object into the queue for later processing. * * @param datetime $last_sync * @param array $salesforce_mapping * @param array $map_sync_triggers * @param string $type * @param string $version_path * @param array $query_options * @param array $response * */ private function get_next_record_batch( $last_sync, $salesforce_mapping, $map_sync_triggers, $type, $version_path, $query_options, $response ) { // Handle next batch of records if it exists $next_records_url = isset( $response['nextRecordsUrl'] ) ? str_replace( $version_path, '', $response['nextRecordsUrl'] ) : false; while ( $next_records_url ) { // shouldn't cache this either. it's going into the queue if it exists anyway. $new_results = $sfapi->api_call( $next_records_url, array(), 'GET', $query_options ); $new_response = $new_results['data']; if ( ! isset( $new_response['errorCode'] ) ) { // Write items to the queue. foreach ( $new_response['records'] as $result ) { // if this record is new as of the last sync, use the create trigger if ( isset( $result['CreatedDate'] ) && $result['CreatedDate'] > $last_sync ) { $sf_sync_trigger = $this->mappings->sync_sf_create; } else { $sf_sync_trigger = $this->mappings->sync_sf_update; } // Only queue when the record's trigger is configured for the mapping // these are bit operators, so we leave out the strict if ( isset( $map_sync_triggers ) && isset( $sf_sync_trigger ) && in_array( $sf_sync_trigger, $map_sync_triggers ) ) { // wp or sf crud event $data = array( 'object_type' => $type, 'object' => $result, 'mapping' => $salesforce_mapping, 'sf_sync_trigger' => $sf_sync_trigger, // use the appropriate trigger based on when this was created ); // add a queue action to save data from salesforce $this->queue->add( $this->schedulable_classes[ $this->schedule_name ]['callback'], array( 'object_type' => $type, 'object' => $result['Id'], 'sf_sync_trigger' => $sf_sync_trigger, ), $this->schedule_name ); // Update the last pull sync timestamp for this record type to avoid re-processing in case of error $last_sync_pull_trigger = DateTime::createFromFormat( 'Y-m-d\TH:i:s+', $result[ $salesforce_mapping['pull_trigger_field'] ], new DateTimeZone( 'UTC' ) ); } } } $next_records_url = isset( $new_response['nextRecordsUrl'] ) ? str_replace( $version_path, '', $new_response['nextRecordsUrl'] ) : false; } // end while loop } /** * Get the next offset query. If check is true, only see if that query would have results. Otherwise, return the SOQL object. * * When batchSize is not in use, run a check with an offset. * * @param string $type the Salesforce object type * @param array $salesforce_mapping the map between object types * @param array $query_options the options for the SOQL query * @param bool $check are we just checking? * @return object|bool $soql|$does_next_offset_have_results * */ private function check_offset_query( $type, $salesforce_mapping, $query_options ) { $soql = $this->get_pull_query( $type, $salesforce_mapping ); $does_next_offset_have_results = false; $sfapi = $this->salesforce['sfapi']; // Execute query // have to cast it to string to make sure it uses the magic method // we don't want to cache this because timestamps $results = $sfapi->query( (string) $soql, $query_options ); $response = $results['data']; if ( ! isset( $response['errorCode'] ) && 0 < count( $response['records'] ) ) { $does_next_offset_have_results = true; } return $does_next_offset_have_results; } /** * Given a SObject type name, build an SOQL query to include all fields for all * SalesforceMappings mapped to that SObject. * * @param string $type * e.g. "Contact", "Account", etc. * @param array $salesforce_mapping * the fieldmap that maps the two object types * * @return Object_Sync_Sf_Salesforce_Select_Query or null if no mappings or no mapped fields * were found. * * @see Object_Sync_Sf_Mapping::get_mapped_fields * @see Object_Sync_Sf_Mapping::get_mapped_record_types */ private function get_pull_query( $type, $salesforce_mapping = array() ) { // we need to determine what to do with saved queries. this is what we currently do but it doesn't work. // check if we have a stored next query to run for this type. if so, unserialize it so we have an object. $pull_query_running = get_option( $this->option_prefix . 'currently_pulling_query_' . $type, '' ); if ( '' !== $pull_query_running ) { $saved_query = maybe_unserialize( $pull_query_running ); } $mapped_fields = array(); $mapped_record_types = array(); $mappings = $this->mappings->get_fieldmaps( null, array( 'salesforce_object' => $type, ) ); // Iterate over each field mapping to determine our query parameters. foreach ( $mappings as $salesforce_mapping ) { // only use fields that come from Salesforce to WordPress, or that sync $mapped_fields = array_merge( $mapped_fields, $this->mappings->get_mapped_fields( $salesforce_mapping, array( $this->mappings->direction_sync, $this->mappings->direction_sf_wordpress ) ) ); // If Record Type is specified, restrict query. $mapping_record_types = $this->mappings->get_mapped_record_types( $salesforce_mapping ); // If Record Type is not specified for a given mapping, ensure query is unrestricted. if ( empty( $mapping_record_types ) ) { $mapped_record_types = false; } elseif ( is_array( $mapped_record_types ) ) { $mapped_record_types = array_merge( $mapped_record_types, $mapping_record_types ); } } // End foreach(). // There are no field mappings configured to pull data from Salesforce so // move on to the next mapped object. Prevents querying unmapped data. if ( empty( $mapped_fields ) ) { return null; } if ( ! isset( $saved_query ) ) { $soql = new Object_Sync_Sf_Salesforce_Select_Query( $type ); // Convert field mappings to SOQL. $soql->fields = array_merge( $mapped_fields, array( 'Id' => 'Id', $salesforce_mapping['pull_trigger_field'] => $salesforce_mapping['pull_trigger_field'], ) ); // these are bit operators, so we leave out the strict if ( in_array( $this->mappings->sync_sf_create, $salesforce_mapping['sync_triggers'] ) ) { $soql->fields['CreatedDate'] = 'CreatedDate'; } // Order by the trigger field, requesting the oldest records first $soql->order = array( $salesforce_mapping['pull_trigger_field'] => 'ASC', ); // Set a limit on the number of records that can be retrieved from the API at one time. $soql->limit = filter_var( get_option( $this->option_prefix . 'pull_query_limit', 25 ), FILTER_VALIDATE_INT ); } else { $soql = $saved_query; } // Get the value for the pull trigger field. Often this will LastModifiedDate. It needs to change when the query gets regenerated after the max offset has been reached. $pull_trigger_field_value = $this->get_pull_date_value( $type, $soql ); // we check to see if the stored date is the same as the new one. if it is not, we will want to reset the offset $reset_offset = false; $has_date = false; $key = array_search( $salesforce_mapping['pull_trigger_field'], array_column( $soql->conditions, 'field' ) ); if ( false !== $key ) { $has_date = true; if ( $soql->conditions[ $key ]['value'] !== $pull_trigger_field_value ) { $reset_offset = true; } } if ( false === $has_date ) { $reset_offset = true; $soql->add_condition( $salesforce_mapping['pull_trigger_field'], $pull_trigger_field_value, '>' ); } else { $soql->conditions[ $key ]['value'] = $pull_trigger_field_value; } // Get the value for the SOQL offset. If max has already been reached, it is zero. $soql->offset = $this->get_pull_offset( $type, $soql, $reset_offset ); // add a filter here to modify the query // Hook to allow other plugins to modify the SOQL query before it is sent to Salesforce $soql = apply_filters( $this->option_prefix . 'pull_query_modify', $soql, $type, $salesforce_mapping, $mapped_fields ); // quick example to change the order to descending /* add_filter( 'object_sync_for_salesforce_pull_query_modify', 'change_pull_query', 10, 4 ); // can always reduce this number if all the arguments are not necessary function change_pull_query( $soql, $type, $salesforce_mapping, $mapped_fields ) { $soql->order = 'DESC'; return $soql; } */ // Make sure our SOQL object properties that are arrays are unique. This prevents values added via developer hook from being added repeatedly when a query is cached. if ( version_compare( PHP_VERSION, '7.0.8', '>=' ) ) { $soql->fields = array_unique( $soql->fields, SORT_REGULAR ); $soql->order = array_unique( $soql->order, SORT_REGULAR ); $soql->conditions = array_unique( $soql->conditions, SORT_REGULAR ); } else { $soql->fields = array_map( 'unserialize', array_unique( array_map( 'serialize', $soql->fields ) ) ); $soql->order = array_map( 'unserialize', array_unique( array_map( 'serialize', $soql->order ) ) ); $soql->conditions = array_map( 'unserialize', array_unique( array_map( 'serialize', $soql->conditions ) ) ); } // serialize the currently running SOQL query and store it for this type $serialized_current_query = maybe_serialize( $soql ); update_option( $this->option_prefix . 'currently_pulling_query_' . $type, $serialized_current_query, false ); return $soql; } /** * Determine the offset for the SOQL query to run * * @param string $type * e.g. "Contact", "Account", etc. * @param object $soql * the SOQL object * @param bool $reset * whether to reset the offset * */ private function get_pull_offset( $type, $soql, $reset = false ) { // set an offset. if there is a saved offset, add the limit to it and move on. otherwise, use the limit. $offset = isset( $soql->offset ) ? $soql->offset + $soql->limit : $soql->limit; if ( true === $reset || $offset > $this->max_soql_size ) { $offset = 0; } return $offset; } /** * Given a SObject type name, determine the datetime value the SOQL object should use to filter results. Often this will be LastModifiedDate. * * @param string $type * e.g. "Contact", "Account", etc. * * @return timestamp $pull_trigger_field_value * */ private function get_pull_date_value( $type, $soql ) { // If no lastupdate, get all records, else get records since last pull. // this should be what keeps it from getting all the records, whether or not they've ever been updated // we also use the option for when the plugin was installed, and don't go back further than that by default $sf_activate_time = get_option( $this->option_prefix . 'activate_time', '' ); $sf_last_sync = get_option( $this->option_prefix . 'pull_last_sync_' . $type, null ); if ( $sf_last_sync ) { $pull_trigger_field_value = gmdate( 'Y-m-d\TH:i:s\Z', $sf_last_sync ); } else { $pull_trigger_field_value = gmdate( 'Y-m-d\TH:i:s\Z', $sf_activate_time ); } // todo: put a hook in here to let devs go retroactive if they want, and sync data from before plugin was activated return $pull_trigger_field_value; } /** * Get merged records from Salesforce. * Note that merges can currently only work if the Soap API is enabled. * */ private function get_merged_records() { $sfapi = $this->salesforce['sfapi']; $use_soap = $this->salesforce['soap_loaded']; if ( true === $use_soap ) { $wsdl = get_option( 'object_sync_for_salesforce_soap_wsdl_path', plugin_dir_path( __FILE__ ) . '../vendor/developerforce/force.com-toolkit-for-php/soapclient/partner.wsdl.xml' ); $soap = new Object_Sync_Sf_Salesforce_Soap_Partner( $sfapi, $wsdl ); } $seconds = 60; $merged_records = array(); // Load fieldmaps for mergeable types foreach ( $this->mergeable_record_types as $type ) { $mappings = $this->mappings->get_fieldmaps( null, array( 'salesforce_object' => $type, ) ); // Iterate over each field mapping to determine our query parameters. foreach ( $mappings as $salesforce_mapping ) { $last_merge_sync = get_option( $this->option_prefix . 'pull_merge_last_' . $salesforce_mapping['salesforce_object'], time() ); $now = time(); update_option( $this->option_prefix . 'pull_merge_last_' . $salesforce_mapping['salesforce_object'], $now ); // get_deleted() constraint: startDate cannot be more than 30 days ago // (using an incompatible date may lead to exceptions). $last_merge_sync = $last_merge_sync > ( time() - 2505600 ) ? $last_merge_sync : ( time() - 2505600 ); // get_deleted() constraint: startDate must be at least one minute greater // than endDate. $now = $now > ( $last_merge_sync + 60 ) ? $now : $now + 60; // need to be using gmdate for Salesforce call $last_merge_sync_sf = gmdate( 'Y-m-d\TH:i:s\Z', $last_merge_sync ); // we want to add something like this eventually, to the query: AND SystemModstamp > 2006-01-01T23:01:01+01:00 $merged = array(); // there doesn't appear to be a way to do this in the rest api; for now we'll do soap if ( true === $use_soap ) { $type = $salesforce_mapping['salesforce_object']; $query = "SELECT Id, isDeleted, masterRecordId FROM $type WHERE masterRecordId != '' AND SystemModStamp > $last_merge_sync_sf"; $merged = $soap->try_soap( 'queryAll', $query ); if ( ! empty( $merged->records ) ) { $merged = json_decode( wp_json_encode( $merged->records ), true ); } else { continue; } foreach ( $merged as $result ) { $record = array(); if ( is_array( array_unique( $result['Id'] ) ) ) { $record['Id'] = array_unique( $result['Id'] )[0]; } else { $record['Id'] = $result['Id']; } if ( isset( $result['any'] ) ) { libxml_use_internal_errors( true ); $any = simplexml_load_string( '' . esc_html__( 'Object: %1$s with %2$s of %3$s', 'object-sync-for-salesforce' ) . '
' . esc_html__( 'Message: ', 'object-sync-for-salesforce' ) . '%4$s' . '
', esc_attr( $salesforce_mapping['wordpress_object'] ), esc_attr( $wordpress_id_field_name ), esc_attr( $wordpress_id ), print_r( $result['errors'], true ) // if we get this error, we need to know whatever we have ); $result = array( 'title' => $title, 'message' => $body, 'trigger' => $sf_sync_trigger, 'parent' => $wordpress_id, 'status' => $status, ); $logging->setup( $result ); $results[] = $result; // hook for pull fail do_action( $this->option_prefix . 'pull_fail', $op, $result, $synced_object ); } // End if(). return $results; } /** * Update records in WordPress from a Salesforce pull * * @param string $sf_sync_trigger * The current operation's trigger * @param array $synced_object * Combined data for fieldmap, mapping object, and Salesforce object data * @param array $params * Array of mapped key value pairs between WordPress and Salesforce fields. * @param string $wordpress_id_field_name * The name of the ID field for this particular WordPress object type * @param int $seconds * Timeout for the transient value to determine the direction for a sync. * @return array $results * Currently this contains an array of log entries for each attempt. * */ private function update_called_from_salesforce( $sf_sync_trigger, $synced_object, $params, $wordpress_id_field_name, $seconds ) { $salesforce_mapping = $synced_object['mapping']; $mapping_object = $synced_object['mapping_object']; $object = $synced_object['salesforce_object']; // methods to run the wp update operations $results = array(); $op = ''; // if the last sync is greater than the last time this object was updated by Salesforce, skip it // this keeps us from doing redundant syncs // because SF stores all DateTimes in UTC. $mapping_object['object_updated'] = current_time( 'mysql' ); $pull_trigger_field = $salesforce_mapping['pull_trigger_field']; $pull_trigger_value = $object[ $pull_trigger_field ]; // hook to allow other plugins to do something right before WordPress data is saved // ex: run outside methods on an object if it exists, or do something in preparation for it if it doesn't do_action( $this->option_prefix . 'pre_pull', $mapping_object['wordpress_id'], $salesforce_mapping, $object, $wordpress_id_field_name, $params ); try { $op = 'Update'; $result = $this->wordpress->object_update( $salesforce_mapping['wordpress_object'], $mapping_object['wordpress_id'], $params ); $mapping_object['last_sync_status'] = $this->mappings->status_success; $mapping_object['last_sync_message'] = esc_html__( 'Mapping object updated via function: ', 'object-sync-for-salesforce' ) . __FUNCTION__; $status = 'success'; if ( isset( $this->logging ) ) { $logging = $this->logging; } elseif ( class_exists( 'Object_Sync_Sf_Logging' ) ) { $logging = new Object_Sync_Sf_Logging( $this->wpdb, $this->version ); } $title = sprintf( // translators: placeholders are: 1) the log status, 2) what operation is happening, 3) the name of the WordPress object type, 4) the WordPress id field name, 5) the WordPress object id value, 6) the name of the Salesforce object, 7) the Salesforce Id value esc_html__( '%1$s: %2$s WordPress %3$s with %4$s of %5$s (Salesforce %6$s Id of %7$s)', 'object-sync-for-salesforce' ), ucfirst( esc_attr( $status ) ), esc_attr( $op ), esc_attr( $salesforce_mapping['wordpress_object'] ), esc_attr( $wordpress_id_field_name ), esc_attr( $mapping_object['wordpress_id'] ), esc_attr( $salesforce_mapping['salesforce_object'] ), esc_attr( $object['Id'] ) ); $result = array( 'title' => $title, 'message' => '', 'trigger' => $sf_sync_trigger, 'parent' => $mapping_object['wordpress_id'], 'status' => $status, ); $logging->setup( $result ); $results[] = $result; // hook for pull success do_action( $this->option_prefix . 'pull_success', $op, $result, $synced_object ); } catch ( WordpressException $e ) { // create log entry for failed update $status = 'error'; if ( isset( $this->logging ) ) { $logging = $this->logging; } elseif ( class_exists( 'Object_Sync_Sf_Logging' ) ) { $logging = new Object_Sync_Sf_Logging( $this->wpdb, $this->version ); } $title .= sprintf( // translators: placeholders are: 1) the log status, 2) what operation is happening, 3) the name of the WordPress object, 4) the WordPress id field name, 5) the WordPress object id value, 6) the name of the Salesforce object, 7) the Salesforce Id value esc_html__( '%1$s: %2$s WordPress %3$s with %4$s of %5$s (Salesforce %6$s with Id of %7$s)', 'object-sync-for-salesforce' ), ucfirst( esc_attr( $status ) ), esc_attr( $op ), esc_attr( $salesforce_mapping['wordpress_object'] ), esc_attr( $wordpress_id_field_name ), esc_attr( $mapping_object['wordpress_id'] ), esc_attr( $salesforce_mapping['salesforce_object'] ), esc_attr( $object['Id'] ) ); $result = array( 'title' => $title, 'message' => $e->getMessage(), 'trigger' => $sf_sync_trigger, 'parent' => $mapping_object['wordpress_id'], 'status' => $status, ); $logging->setup( $result ); $results[] = $result; $mapping_object['last_sync_status'] = $this->mappings->status_error; $mapping_object['last_sync_message'] = $e->getMessage(); if ( false === $hold_exceptions ) { throw $e; } if ( empty( $exception ) ) { $exception = $e; } else { $my_class = get_class( $e ); $exception = new $my_class( $e->getMessage(), $e->getCode(), $exception ); } // hook for pull fail do_action( $this->option_prefix . 'pull_fail', $op, $result, $synced_object ); } // End try(). // need to move these into the success check // maybe can check to see if we actually updated anything in WordPress // tell the mapping object - whether it is new or already existed - how we just used it $mapping_object['last_sync_action'] = 'pull'; $mapping_object['last_sync'] = current_time( 'mysql' ); // update that mapping object. the Salesforce data version will be set here as well because we set it earlier $update_object_map = $this->mappings->update_object_map( $mapping_object, $mapping_object['id'] ); return $results; } /** * Delete records in WordPress from a Salesforce pull * * @param string $sf_sync_trigger * The current operation's trigger * @param array $synced_object * Combined data for fieldmap, mapping object, and Salesforce object data * @param string $wordpress_id_field_name * The name of the ID field for this particular WordPress object type * @param int $seconds * Timeout for the transient value to determine the direction for a sync. * @param array $mapping_objects * The data for the mapping objects between the individual Salesforce and WordPress items. We only pass this because of the need to count before deleting records. * @return array $results * Currently this contains an array of log entries for each attempt. * */ private function delete_called_from_salesforce( $sf_sync_trigger, $synced_object, $wordpress_id_field_name, $seconds, $mapping_objects ) { $salesforce_mapping = $synced_object['mapping']; $mapping_object = $synced_object['mapping_object']; // methods to run the wp delete operations $results = array(); $op = ''; // deleting mapped objects if ( $sf_sync_trigger == $this->mappings->sync_sf_delete ) { // trigger is a bit operator if ( isset( $mapping_object['id'] ) ) { $op = 'Delete'; // only delete if there are no additional mapping objects for this record if ( 1 === count( $mapping_objects ) ) { set_transient( 'salesforce_pulling_' . $mapping_object['salesforce_id'], 1, $seconds ); set_transient( 'salesforce_pulling_object_id', $mapping_object['salesforce_id'] ); try { $result = $this->wordpress->object_delete( $salesforce_mapping['wordpress_object'], $mapping_object['wordpress_id'] ); } catch ( WordpressException $e ) { $status = 'error'; // create log entry for failed delete if ( isset( $this->logging ) ) { $logging = $this->logging; } elseif ( class_exists( 'Object_Sync_Sf_Logging' ) ) { $logging = new Object_Sync_Sf_Logging( $this->wpdb, $this->version ); } $title = sprintf( // translators: placeholders are: 1) the log status, 2) what operation is happening, 3) the name of the WordPress object type, 4) the WordPress id field name, 5) the WordPress object id value, 6) the name of the Salesforce object, 7) the Salesforce Id value esc_html__( '%1$s: %2$s WordPress %3$s with %4$s of %5$s (%6$s %7$s)', 'object-sync-for-salesforce' ), ucfirst( esc_attr( $status ) ), esc_attr( $op ), esc_attr( $salesforce_mapping['wordpress_object'] ), esc_attr( $wordpress_id_field_name ), esc_attr( $mapping_object['wordpress_id'] ), esc_attr( $salesforce_mapping['salesforce_object'] ), esc_attr( $mapping_object['salesforce_id'] ) ); $result = array( 'title' => $title, 'message' => $e->getMessage(), 'trigger' => $sf_sync_trigger, 'parent' => $mapping_object['wordpress_id'], 'status' => $status, ); $logging->setup( $result ); $results[] = $result; if ( false === $hold_exceptions ) { throw $e; } if ( empty( $exception ) ) { $exception = $e; } else { $my_class = get_class( $e ); $exception = new $my_class( $e->getMessage(), $e->getCode(), $exception ); } // hook for pull fail do_action( $this->option_prefix . 'pull_fail', $op, $result, $synced_object ); } // End try(). if ( ! isset( $e ) ) { // create log entry for successful delete if the result had no errors $status = 'success'; if ( isset( $this->logging ) ) { $logging = $this->logging; } elseif ( class_exists( 'Object_Sync_Sf_Logging' ) ) { $logging = new Object_Sync_Sf_Logging( $this->wpdb, $this->version ); } $title = sprintf( // translators: placeholders are: 1) the log status, 2) what operation is happening, 3) the name of the WordPress object type, 4) the WordPress id field name, 5) the WordPress object id value, 6) the name of the Salesforce object, 7) the Salesforce Id value esc_html__( '%1$s: %2$s WordPress %3$s with %4$s of %5$s (%6$s %7$s)', 'object-sync-for-salesforce' ), ucfirst( esc_attr( $status ) ), esc_attr( $op ), esc_attr( $salesforce_mapping['wordpress_object'] ), esc_attr( $wordpress_id_field_name ), esc_attr( $mapping_object['wordpress_id'] ), esc_attr( $salesforce_mapping['salesforce_object'] ), esc_attr( $mapping_object['salesforce_id'] ) ); $result = array( 'title' => $title, 'message' => '', 'trigger' => $sf_sync_trigger, 'parent' => $mapping_object['wordpress_id'], 'status' => $status, ); $logging->setup( $result ); $results[] = $result; // hook for pull success do_action( $this->option_prefix . 'pull_success', $op, $result, $synced_object ); } // End if() successful } else { // create log entry for additional mapped items $more_ids = sprintf( // translators: parameter is the name of the WordPress id field name '' . esc_html__( 'The WordPress record was not deleted because there are multiple Salesforce IDs that match this WordPress %1$s.) They are:', 'object-sync-for-salesforce' ) . '
', esc_attr( $wordpress_id_field_name ) ); $more_ids .= 'The map row between this Salesforce object and the WordPress object, as stored in the WordPress database, will be deleted, and this Salesforce object has been deleted, but WordPress object data will remain untouched.
', 'object-sync-for-salesforce' ); $status = 'notice'; if ( isset( $this->logging ) ) { $logging = $this->logging; } elseif ( class_exists( 'Object_Sync_Sf_Logging' ) ) { $logging = new Object_Sync_Sf_Logging( $this->wpdb, $this->version ); } $title = sprintf( // translators: placeholders are: 1) the operation that is happening, 2) the name of the WordPress object type, 3) the WordPress id field name, 4) the WordPress object id value, 5) the name of the Salesforce object type, 6) the Salesforce Id esc_html__( '%1$s: %2$s on WordPress %3$s with %4$s of %5$s was stopped because there are other WordPress records mapped to Salesforce %6$s of %7$s', 'object-sync-for-salesforce' ), ucfirst( esc_attr( $status ) ), esc_attr( $op ), esc_attr( $salesforce_mapping['wordpress_object'] ), esc_attr( $wordpress_id_field_name ), esc_attr( $mapping_object['wordpress_id'] ), esc_attr( $salesforce_mapping['salesforce_object'] ), esc_attr( $mapping_object['salesforce_id'] ) ); $notice = array( 'title' => $title, 'message' => $more_ids, 'trigger' => $sf_sync_trigger, 'parent' => 0, 'status' => $status, ); $logging->setup( $notice ); } // End if() on count // delete the map row from WordPress after the WordPress row has been deleted // we delete the map row even if the WordPress delete failed, because the Salesforce object is gone $this->mappings->delete_object_map( $mapping_object['id'] ); // there is no map row if we end this if statement } // End if(). } // End if(). return $results; } /** * Clear the currently stored query for the specified content type * * @param string $type * e.g. "Contact", "Account", etc. * */ public function clear_current_type_query( $type ) { // update the last sync timestamp for this content type $this->increment_current_type_datetime( $type ); // delete the option value for the currently pulling query for this type delete_option( $this->option_prefix . 'currently_pulling_query_' . $type ); // delete the option value for the last pull record id delete_option( $this->option_prefix . 'last_pull_id' ); } /** * Increment the currently running query's datetime * * @param string $type * e.g. "Contact", "Account", etc. * @param timestamp $next_query_modified_date * the last record's modified datetime, or the current time if there isn't one * */ private function increment_current_type_datetime( $type, $next_query_modified_date = '' ) { // update the last sync timestamp for this content type if ( '' === $next_query_modified_date ) { $next_query_modified_date = time(); } else { $next_query_modified_date = strtotime( $next_query_modified_date ); } update_option( $this->option_prefix . 'pull_last_sync_' . $type, $next_query_modified_date ); } /** * Create an object map between a Salesforce object and a WordPress object * * @param array $salesforce_object * Array of the salesforce object's data * @param string $wordpress_id * Unique identifier for the WordPress object * @param array $field_mapping * The row that maps the object types together, including which fields match which other fields * * @return int $wpdb->insert_id * This is the database row for the map object * */ private function create_object_map( $salesforce_object, $wordpress_id, $field_mapping ) { // Create object map and save it $mapping_object = $this->mappings->create_object_map( array( 'wordpress_id' => $wordpress_id, // wordpress unique id 'salesforce_id' => $salesforce_object['Id'], // salesforce unique id. we don't care what kind of object it is at this point 'wordpress_object' => $field_mapping['wordpress_object'], // keep track of what kind of wp object this is 'last_sync' => current_time( 'mysql' ), 'last_sync_action' => 'pull', 'last_sync_status' => $this->mappings->status_success, 'last_sync_message' => esc_html__( 'Mapping object created via function: ', 'object-sync-for-salesforce' ) . __FUNCTION__, 'action' => 'created', ) ); return $mapping_object; } /** * Find out if pull is allowed for this record * * @param string $object_type * Salesforce object type * @param array $object * Array of the salesforce object's data * @param string $sf_sync_trigger * The current operation's trigger * @param array $mapping * the fieldmap that maps the two object types * @param array $map_sync_triggers * * @return bool $pull_allowed * Whether all this stuff allows the $result to be pulled into WordPress * */ private function is_pull_allowed( $object_type, $object, $sf_sync_trigger, $salesforce_mapping, $map_sync_triggers ) { // default is pull is allowed $pull_allowed = true; // if the current fieldmap does not allow create, we need to check if there is an object map for the Salesforce object Id. if not, set pull_allowed to false. if ( ! in_array( $this->mappings->sync_sf_create, $map_sync_triggers ) ) { $object_map = $this->mappings->load_all_by_salesforce( $object['Id'] ); if ( empty( $object_map ) ) { $pull_allowed = false; } } // Hook to allow other plugins to prevent a pull per-mapping. // Putting the pull_allowed hook here will keep the queue from storing data when it is not supposed to store it $pull_allowed = apply_filters( $this->option_prefix . 'pull_object_allowed', $pull_allowed, $object_type, $object, $sf_sync_trigger, $salesforce_mapping ); // example to keep from pulling the Contact with id of abcdef /* add_filter( 'object_sync_for_salesforce_pull_object_allowed', 'check_user', 10, 5 ); // can always reduce this number if all the arguments are not necessary function check_user( $pull_allowed, $object_type, $object, $sf_sync_trigger, $salesforce_mapping ) { if ( 'Contact' === $object_type && 'abcdef' === $object['Id'] ) { $pull_allowed = false; } return $pull_allowed; } */ return $pull_allowed; } }